Skip to content

resourcewatch: harden watch loop to prevent thread exhaustion#30944

Open
jcpowermac wants to merge 1 commit intoopenshift:mainfrom
jcpowermac:fix/resourcewatch-thread-exhaustion
Open

resourcewatch: harden watch loop to prevent thread exhaustion#30944
jcpowermac wants to merge 1 commit intoopenshift:mainfrom
jcpowermac:fix/resourcewatch-thread-exhaustion

Conversation

@jcpowermac
Copy link
Copy Markdown
Contributor

Handle watch error/bookmark events safely, always stop watch streams, and add context-aware retry backoff so reconnects do not spin and accumulate threads.

Made-with: Cursor

@openshift-ci-robot
Copy link
Copy Markdown

Pipeline controller notification
This repo is configured to use the pipeline controller. Second-stage tests will be triggered either automatically or after lgtm label is added, depending on the repository configuration. The pipeline controller will automatically detect which contexts are required and will utilize /test Prow commands to trigger the second stage.

For optional jobs, comment /test ? to see a list of all defined jobs. To trigger manually all jobs from second stage use /pipeline required command.

This repository is configured in: automatic mode

@openshift-ci openshift-ci bot requested review from p0lyn0mial and sjenning March 31, 2026 19:49
@openshift-ci
Copy link
Copy Markdown
Contributor

openshift-ci bot commented Mar 31, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: jcpowermac
Once this PR has been reviewed and has the lgtm label, please assign neisw for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 31, 2026

Walkthrough

Adds retry/backoff and stronger error handling to resource observation: sentinel errors, jittered exponential retry delays with special-casing for NotFound, context-aware wait logic, and stricter watch event processing. New unit tests exercise error-event handling, retry delay behavior, and watch lifecycle/cancellation.

Changes

Cohort / File(s) Summary
Resource Observer Core
pkg/resourcewatch/observe/observe.go
Added sentinel errors (errWatchClosed, errWatchErrorEvent, errUnexpectedObject), backoff constants, and a retry loop around listAndWatchResource with retryAttempt. Introduced nextRetryDelay, waitForRetry, and retryReason. listAndWatchResource now returns sentinel errors for closed watches and error events, handles watch.Bookmark/watch.Error/unexpected objects explicitly, defers resourceWatch.Stop(), and lets outer logic control NotFound pacing.
Resource Observer Tests
pkg/resourcewatch/observe/observe_test.go
Added test helpers (fakeNamespaceableResource, trackingWatch) and unit tests covering: watch Error event -> errWatchErrorEvent with status info and Stop() called; nextRetryDelay behavior including NotFound special-case and backoff bounds; watch Added event processing and cancellation/Stop() behavior; waitForRetry immediate-fail on canceled context.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
pkg/resourcewatch/observe/observe.go (1)

181-189: Consider removing unreachable default case.

The default branch at line 187-188 is now unreachable because the outer switch (lines 159-174) already handles unknown event types by logging and continuing. Only Added, Modified, and Deleted can reach this inner switch.

