Skip to content

Simplify budget distribution logic and new worker metadata consumption#32775

Merged
scwhittle merged 4 commits intoapache:masterfrom
m-trieu:mt-simplify-budget
Oct 21, 2024
Merged

Simplify budget distribution logic and new worker metadata consumption#32775
scwhittle merged 4 commits intoapache:masterfrom
m-trieu:mt-simplify-budget

Conversation

@m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Oct 14, 2024

Remove the budget distributor thread and move the managing of each server's internal budget to the direct GetWorkStream.

R: @arunpandianp @scwhittle


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @shunping added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since started is set inside startStreams after getWorkStream.get() without mutexes guarding them, started can be false here setBudget is called by a different thread when startStreams is in the middle of execution. do we need to change the synchronization in this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is in this PR #32774 and i added synchronization there
this is still the memoized Supplier.get() which is threadsafe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void sendRequestExtension(GetWorkBudget extension) {
private void maybeSendRequestExtension(GetWorkBudget extension) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just do = new Object() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mt-simplify-budget branch from 759ca44 to 53af347 Compare October 15, 2024 17:49
@m-trieu m-trieu force-pushed the mt-simplify-budget branch from 53af347 to 496d1ed Compare October 15, 2024 18:35
@@ -219,191 +201,195 @@ static FanOutStreamingEngineWorkerHarness forTesting(
@SuppressWarnings("ReturnValueIgnored")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this can be removed after removing get()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SuppressWarnings("FutureReturnValueIgnored")
private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) {
StreamingEngineBackends currentBackends = backends.get();
ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the variable and just do currentBackends.windmillStreams().entrySet() ... below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

newWorkerMetadataConsumer.shutdownNow();
Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness never started.");
Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
workerMetadataConsumer.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this shutdown all the stream senders (perhaps could call closeStaleStreams(emptyBackends))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

newWorkerMetadataPublisher.shutdownNow();
newWorkerMetadataConsumer.shutdownNow();
Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness never started.");
Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we should abort/close the stream below if it doesn't terminate within some amount of time after half-closing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Used shutdown instead, and then used awaitTermination


@VisibleForTesting
@Override
public synchronized void shutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also shutdown the windmillStreamManager? (after possibly closing streams below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/** Returns the remaining in-flight {@link GetWorkBudget}. */
GetWorkBudget remainingBudget();
default void setBudget(GetWorkBudget newBudget) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be the virtual method and the long variant defaulted to making a budget?
The implementation makes a budget object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong());
}

abstract AtomicLong itemsRequested();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about just using synchronized instead of lots of separate atomics? Multiple atomic ops might be worse performance anyway and it means we might have weird races where they are inconsistently updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done also added tests for this class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the members be changed to just raw longs/objects? The accessors just need to be synchronized as well.

Seems like this could be easier without autovalue since we don't need the accessors eather.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

this.heartbeatSender = heartbeatSender;
this.workCommitter = workCommitter;
this.getDataClient = getDataClient;
this.maxGetWorkBudget =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this max be moved into the tracker and synchronized within it?
As is there are races between setting and using this when calling into the tracker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

default:
throw new UnsupportedOperationException(
"Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS are supported WindmillServiceAddresses.");
"Only IPV6, GCP_SERVICE_ADDRESS, AUTHENTICATED_GCP_SERVICE_ADDRESS are supported"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove IPV6 from comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ImmutableCollection<T> streams, GetWorkBudget totalGetWorkBudget) {
GetWorkBudget activeWorkBudget = activeWorkBudgetSupplier.get();
LOG.info("Current active work budget: {}", activeWorkBudget);
// TODO: Fix possibly non-deterministic handing out of budgets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove, doesn't seem like the same if we are not doing deltas

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 17, 2024

back to you @scwhittle
Thanks!

() -> closeStreamSender(entry.getKey(), entry.getValue()),
windmillStreamManager));
entry -> {
CompletableFuture<Void> ignored =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this any different than just executing directly? if not it seems simpler to avoid the future.

windmillStreamManager.execute(
() -> closeStreamSender(entry.getKey(), entry.getValue()))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

CompletableFuture.runAsync(
() -> closeStreamSender(sender.endpoint(), sender), windmillStreamManager));
sender -> {
CompletableFuture<Void> ignored =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


public static WindmillEndpoints none() {
return WindmillEndpoints.builder()
.setVersion(Long.MAX_VALUE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min seems safer. Otherwise if somehow none() was observed the logic to ensure version is increasing mean's we'd never process another endpoint set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

+ "last sent request: %s. ",
workItemAssemblers.size(),
maxGetWorkBudget.get(),
budgetTracker.maxGetWorkBudget().get(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could move html generation into budgettracker and not need all the accessors. If we change how the tracker works in the future we might want to show more too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

new AtomicLong(), new AtomicLong(), new AtomicLong(), new AtomicLong());
}

abstract AtomicLong itemsRequested();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the members be changed to just raw longs/objects? The accessors just need to be synchronized as well.

Seems like this could be easier without autovalue since we don't need the accessors eather.

GetWorkStreamTestStub testStub = new GetWorkStreamTestStub(requestObserver);
stream = createGetWorkStream(testStub, GetWorkBudget.noBudget(), throttleTimer);
stream.startThrottleTimer();
testStub.injectResponse(Windmill.StreamingGetWorkResponseChunk.getDefaultInstance());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this run inline? otherwise it seems like it coudl be racy below that unthrottling happens?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it does, this is the actual response observer we create in the AbstractWindmillStream


private static class GetWorkStreamTestStub
extends CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1ImplBase {
private final TestGetWorkRequestObserver requestObserver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might as well mark volatile to prevent races

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the mutable state in TestGetWorkRequestObserver threadsafe, to prevent races
this field is sent once per lifetime of the stream so final is ok i think

GetWorkBudgetSpender getWorkBudgetSpender =
spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget()));
createBudgetDistributor(1L)
GetWorkBudgetSpender getWorkBudgetSpender = spy(createGetWorkBudgetOwner());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove spy here? already done in the helper method

ditto for below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@Test
public void testDistributeBudget_distributesBudgetEvenlyIfPossible() {
long totalItemsAndBytes = 10L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to have different items and bytes values to confirm distributor doesn't mix them up internally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -192,17 +82,17 @@ public void testDistributeBudget_distributesBudgetEvenlyIfPossible() {
streams.forEach(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just skip the math in the test and inline the right values?
The math is just copying what we have in the impl, if there is some bug in the impl hard coding the values at least is a sanity check.

ditto below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 21, 2024

thanks @scwhittle !
back to you

@scwhittle
Copy link
Contributor

Run Java PreCommit

}
private static final class GetWorkBudgetTracker {

@GuardedBy("GetWorkBudgetTracker.this")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scwhittle scwhittle merged commit 68f1543 into apache:master Oct 21, 2024
@Abacn
Copy link
Contributor

Abacn commented Nov 7, 2024

It seems "FanOutStreamingEngineWorkerHarnessTest" becomes increasingly flaky recently, e.g. on schedule run:

https://github.com/apache/beam/runs/32620998299

and on a PR run:

https://github.com/apache/beam/runs/32622849052

error:

java.lang.AssertionError
	at org.junit.Assert.fail(Assert.java:87)
	at org.junit.Assert.assertTrue(Assert.java:42)
	at org.junit.Assert.assertTrue(Assert.java:53)
	at org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarnessTest.testStreamsStartCorrectly(FanOutStreamingEngineWorkerHarnessTest.java:217)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants