diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumer/KafkaConsumerApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumer/KafkaConsumerApplication.java index 822301015..780afa8ee 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumer/KafkaConsumerApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumer/KafkaConsumerApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,10 +24,10 @@ package com.bakdata.kafka.consumer; +import com.bakdata.kafka.CloseExecutionOptions; import com.bakdata.kafka.KafkaApplication; import com.bakdata.kafka.mixin.ConsumerOptions; import com.bakdata.kafka.mixin.InputOptions; -import java.time.Duration; import java.util.Optional; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -35,10 +35,8 @@ import lombok.ToString; import lombok.experimental.Delegate; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; -import picocli.CommandLine.Option; /** @@ -72,10 +70,6 @@ public abstract class KafkaConsumerApplication extends @Mixin @Delegate private ConsumerOptions consumerOptions = new ConsumerOptions(); - @Option(names = {"--poll-timeout"}, - description = "The maximum time to block in the consumer poll loop. Examples: 'PT0.1S', 'PT2S', 'PT1M'.", - defaultValue = "PT0.1S") - private Duration pollTimeout = Duration.ofMillis(100); /** * Reset the Kafka Consumer application. Additionally, delete the consumer group. @@ -101,8 +95,9 @@ public void reset() { @Override public final Optional createExecutionOptions() { final ConsumerExecutionOptions executionOptions = ConsumerExecutionOptions.builder() - .volatileGroupInstanceId(this.isVolatileGroupInstanceId()) - .onStart(this::onConsumerStart) + .closeExecutionOptions(CloseExecutionOptions.builder() + .volatileGroupInstanceId(this.isVolatileGroupInstanceId()) + .build()) .pollTimeout(this.getPollTimeout()) .build(); return Optional.of(executionOptions); @@ -128,13 +123,4 @@ public final ConfiguredConsumerApp createConfiguredApp(final T app, public ConsumerAppConfiguration createConfiguration(final ConsumerTopicConfig topics) { return new ConsumerAppConfiguration(topics, this.getGroupId()); } - - /** - * Called after starting Kafka Consumer - * - * @param runningConsumer running {@link ConsumerRunnable} instance along with its {@link ConsumerConfig} - */ - protected void onConsumerStart(final RunningConsumer runningConsumer) { - // do nothing by default - } } diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumer/SimpleKafkaConsumerApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumer/SimpleKafkaConsumerApplication.java index c89e0ac8d..58d5b34ce 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumer/SimpleKafkaConsumerApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumer/SimpleKafkaConsumerApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,7 @@ import lombok.RequiredArgsConstructor; /** - * {@code KafkaConsumerApplication} without any additional configuration options. + * {@link KafkaConsumerApplication} without any additional configuration options. * * @param type of {@link ConsumerApp} created by this application */ diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumerproducer/KafkaConsumerProducerApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumerproducer/KafkaConsumerProducerApplication.java index 510c7c200..02c69bfed 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumerproducer/KafkaConsumerProducerApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumerproducer/KafkaConsumerProducerApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,11 +24,14 @@ package com.bakdata.kafka.consumerproducer; +import com.bakdata.kafka.CloseExecutionOptions; import com.bakdata.kafka.KafkaApplication; +import com.bakdata.kafka.consumer.ConsumerExecutionOptions; import com.bakdata.kafka.mixin.ConsumerOptions; import com.bakdata.kafka.mixin.ErrorOptions; import com.bakdata.kafka.mixin.InputOptions; import com.bakdata.kafka.mixin.OutputOptions; +import com.bakdata.kafka.producer.ProducerExecutionOptions; import java.util.Optional; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -94,10 +97,9 @@ public void clean() { } /** - * Clear all state stores and consumer group offsets associated with the Kafka ConsumerProducer application. + * Reset consumer group offsets associated with the Kafka ConsumerProducer application. */ - @Command(description = "Clear all state stores, consumer group offsets, and internal topics associated with the " - + "Kafka ConsumerProducer application.") + @Command(description = "Reset consumer group offsets associated with the Kafka ConsumerProducer application.") public void reset() { this.prepareClean(); try (final CleanableApp app = this.createCleanableApp()) { @@ -108,7 +110,17 @@ public void reset() { @Override public final Optional createExecutionOptions() { - return Optional.empty(); + final ConsumerProducerExecutionOptions executionOptions = ConsumerProducerExecutionOptions.builder() + .consumerExecutionOptions(ConsumerExecutionOptions.builder() + .closeExecutionOptions(CloseExecutionOptions.builder() + .volatileGroupInstanceId(this.isVolatileGroupInstanceId()) + .build()) + .pollTimeout(this.getPollTimeout()) + .build()) + .producerExecutionOptions(ProducerExecutionOptions.builder() + .build()) + .build(); + return Optional.of(executionOptions); } @Override diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumerproducer/SimpleKafkaConsumerProducerApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumerproducer/SimpleKafkaConsumerProducerApplication.java index b11ae695b..6618ab9fc 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumerproducer/SimpleKafkaConsumerProducerApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/consumerproducer/SimpleKafkaConsumerProducerApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,7 @@ import lombok.RequiredArgsConstructor; /** - * {@code KafkaConsumerProducerApplication} without any additional configuration options. + * {@link KafkaConsumerProducerApplication} without any additional configuration options. * * @param type of {@link ConsumerProducerApp} created by this application */ diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/mixin/ConsumerOptions.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/mixin/ConsumerOptions.java index db4448f3f..bc055d0b7 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/mixin/ConsumerOptions.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/mixin/ConsumerOptions.java @@ -24,8 +24,10 @@ package com.bakdata.kafka.mixin; +import java.time.Duration; import lombok.Data; import picocli.CommandLine; +import picocli.CommandLine.Option; /** * Shared CLI options to configure Kafka Consumer applications. @@ -38,4 +40,7 @@ public class ConsumerOptions { @CommandLine.Option(names = "--group-id", description = "Unique identifier for the Kafka Consumer applications, used as 'group.id'.") private String groupId; + @Option(names = {"--poll-timeout"}, + description = "The maximum time to block in the consumer poll loop. Examples: 'PT0.1S', 'PT2S', 'PT1M'.") + private Duration pollTimeout = Duration.ofSeconds(10L); } diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/producer/KafkaProducerApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/producer/KafkaProducerApplication.java index 2ed6bb485..9578429e8 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/producer/KafkaProducerApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/producer/KafkaProducerApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -73,7 +73,9 @@ public void clean() { @Override public final Optional createExecutionOptions() { - return Optional.empty(); + final ProducerExecutionOptions executionOptions = ProducerExecutionOptions.builder() + .build(); + return Optional.of(executionOptions); } @Override diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/streams/KafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/streams/KafkaStreamsApplication.java index 68a6ef033..b589bf0f1 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/streams/KafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/streams/KafkaStreamsApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,6 +24,7 @@ package com.bakdata.kafka.streams; +import com.bakdata.kafka.CloseExecutionOptions; import com.bakdata.kafka.KafkaApplication; import com.bakdata.kafka.mixin.ErrorOptions; import com.bakdata.kafka.mixin.InputOptions; @@ -116,7 +117,9 @@ public void reset() { @Override public final Optional createExecutionOptions() { final StreamsExecutionOptions options = StreamsExecutionOptions.builder() - .volatileGroupInstanceId(this.isVolatileGroupInstanceId()) + .closeExecutionOptions(CloseExecutionOptions.builder() + .volatileGroupInstanceId(this.isVolatileGroupInstanceId()) + .build()) .uncaughtExceptionHandler(this::createUncaughtExceptionHandler) .stateListener(this::createStateListener) .onStart(this::onStreamsStart) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/KafkaConsumerProducerApplicationCliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/KafkaConsumerProducerApplicationCliTest.java index b0555eaf7..570b33a5d 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/KafkaConsumerProducerApplicationCliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/KafkaConsumerProducerApplicationCliTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -65,7 +65,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { throw new UnsupportedOperationException(); } @@ -97,7 +97,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { throw new UnsupportedOperationException(); } @@ -125,7 +125,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { throw new UnsupportedOperationException(); } @@ -161,7 +161,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { throw new UnsupportedOperationException(); } @@ -191,11 +191,11 @@ public ConsumerProducerApp createApp() { return new ConsumerProducerApp() { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - return (consumerConfig, producerConfig) -> {}; + return consumerConfig -> {}; } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { return "my-id"; } @@ -223,13 +223,13 @@ void shouldExitWithErrorInBuildRunnable() { () -> new ConsumerProducerApp() { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - return (consumerConfig, producerConfig) -> { + return consumerConfig -> { throw new RuntimeException("Error building runnable"); }; } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { return "app"; } @@ -264,8 +264,8 @@ void shouldExitWithSuccessCodeOnShutdown() { () -> new ConsumerProducerApp() { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - return (consumerConfig, producerConfig) -> { - try (final Producer producer = builder.producerBuilder() + return consumerConfig -> { + try (final Producer producer = builder.getProducerBuilder() .createProducer()) { final ProducerRecord producerRecord = new ProducerRecord<>(output, "foo", "bar"); @@ -275,7 +275,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { return "app"; } @@ -321,7 +321,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { throw new UnsupportedOperationException(); } @@ -351,7 +351,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { throw new UnsupportedOperationException(); } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/apps/CloseFlagApp.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/apps/CloseFlagApp.java index 182172524..b433ed67b 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/apps/CloseFlagApp.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/apps/CloseFlagApp.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -60,19 +60,19 @@ public ConsumerProducerApp createApp() { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - final Producer producer = builder.producerBuilder().createProducer(); - final Consumer consumer = builder.consumerBuilder().createConsumer(); - builder.consumerBuilder().subscribeToAllTopics(consumer); + final Producer producer = builder.getProducerBuilder().createProducer(); + final Consumer consumer = builder.getConsumerBuilder().createConsumer(); + builder.getConsumerBuilder().subscribeToAllTopics(consumer); final ConsumerRunnable consumerRunnable = - builder.consumerBuilder().createDefaultConsumerRunnable(consumer, records -> + builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records -> records.forEach(consumerRecord -> - producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(), + producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(), consumerRecord.key(), consumerRecord.value())))); return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable); } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { return CloseFlagApp.this.getClass().getSimpleName() + "-" + configuration.getTopics().getOutputTopic(); } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/apps/Mirror.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/apps/Mirror.java index 2e2df3738..3a46644dc 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/apps/Mirror.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/consumerproducer/apps/Mirror.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -43,19 +43,19 @@ public class Mirror implements ConsumerProducerApp { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - final Producer producer = builder.producerBuilder().createProducer(); - final Consumer consumer = builder.consumerBuilder().createConsumer(); - builder.consumerBuilder().subscribeToAllTopics(consumer); + final Producer producer = builder.getProducerBuilder().createProducer(); + final Consumer consumer = builder.getConsumerBuilder().createConsumer(); + builder.getConsumerBuilder().subscribeToAllTopics(consumer); final ConsumerRunnable - consumerRunnable = builder.consumerBuilder().createDefaultConsumerRunnable(consumer, records -> + consumerRunnable = builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records -> records.forEach(consumerRecord -> - producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(), + producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(), consumerRecord.key(), consumerRecord.value())))); return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable); } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { return this.getClass().getSimpleName() + "-" + configuration.getTopics().getOutputTopic(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/CloseExecutionOptions.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/CloseExecutionOptions.java new file mode 100644 index 000000000..90afda5d0 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/CloseExecutionOptions.java @@ -0,0 +1,91 @@ +/* + * MIT License + * + * Copyright (c) 2026 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.time.Duration; +import java.util.Map; +import lombok.Builder; +import org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.KafkaStreams.CloseOptions; +import org.apache.kafka.streams.StreamsConfig; + +/** + * Options to configure closing behavior of Kafka apps + */ +@Builder +public class CloseExecutionOptions { + /** + * Defines if {@link ConsumerConfig#GROUP_INSTANCE_ID_CONFIG} is volatile. If it is configured and non-volatile, + * {@link CloseOptions#leaveGroup(boolean)} is disabled and + * {@link org.apache.kafka.clients.consumer.CloseOptions#withGroupMembershipOperation(GroupMembershipOperation)} is + * set to {@link GroupMembershipOperation#DEFAULT}. + */ + @Builder.Default + private final boolean volatileGroupInstanceId = true; + /** + * Defines {@link CloseOptions#timeout(Duration)} and + * {@link org.apache.kafka.clients.consumer.CloseOptions#withTimeout(Duration)} + */ + @Builder.Default + private final Duration closeTimeout = Duration.ofMillis(Long.MAX_VALUE); + + private static boolean isStaticMembershipDisabled(final Map originals) { + return originals.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG) == null; + } + + /** + * Create {@link CloseOptions} for {@link org.apache.kafka.streams.KafkaStreams} + * + * @param config streams config + * @return {@link CloseOptions} + * @see org.apache.kafka.streams.KafkaStreams#close(CloseOptions) + */ + public CloseOptions createCloseOptions(final StreamsConfig config) { + final boolean leaveGroup = this.shouldLeaveGroup(config.originals()); + return new CloseOptions().leaveGroup(leaveGroup).timeout(this.closeTimeout); + } + + /** + * Create {@link org.apache.kafka.clients.consumer.CloseOptions} for + * {@link org.apache.kafka.clients.consumer.Consumer} + * + * @param config consumer config + * @return {@link org.apache.kafka.clients.consumer.CloseOptions} + * @see org.apache.kafka.clients.consumer.Consumer#close(org.apache.kafka.clients.consumer.CloseOptions) + */ + public org.apache.kafka.clients.consumer.CloseOptions createCloseOptions(final ConsumerConfig config) { + final boolean leaveGroup = this.shouldLeaveGroup(config.originals()); + final GroupMembershipOperation operation = + leaveGroup ? GroupMembershipOperation.LEAVE_GROUP : GroupMembershipOperation.DEFAULT; + return org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(operation) + .withTimeout(this.closeTimeout); + } + + boolean shouldLeaveGroup(final Map originals) { + final boolean staticMembershipDisabled = isStaticMembershipDisabled(originals); + return staticMembershipDisabled || this.volatileGroupInstanceId; + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/admin/ConsumerGroupsClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/admin/ConsumerGroupsClient.java index 0096663c9..9b6f9c2be 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/admin/ConsumerGroupsClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/admin/ConsumerGroupsClient.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -39,7 +40,11 @@ import org.apache.kafka.clients.admin.GroupListing; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListGroupsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; @@ -160,6 +165,42 @@ public void deleteIfExists() { } } + /** + * Reset consumer group offset. + * + * @param offsetSpec specification where offsets should be reset to + */ + public void reset(final OffsetSpec offsetSpec) { + final Optional groupDescription = this.describe(); + if (groupDescription.isEmpty()) { + return; + } + if (groupDescription.get().groupState() != GroupState.EMPTY) { + throw new KafkaAdminException( + "Failed to reset offsets for consumer group %s: consumer group is not empty".formatted( + this.groupName)); + } + + final Map groupOffsets = this.listOffsets(); + + final Map request = groupOffsets.keySet().stream() + .collect(Collectors.toMap(tp -> tp, tp -> offsetSpec)); + final KafkaFuture> offsetsFuture = + ConsumerGroupsClient.this.adminClient.listOffsets(request).all(); + final Map offsets = + ConsumerGroupsClient.this.timeout.get(offsetsFuture, + () -> "Failed to reset offsets for consumer group %s: could not find offsets for spec %s" + .formatted(this.groupName, offsetSpec)); + + final Map resetOffsets = offsets.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset()))); + final KafkaFuture alterOffsetResult = + ConsumerGroupsClient.this.adminClient.alterConsumerGroupOffsets(this.groupName, resetOffsets).all(); + ConsumerGroupsClient.this.timeout.get(alterOffsetResult, + () -> "Failed to reset offsets for consumer group %s: could not alter offsets".formatted( + this.groupName)); + } + /** * Create a client for the configuration of this consumer group. * diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/admin/KafkaAdminException.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/admin/KafkaAdminException.java index 116c87743..4dbfe356a 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/admin/KafkaAdminException.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/admin/KafkaAdminException.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,4 +32,8 @@ public class KafkaAdminException extends RuntimeException { KafkaAdminException(final String message, final Throwable cause) { super(message, cause); } + + KafkaAdminException(final String message) { + super(message); + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConfiguredConsumerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConfiguredConsumerApp.java index 31aa062b4..09b6272b3 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConfiguredConsumerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConfiguredConsumerApp.java @@ -31,7 +31,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import lombok.Getter; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.IsolationLevel; @@ -41,9 +43,22 @@ * * @param type of {@link ConsumerApp} */ -public record ConfiguredConsumerApp(@NonNull T app, - @NonNull ConsumerAppConfiguration configuration) - implements ConfiguredApp> { +@RequiredArgsConstructor +public class ConfiguredConsumerApp implements ConfiguredApp> { + @Getter + private final @NonNull T app; + private final @NonNull ConsumerAppConfiguration configuration; + + /** + * Base configuration for all consumer apps which includes + *
+     * auto.offset.reset=earliest
+     * enable.auto.commit=false
+     * isolation.level=read_committed
+     * 
+ * + * @return base configuration + */ public static Map createBaseConfig() { final Map kafkaConfig = new HashMap<>(); @@ -59,10 +74,10 @@ public static Map createBaseConfig() { * Configuration is created in the following order *
    *
  • - * Offset management: *
          * auto.offset.reset=earliest
          * enable.auto.commit=false
    +     * isolation.level=read_committed
          * 
    *
  • *
  • @@ -96,8 +111,8 @@ public Map getKafkaProperties(final RuntimeConfiguration runtime * Get unique group identifier of {@link ConsumerApp} * * @return unique group identifier - * @throws IllegalArgumentException if unique group identifier of {@link ConsumerApp} is different from - * provided group identifier in {@link ConsumerAppConfiguration} + * @throws IllegalArgumentException if unique group identifier of {@link ConsumerApp} is different from provided + * group identifier in {@link ConsumerAppConfiguration} * @see ConsumerApp#getUniqueGroupId(ConsumerAppConfiguration) */ public String getUniqueGroupId() { @@ -111,9 +126,9 @@ public String getUniqueGroupId() { } /** - * Create an {@code ExecutableConsumerApp} using the provided {@link RuntimeConfiguration} + * Create an {@link ExecutableConsumerApp} using the provided {@link RuntimeConfiguration} * - * @return {@code ExecutableConsumerApp} + * @return {@link ExecutableConsumerApp} */ @Override public ExecutableConsumerApp withRuntimeConfiguration(final RuntimeConfiguration runtimeConfiguration) { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerApp.java index 72fdd51b2..ffcaabcd9 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerApp.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -37,12 +37,12 @@ public interface ConsumerApp extends App kafkaProperties, - @NonNull ConsumerExecutionOptions executionOptions) { +@RequiredArgsConstructor +@Value +public class ConsumerBuilder { + + @NonNull + ConsumerTopicConfig topics; + @NonNull + Map kafkaProperties; + @NonNull + ConsumerExecutionOptions executionOptions; /** - * Create a new {@code Consumer} using {@link #kafkaProperties} + * Create a new {@link Consumer} using {@link #kafkaProperties} * * @param type of keys * @param type of values - * @return {@code Consumer} + * @return {@link Consumer} * @see KafkaConsumer#KafkaConsumer(Map) */ public Consumer createConsumer() { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunner.java index 57fbe220b..5e1b2ed11 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunner.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,25 +24,15 @@ package com.bakdata.kafka.consumer; -import com.bakdata.kafka.CleanUpException; import com.bakdata.kafka.CleanUpRunner; import com.bakdata.kafka.admin.AdminClientX; import com.bakdata.kafka.admin.ConsumerGroupsClient.ConsumerGroupClient; import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.ConsumerGroupDescription; -import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.GroupState; -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.TopicPartition; /** * Clean up all topics specified by a {@link ConsumerTopicConfig} @@ -50,7 +40,6 @@ @Slf4j @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public final class ConsumerCleanUpRunner implements CleanUpRunner { - private final @NonNull ConsumerTopicConfig topics; private final @NonNull Map kafkaProperties; private final @NonNull String groupId; private final @NonNull ConsumerCleanUpConfiguration cleanHooks; @@ -58,31 +47,27 @@ public final class ConsumerCleanUpRunner implements CleanUpRunner { /** * Create a new {@code ConsumerCleanUpRunner} with default {@link ConsumerCleanUpConfiguration} * - * @param topics topic configuration * @param kafkaProperties configuration to connect to Kafka admin tools * @param groupId consumer group id to clean up * @return {@code ConsumerCleanUpRunner} */ - public static ConsumerCleanUpRunner create(@NonNull final ConsumerTopicConfig topics, - @NonNull final Map kafkaProperties, + public static ConsumerCleanUpRunner create(@NonNull final Map kafkaProperties, @NonNull final String groupId) { - return create(topics, kafkaProperties, groupId, new ConsumerCleanUpConfiguration()); + return create(kafkaProperties, groupId, new ConsumerCleanUpConfiguration()); } /** * Create a new {@code ConsumerCleanUpRunner} * - * @param topics topic configuration * @param kafkaProperties configuration to connect to Kafka admin tools * @param groupId consumer group id to clean up * @param configuration configuration for hooks that are called when running {@link #clean()} * @return {@code ConsumerCleanUpRunner} */ - public static ConsumerCleanUpRunner create(@NonNull final ConsumerTopicConfig topics, - @NonNull final Map kafkaProperties, + public static ConsumerCleanUpRunner create(@NonNull final Map kafkaProperties, @NonNull final String groupId, @NonNull final ConsumerCleanUpConfiguration configuration) { - return new ConsumerCleanUpRunner(topics, kafkaProperties, groupId, configuration); + return new ConsumerCleanUpRunner(kafkaProperties, groupId, configuration); } @Override @@ -120,43 +105,11 @@ private class Task { private void reset() { final ConsumerGroupClient groupClient = this.adminClient.consumerGroups() .group(ConsumerCleanUpRunner.this.groupId); - final Optional groupDescription = groupClient.describe(); - if (groupDescription.isEmpty()) { - return; - } - if (groupDescription.get().groupState() != GroupState.EMPTY) { - throw new CleanUpException("Error resetting application, consumer group is not empty"); - } - - final Map groupOffsets = groupClient.listOffsets(); - - final Map request = groupOffsets.keySet().stream() - .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.earliest())); - final Map earliestOffsets = - runAdminFuture(this.adminClient.admin().listOffsets(request).all(), - "Error resetting application, beginning consumer group offset could not be found"); - - final Map resetOffsets = earliestOffsets.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> new OffsetAndMetadata(e.getValue().offset()))); - runAdminFuture( - this.adminClient.admin().alterConsumerGroupOffsets(ConsumerCleanUpRunner.this.groupId, resetOffsets) - .all(), "Error resetting application, could not alter consumer group offsets"); + groupClient.reset(OffsetSpec.earliest()); ConsumerCleanUpRunner.this.cleanHooks.runResetHooks(); } - private static T runAdminFuture(final KafkaFuture action, final String errorMessage) { - try { - return action.get(); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - throw new CleanUpException(errorMessage, e); - } catch (final ExecutionException e) { - throw new CleanUpException(errorMessage, e); - } - } - private void clean() { this.deleteConsumerGroup(); ConsumerCleanUpRunner.this.cleanHooks.runCleanHooks(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerExecutionOptions.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerExecutionOptions.java index af8408aef..674f3996c 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerExecutionOptions.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerExecutionOptions.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,13 +24,11 @@ package com.bakdata.kafka.consumer; +import com.bakdata.kafka.CloseExecutionOptions; import java.time.Duration; -import java.util.Map; import lombok.Builder; import lombok.Getter; -import lombok.NonNull; import org.apache.kafka.clients.consumer.CloseOptions; -import org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -41,50 +39,19 @@ public class ConsumerExecutionOptions { /** - * Hook that is called after the {@link ConsumerRunnable} is started + * Defines the behavior when closing a consumer */ @Builder.Default - private final @NonNull java.util.function.Consumer onStart = runningConsumer -> {}; - - /** - * Defines if {@link ConsumerConfig#GROUP_INSTANCE_ID_CONFIG} is volatile. If it is configured and non-volatile, - * {@link Consumer#close(CloseOptions)} is called with - * {@link CloseOptions#groupMembershipOperation(GroupMembershipOperation)} set to - * {@link GroupMembershipOperation#REMAIN_IN_GROUP} - */ - @Builder.Default - private final boolean volatileGroupInstanceId = true; - - /** - * Defines {@link CloseOptions#timeout(Duration)} when calling {@link Consumer#close(CloseOptions)} - */ - @Builder.Default - private final Duration closeTimeout = Duration.ofMillis(Long.MAX_VALUE); + private final CloseExecutionOptions closeExecutionOptions = CloseExecutionOptions.builder().build(); /** * Defines the timeout duration for the {@link Consumer#poll(Duration)} call */ @Builder.Default @Getter - private final Duration pollTimeout = Duration.ofMillis(Long.MAX_VALUE); - - private static boolean isStaticMembershipDisabled(final Map originals) { - return originals.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG) == null; - } + private final Duration pollTimeout = Duration.ofSeconds(10L); CloseOptions createCloseOptions(final ConsumerConfig config) { - final boolean leaveGroup = this.shouldLeaveGroup(config.originals()); - final GroupMembershipOperation operation = - leaveGroup ? GroupMembershipOperation.LEAVE_GROUP : GroupMembershipOperation.DEFAULT; - return CloseOptions.groupMembershipOperation(operation).withTimeout(this.closeTimeout); - } - - boolean shouldLeaveGroup(final Map originals) { - final boolean staticMembershipDisabled = isStaticMembershipDisabled(originals); - return staticMembershipDisabled || this.volatileGroupInstanceId; - } - - void onStart(final RunningConsumer runningConsumer) { - this.onStart.accept(runningConsumer); + return this.closeExecutionOptions.createCloseOptions(config); } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerRunner.java index 3623eba9f..8a20b7224 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerRunner.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -39,7 +39,6 @@ public class ConsumerRunner implements Runner { private final @NonNull ConsumerRunnable runnable; private final @NonNull ConsumerConfig config; - private final @NonNull ConsumerExecutionOptions executionOptions; @Override public void close() { @@ -49,17 +48,7 @@ public void close() { @Override public void run() { - log.info("Starting consumer"); - this.runConsumer(); - } - - private void runConsumer() { - log.info("Starting Kafka Consumer and calling start hook"); - final RunningConsumer runningConsumer = RunningConsumer.builder() - .consumerRunnable(this.runnable) - .config(this.config) - .build(); - this.executionOptions.onStart(runningConsumer); + log.info("Starting Kafka Consumer"); // Run Kafka consumer until it shuts down this.runnable.run(this.config); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/DefaultConsumerRunnable.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/DefaultConsumerRunnable.java index 1a892e648..856c1edc3 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/DefaultConsumerRunnable.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/DefaultConsumerRunnable.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,7 +27,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import lombok.AccessLevel; -import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.CloseOptions; @@ -44,11 +43,10 @@ * @param type of keys * @param type of values */ -@RequiredArgsConstructor(access = AccessLevel.PROTECTED) +@RequiredArgsConstructor(access = AccessLevel.PACKAGE) @Slf4j public class DefaultConsumerRunnable implements ConsumerRunnable { - @Getter private final Consumer consumer; private final ConsumerExecutionOptions executionOptions; /** @@ -83,8 +81,6 @@ private void pollLoop(final ConsumerConfig consumerConfig) { } } catch (final WakeupException exception) { log.info("Consumer poll loop waking up for shutdown", exception); - } catch (final RuntimeException exception) { - log.error("RuntimeException while running consumer loop", exception); } finally { log.info("Closing consumer"); final CloseOptions closeOptions = this.executionOptions.createCloseOptions(consumerConfig); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ExecutableConsumerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ExecutableConsumerApp.java index 4474e7d47..bf94a796f 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ExecutableConsumerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ExecutableConsumerApp.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -31,7 +31,6 @@ import lombok.Builder; import lombok.Getter; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerConfig; /** @@ -49,15 +48,15 @@ public class ExecutableConsumerApp private final @NonNull T app; /** - * Create {@code ConsumerCleanUpRunner} in order to clean application + * Create {@link ConsumerCleanUpRunner} in order to clean application * - * @return {@code ConsumerCleanUpRunner} + * @return {@link ConsumerCleanUpRunner} */ @Override public ConsumerCleanUpRunner createCleanUpRunner() { final AppConfiguration configuration = this.createConfiguration(); final ConsumerCleanUpConfiguration configurer = this.app.setupCleanUp(configuration); - return ConsumerCleanUpRunner.create(this.topics, this.kafkaProperties, this.groupId, configurer); + return ConsumerCleanUpRunner.create(this.kafkaProperties, this.groupId, configurer); } /** @@ -75,7 +74,7 @@ public ConsumerRunner createRunner(final ConsumerExecutionOptions options) { final ConsumerBuilder consumerBuilder = new ConsumerBuilder(this.topics, this.kafkaProperties, options); final AppConfiguration configuration = this.createConfiguration(); this.app.setup(configuration); - return new ConsumerRunner(this.app.buildRunnable(consumerBuilder), this.getConfig(), options); + return new ConsumerRunner(this.app.buildRunnable(consumerBuilder), this.getConfig()); } @Override diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/RunningConsumer.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/RunningConsumer.java deleted file mode 100644 index c0f141b62..000000000 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/RunningConsumer.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka.consumer; - -import com.bakdata.kafka.consumer.ConsumerExecutionOptions.ConsumerExecutionOptionsBuilder; -import java.util.function.Consumer; -import lombok.Builder; -import lombok.NonNull; -import lombok.Value; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; - -/** - * A running {@link KafkaConsumer} instance along with its {@link ConsumerConfig} and - * {@link ConsumerRunnable} - * - * @see ConsumerExecutionOptionsBuilder#onStart(Consumer) - */ -@Builder -@Value -public class RunningConsumer { - - @NonNull - ConsumerConfig config; - @NonNull - ConsumerRunnable consumerRunnable; -} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConfiguredConsumerProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConfiguredConsumerProducerApp.java index 497f1ebb9..48e0e9194 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConfiguredConsumerProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConfiguredConsumerProducerApp.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,9 @@ import com.bakdata.kafka.producer.ConfiguredProducerApp; import java.util.Map; import java.util.Objects; +import lombok.Getter; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -42,9 +44,12 @@ * * @param type of {@link ConsumerProducerApp} */ -public record ConfiguredConsumerProducerApp( - @NonNull T app, @NonNull ConsumerProducerAppConfiguration configuration) +@RequiredArgsConstructor +public class ConfiguredConsumerProducerApp implements ConfiguredApp> { + @Getter + private final @NonNull T app; + private final @NonNull ConsumerProducerAppConfiguration configuration; /** *

    This method creates the configuration to run a {@link ConsumerProducerApp}.

    @@ -89,7 +94,7 @@ public Map getKafkaProperties(final RuntimeConfiguration runtime config.putAll(ConfiguredConsumerApp.createBaseConfig()); final KafkaPropertiesFactory propertiesFactory = this.createPropertiesFactory(runtimeConfiguration, config); return propertiesFactory.createKafkaProperties(Map.of( - CommonClientConfigs.GROUP_ID_CONFIG, this.getUniqueAppId() + CommonClientConfigs.GROUP_ID_CONFIG, this.getUniqueGroupId() )); } @@ -97,24 +102,25 @@ public Map getKafkaProperties(final RuntimeConfiguration runtime * Get unique group identifier of {@link ConsumerProducerApp} * * @return unique group identifier - * @throws IllegalArgumentException if unique group identifier of {@link ConsumerProducerApp} is different - * from provided group identifier in {@link ConsumerProducerAppConfiguration} - * @see ConsumerProducerApp#getUniqueAppId(ConsumerProducerAppConfiguration) + * @throws IllegalArgumentException if unique group identifier of {@link ConsumerProducerApp} is different from + * provided group identifier in {@link ConsumerProducerAppConfiguration} + * @see ConsumerProducerApp#getUniqueGroupId(ConsumerProducerAppConfiguration) */ - public String getUniqueAppId() { - final String uniqueAppId = - Objects.requireNonNull(this.app.getUniqueAppId(this.configuration), "Group ID cannot be null"); - if (this.configuration.getUniqueAppId().map(configuredId -> !uniqueAppId.equals(configuredId)).orElse(false)) { + public String getUniqueGroupId() { + final String uniqueGroupId = + Objects.requireNonNull(this.app.getUniqueGroupId(this.configuration), "Group ID cannot be null"); + if (this.configuration.getUniqueGroupId().map(configuredId -> !uniqueGroupId.equals(configuredId)) + .orElse(false)) { throw new IllegalArgumentException( - "Provided group ID does not match ConsumerProducerApp#getUniqueAppId()"); + "Provided group ID does not match ConsumerProducerApp#getUniqueGroupId()"); } - return uniqueAppId; + return uniqueGroupId; } /** - * Create an {@code ExecutableConsumerProducerApp} using the provided {@link RuntimeConfiguration} + * Create an {@link ExecutableConsumerProducerApp} using the provided {@link RuntimeConfiguration} * - * @return {@code ExecutableConsumerProducerApp} + * @return {@link ExecutableConsumerProducerApp} */ @Override public ExecutableConsumerProducerApp withRuntimeConfiguration(final RuntimeConfiguration runtimeConfiguration) { @@ -124,7 +130,7 @@ public ExecutableConsumerProducerApp withRuntimeConfiguration(final RuntimeCo .producerProperties(properties) .app(this.app) .topics(this.getTopics()) - .groupId(this.getUniqueAppId()) + .groupId(this.getUniqueGroupId()) .build(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerApp.java index b331f42ea..c6acdd6dc 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerApp.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -38,7 +38,7 @@ public interface ConsumerProducerApp extends App - * User may provide a unique group identifier via {@link ConsumerProducerAppConfiguration#getUniqueAppId()}. + * User may provide a unique group identifier via {@link ConsumerProducerAppConfiguration#getUniqueGroupId()}. * If that is the case, the returned group ID should match the provided one. * * @param configuration provides runtime configuration * @return unique group identifier */ - default String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) { - return configuration.getUniqueAppId() + default String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) { + return configuration.getUniqueGroupId() .orElseThrow(() -> new IllegalArgumentException("Please provide a group ID")); } /** - * @return {@code StreamsCleanUpConfiguration} + * @return {@link StreamsCleanUpConfiguration} * @see StreamsCleanUpRunner */ @Override diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerAppConfiguration.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerAppConfiguration.java index 834695cf6..e3a379b95 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerAppConfiguration.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerAppConfiguration.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -40,7 +40,7 @@ public class ConsumerProducerAppConfiguration { private final String uniqueAppId; /** - * Create a new {@code ConsumerAppConfiguration} with no provided {@link #uniqueAppId} + * Create a new {@code ConsumerProducerAppConfiguration} with no provided {@link #uniqueAppId} * * @param topics topics to use for app */ @@ -52,9 +52,9 @@ public ConsumerProducerAppConfiguration(final ConsumerProducerTopicConfig topics * Get the provided unique group ID. If user did not provide a unique group ID, this will return empty. * * @return provided unique group ID - * @see ConsumerProducerApp#getUniqueAppId(ConsumerProducerAppConfiguration) + * @see ConsumerProducerApp#getUniqueGroupId(ConsumerProducerAppConfiguration) */ - public Optional getUniqueAppId() { + public Optional getUniqueGroupId() { return Optional.ofNullable(this.uniqueAppId); } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerBuilder.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerBuilder.java index 6b1371f43..f954f4e23 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerBuilder.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerBuilder.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,13 +27,20 @@ import com.bakdata.kafka.consumer.ConsumerBuilder; import com.bakdata.kafka.producer.ProducerBuilder; import lombok.NonNull; +import lombok.Value; /** * Provides all runtime configurations when running a {@link ConsumerProducerApp} * * @see ConsumerProducerApp#buildRunnable(ConsumerProducerBuilder) */ -public record ConsumerProducerBuilder(@NonNull ConsumerProducerTopicConfig topics, - @NonNull ConsumerBuilder consumerBuilder, - @NonNull ProducerBuilder producerBuilder) { +@Value +public class ConsumerProducerBuilder { + + @NonNull + ConsumerProducerTopicConfig topics; + @NonNull + ConsumerBuilder consumerBuilder; + @NonNull + ProducerBuilder producerBuilder; } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunner.java index d8a813c84..3244c4972 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunner.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,7 +27,6 @@ import com.bakdata.kafka.CleanUpRunner; import com.bakdata.kafka.consumer.ConsumerCleanUpConfiguration; import com.bakdata.kafka.consumer.ConsumerCleanUpRunner; -import com.bakdata.kafka.consumer.ConsumerTopicConfig; import com.bakdata.kafka.producer.ProducerCleanUpConfiguration; import com.bakdata.kafka.producer.ProducerCleanUpRunner; import com.bakdata.kafka.producer.ProducerTopicConfig; @@ -68,18 +67,17 @@ public static ConsumerProducerCleanUpRunner create(@NonNull final ConsumerProduc * @param kafkaProperties configuration to connect to Kafka admin tools * @param groupId group id of the consumer * @param configuration configuration for hooks that are called when running {@link #clean()} - * @return {@code ConsumerCleanUpRunner} + * @return {@code ConsumerProducerCleanUpRunner} */ public static ConsumerProducerCleanUpRunner create(@NonNull final ConsumerProducerTopicConfig topics, @NonNull final Map kafkaProperties, @NonNull final String groupId, @NonNull final StreamsCleanUpConfiguration configuration) { - final ConsumerTopicConfig consumerTopicConfig = topics.toConsumerTopicConfig(); final ProducerTopicConfig producerTopicConfig = topics.toProducerTopicConfig(); final ConsumerCleanUpConfiguration consumerConfig = configuration.toConsumerCleanUpConfiguration(); final ProducerCleanUpConfiguration producerConfig = configuration.toProducerCleanUpConfiguration(); final ConsumerCleanUpRunner consumerCleanUpRunner = - ConsumerCleanUpRunner.create(consumerTopicConfig, kafkaProperties, groupId, consumerConfig); + ConsumerCleanUpRunner.create(kafkaProperties, groupId, consumerConfig); final ProducerCleanUpRunner producerCleanUpRunner = ProducerCleanUpRunner.create(producerTopicConfig, kafkaProperties, producerConfig); return new ConsumerProducerCleanUpRunner(consumerCleanUpRunner, producerCleanUpRunner); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerExecutionOptions.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerExecutionOptions.java index 280c28549..beaf1ec5e 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerExecutionOptions.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerExecutionOptions.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,58 +25,28 @@ package com.bakdata.kafka.consumerproducer; import com.bakdata.kafka.consumer.ConsumerExecutionOptions; -import java.time.Duration; +import com.bakdata.kafka.producer.ProducerExecutionOptions; import lombok.Builder; import lombok.Getter; -import lombok.NonNull; -import org.apache.kafka.clients.consumer.CloseOptions; -import org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; /** * Options to run a Kafka ConsumerProducer app */ @Builder +@Getter public final class ConsumerProducerExecutionOptions { /** - * Hook that is called after the {@link ConsumerProducerRunnable} is started + * Defines the execution options for the consumer part of the app. */ @Builder.Default - private final @NonNull java.util.function.Consumer onStart = runningConsumerProducer -> {}; + private final ConsumerExecutionOptions consumerExecutionOptions = ConsumerExecutionOptions.builder() + .build(); /** - * Defines if {@link ConsumerConfig#GROUP_INSTANCE_ID_CONFIG} is volatile. If it is configured and non-volatile, - * {@link Consumer#close(CloseOptions)} is called with - * {@link CloseOptions#groupMembershipOperation(GroupMembershipOperation)} set to - * {@link GroupMembershipOperation#REMAIN_IN_GROUP} + * Defines the execution options for the producer part of the app. */ @Builder.Default - private final boolean volatileGroupInstanceId = true; - - /** - * Defines {@link CloseOptions#timeout(Duration)} when calling {@link Consumer#close(CloseOptions)} - */ - @Builder.Default - private final Duration closeTimeout = Duration.ofMillis(Long.MAX_VALUE); - - /** - * Defines the timeout duration for the {@link Consumer#poll(Duration)} call - */ - @Builder.Default - @Getter - private final Duration pollTimeout = Duration.ofMillis(Long.MAX_VALUE); - - void onStart(final RunningConsumerProducer runningConsumerProducer) { - this.onStart.accept(runningConsumerProducer); - } - - ConsumerExecutionOptions toConsumerExecutionOptions() { - return ConsumerExecutionOptions.builder() - .volatileGroupInstanceId(this.volatileGroupInstanceId) - .pollTimeout(this.pollTimeout) - .closeTimeout(this.closeTimeout) - .build(); - } + private final ProducerExecutionOptions producerExecutionOptions = ProducerExecutionOptions.builder() + .build(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunnable.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunnable.java index a8b4bfad7..78fba25a9 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunnable.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunnable.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,7 +25,6 @@ package com.bakdata.kafka.consumerproducer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; /** * Produce or consume messages to or from Kafka @@ -36,9 +35,8 @@ public interface ConsumerProducerRunnable extends AutoCloseable { * Produce or Consume messages from Kafka * * @param consumerConfig configuration for the consumer - * @param producerConfig configuration for the producer */ - void run(ConsumerConfig consumerConfig, ProducerConfig producerConfig); + void run(ConsumerConfig consumerConfig); @Override default void close() { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunner.java index 23c311469..ddae257bb 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunner.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; /** * Runs a Kafka Consumer and Producer application @@ -40,8 +39,6 @@ public class ConsumerProducerRunner implements Runner { private final @NonNull ConsumerProducerRunnable runnable; private final @NonNull ConsumerConfig consumerConfig; - private final @NonNull ProducerConfig producerConfig; - private final @NonNull ConsumerProducerExecutionOptions executionOptions; @Override public void close() { @@ -51,19 +48,8 @@ public void close() { @Override public void run() { - log.info("Starting consumer and producer"); - this.runConsumerProducer(); - } - - private void runConsumerProducer() { - log.info("Starting Kafka ConsumerProducer and calling start hook"); - final RunningConsumerProducer runningConsumer = RunningConsumerProducer.builder() - .consumerProducerRunnable(this.runnable) - .consumerConfig(this.consumerConfig) - .producerConfig(this.producerConfig) - .build(); - this.executionOptions.onStart(runningConsumer); + log.info("Starting Kafka ConsumerProducer"); // Run Kafka application until it shuts down - this.runnable.run(this.consumerConfig, this.producerConfig); + this.runnable.run(this.consumerConfig); } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerTopicConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerTopicConfig.java index af3f40d1a..0f1a4e3ed 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerTopicConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ConsumerProducerTopicConfig.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -70,6 +70,11 @@ public class ConsumerProducerTopicConfig { Map labeledOutputTopics = emptyMap(); String errorTopic; + /** + * Convert this config to a {@link ConsumerTopicConfig} for the consumer part of the app. + * + * @return {@link ConsumerTopicConfig} + */ public ConsumerTopicConfig toConsumerTopicConfig() { return ConsumerTopicConfig.builder() .inputTopics(this.getInputTopics()) @@ -79,6 +84,10 @@ public ConsumerTopicConfig toConsumerTopicConfig() { .build(); } + /** + * Convert this config to a {@link ProducerTopicConfig} for the producer part of the app. + * @return {@link ProducerTopicConfig} + */ public ProducerTopicConfig toProducerTopicConfig() { return ProducerTopicConfig.builder() .outputTopic(this.getOutputTopic()) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/DefaultConsumerProducerRunnable.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/DefaultConsumerProducerRunnable.java index 1a2a3a318..8a050e053 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/DefaultConsumerProducerRunnable.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/DefaultConsumerProducerRunnable.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,41 +25,37 @@ package com.bakdata.kafka.consumerproducer; import com.bakdata.kafka.consumer.ConsumerRunnable; -import lombok.AllArgsConstructor; -import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -@AllArgsConstructor + +/** + * Creates a {@link ConsumerProducerRunnable} using the provided {@link ConsumerRunnable} and {@link Producer}. + * + * @param type of keys produced by this runnable + * @param type of values produced by this runnable + */ +@RequiredArgsConstructor @Slf4j public class DefaultConsumerProducerRunnable implements ConsumerProducerRunnable { private final Producer producer; - @Getter private final ConsumerRunnable consumerRunnable; @Override - public void run(final ConsumerConfig consumerConfig, final ProducerConfig producerConfig) { + public void run(final ConsumerConfig consumerConfig) { this.consumerRunnable.run(consumerConfig); } @Override public void close() { - try { - log.debug("Closing consumer runnable"); - this.consumerRunnable.close(); - } catch (final RuntimeException e) { - log.warn("Error closing consumer runnable", e); - } + log.debug("Closing consumer runnable"); + this.consumerRunnable.close(); - try { - log.debug("Closing producer"); - this.producer.close(); - } catch (final RuntimeException e) { - log.warn("Error closing producer", e); - } + log.debug("Closing producer"); + this.producer.close(); log.info("ConsumerProducer was shut down gracefully"); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ExecutableConsumerProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ExecutableConsumerProducerApp.java index 8c4c04a4a..bb325138a 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ExecutableConsumerProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/ExecutableConsumerProducerApp.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,9 +27,7 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.consumer.ConsumerBuilder; -import com.bakdata.kafka.consumer.ConsumerTopicConfig; import com.bakdata.kafka.producer.ProducerBuilder; -import com.bakdata.kafka.producer.ProducerTopicConfig; import com.bakdata.kafka.streams.StreamsCleanUpConfiguration; import java.util.Map; import lombok.AccessLevel; @@ -56,9 +54,9 @@ public class ExecutableConsumerProducerApp private final @NonNull String groupId; /** - * Create {@code ConsumerProducerCleanUpRunner} in order to clean application + * Create {@link ConsumerProducerCleanUpRunner} in order to clean application * - * @return {@code ConsumerProducerCleanUpRunner} + * @return {@link ConsumerProducerCleanUpRunner} */ @Override public ConsumerProducerCleanUpRunner createCleanUpRunner() { @@ -69,9 +67,9 @@ public ConsumerProducerCleanUpRunner createCleanUpRunner() { } /** - * Create {@code ConsumerProducerRunner} in order to run application + * Create {@link ConsumerProducerRunner} in order to run application * - * @return {@code ConsumerProducerRunner} + * @return {@link ConsumerProducerRunner} */ @Override public ConsumerProducerRunner createRunner() { @@ -82,7 +80,7 @@ public ConsumerProducerRunner createRunner() { @Override public ConsumerProducerRunner createRunner(final ConsumerProducerExecutionOptions options) { final ConsumerBuilder consumerBuilder = new ConsumerBuilder(this.topics.toConsumerTopicConfig(), - this.consumerProperties, options.toConsumerExecutionOptions()); + this.consumerProperties, options.getConsumerExecutionOptions()); final ProducerBuilder producerBuilder = new ProducerBuilder(this.topics.toProducerTopicConfig(), this.producerProperties); final ConsumerProducerBuilder @@ -90,9 +88,7 @@ public ConsumerProducerRunner createRunner(final ConsumerProducerExecutionOption final AppConfiguration configuration = this.createConfiguration(); this.app.setup(configuration); return new ConsumerProducerRunner(this.app.buildRunnable(consumerProducerBuilder), - this.getConsumerConfig(), - this.getProducerConfig(), - options); + this.getConsumerConfig()); } @Override diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/RunningConsumerProducer.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/RunningConsumerProducer.java deleted file mode 100644 index 5981d8521..000000000 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/RunningConsumerProducer.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka.consumerproducer; - -import com.bakdata.kafka.consumer.ConsumerRunnable; -import lombok.Builder; -import lombok.NonNull; -import lombok.Value; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.ProducerConfig; - -/** - * A running {@link KafkaConsumer} instance along with its {@link ConsumerConfig} and - * {@link ConsumerRunnable} - * - * @see ConsumerProducerExecutionOptions#onStart(RunningConsumerProducer) - */ -@Builder -@Value -public class RunningConsumerProducer { - - @NonNull - ConsumerConfig consumerConfig; - @NonNull - ProducerConfig producerConfig; - @NonNull - ConsumerProducerRunnable consumerProducerRunnable; -} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/SerializerDeserializerConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/SerializerDeserializerConfig.java index 2855ad47b..2c866e729 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/SerializerDeserializerConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumerproducer/SerializerDeserializerConfig.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -37,7 +37,7 @@ import org.apache.kafka.common.serialization.Serializer; /** - * Defines how to (de)serialize the data in a Kafka consumer or producer + * Defines how to (de-)serialize the data in a Kafka consumer or producer */ @RequiredArgsConstructor @With @@ -46,6 +46,22 @@ public class SerializerDeserializerConfig implements SerializationConfig { private final @NonNull SerializerConfig serializerConfig; private final @NonNull DeserializerConfig deserializerConfig; + /** + * Create a new {@code SerializerDeserializerConfig} + * + * @param keySerializer serializer for keys + * @param valueSerializer serializer for values + * @param keyDeserializer deserializer for keys + * @param valueDeserializer deserializer for values + */ + public SerializerDeserializerConfig(final @NonNull Class keySerializer, + final @NonNull Class valueSerializer, + final @NonNull Class keyDeserializer, + final @NonNull Class valueDeserializer) { + this.serializerConfig = new SerializerConfig(keySerializer, valueSerializer); + this.deserializerConfig = new DeserializerConfig(keyDeserializer, valueDeserializer); + } + @Override public Map createProperties() { return Stream.concat(Stream.of(this.serializerConfig.createProperties()), @@ -54,15 +70,8 @@ public Map createProperties() { .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, + // v1 and v2 are always different (v1, v2) -> v2 )); } - - public SerializerDeserializerConfig(final @NonNull Class keySerializer, - final @NonNull Class valueSerializer, - final @NonNull Class keyDeserializer, - final @NonNull Class valueDeserializer) { - this.serializerConfig = new SerializerConfig(keySerializer, valueSerializer); - this.deserializerConfig = new DeserializerConfig(keyDeserializer, valueDeserializer); - } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ConfiguredProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ConfiguredProducerApp.java index 747d8bd16..4d5498df3 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ConfiguredProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ConfiguredProducerApp.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -48,6 +48,16 @@ public class ConfiguredProducerApp implements ConfiguredA private final @NonNull T app; private final @NonNull ProducerAppConfiguration configuration; + /** + * Base configuration for all producer apps which includes + *
    +     * max.in.flight.requests.per.connection=1
    +     * acks=all
    +     * compression.type=gzip
    +     * 
    + * + * @return base configuration + */ public static Map createBaseConfig() { final Map kafkaConfig = new HashMap<>(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsExecutionOptions.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsExecutionOptions.java index 44104d7df..795eb62c9 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsExecutionOptions.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsExecutionOptions.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,13 +24,12 @@ package com.bakdata.kafka.streams; +import com.bakdata.kafka.CloseExecutionOptions; import java.time.Duration; -import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; import lombok.Builder; import lombok.NonNull; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.CloseOptions; import org.apache.kafka.streams.KafkaStreams.StateListener; @@ -59,29 +58,19 @@ public class StreamsExecutionOptions { private final @NonNull Supplier uncaughtExceptionHandler = DefaultStreamsUncaughtExceptionHandler::new; /** - * Defines if {@link ConsumerConfig#GROUP_INSTANCE_ID_CONFIG} is volatile. If it is configured and non-volatile, - * {@link KafkaStreams#close(CloseOptions)} is called with {@link CloseOptions#leaveGroup(boolean)} disabled + * Defines the behavior when closing a Kafka Streams app using {@link KafkaStreams#close(CloseOptions)}. */ @Builder.Default - private final boolean volatileGroupInstanceId = true; + private final CloseExecutionOptions closeExecutionOptions = CloseExecutionOptions.builder() + .build(); /** * Defines {@link CloseOptions#timeout(Duration)} when calling {@link KafkaStreams#close(CloseOptions)} */ @Builder.Default private final Duration closeTimeout = Duration.ofMillis(Long.MAX_VALUE); - private static boolean isStaticMembershipDisabled(final Map originals) { - return originals.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG) == null; - } - CloseOptions createCloseOptions(final StreamsConfig config) { - final boolean leaveGroup = this.shouldLeaveGroup(config.originals()); - return new CloseOptions().leaveGroup(leaveGroup).timeout(this.closeTimeout); - } - - boolean shouldLeaveGroup(final Map originals) { - final boolean staticMembershipDisabled = isStaticMembershipDisabled(originals); - return staticMembershipDisabled || this.volatileGroupInstanceId; + return this.closeExecutionOptions.createCloseOptions(config); } void onStart(final RunningStreams runningStreams) { diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/StreamsExecutionOptionsTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/CloseExecutionOptionsTest.java similarity index 85% rename from streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/StreamsExecutionOptionsTest.java rename to streams-bootstrap-core/src/test/java/com/bakdata/kafka/CloseExecutionOptionsTest.java index a3837a0bc..e694fda20 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/StreamsExecutionOptionsTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/CloseExecutionOptionsTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,7 @@ * SOFTWARE. */ -package com.bakdata.kafka.streams; +package com.bakdata.kafka; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; @@ -31,18 +31,18 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.junit.jupiter.api.Test; -class StreamsExecutionOptionsTest { +class CloseExecutionOptionsTest { @Test void shouldLeaveGroup() { - final StreamsExecutionOptions options = StreamsExecutionOptions.builder() + final CloseExecutionOptions options = CloseExecutionOptions.builder() .build(); assertThat(options.shouldLeaveGroup(emptyMap())).isTrue(); } @Test void shouldNotLeaveGroup() { - final StreamsExecutionOptions options = StreamsExecutionOptions.builder() + final CloseExecutionOptions options = CloseExecutionOptions.builder() .volatileGroupInstanceId(false) .build(); assertThat(options.shouldLeaveGroup(Map.of( @@ -52,7 +52,7 @@ void shouldNotLeaveGroup() { @Test void shouldLeaveGroupWithVolatileGroupId() { - final StreamsExecutionOptions options = StreamsExecutionOptions.builder() + final CloseExecutionOptions options = CloseExecutionOptions.builder() .volatileGroupInstanceId(true) .build(); assertThat(options.shouldLeaveGroup(Map.of( diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunnerTest.java index 8f2dafcf8..963de5815 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerCleanUpRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -31,13 +31,13 @@ import static com.bakdata.kafka.consumer.TestHelper.run; import static java.util.concurrent.CompletableFuture.runAsync; -import com.bakdata.kafka.CleanUpException; import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.admin.AdminClientX; import com.bakdata.kafka.admin.ConsumerGroupsClient; import com.bakdata.kafka.admin.ConsumerGroupsClient.ConsumerGroupClient; +import com.bakdata.kafka.admin.KafkaAdminException; import com.bakdata.kafka.consumer.apps.StringConsumer; import com.bakdata.kafka.consumer.apps.StringPatternConsumer; import java.time.Duration; @@ -60,14 +60,14 @@ class ConsumerCleanUpRunnerTest extends KafkaTest { @InjectSoftAssertions private SoftAssertions softly; - static ConfiguredConsumerApp createStringApplication() { + static ConfiguredConsumerApp createStringApplication() { final ConsumerTopicConfig topics = ConsumerTopicConfig.builder() .inputTopics(List.of("input")) .build(); return new ConfiguredConsumerApp<>(new StringConsumer(), new ConsumerAppConfiguration(topics)); } - static ConfiguredConsumerApp createStringPatternApplication() { + static ConfiguredConsumerApp createStringPatternApplication() { final ConsumerTopicConfig topics = ConsumerTopicConfig.builder() .inputPattern(Pattern.compile(".*_topic")) .build(); @@ -82,8 +82,8 @@ private void assertSize(final Collection> records @Test void shouldDeleteConsumerGroup() { - try (final ConfiguredConsumerApp app = createStringApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerApp app = createStringApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig())) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); @@ -102,7 +102,7 @@ void shouldDeleteConsumerGroup() { new KeyValue<>("blub", "blub") ); - final StringConsumer stringConsumer = (StringConsumer) app.app(); + final StringConsumer stringConsumer = app.getApp(); run(executableApp); assertContent(this.softly, stringConsumer.getConsumedRecords(), expectedValues, @@ -131,8 +131,8 @@ void shouldDeleteConsumerGroup() { @Test void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() { - try (final ConfiguredConsumerApp app = createStringApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerApp app = createStringApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig())) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); @@ -151,7 +151,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() { new KeyValue<>("blub", "blub") ); - final StringConsumer stringConsumer = (StringConsumer) app.app(); + final StringConsumer stringConsumer = app.getApp(); run(executableApp); assertContent(this.softly, stringConsumer.getConsumedRecords(), expectedValues, @@ -179,8 +179,8 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() { @Test void shouldReprocessAlreadySeenRecords() { - try (final ConfiguredConsumerApp app = createStringApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerApp app = createStringApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig())) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); @@ -193,7 +193,7 @@ void shouldReprocessAlreadySeenRecords() { new SimpleProducerRecord<>("blub", "blub") )); - final StringConsumer stringConsumer = (StringConsumer) app.app(); + final StringConsumer stringConsumer = app.getApp(); run(executableApp); this.assertSize(stringConsumer.getConsumedRecords(), 3); @@ -212,8 +212,8 @@ void shouldReprocessAlreadySeenRecords() { @Test void shouldNotThrowExceptionOnMissingInputTopic() { - try (final ConfiguredConsumerApp app = createStringApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerApp app = createStringApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig())) { this.softly.assertThatCode(() -> clean(executableApp)).doesNotThrowAnyException(); } @@ -221,8 +221,9 @@ void shouldNotThrowExceptionOnMissingInputTopic() { @Test void shouldThrowExceptionOnResetterError() { - try (final ConfiguredConsumerApp app = createStringApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig()); + try (final ConfiguredConsumerApp app = createStringApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, + this.createConfig()); final ConsumerRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); @@ -231,15 +232,16 @@ void shouldThrowExceptionOnResetterError() { awaitActive(executableApp); // should throw exception because consumer group is still active this.softly.assertThatThrownBy(() -> reset(executableApp)) - .isInstanceOf(CleanUpException.class) - .hasMessageContaining("Error resetting application, consumer group is not empty"); + .isInstanceOf(KafkaAdminException.class) + .hasMessageContaining("Failed to reset offsets for consumer group %s: consumer group is not empty", + app.getUniqueGroupId()); } } @Test void shouldReprocessAlreadySeenRecordsWithPattern() { - try (final ConfiguredConsumerApp app = createStringPatternApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerApp app = createStringPatternApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig())) { final String topic = "input_topic"; final KafkaTestClient testClient = this.newTestClient(); @@ -253,7 +255,7 @@ void shouldReprocessAlreadySeenRecordsWithPattern() { new SimpleProducerRecord<>("blub", "blub") )); - final StringPatternConsumer stringConsumer = (StringPatternConsumer) app.app(); + final StringPatternConsumer stringConsumer = app.getApp(); run(executableApp); this.assertSize(stringConsumer.getConsumedRecords(), 3); @@ -272,8 +274,9 @@ void shouldReprocessAlreadySeenRecordsWithPattern() { @Test void shouldNotThrowExceptionOnResetIfConsumerGroupNotExists() { - try (final ConfiguredConsumerApp app = createStringApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig())) { + try (final ConfiguredConsumerApp app = createStringApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, + this.createConfig())) { // The app is not run so the consumer group is never created this.softly.assertThatCode(() -> reset(executableApp)).doesNotThrowAnyException(); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerExecutionOptionsTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerExecutionOptionsTest.java deleted file mode 100644 index 0147cf526..000000000 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerExecutionOptionsTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka.consumer; - -import static java.util.Collections.emptyMap; -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.junit.jupiter.api.Test; - -class ConsumerExecutionOptionsTest { - - @Test - void shouldLeaveGroup() { - final ConsumerExecutionOptions options = ConsumerExecutionOptions.builder() - .build(); - assertThat(options.shouldLeaveGroup(emptyMap())).isTrue(); - } - - @Test - void shouldNotLeaveGroup() { - final ConsumerExecutionOptions options = ConsumerExecutionOptions.builder() - .volatileGroupInstanceId(false) - .build(); - assertThat(options.shouldLeaveGroup(Map.of( - ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "foo" - ))).isFalse(); - } - - @Test - void shouldLeaveGroupWithVolatileGroupId() { - final ConsumerExecutionOptions options = ConsumerExecutionOptions.builder() - .volatileGroupInstanceId(true) - .build(); - assertThat(options.shouldLeaveGroup(Map.of( - ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "foo" - ))).isTrue(); - } -} diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerRunnerTest.java index 881010f60..d5ef19980 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerRunnerTest.java @@ -50,8 +50,9 @@ class ConsumerRunnerTest extends KafkaTest { @Test void shouldRunApp() { - try (final ConfiguredConsumerApp app = createStringApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig()); + try (final ConfiguredConsumerApp app = createStringApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, + this.createConfig()); final ConsumerRunner runner = executableApp.createRunner()) { runAsync(runner); @@ -60,7 +61,7 @@ void shouldRunApp() { final SimpleProducerRecord simpleProducerRecord = new SimpleProducerRecord<>("foo", "bar"); this.writeInputTopic(app.getTopics().getInputTopics().get(0), simpleProducerRecord); - final StringConsumer stringConsumer = (StringConsumer) app.app(); + final StringConsumer stringConsumer = app.getApp(); awaitProcessing(executableApp); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/DefaultConsumerRunnableTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/DefaultConsumerRunnableTest.java index 64092977e..1e9cdcb5e 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/DefaultConsumerRunnableTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/DefaultConsumerRunnableTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -57,14 +57,14 @@ class DefaultConsumerRunnableTest extends KafkaTest { @InjectSoftAssertions private SoftAssertions softly; - static ConfiguredConsumerApp createStringApplication() { + static ConfiguredConsumerApp createStringApplication() { final ConsumerTopicConfig topics = ConsumerTopicConfig.builder() .inputTopics(List.of("input")) .build(); return createStringApplication(new ConsumerAppConfiguration(topics)); } - static ConfiguredConsumerApp createStringApplication(final ConsumerAppConfiguration configuration) { + static ConfiguredConsumerApp createStringApplication(final ConsumerAppConfiguration configuration) { return new ConfiguredConsumerApp<>(new StringConsumer(), configuration); } @@ -79,8 +79,9 @@ static ConfiguredConsumerApp createCustomProcessorConsumer( @Test void shouldRunProcessAndShutdownGracefully() { - try (final ConfiguredConsumerApp app = createStringApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig()); + try (final ConfiguredConsumerApp app = createStringApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, + this.createConfig()); final ConsumerRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); @@ -98,7 +99,7 @@ void shouldRunProcessAndShutdownGracefully() { new KeyValue<>("blub", "blub") ); - final StringConsumer stringConsumer = (StringConsumer) app.app(); + final StringConsumer stringConsumer = app.getApp(); runAsync(runner); awaitActive(executableApp); @@ -109,8 +110,9 @@ void shouldRunProcessAndShutdownGracefully() { @Test void shouldCommitOffsets() { - try (final ConfiguredConsumerApp app = createStringApplication(); - final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig()); + try (final ConfiguredConsumerApp app = createStringApplication(); + final ExecutableConsumerApp executableApp = createExecutableApp(app, + this.createConfig()); final ConsumerRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); @@ -203,9 +205,10 @@ void shouldSubscribeToInputPattern() { final ConsumerTopicConfig topics = ConsumerTopicConfig.builder() .inputPattern(Pattern.compile("inp.*")) .build(); - try (final ConfiguredConsumerApp app = createStringApplication( + try (final ConfiguredConsumerApp app = createStringApplication( new ConsumerAppConfiguration(topics)); - final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig()); + final ExecutableConsumerApp executableApp = createExecutableApp(app, + this.createConfig()); final ConsumerRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic("input"); @@ -217,7 +220,7 @@ void shouldSubscribeToInputPattern() { )); final List> expectedValues = List.of(new KeyValue<>("blub", "blub")); - final StringConsumer stringConsumer = (StringConsumer) app.app(); + final StringConsumer stringConsumer = app.getApp(); runAsync(runner); awaitActive(executableApp); @@ -231,9 +234,10 @@ void shouldSubscribeToLabeledInputPattern() { final ConsumerTopicConfig topics = ConsumerTopicConfig.builder() .labeledInputPatterns(Map.of("LABEL", Pattern.compile("inp.*"))) .build(); - try (final ConfiguredConsumerApp app = createStringApplication( + try (final ConfiguredConsumerApp app = createStringApplication( new ConsumerAppConfiguration(topics)); - final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig()); + final ExecutableConsumerApp executableApp = createExecutableApp(app, + this.createConfig()); final ConsumerRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic("input"); @@ -245,7 +249,7 @@ void shouldSubscribeToLabeledInputPattern() { )); final List> expectedValues = List.of(new KeyValue<>("blub", "blub")); - final StringConsumer stringConsumer = (StringConsumer) app.app(); + final StringConsumer stringConsumer = app.getApp(); runAsync(runner); awaitActive(executableApp); @@ -259,9 +263,10 @@ void shouldSubscribeToLabeledInputTopics() { final ConsumerTopicConfig topics = ConsumerTopicConfig.builder() .labeledInputTopics(Map.of("LABEL", List.of("input"))) .build(); - try (final ConfiguredConsumerApp app = createStringApplication( + try (final ConfiguredConsumerApp app = createStringApplication( new ConsumerAppConfiguration(topics)); - final ExecutableConsumerApp executableApp = createExecutableApp(app, this.createConfig()); + final ExecutableConsumerApp executableApp = createExecutableApp(app, + this.createConfig()); final ConsumerRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic("input"); @@ -273,7 +278,7 @@ void shouldSubscribeToLabeledInputTopics() { )); final List> expectedValues = List.of(new KeyValue<>("blub", "blub")); - final StringConsumer stringConsumer = (StringConsumer) app.app(); + final StringConsumer stringConsumer = app.getApp(); runAsync(runner); awaitActive(executableApp); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/TestHelper.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/TestHelper.java index fe8fca84f..d7aefeffd 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/TestHelper.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/TestHelper.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -59,7 +59,8 @@ public static void assertContent(final SoftAssertions softly, }); } - public static ExecutableConsumerApp createExecutableApp(final ConfiguredConsumerApp app, + public static ExecutableConsumerApp createExecutableApp( + final ConfiguredConsumerApp app, final RuntimeConfiguration runtimeConfiguration) { return app.withRuntimeConfiguration(runtimeConfiguration); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConfiguredConsumerProducerAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConfiguredConsumerProducerAppTest.java index 4dbc46197..af2b2cf7e 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConfiguredConsumerProducerAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConfiguredConsumerProducerAppTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -183,7 +183,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration topics) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration topics) { return "app-id"; } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunnerTest.java index abd99a731..686083d18 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerCleanUpRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -31,7 +31,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import com.bakdata.kafka.AppConfiguration; -import com.bakdata.kafka.CleanUpException; import com.bakdata.kafka.CleanUpRunner; import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.HasTopicHooks.TopicHook; @@ -44,6 +43,7 @@ import com.bakdata.kafka.admin.AdminClientX; import com.bakdata.kafka.admin.ConsumerGroupsClient; import com.bakdata.kafka.admin.ConsumerGroupsClient.ConsumerGroupClient; +import com.bakdata.kafka.admin.KafkaAdminException; import com.bakdata.kafka.admin.TopicsClient; import com.bakdata.kafka.consumerproducer.apps.MirrorKeyWithAvroConsumerProducer; import com.bakdata.kafka.consumerproducer.apps.MirrorValueWithAvroConsumerProducer; @@ -97,7 +97,7 @@ private static void clean(final ExecutableApp app } } - static ConfiguredConsumerProducerApp createStringConsumerProducer() { + static ConfiguredConsumerProducerApp createStringConsumerProducer() { return new ConfiguredConsumerProducerApp<>(new StringConsumerProducer(), new ConsumerProducerAppConfiguration(TOPIC_CONFIG)); } @@ -132,16 +132,16 @@ public StreamsCleanUpConfiguration setupCleanUp( }, new ConsumerProducerAppConfiguration(TOPIC_CONFIG)); } - static ExecutableConsumerProducerApp createExecutableApp( - final ConfiguredConsumerProducerApp app, + static ExecutableConsumerProducerApp createExecutableApp( + final ConfiguredConsumerProducerApp app, final RuntimeConfiguration runtimeConfiguration) { return app.withRuntimeConfiguration(runtimeConfiguration); } @Test void shouldDeleteTopic() { - try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); - final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); + final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, this.createConfig())) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); @@ -179,8 +179,8 @@ void shouldDeleteTopic() { @Test void shouldDeleteConsumerGroup() { - try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); - final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); + final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, this.createConfig())) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); @@ -199,7 +199,7 @@ void shouldDeleteConsumerGroup() { new KeyValue<>("blub", "blub") ); - final StringConsumerProducer stringConsumer = (StringConsumerProducer) app.app(); + final StringConsumerProducer stringConsumer = app.getApp(); run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, @@ -207,7 +207,7 @@ void shouldDeleteConsumerGroup() { try (final AdminClientX adminClient = testClient.admin()) { final ConsumerGroupClient consumerGroupClient = - adminClient.consumerGroups().group(app.getUniqueAppId()); + adminClient.consumerGroups().group(app.getUniqueGroupId()); this.softly.assertThat(consumerGroupClient.exists()) .as("Consumer group exists") .isTrue(); @@ -219,7 +219,7 @@ void shouldDeleteConsumerGroup() { try (final AdminClientX adminClient = testClient.admin()) { final ConsumerGroupClient consumerGroupClient = - adminClient.consumerGroups().group(app.getUniqueAppId()); + adminClient.consumerGroups().group(app.getUniqueGroupId()); this.softly.assertThat(consumerGroupClient.exists()) .as("Consumer group is deleted") .isFalse(); @@ -229,8 +229,8 @@ void shouldDeleteConsumerGroup() { @Test void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() { - try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); - final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); + final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, this.createConfig())) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getOutputTopic()); @@ -255,7 +255,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() { try (final AdminClientX adminClient = testClient.admin()) { final ConsumerGroupsClient groups = adminClient.consumerGroups(); - this.softly.assertThat(groups.group(app.getUniqueAppId()).exists()) + this.softly.assertThat(groups.group(app.getUniqueGroupId()).exists()) .as("Consumer group exists") .isTrue(); } @@ -264,8 +264,8 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() { try (final AdminClientX adminClient = testClient.admin()) { final ConsumerGroupsClient groups = adminClient.consumerGroups(); - groups.group(app.getUniqueAppId()).delete(); - this.softly.assertThat(groups.group(app.getUniqueAppId()).exists()) + groups.group(app.getUniqueGroupId()).delete(); + this.softly.assertThat(groups.group(app.getUniqueGroupId()).exists()) .as("Consumer group is deleted") .isFalse(); } @@ -381,8 +381,8 @@ void shouldCallCleanUpHookForAllTopics() { @Test void shouldNotThrowExceptionOnMissingInputTopic() { - try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); - final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); + final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, this.createConfig())) { this.softly.assertThatCode(() -> clean(executableApp)).doesNotThrowAnyException(); } @@ -390,8 +390,8 @@ void shouldNotThrowExceptionOnMissingInputTopic() { @Test void shouldThrowExceptionOnResetterError() { - try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); - final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); + final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, this.createConfig()); final ConsumerProducerRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); @@ -401,15 +401,16 @@ void shouldThrowExceptionOnResetterError() { awaitActive(executableApp); // should throw exception because consumer group is still active this.softly.assertThatThrownBy(() -> reset(executableApp)) - .isInstanceOf(CleanUpException.class) - .hasMessageContaining("Error resetting application, consumer group is not empty"); + .isInstanceOf(KafkaAdminException.class) + .hasMessageContaining("Failed to reset offsets for consumer group %s: consumer group is not empty", + app.getUniqueGroupId()); } } @Test void shouldReprocessAlreadySeenRecords() { - try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); - final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); + final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, this.createConfig())) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getOutputTopic()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunnerTest.java index 6503d581b..a96dd7fc7 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/ConsumerProducerRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -31,6 +31,7 @@ import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; +import com.bakdata.kafka.consumerproducer.apps.StringConsumerProducer; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -49,8 +50,8 @@ class ConsumerProducerRunnerTest extends KafkaTest { @Test void shouldRunApp() { - try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); - final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, + try (final ConfiguredConsumerProducerApp app = createStringConsumerProducer(); + final ExecutableConsumerProducerApp executableApp = createExecutableApp(app, this.createConfig()); final ConsumerProducerRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/MirrorKeyWithAvroConsumerProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/MirrorKeyWithAvroConsumerProducer.java index 798b7fd30..90742db18 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/MirrorKeyWithAvroConsumerProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/MirrorKeyWithAvroConsumerProducer.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -54,19 +54,19 @@ public SerializerDeserializerConfig defaultSerializationConfig() { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - final Consumer consumer = builder.consumerBuilder().createConsumer(); - builder.consumerBuilder().subscribeToAllTopics(consumer); - final Producer producer = builder.producerBuilder().createProducer(); - final ConsumerRunnable consumerRunnable = builder.consumerBuilder().createDefaultConsumerRunnable(consumer, + final Consumer consumer = builder.getConsumerBuilder().createConsumer(); + builder.getConsumerBuilder().subscribeToAllTopics(consumer); + final Producer producer = builder.getProducerBuilder().createProducer(); + final ConsumerRunnable consumerRunnable = builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records -> records.forEach( consumerRecord -> - producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(), + producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(), consumerRecord.key(), consumerRecord.value())))); return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable); } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration topics) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration topics) { return "app-id"; } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/MirrorValueWithAvroConsumerProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/MirrorValueWithAvroConsumerProducer.java index 7f42f13f7..97586d4f6 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/MirrorValueWithAvroConsumerProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/MirrorValueWithAvroConsumerProducer.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -54,19 +54,19 @@ public SerializerDeserializerConfig defaultSerializationConfig() { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - final Consumer consumer = builder.consumerBuilder().createConsumer(); - builder.consumerBuilder().subscribeToAllTopics(consumer); - final Producer producer = builder.producerBuilder().createProducer(); - final ConsumerRunnable consumerRunnable = builder.consumerBuilder().createDefaultConsumerRunnable(consumer, + final Consumer consumer = builder.getConsumerBuilder().createConsumer(); + builder.getConsumerBuilder().subscribeToAllTopics(consumer); + final Producer producer = builder.getProducerBuilder().createProducer(); + final ConsumerRunnable consumerRunnable = builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records -> records.forEach( consumerRecord -> - producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(), + producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(), consumerRecord.key(), consumerRecord.value())))); return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable); } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration topics) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration topics) { return "app-id"; } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/StringConsumerProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/StringConsumerProducer.java index c9a754c23..08c467bf0 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/StringConsumerProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/StringConsumerProducer.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -51,19 +51,19 @@ public SerializerDeserializerConfig defaultSerializationConfig() { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - final Producer producer = builder.producerBuilder().createProducer(); - final Consumer consumer = builder.consumerBuilder().createConsumer(); - builder.consumerBuilder().subscribeToAllTopics(consumer); + final Producer producer = builder.getProducerBuilder().createProducer(); + final Consumer consumer = builder.getConsumerBuilder().createConsumer(); + builder.getConsumerBuilder().subscribeToAllTopics(consumer); final ConsumerRunnable - consumerRunnable = builder.consumerBuilder().createDefaultConsumerRunnable(consumer, records -> + consumerRunnable = builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records -> records.forEach(consumerRecord -> - producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(), + producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(), consumerRecord.key(), consumerRecord.value())))); return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable); } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration topics) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration topics) { return "app-id"; } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/StringPatternConsumerProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/StringPatternConsumerProducer.java index a29d0334e..f20183b2b 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/StringPatternConsumerProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumerproducer/apps/StringPatternConsumerProducer.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -51,19 +51,19 @@ public SerializerDeserializerConfig defaultSerializationConfig() { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - final Consumer consumer = builder.consumerBuilder().createConsumer(); - builder.consumerBuilder().subscribeToAllTopics(consumer); - final Producer producer = builder.producerBuilder().createProducer(); - final ConsumerRunnable consumerRunnable = builder.consumerBuilder().createDefaultConsumerRunnable(consumer, + final Consumer consumer = builder.getConsumerBuilder().createConsumer(); + builder.getConsumerBuilder().subscribeToAllTopics(consumer); + final Producer producer = builder.getProducerBuilder().createProducer(); + final ConsumerRunnable consumerRunnable = builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records -> records.forEach( consumerRecord -> - producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(), + producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(), consumerRecord.key(), consumerRecord.value())))); return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable); } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration topics) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration topics) { return "app-id"; } } diff --git a/streams-bootstrap-test/src/testFixtures/java/com/bakdata/kafka/streams/apps/SimpleConsumerProducerApp.java b/streams-bootstrap-test/src/testFixtures/java/com/bakdata/kafka/streams/apps/SimpleConsumerProducerApp.java index f8067f163..c35090d3c 100644 --- a/streams-bootstrap-test/src/testFixtures/java/com/bakdata/kafka/streams/apps/SimpleConsumerProducerApp.java +++ b/streams-bootstrap-test/src/testFixtures/java/com/bakdata/kafka/streams/apps/SimpleConsumerProducerApp.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -47,19 +47,19 @@ public SerializerDeserializerConfig defaultSerializationConfig() { @Override public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) { - final Producer producer = builder.producerBuilder().createProducer(); - final Consumer consumer = builder.consumerBuilder().createConsumer(); - builder.consumerBuilder().subscribeToAllTopics(consumer); + final Producer producer = builder.getProducerBuilder().createProducer(); + final Consumer consumer = builder.getConsumerBuilder().createConsumer(); + builder.getConsumerBuilder().subscribeToAllTopics(consumer); final ConsumerRunnable - consumerRunnable = builder.consumerBuilder().createDefaultConsumerRunnable(consumer, records -> + consumerRunnable = builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records -> records.forEach(consumerRecord -> - producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(), + producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(), consumerRecord.key(), consumerRecord.value())))); return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable); } @Override - public String getUniqueAppId(final ConsumerProducerAppConfiguration topics) { + public String getUniqueGroupId(final ConsumerProducerAppConfiguration topics) { return "app-id"; } }