♻️ Suggested simplification
 			switch observation.Type {
 			case watch.Added:
 			case watch.Modified:
 				emitUpdate(observedResources, gvr, object, resourceC)
 			case watch.Deleted:
 				emitDelete(observedResources, gvr, object, resourceC)
-			default:
-				log.Info("Unhandled watch event", "type", observation.Type)
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/resourcewatch/observe/observe.go` around lines 181 - 189, The inner
switch on observation.Type contains a redundant default branch because unknown
types are already handled by the outer switch; remove the default case (the
log.Info("Unhandled watch event", "type", observation.Type) branch) from the
inner switch and keep only the case arms for watch.Added, watch.Modified (which
calls emitUpdate(observedResources, gvr, object, resourceC)), and watch.Deleted
(which calls emitDelete(observedResources, gvr, object, resourceC)) to simplify
the logic and avoid unreachable code.
pkg/resourcewatch/observe/observe_test.go (1)

143-147: Minor: Backoff increase test could be flaky due to jitter.

With jitter applied, there's a small theoretical chance that second <= first (e.g., if first gets +25% jitter and second gets -25% jitter). In practice this is extremely unlikely given the 2x multiplier, but for deterministic tests, consider testing without jitter or testing the trend over multiple attempts.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/resourcewatch/observe/observe_test.go` around lines 143 - 147, The test
around nextRetryDelay is flaky because jitter can make a later delay appear
smaller; modify the test to either disable jitter or assert the exponential
trend deterministically (e.g., call nextRetryDelay with a
deterministic/no-jitter mode or run multiple sequential calls and check that the
median/average increases). Locate the call site in the test using
nextRetryDelay(...) and change it to invoke nextRetryDelay in a deterministic
way (or add a parameter/flag to nextRetryDelay to turn off jitter for tests),
then assert second > first reliably or assert an increasing trend over several
attempts.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@pkg/resourcewatch/observe/observe_test.go`:
- Around line 143-147: The test around nextRetryDelay is flaky because jitter
can make a later delay appear smaller; modify the test to either disable jitter
or assert the exponential trend deterministically (e.g., call nextRetryDelay
with a deterministic/no-jitter mode or run multiple sequential calls and check
that the median/average increases). Locate the call site in the test using
nextRetryDelay(...) and change it to invoke nextRetryDelay in a deterministic
way (or add a parameter/flag to nextRetryDelay to turn off jitter for tests),
then assert second > first reliably or assert an increasing trend over several
attempts.

In `@pkg/resourcewatch/observe/observe.go`:
- Around line 181-189: The inner switch on observation.Type contains a redundant
default branch because unknown types are already handled by the outer switch;
remove the default case (the log.Info("Unhandled watch event", "type",
observation.Type) branch) from the inner switch and keep only the case arms for
watch.Added, watch.Modified (which calls emitUpdate(observedResources, gvr,
object, resourceC)), and watch.Deleted (which calls
emitDelete(observedResources, gvr, object, resourceC)) to simplify the logic and
avoid unreachable code.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e77c427c-59b6-46ed-a944-4e1ab724cfc5

📥 Commits

Reviewing files that changed from the base of the PR and between a85a0eb and b38a45a.

📒 Files selected for processing (2)
  • pkg/resourcewatch/observe/observe.go
  • pkg/resourcewatch/observe/observe_test.go

Handle watch error/bookmark events safely, always stop watch streams,
and add context-aware retry backoff so reconnects do not spin and
accumulate threads.

- Dispatch on event type before casting to *unstructured.Unstructured
  so that watch.Error (which carries *metav1.Status) and watch.Bookmark
  events no longer trigger a cast failure and tight retry loop.
- Route watch.Added events through emitUpdate so resources created
  after the initial list are observed immediately.
- Add defer resourceWatch.Stop() so all return paths release the
  underlying watch stream.
- Replace the bare time.Sleep with bounded exponential backoff and
  jitter, clamped to [500ms, 30s], using context-aware waiting for
  prompt cancellation.
- Propagate NotFound from listAndWatchResource as an error so retry
  pacing is governed centrally by ObserveResource.
- Add unit tests for error-event handling, watch cleanup, backoff
  bounds, context cancellation, and Added-event observation.

Made-with: Cursor
@jcpowermac jcpowermac force-pushed the fix/resourcewatch-thread-exhaustion branch from b38a45a to fd007d1 Compare March 31, 2026 19:55
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/resourcewatch/observe/observe.go`:
- Around line 53-66: The loop around listAndWatchResource currently retries on
all non-context errors; change it to treat decode/contract failures as terminal
by detecting those via retryReason(err) (or the specific error types your code
uses for decode/contract failures) and returning immediately instead of backing
off and retrying. Concretely, after the listAndWatchResource error check but
before computing nextRetryDelay/retrying, add a branch: if retryReason(err)
indicates a decode/contract failure (or errors.Is(err, ErrDecode) /
errors.Is(err, ErrContract)), log the terminal error and return; otherwise
continue with nextRetryDelay, waitForRetry, and retryAttempt increment as
before. Apply the same change to the other retry loops in this file that use
nextRetryDelay/retryAttempt (the other list/watch retry sites using
retryReason).
- Around line 58-70: The loop never resets retryAttempt because
listAndWatchResource currently only returns errors, so the retryAttempt++ on
error keeps growing; change the control flow so that when listAndWatchResource
finishes a clean watch cycle it returns nil (or another success signal) and the
caller sets retryAttempt = 0 before continuing; keep the existing logic that
increments retryAttempt only on error paths (when err != nil), call
nextRetryDelay/retryReason/waitForRetry only for errors, and ensure
waitForRetry(false) still returns early—update listAndWatchResource and the loop
around it (symbols: listAndWatchResource, retryAttempt, nextRetryDelay,
retryReason, waitForRetry) accordingly so healthy long-running periods reset the
retry back to base delay.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4d90937b-24f5-49bb-b16c-ec4783748900

📥 Commits

Reviewing files that changed from the base of the PR and between b38a45a and fd007d1.

📒 Files selected for processing (2)
  • pkg/resourcewatch/observe/observe.go
  • pkg/resourcewatch/observe/observe_test.go
✅ Files skipped from review due to trivial changes (1)
  • pkg/resourcewatch/observe/observe_test.go

Comment on lines 53 to +66
if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil {
log.Error(err, "failed to list and watch resource")
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}

retryDelay := nextRetryDelay(err, retryAttempt)
log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)

