diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java index 1c89f7709521..36ba19d7da3c 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java @@ -19,15 +19,55 @@ import org.joda.time.Instant; +/** + * Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a + * proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at + * least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous + * bundles have completed. + * + *
A bundle is considered complete only when the outputs corresponding to each element in the
+ * bundle have been resolved and the watermark associated with the bundle(if any) is propagated
+ * downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1.
+ * In case of synchronous ParDo, outputs of the element is resolved immediately after the
+ * processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when
+ * all the future emitted by the processElement is resolved.
+ *
+ * @param A bundle is considered complete only when the outputs corresponding to each element in the
- * bundle have been resolved and the watermark associated with the bundle(if any) is propagated
- * downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1.
- * In case of synchronous ParDo, outputs of the element is resolved immediately after the
- * processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when
- * all the future emitted by the processElement is resolved.
- *
- * This class is not thread safe and the current implementation relies on the assumption that
- * messages are dispatched to BundleManager in a single threaded mode.
- *
- * @param This class is not thread safe and the current implementation relies on the assumption that
+ * messages are dispatched to BundleManager in a single threaded mode.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
index f8f30a8d2f86..b62b9246eb06 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
@@ -69,7 +69,7 @@ public void setUp() {
}
@Test
- public void testTryStartBundleStartsBundle() {
+ public void testWhenFirstTryStartBundleThenStartsBundle() {
bundleManager.tryStartBundle();
verify(bundleProgressListener, times(1)).onBundleStarted();
@@ -82,29 +82,14 @@ public void testTryStartBundleStartsBundle() {
assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted());
}
- @Test
- public void testTryStartBundleThrowsExceptionAndSignalError() {
+ @Test(expected = IllegalArgumentException.class)
+ public void testWhenCurrentBundleDoneFutureIsNotNullThenStartBundleFails() {
bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null));
- try {
- bundleManager.tryStartBundle();
- } catch (IllegalArgumentException e) {
- bundleManager.signalFailure(e);
- }
-
- // verify if the signal failure only resets appropriate attributes of bundle
- verify(mockFutureCollector, times(1)).prepare();
- verify(mockFutureCollector, times(1)).discard();
- assertEquals(
- "Expected the number of element in the current bundle to 0",
- 0L,
- bundleManager.getCurrentBundleElementCount());
- assertEquals(
- "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
- assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
+ bundleManager.tryStartBundle();
}
@Test
- public void testTryStartBundleThrowsExceptionFromTheListener() {
+ public void testWhenSignalFailureThenResetCurrentBundle() {
doThrow(new RuntimeException("User start bundle threw an exception"))
.when(bundleProgressListener)
.onBundleStarted();
@@ -128,7 +113,7 @@ public void testTryStartBundleThrowsExceptionFromTheListener() {
}
@Test
- public void testMultipleStartBundle() {
+ public void testWhenMultipleTryStartThenOnlyStartBundleOnce() {
bundleManager.tryStartBundle();
bundleManager.tryStartBundle();
@@ -153,7 +138,7 @@ public void testMultipleStartBundle() {
* 2. onBundleFinished callback is invoked on the progress listener
*/
@Test
- public void testTryFinishBundleClosesBundle() {
+ public void testWhenTryFinishBundleThenBundleIsReset() {
OpEmitter