Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,55 @@

import org.joda.time.Instant;

/**
* Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a
* proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at
* least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous
* bundles have completed.
*
* <p>A bundle is considered complete only when the outputs corresponding to each element in the
* bundle have been resolved and the watermark associated with the bundle(if any) is propagated
* downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1.
* In case of synchronous ParDo, outputs of the element is resolved immediately after the
* processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when
* all the future emitted by the processElement is resolved.
*
* @param <OutT> output type of the {@link DoFnOp}
*/
public interface BundleManager<OutT> {
/** Starts a new bundle if not already started, then adds an element to the existing bundle. */
void tryStartBundle();

/**
* Signals a watermark event arrived. The BundleManager will decide if the watermark needs to be
* processed, and notify the listener if needed.
*
* @param watermark
* @param emitter
*/
void processWatermark(Instant watermark, OpEmitter<OutT> emitter);

/**
* Signals the BundleManager that a timer is up.
*
* @param keyedTimerData
* @param emitter
*/
void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter);

/**
* Fails the current bundle, throws away the pending output, and resets the bundle to an empty
* state.
*
* @param t the throwable that caused the failure.
*/
void signalFailure(Throwable t);

/**
* Tries to close the bundle, and reset the bundle to an empty state.
*
* @param emitter
*/
void tryFinishBundle(OpEmitter<OutT> emitter);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,10 @@
import org.slf4j.LoggerFactory;

/**
* Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a
* proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at
* least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous
* bundles have completed.
*
* <p>A bundle is considered complete only when the outputs corresponding to each element in the
* bundle have been resolved and the watermark associated with the bundle(if any) is propagated
* downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1.
* In case of synchronous ParDo, outputs of the element is resolved immediately after the
* processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when
* all the future emitted by the processElement is resolved.
*
* <p>This class is not thread safe and the current implementation relies on the assumption that
* messages are dispatched to BundleManager in a single threaded mode.
*
* @param <OutT> output type of the {@link DoFnOp}
* @inheritDoc Implementation of BundleManager for non-portable mode. Keeps track of the async
* function completions.
* <p>This class is not thread safe and the current implementation relies on the assumption that
* messages are dispatched to BundleManager in a single threaded mode.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setUp() {
}

@Test
public void testTryStartBundleStartsBundle() {
public void testWhenFirstTryStartBundleThenStartsBundle() {
bundleManager.tryStartBundle();

verify(bundleProgressListener, times(1)).onBundleStarted();
Expand All @@ -82,29 +82,14 @@ public void testTryStartBundleStartsBundle() {
assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted());
}

@Test
public void testTryStartBundleThrowsExceptionAndSignalError() {
@Test(expected = IllegalArgumentException.class)
public void testWhenCurrentBundleDoneFutureIsNotNullThenStartBundleFails() {
bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null));
try {
bundleManager.tryStartBundle();
} catch (IllegalArgumentException e) {
bundleManager.signalFailure(e);
}

// verify if the signal failure only resets appropriate attributes of bundle
verify(mockFutureCollector, times(1)).prepare();
verify(mockFutureCollector, times(1)).discard();
assertEquals(
"Expected the number of element in the current bundle to 0",
0L,
bundleManager.getCurrentBundleElementCount());
assertEquals(
"Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
bundleManager.tryStartBundle();
}

@Test
public void testTryStartBundleThrowsExceptionFromTheListener() {
public void testWhenSignalFailureThenResetCurrentBundle() {
doThrow(new RuntimeException("User start bundle threw an exception"))
.when(bundleProgressListener)
.onBundleStarted();
Expand All @@ -128,7 +113,7 @@ public void testTryStartBundleThrowsExceptionFromTheListener() {
}

@Test
public void testMultipleStartBundle() {
public void testWhenMultipleTryStartThenOnlyStartBundleOnce() {
bundleManager.tryStartBundle();
bundleManager.tryStartBundle();

Expand All @@ -153,7 +138,7 @@ public void testMultipleStartBundle() {
* 2. onBundleFinished callback is invoked on the progress listener
*/
@Test
public void testTryFinishBundleClosesBundle() {
public void testWhenTryFinishBundleThenBundleIsReset() {
OpEmitter<String> mockEmitter = mock(OpEmitter.class);
when(mockFutureCollector.finish())
.thenReturn(
Expand Down