if !waitForRetry(ctx, retryDelay) {
return
}

retryAttempt++
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't retry terminal decode/contract failures forever.

This loop now backs off and retries every non-context error, including malformed watch payloads. Those won't self-heal, so the observer goroutine never returns; pkg/resourcewatch/observe/source.go:113-131 only closes finished after every observer exits. Keep retries for transient list/watch failures, but treat decode/contract errors as terminal.

Possible fix
 		if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil {
 			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
 				return
 			}
+			if errors.Is(err, errUnexpectedObject) {
+				log.Error(err, "terminal resource watch failure")
+				return
+			}

 			retryDelay := nextRetryDelay(err, retryAttempt)
 			log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)
 			case watch.Error:
 				status, ok := observation.Object.(*metav1.Status)
 				if !ok {
-					return fmt.Errorf("%w: %T", errWatchErrorEvent, observation.Object)
+					return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
 				}

Also applies to: 166-170, 179-181

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/resourcewatch/observe/observe.go` around lines 53 - 66, The loop around
listAndWatchResource currently retries on all non-context errors; change it to
treat decode/contract failures as terminal by detecting those via
retryReason(err) (or the specific error types your code uses for decode/contract
failures) and returning immediately instead of backing off and retrying.
Concretely, after the listAndWatchResource error check but before computing
nextRetryDelay/retrying, add a branch: if retryReason(err) indicates a
decode/contract failure (or errors.Is(err, ErrDecode) / errors.Is(err,
ErrContract)), log the terminal error and return; otherwise continue with
nextRetryDelay, waitForRetry, and retryAttempt increment as before. Apply the
same change to the other retry loops in this file that use
nextRetryDelay/retryAttempt (the other list/watch retry sites using
retryReason).

Comment on lines +58 to +70
retryDelay := nextRetryDelay(err, retryAttempt)
log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)

if !waitForRetry(ctx, retryDelay) {
return
}

retryAttempt++
continue
}

// If a watch cycle ends cleanly, start retries from the base delay.
retryAttempt = 0
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

retryAttempt never resets with the current control flow.

The increment at Line 65 runs on every failure, but the reset at Line 70 is unreachable now that listAndWatchResource only returns errors. After enough reconnects, later watch renewals stay pinned near maxRetryDelay even after long healthy periods.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/resourcewatch/observe/observe.go` around lines 58 - 70, The loop never
resets retryAttempt because listAndWatchResource currently only returns errors,
so the retryAttempt++ on error keeps growing; change the control flow so that
when listAndWatchResource finishes a clean watch cycle it returns nil (or
another success signal) and the caller sets retryAttempt = 0 before continuing;
keep the existing logic that increments retryAttempt only on error paths (when
err != nil), call nextRetryDelay/retryReason/waitForRetry only for errors, and
ensure waitForRetry(false) still returns early—update listAndWatchResource and
the loop around it (symbols: listAndWatchResource, retryAttempt, nextRetryDelay,
retryReason, waitForRetry) accordingly so healthy long-running periods reset the
retry back to base delay.

@openshift-ci-robot
Copy link
Copy Markdown

Scheduling required tests:
/test e2e-aws-csi
/test e2e-aws-ovn-fips
/test e2e-aws-ovn-microshift
/test e2e-aws-ovn-microshift-serial
/test e2e-aws-ovn-serial-1of2
/test e2e-aws-ovn-serial-2of2
/test e2e-gcp-csi
/test e2e-gcp-ovn
/test e2e-gcp-ovn-upgrade
/test e2e-metal-ipi-ovn-ipv6
/test e2e-vsphere-ovn
/test e2e-vsphere-ovn-upi

@neisw
Copy link
Copy Markdown
Contributor

neisw commented Mar 31, 2026

Code Review Report

PR: resourcewatch: harden watch loop to prevent thread exhaustion
Author: jcpowermac (Joseph Callen)
Base branch: main
Language: Go


