diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index b6b7318e30c7..59f2716e52f2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -61,6 +61,7 @@ class MessageDispatcher { private final ApiClock clock; private final Duration ackExpirationPadding; + private final Duration maxAckExtensionPeriod; private final MessageReceiver receiver; private final AckProcessor ackProcessor; @@ -87,20 +88,27 @@ class MessageDispatcher { // it is not modified while inside the queue. // The hashcode and equals methods are explicitly not implemented to discourage // the use of this class as keys in maps or similar containers. - private static class ExtensionJob implements Comparable { + private class ExtensionJob implements Comparable { + Instant creation; Instant expiration; int nextExtensionSeconds; ArrayList ackHandlers; ExtensionJob( - Instant expiration, int initialAckDeadlineExtension, ArrayList ackHandlers) { + Instant creation, + Instant expiration, + int initialAckDeadlineExtension, + ArrayList ackHandlers) { + this.creation = creation; this.expiration = expiration; nextExtensionSeconds = initialAckDeadlineExtension; this.ackHandlers = ackHandlers; } void extendExpiration(Instant now) { - expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds)); + Instant possibleExtension = now.plus(Duration.standardSeconds(nextExtensionSeconds)); + Instant maxExtension = creation.plus(maxAckExtensionPeriod); + expiration = possibleExtension.isBefore(maxExtension) ? possibleExtension : maxExtension; nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS); } @@ -217,12 +225,14 @@ void sendAckOperations( MessageReceiver receiver, AckProcessor ackProcessor, Duration ackExpirationPadding, + Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, FlowController flowController, ScheduledExecutorService executor, ApiClock clock) { this.executor = executor; this.ackExpirationPadding = ackExpirationPadding; + this.maxAckExtensionPeriod = maxAckExtensionPeriod; this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; @@ -305,7 +315,11 @@ public void run() { synchronized (outstandingAckHandlers) { outstandingAckHandlers.add( - new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); + new ExtensionJob( + new Instant(clock.millisTime()), + expiration, + INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, + ackHandlers)); } setupNextAckDeadlineExtensionAlarm(expiration); @@ -380,6 +394,13 @@ public void run() { && outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) { ExtensionJob job = outstandingAckHandlers.poll(); + if (maxAckExtensionPeriod.getMillis() > 0 + && job.creation.plus(maxAckExtensionPeriod).compareTo(now) <= 0) { + // The job has expired, according to the maxAckExtensionPeriod, we are just going to + // drop it. + continue; + } + // If a message has already been acked, remove it, nothing to do. for (int i = 0; i < job.ackHandlers.size(); ) { if (job.ackHandlers.get(i).acked.get()) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index 5f979490979a..ce960b683877 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -67,6 +67,7 @@ public PollingSubscriberConnection( Credentials credentials, MessageReceiver receiver, Duration ackExpirationPadding, + Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, Channel channel, FlowController flowController, @@ -82,6 +83,7 @@ public PollingSubscriberConnection( receiver, this, ackExpirationPadding, + maxAckExtensionPeriod, ackLatencyDistribution, flowController, executor, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index a85fad6b0cce..4be3745b6251 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -70,6 +70,7 @@ public StreamingSubscriberConnection( Credentials credentials, MessageReceiver receiver, Duration ackExpirationPadding, + Duration maxAckExtensionPeriod, int streamAckDeadlineSeconds, Distribution ackLatencyDistribution, Channel channel, @@ -85,6 +86,7 @@ public StreamingSubscriberConnection( receiver, this, ackExpirationPadding, + maxAckExtensionPeriod, ackLatencyDistribution, flowController, executor, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index f46c5eb349c1..2379388d8781 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -92,6 +92,7 @@ public class Subscriber extends AbstractApiService { private final String cachedSubscriptionNameString; private final FlowControlSettings flowControlSettings; private final Duration ackExpirationPadding; + private final Duration maxAckExtensionPeriod; private final ScheduledExecutorService executor; private final Distribution ackLatencyDistribution = new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); @@ -113,6 +114,7 @@ private Subscriber(Builder builder) throws IOException { subscriptionName = builder.subscriptionName; cachedSubscriptionNameString = subscriptionName.toString(); ackExpirationPadding = builder.ackExpirationPadding; + maxAckExtensionPeriod = builder.maxAckExtensionPeriod; streamAckDeadlineSeconds = Math.max( INITIAL_ACK_DEADLINE_SECONDS, @@ -245,6 +247,7 @@ private void startStreamingConnections() { credentials, receiver, ackExpirationPadding, + maxAckExtensionPeriod, streamAckDeadlineSeconds, ackLatencyDistribution, channelBuilder.build(), @@ -321,6 +324,7 @@ private void startPollingConnections() { credentials, receiver, ackExpirationPadding, + maxAckExtensionPeriod, ackLatencyDistribution, channelBuilder.build(), flowController, @@ -409,6 +413,7 @@ public void run() { public static final class Builder { private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100); private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500); + private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.standardMinutes(60); static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() @@ -423,6 +428,7 @@ public static final class Builder { MessageReceiver receiver; Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; + Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance(); @@ -483,6 +489,21 @@ public Builder setAckExpirationPadding(Duration ackExpirationPadding) { return this; } + /** + * Set the maximum period a message ack deadline will be extended. + * + *

It is recommended to set this value to a reasonable upper bound of the subscriber time to + * process any message. This maximum period avoids messages to be locked by a subscriber + * in cases when the {@link AckReply} is lost. + * + *

A zero duration effectively disables auto deadline extensions. + */ + public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { + Preconditions.checkArgument(maxAckExtensionPeriod.getMillis() >= 0); + this.maxAckExtensionPeriod = maxAckExtensionPeriod; + return this; + } + /** Gives the ability to set a custom executor. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); @@ -500,3 +521,4 @@ public Subscriber build() throws IOException { } } } + diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeScheduledExecutorService.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeScheduledExecutorService.java index 0d757a3be6af..069ab4e119f8 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeScheduledExecutorService.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeScheduledExecutorService.java @@ -20,6 +20,8 @@ import com.google.common.primitives.Ints; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.AbstractExecutorService; @@ -44,6 +46,7 @@ public class FakeScheduledExecutorService extends AbstractExecutorService private final AtomicBoolean shutdown = new AtomicBoolean(false); private final PriorityQueue> pendingCallables = new PriorityQueue<>(); private final FakeClock clock = new FakeClock(); + private final Deque expectedWorkQueue = new LinkedList<>(); public ApiClock getClock() { return clock; @@ -79,6 +82,35 @@ public ScheduledFuture scheduleWithFixedDelay( Duration.millis(unit.toMillis(initialDelay)), command, PendingCallableType.FIXED_DELAY)); } + /** + * This allows for adding expectations on future work to be scheduled ( + * {@link FakeScheduledExecutorService#schedule} + * or {@link FakeScheduledExecutorService#scheduleAtFixedRate} + * or {@link FakeScheduledExecutorService#scheduleWithFixedDelay}) based on its delay. + */ + public void setupScheduleExpectation(Duration delay) { + synchronized (expectedWorkQueue) { + expectedWorkQueue.add(delay); + } + } + + /** + * Blocks the current thread until all the work + * {@link FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been + * scheduled in the executor. + */ + public void waitForExpectedWork() { + synchronized (expectedWorkQueue) { + while (!expectedWorkQueue.isEmpty()) { + try { + expectedWorkQueue.wait(); + } catch (InterruptedException e) { + // Wait uninterruptibly + } + } + } + } + /** * This will advance the reference time of the executor and execute (in the same thread) any * outstanding callable which execution time has passed. @@ -94,13 +126,14 @@ private void work() { for (;;) { PendingCallable callable = null; synchronized (pendingCallables) { - if (pendingCallables.isEmpty() || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) { + if (pendingCallables.isEmpty() + || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) { break; } callable = pendingCallables.poll(); } if (callable != null) { - try{ + try { callable.call(); } catch (Exception e) { // We ignore any callable exception, which should be set to the future but not relevant to @@ -182,6 +215,16 @@ ScheduledFuture schedulePendingCallable(PendingCallable callable) { pendingCallables.add(callable); } work(); + synchronized (expectedWorkQueue) { + // We compare by the callable delay in order decide when to remove expectations from the + // expected work queue, i.e. only the expected work that matches the delay of the scheduled + // callable is removed from the queue. + if (!expectedWorkQueue.isEmpty() && expectedWorkQueue.peek().equals(callable.delay)) { + expectedWorkQueue.poll(); + } + expectedWorkQueue.notifyAll(); + } + return callable.getScheduledFuture(); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 19df0b265fb7..74b6e2d2b0ae 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.pubsub.spi.v1.MessageDispatcher.PENDING_ACKS_SEND_DELAY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; @@ -64,6 +65,8 @@ public class SubscriberImplTest { private static final PubsubMessage TEST_MESSAGE = PubsubMessage.newBuilder().setMessageId("1").build(); + private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECS = 2; + @Parameters public static Collection data() { return Arrays.asList(new Object[][] {{false}}); @@ -276,14 +279,17 @@ public void testModifyAckDeadline() throws Exception { Subscriber subscriber = startSubscriber( getTestSubscriberBuilder(testReceiver) - .setAckExpirationPadding(Duration.standardSeconds(1))); - + .setAckExpirationPadding(Duration.standardSeconds(1)) + .setMaxAckExtensionPeriod(Duration.standardSeconds(13))); // Send messages to be acked List testAckIdsBatch = ImmutableList.of("A", "B", "C"); testReceiver.setExplicitAck(true); + // A modify ack deadline should be scheduled for the next 9s + fakeExecutor.setupScheduleExpectation(Duration.standardSeconds(9)); sendMessages(testAckIdsBatch); + // To ensure first modify ack deadline got scheduled + fakeExecutor.waitForExpectedWork(); - // Trigger modify ack deadline sending - 10s initial stream ack deadline - 1 padding fakeExecutor.advanceTime(Duration.standardSeconds(9)); assertEquivalentWithTransformation( @@ -292,12 +298,11 @@ public void testModifyAckDeadline() throws Exception { new Function() { @Override public ModifyAckDeadline apply(String ack) { - return new ModifyAckDeadline(ack, 2); // 2 seconds is the initial mod ack deadline + return new ModifyAckDeadline(ack, INITIAL_ACK_DEADLINE_EXTENSION_SECS); } }); - // Trigger modify ack deadline sending - 2s of the renewed deadlines - fakeExecutor.advanceTime(Duration.standardSeconds(2)); + fakeExecutor.advanceTime(Duration.standardSeconds(1)); assertEquivalentWithTransformation( testAckIdsBatch, @@ -305,10 +310,73 @@ public ModifyAckDeadline apply(String ack) { new Function() { @Override public ModifyAckDeadline apply(String ack) { - return new ModifyAckDeadline(ack, 4); + return new ModifyAckDeadline(ack, 3); // It is expected that the deadline is renewed + // only three more seconds to not pass the max + // ack deadline ext. } }); + // No more modify ack deadline extension should be triggered at this point + fakeExecutor.advanceTime(Duration.standardSeconds(20)); + + assertTrue(fakeSubscriberServiceImpl.getModifyAckDeadlines().isEmpty()); + + testReceiver.replyAllOutstandingMessage(); + subscriber.stopAsync().awaitTerminated(); + } + + @Test + public void testModifyAckDeadline_defaultMaxExtensionPeriod() throws Exception { + Subscriber subscriber = + startSubscriber( + getTestSubscriberBuilder(testReceiver) + .setAckExpirationPadding(Duration.standardSeconds(1))); + // Send messages to be acked + List testAckIdsBatch = ImmutableList.of("A", "B", "C"); + testReceiver.setExplicitAck(true); + // A modify ack deadline should be schedule for the next 9s + fakeExecutor.setupScheduleExpectation(Duration.standardSeconds(9)); + sendMessages(testAckIdsBatch); + // To ensure the first modify ack deadlines got scheduled + fakeExecutor.waitForExpectedWork(); + + // Next modify ack deadline should be schedule in the next 1s + fakeExecutor.advanceTime(Duration.standardSeconds(9)); + + assertEquivalentWithTransformation( + testAckIdsBatch, + fakeSubscriberServiceImpl.waitAndConsumeModifyAckDeadlines(3), + new Function() { + @Override + public ModifyAckDeadline apply(String ack) { + return new ModifyAckDeadline(ack, INITIAL_ACK_DEADLINE_EXTENSION_SECS); + } + }); + + fakeExecutor.advanceTime(Duration.standardSeconds(1)); + int timeIncrementSecs = INITIAL_ACK_DEADLINE_EXTENSION_SECS; // Second time increment + + // Check ack deadline extensions while the current time has not reached 60 minutes + while (fakeExecutor.getClock().millisTime() + timeIncrementSecs - 1 < 1000 * 60 * 60) { + timeIncrementSecs *= 2; + final int expectedIncrementSecs = Math.min(600, timeIncrementSecs); + assertEquivalentWithTransformation( + testAckIdsBatch, + fakeSubscriberServiceImpl.waitAndConsumeModifyAckDeadlines(3), + new Function() { + @Override + public ModifyAckDeadline apply(String ack) { + return new ModifyAckDeadline(ack, expectedIncrementSecs); + } + }); + fakeExecutor.advanceTime(Duration.standardSeconds(timeIncrementSecs - 1)); + } + + // No more modify ack deadline extension should be triggered at this point + fakeExecutor.advanceTime(Duration.standardSeconds(20)); + + assertTrue(fakeSubscriberServiceImpl.getModifyAckDeadlines().isEmpty()); + testReceiver.replyAllOutstandingMessage(); subscriber.stopAsync().awaitTerminated(); } @@ -486,9 +554,10 @@ private void assertEquivalentWithTransformation( remaining.addAll(target); for (E expectedElem : expectedElems) { - if (!remaining.contains(transform.apply(expectedElem))) { + T expected = transform.apply(expectedElem); + if (!remaining.contains(expected)) { throw new AssertionError( - String.format("Expected element %s is not contained in %s", expectedElem, target)); + String.format("Expected element %s is not contained in %s", expected, target)); } remaining.remove(expectedElem); }