diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java index 1c89f7709521..36ba19d7da3c 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java @@ -19,15 +19,55 @@ import org.joda.time.Instant; +/** + * Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a + * proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at + * least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous + * bundles have completed. + * + *

A bundle is considered complete only when the outputs corresponding to each element in the + * bundle have been resolved and the watermark associated with the bundle(if any) is propagated + * downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1. + * In case of synchronous ParDo, outputs of the element is resolved immediately after the + * processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when + * all the future emitted by the processElement is resolved. + * + * @param output type of the {@link DoFnOp} + */ public interface BundleManager { + /** Starts a new bundle if not already started, then adds an element to the existing bundle. */ void tryStartBundle(); + /** + * Signals a watermark event arrived. The BundleManager will decide if the watermark needs to be + * processed, and notify the listener if needed. + * + * @param watermark + * @param emitter + */ void processWatermark(Instant watermark, OpEmitter emitter); + /** + * Signals the BundleManager that a timer is up. + * + * @param keyedTimerData + * @param emitter + */ void processTimer(KeyedTimerData keyedTimerData, OpEmitter emitter); + /** + * Fails the current bundle, throws away the pending output, and resets the bundle to an empty + * state. + * + * @param t the throwable that caused the failure. + */ void signalFailure(Throwable t); + /** + * Tries to close the bundle, and reset the bundle to an empty state. + * + * @param emitter + */ void tryFinishBundle(OpEmitter emitter); /** diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java index 4a90e4688247..47899dd43c02 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java @@ -39,22 +39,10 @@ import org.slf4j.LoggerFactory; /** - * Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a - * proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at - * least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous - * bundles have completed. - * - *

A bundle is considered complete only when the outputs corresponding to each element in the - * bundle have been resolved and the watermark associated with the bundle(if any) is propagated - * downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1. - * In case of synchronous ParDo, outputs of the element is resolved immediately after the - * processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when - * all the future emitted by the processElement is resolved. - * - *

This class is not thread safe and the current implementation relies on the assumption that - * messages are dispatched to BundleManager in a single threaded mode. - * - * @param output type of the {@link DoFnOp} + * @inheritDoc Implementation of BundleManager for non-portable mode. Keeps track of the async + * function completions. + *

This class is not thread safe and the current implementation relies on the assumption that + * messages are dispatched to BundleManager in a single threaded mode. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java index f8f30a8d2f86..b62b9246eb06 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java @@ -69,7 +69,7 @@ public void setUp() { } @Test - public void testTryStartBundleStartsBundle() { + public void testWhenFirstTryStartBundleThenStartsBundle() { bundleManager.tryStartBundle(); verify(bundleProgressListener, times(1)).onBundleStarted(); @@ -82,29 +82,14 @@ public void testTryStartBundleStartsBundle() { assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted()); } - @Test - public void testTryStartBundleThrowsExceptionAndSignalError() { + @Test(expected = IllegalArgumentException.class) + public void testWhenCurrentBundleDoneFutureIsNotNullThenStartBundleFails() { bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null)); - try { - bundleManager.tryStartBundle(); - } catch (IllegalArgumentException e) { - bundleManager.signalFailure(e); - } - - // verify if the signal failure only resets appropriate attributes of bundle - verify(mockFutureCollector, times(1)).prepare(); - verify(mockFutureCollector, times(1)).discard(); - assertEquals( - "Expected the number of element in the current bundle to 0", - 0L, - bundleManager.getCurrentBundleElementCount()); - assertEquals( - "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount()); - assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted()); + bundleManager.tryStartBundle(); } @Test - public void testTryStartBundleThrowsExceptionFromTheListener() { + public void testWhenSignalFailureThenResetCurrentBundle() { doThrow(new RuntimeException("User start bundle threw an exception")) .when(bundleProgressListener) .onBundleStarted(); @@ -128,7 +113,7 @@ public void testTryStartBundleThrowsExceptionFromTheListener() { } @Test - public void testMultipleStartBundle() { + public void testWhenMultipleTryStartThenOnlyStartBundleOnce() { bundleManager.tryStartBundle(); bundleManager.tryStartBundle(); @@ -153,7 +138,7 @@ public void testMultipleStartBundle() { * 2. onBundleFinished callback is invoked on the progress listener */ @Test - public void testTryFinishBundleClosesBundle() { + public void testWhenTryFinishBundleThenBundleIsReset() { OpEmitter mockEmitter = mock(OpEmitter.class); when(mockFutureCollector.finish()) .thenReturn(