1. Files Reviewed

  • pkg/resourcewatch/observe/observe.go (MODIFIED, +112/-13)
  • pkg/resourcewatch/observe/observe_test.go (NEW, 235 lines)

2. Unit Test Coverage

4 tests cover: error event handling with watch stop, backoff delay calculation (NotFound + attempts 0/1/50), Added event observation emission with cleanup, and context cancellation in waitForRetry.

Gaps:

Priority Gap
High errWatchClosed path untested — this is likely the primary trigger for the thread-exhaustion bug (closed watch channel should trigger retry with backoff)
High Full retry loop integration test missing — no test verifies that ObserveResource retries with increasing delays across multiple iterations
Medium errUnexpectedObject path untested
Medium retryReason mapping function untested
Medium waitForRetry happy path (wait completes normally, returns true) untested
Low nextRetryDelay is a natural candidate for table-driven tests

3. Idiomatic Go Code

Medium-High

Verify nil-watch guard before defer Stop()observe.go:~147
The defer resourceWatch.Stop() is an excellent fix. Verify the error check from client.Watch() precedes the defer — if Watch() can return a nil watcher on some error paths, calling Stop() on nil panics. From the diff, the pattern appears correct (error is checked first, then defer), but worth confirming.

Medium

math/rand vs math/rand/v2observe.go import
rand.Int63n is soft-deprecated since Go 1.22. If the module's go.mod permits it, switch to math/rand/v2 with rand.N().

Verify no nil-return tight loopobserve.go:ObserveResource
retryAttempt resets to 0 when listAndWatchResource returns nil. If there's any path where it returns nil in a tight loop (without having done real work), the backoff resets and the loop spins. From the diff, errWatchClosed is returned as a non-nil error on channel close, so this appears safe — but audit all return paths.

Low

Simplify backoff doublingobserve.go:nextRetryDelay
The loop with manual overflow guard and break can be simplified with Go 1.21's built-in min:

for i := 0; i < retryAttempt; i++ {
    backoff = min(backoff*2, maxRetryDelay)
}

Use time.NewTimer in waitForRetry — already done correctly in the diff (time.NewTimer + defer timer.Stop()). Good.

Jitter floor clampobserve.go:nextRetryDelay
The if backoff < minRetryDelay after jitter is doing real work (jitter can reduce below min). Add a brief comment explaining this.


4. DRY Compliance

Priority Issue
Medium Redundant backoff > maxRetryDelay check in nextRetryDelay — appears twice (once after the doubling loop, once after jitter). Consolidate to a single clamp after all mutations.
Medium fakeNamespaceableResource in tests (~100 lines of panic("not implemented") stubs) — consider using k8s.io/client-go/dynamic/fake.NewSimpleDynamicClient which provides this out of the box and stays in sync with upstream interface changes.
Low Error wrapping pattern fmt.Errorf("%w: ...", errSentinel, ...) is used consistently — no issue.

5. SOLID Compliance

Principle Rating Notes
SRP Good Each new function has a clear single purpose: nextRetryDelay (compute delay), waitForRetry (context-aware sleep), retryReason (error classification for logging)
OCP Good Watch event handling extended cleanly with Bookmark/Error cases without restructuring existing flow

6. Build Verification

Skipped — remote PR review without local checkout.


7. Overall Verdict: PASS WITH RECOMMENDATIONS

This is a well-targeted fix for a real production issue (thread exhaustion from unbounded watch retries). The defer resourceWatch.Stop() addition alone is a critical fix. The exponential backoff with jitter and context-aware retry are solid patterns.

Required Actions (blocking)

  1. Add a test for the errWatchClosed path — this is the primary trigger for the thread-exhaustion bug and must have coverage before merge

Recommended Improvements (non-blocking)

  1. Add a test exercising the full retry loop in ObserveResource (multiple retries with backoff + context cancellation)
  2. Consolidate the redundant backoff > maxRetryDelay check in nextRetryDelay to a single post-jitter clamp
  3. Consider replacing fakeNamespaceableResource with k8s.io/client-go/dynamic/fake.NewSimpleDynamicClient to reduce ~100 lines of boilerplate
  4. Switch from math/rand to math/rand/v2 if go.mod permits (Go >= 1.22)
  5. Add a test for retryReason error-to-string mapping
  6. Simplify the backoff doubling loop using min() (Go 1.21+)

🤖 Generated with Claude Code

@openshift-ci
Copy link
Copy Markdown
Contributor

openshift-ci bot commented Apr 1, 2026

@jcpowermac: all tests passed!

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants