Skip to content

feat(core): integrate RetryManager into SegmentDestination upload pipeline#1160

Open
abueide wants to merge 6 commits intotapi/retry-manager-testsfrom
tapi/segment-destination
Open

feat(core): integrate RetryManager into SegmentDestination upload pipeline#1160
abueide wants to merge 6 commits intotapi/retry-manager-testsfrom
tapi/segment-destination

Conversation

@abueide
Copy link
Contributor

@abueide abueide commented Mar 10, 2026

Summary

  • Wire RetryManager into SegmentDestination for TAPI-compliant retry handling
  • Add uploadBatch() with error classification, retry-after parsing, and configurable retry behavior
  • Add aggregateErrors() to separate batch results by status (success/429/transient/permanent)
  • Add updateRetryState() that delegates to RetryManager and returns whether limits were exceeded
  • Implement per-event age pruning via _queuedAt timestamps and pruneExpiredEvents() — events older than maxTotalBackoffDuration are dropped individually
  • Add canRetry() upload gate check before sending batches
  • Add X-Retry-Count header propagation to API requests
  • Replace QueueFlushingPlugin.dequeue() with dequeueByMessageIds() for messageId-based dequeue
  • Replace isPendingUpload flag with promise-based concurrent flush guard
  • Override shutdown() (instead of standalone destroy()) to integrate with plugin lifecycle — prevents auto-flush timer leak on client cleanup
  • Handle RetryResult.limit_exceeded: log warning and let per-event age pruning handle drops (events are NOT bulk-dropped on global counter reset)
  • Add comprehensive tests for pruning logic, retry integration, and API retry count

PR 5 of 5 in the TAPI backoff/retry stack. Depends on #1159.

Test plan

  • All 42 SegmentDestination tests pass
  • All 461 total tests pass
  • shutdown() properly cleans up RetryManager timers via plugin lifecycle
  • Events only dropped by per-event _queuedAt age, not by global retry counter reset

🤖 Generated with Claude Code

@abueide abueide force-pushed the tapi/retry-manager-tests branch 2 times, most recently from ba89956 to aac6169 Compare March 12, 2026 14:57
@abueide abueide force-pushed the tapi/segment-destination branch 2 times, most recently from af03054 to 872790a Compare March 12, 2026 14:57
@abueide abueide force-pushed the tapi/retry-manager-tests branch from aac6169 to cf2abd2 Compare March 12, 2026 15:36
@abueide abueide force-pushed the tapi/segment-destination branch from 872790a to f6282d0 Compare March 12, 2026 15:42
@abueide abueide force-pushed the tapi/retry-manager-tests branch from cf2abd2 to 23ae940 Compare March 12, 2026 16:11
@abueide abueide force-pushed the tapi/segment-destination branch from f6282d0 to a49f9ec Compare March 12, 2026 16:11
@abueide abueide force-pushed the tapi/retry-manager-tests branch from 23ae940 to 9f2575b Compare March 12, 2026 16:40
@abueide abueide force-pushed the tapi/segment-destination branch from 2b7adb3 to 3fb2dce Compare March 12, 2026 16:40
@abueide abueide force-pushed the tapi/retry-manager-tests branch from 9f2575b to 6981059 Compare March 12, 2026 16:48
@abueide abueide force-pushed the tapi/segment-destination branch from 3fb2dce to ef94825 Compare March 12, 2026 16:48
abueide and others added 6 commits March 12, 2026 12:38
…eline

Wire RetryManager into SegmentDestination for TAPI-compliant retry
handling: uploadBatch() with error classification, event pruning on
partial failures, retry count header propagation, and QueueFlushingPlugin
error type handling updates.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Don't reset retry state on partial success when concurrent batches have
  429/transient errors
- Use if/else if so 429 takes precedence over transient error handling
- Remove redundant return Promise.resolve() in async function
- Fix duplicate keepalive property from master merge

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Use res.ok instead of res.status === 200 for 2xx success range
- Remove dead dequeue() method from QueueFlushingPlugin
- Add destroy() to SegmentDestination for RetryManager timer cleanup
- Reset retry state when queue is empty at flush time or after pruning
- Call handle429 per result instead of pre-aggregating, so
  RetryManager.applyRetryStrategy respects eager/lazy consolidation
- Simplify retryAfterSeconds fallback to ?? 60

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ination

Extract pruneExpiredEvents() and updateRetryState() from sendEvents to
improve readability. Remove redundant/obvious comments, merge duplicate
switch cases, and simplify return statements throughout.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… pruning

- Override shutdown() instead of standalone destroy() to integrate with
  the plugin lifecycle — prevents auto-flush timer leak on client cleanup
- Handle RetryResult 'limit_exceeded' from RetryManager: log warning and
  let per-event age pruning (pruneExpiredEvents via _queuedAt) handle
  event drops rather than dropping all retryable events on global counter
  reset
- Import RetryResult type for type-safe limit checking

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@abueide abueide force-pushed the tapi/retry-manager-tests branch from 6981059 to c49e640 Compare March 12, 2026 17:38
@abueide abueide force-pushed the tapi/segment-destination branch from ef94825 to 1be5046 Compare March 12, 2026 17:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant