Simplify budget distribution logic and new worker metadata consumption#32775
Simplify budget distribution logic and new worker metadata consumption#32775scwhittle merged 4 commits intoapache:masterfrom
Conversation
|
Assigning reviewers. If you would like to opt out of this review, comment R: @shunping added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
...n/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Since started is set inside startStreams after getWorkStream.get() without mutexes guarding them, started can be false here setBudget is called by a different thread when startStreams is in the middle of execution. do we need to change the synchronization in this class?
There was a problem hiding this comment.
that is in this PR #32774 and i added synchronization there
this is still the memoized Supplier.get() which is threadsafe
There was a problem hiding this comment.
| private void sendRequestExtension(GetWorkBudget extension) { | |
| private void maybeSendRequestExtension(GetWorkBudget extension) { |
There was a problem hiding this comment.
nit: just do = new Object() here?
...n/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java
Outdated
Show resolved
Hide resolved
…ream, simplify worker metadata consumption
759ca44 to
53af347
Compare
53af347 to
496d1ed
Compare
| @@ -219,191 +201,195 @@ static FanOutStreamingEngineWorkerHarness forTesting( | |||
| @SuppressWarnings("ReturnValueIgnored") | |||
There was a problem hiding this comment.
think this can be removed after removing get()
| @SuppressWarnings("FutureReturnValueIgnored") | ||
| private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) { | ||
| StreamingEngineBackends currentBackends = backends.get(); | ||
| ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams = |
There was a problem hiding this comment.
remove the variable and just do currentBackends.windmillStreams().entrySet() ... below
| newWorkerMetadataConsumer.shutdownNow(); | ||
| Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness never started."); | ||
| Preconditions.checkNotNull(getWorkerMetadataStream).halfClose(); | ||
| workerMetadataConsumer.shutdownNow(); |
There was a problem hiding this comment.
should this shutdown all the stream senders (perhaps could call closeStaleStreams(emptyBackends))
| newWorkerMetadataPublisher.shutdownNow(); | ||
| newWorkerMetadataConsumer.shutdownNow(); | ||
| Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness never started."); | ||
| Preconditions.checkNotNull(getWorkerMetadataStream).halfClose(); |
There was a problem hiding this comment.
seems like we should abort/close the stream below if it doesn't terminate within some amount of time after half-closing.
There was a problem hiding this comment.
Done
Used shutdown instead, and then used awaitTermination
|
|
||
| @VisibleForTesting | ||
| @Override | ||
| public synchronized void shutdown() { |
There was a problem hiding this comment.
should we also shutdown the windmillStreamManager? (after possibly closing streams below)
|
|
||
| /** Returns the remaining in-flight {@link GetWorkBudget}. */ | ||
| GetWorkBudget remainingBudget(); | ||
| default void setBudget(GetWorkBudget newBudget) { |
There was a problem hiding this comment.
should this be the virtual method and the long variant defaulted to making a budget?
The implementation makes a budget object
| new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong()); | ||
| } | ||
|
|
||
| abstract AtomicLong itemsRequested(); |
There was a problem hiding this comment.
how about just using synchronized instead of lots of separate atomics? Multiple atomic ops might be worse performance anyway and it means we might have weird races where they are inconsistently updated.
There was a problem hiding this comment.
Done also added tests for this class
There was a problem hiding this comment.
can the members be changed to just raw longs/objects? The accessors just need to be synchronized as well.
Seems like this could be easier without autovalue since we don't need the accessors eather.
| this.heartbeatSender = heartbeatSender; | ||
| this.workCommitter = workCommitter; | ||
| this.getDataClient = getDataClient; | ||
| this.maxGetWorkBudget = |
There was a problem hiding this comment.
can this max be moved into the tracker and synchronized within it?
As is there are races between setting and using this when calling into the tracker.
| default: | ||
| throw new UnsupportedOperationException( | ||
| "Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS are supported WindmillServiceAddresses."); | ||
| "Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS are supported" |
There was a problem hiding this comment.
remove IPV6 from comment
| ImmutableCollection<T> streams, GetWorkBudget totalGetWorkBudget) { | ||
| GetWorkBudget activeWorkBudget = activeWorkBudgetSupplier.get(); | ||
| LOG.info("Current active work budget: {}", activeWorkBudget); | ||
| // TODO: Fix possibly non-deterministic handing out of budgets. |
There was a problem hiding this comment.
remove, doesn't seem like the same if we are not doing deltas
|
back to you @scwhittle |
| () -> closeStreamSender(entry.getKey(), entry.getValue()), | ||
| windmillStreamManager)); | ||
| entry -> { | ||
| CompletableFuture<Void> ignored = |
There was a problem hiding this comment.
is this any different than just executing directly? if not it seems simpler to avoid the future.
windmillStreamManager.execute(
() -> closeStreamSender(entry.getKey(), entry.getValue()))
| CompletableFuture.runAsync( | ||
| () -> closeStreamSender(sender.endpoint(), sender), windmillStreamManager)); | ||
| sender -> { | ||
| CompletableFuture<Void> ignored = |
|
|
||
| public static WindmillEndpoints none() { | ||
| return WindmillEndpoints.builder() | ||
| .setVersion(Long.MAX_VALUE) |
There was a problem hiding this comment.
min seems safer. Otherwise if somehow none() was observed the logic to ensure version is increasing mean's we'd never process another endpoint set
| + "last sent request: %s. ", | ||
| workItemAssemblers.size(), | ||
| maxGetWorkBudget.get(), | ||
| budgetTracker.maxGetWorkBudget().get(), |
There was a problem hiding this comment.
could move html generation into budgettracker and not need all the accessors. If we change how the tracker works in the future we might want to show more too.
| new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong()); | ||
| } | ||
|
|
||
| abstract AtomicLong itemsRequested(); |
There was a problem hiding this comment.
can the members be changed to just raw longs/objects? The accessors just need to be synchronized as well.
Seems like this could be easier without autovalue since we don't need the accessors eather.
| GetWorkStreamTestStub testStub = new GetWorkStreamTestStub(requestObserver); | ||
| stream = createGetWorkStream(testStub, GetWorkBudget.noBudget(), throttleTimer); | ||
| stream.startThrottleTimer(); | ||
| testStub.injectResponse(Windmill.StreamingGetWorkResponseChunk.getDefaultInstance()); |
There was a problem hiding this comment.
does this run inline? otherwise it seems like it coudl be racy below that unthrottling happens?
There was a problem hiding this comment.
yea it does, this is the actual response observer we create in the AbstractWindmillStream
|
|
||
| private static class GetWorkStreamTestStub | ||
| extends CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1ImplBase { | ||
| private final TestGetWorkRequestObserver requestObserver; |
There was a problem hiding this comment.
might as well mark volatile to prevent races
There was a problem hiding this comment.
made the mutable state in TestGetWorkRequestObserver threadsafe, to prevent races
this field is sent once per lifetime of the stream so final is ok i think
| GetWorkBudgetSpender getWorkBudgetSpender = | ||
| spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget())); | ||
| createBudgetDistributor(1L) | ||
| GetWorkBudgetSpender getWorkBudgetSpender = spy(createGetWorkBudgetOwner()); |
There was a problem hiding this comment.
remove spy here? already done in the helper method
ditto for below
|
|
||
| @Test | ||
| public void testDistributeBudget_distributesBudgetEvenlyIfPossible() { | ||
| long totalItemsAndBytes = 10L; |
There was a problem hiding this comment.
would be better to have different items and bytes values to confirm distributor doesn't mix them up internally
| @@ -192,17 +82,17 @@ public void testDistributeBudget_distributesBudgetEvenlyIfPossible() { | |||
| streams.forEach( | |||
There was a problem hiding this comment.
just skip the math in the test and inline the right values?
The math is just copying what we have in the impl, if there is some bug in the impl hard coding the values at least is a sanity check.
ditto below.
|
thanks @scwhittle ! |
|
Run Java PreCommit |
| } | ||
| private static final class GetWorkBudgetTracker { | ||
|
|
||
| @GuardedBy("GetWorkBudgetTracker.this") |
There was a problem hiding this comment.
|
It seems "FanOutStreamingEngineWorkerHarnessTest" becomes increasingly flaky recently, e.g. on schedule run: https://github.com/apache/beam/runs/32620998299 and on a PR run: https://github.com/apache/beam/runs/32622849052 error: |
Remove the budget distributor thread and move the managing of each server's internal budget to the direct GetWorkStream.
R: @arunpandianp @scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.