Skip to content
Draft
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,21 +24,19 @@

package com.bakdata.kafka.consumer;

import com.bakdata.kafka.CloseExecutionOptions;
import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.mixin.ConsumerOptions;
import com.bakdata.kafka.mixin.InputOptions;
import java.time.Duration;
import java.util.Optional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Delegate;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
import picocli.CommandLine.Option;


/**
Expand Down Expand Up @@ -72,10 +70,6 @@ public abstract class KafkaConsumerApplication<T extends ConsumerApp> extends
@Mixin
@Delegate
private ConsumerOptions consumerOptions = new ConsumerOptions();
@Option(names = {"--poll-timeout"},
description = "The maximum time to block in the consumer poll loop. Examples: 'PT0.1S', 'PT2S', 'PT1M'.",
defaultValue = "PT0.1S")
private Duration pollTimeout = Duration.ofMillis(100);

/**
* Reset the Kafka Consumer application. Additionally, delete the consumer group.
Expand All @@ -101,8 +95,9 @@ public void reset() {
@Override
public final Optional<ConsumerExecutionOptions> createExecutionOptions() {
final ConsumerExecutionOptions executionOptions = ConsumerExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.onStart(this::onConsumerStart)
.closeExecutionOptions(CloseExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.build())
.pollTimeout(this.getPollTimeout())
.build();
return Optional.of(executionOptions);
Expand All @@ -128,13 +123,4 @@ public final ConfiguredConsumerApp<T> createConfiguredApp(final T app,
public ConsumerAppConfiguration createConfiguration(final ConsumerTopicConfig topics) {
return new ConsumerAppConfiguration(topics, this.getGroupId());
}

/**
* Called after starting Kafka Consumer
*
* @param runningConsumer running {@link ConsumerRunnable} instance along with its {@link ConsumerConfig}
*/
protected void onConsumerStart(final RunningConsumer runningConsumer) {
// do nothing by default
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,7 @@
import lombok.RequiredArgsConstructor;

/**
* {@code KafkaConsumerApplication} without any additional configuration options.
* {@link KafkaConsumerApplication} without any additional configuration options.
*
* @param <T> type of {@link ConsumerApp} created by this application
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,11 +24,14 @@

package com.bakdata.kafka.consumerproducer;

import com.bakdata.kafka.CloseExecutionOptions;
import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.consumer.ConsumerExecutionOptions;
import com.bakdata.kafka.mixin.ConsumerOptions;
import com.bakdata.kafka.mixin.ErrorOptions;
import com.bakdata.kafka.mixin.InputOptions;
import com.bakdata.kafka.mixin.OutputOptions;
import com.bakdata.kafka.producer.ProducerExecutionOptions;
import java.util.Optional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -94,10 +97,9 @@ public void clean() {
}

/**
* Clear all state stores and consumer group offsets associated with the Kafka ConsumerProducer application.
* Reset consumer group offsets associated with the Kafka ConsumerProducer application.
*/
@Command(description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
+ "Kafka ConsumerProducer application.")
@Command(description = "Reset consumer group offsets associated with the Kafka ConsumerProducer application.")
public void reset() {
this.prepareClean();
try (final CleanableApp<ConsumerProducerCleanUpRunner> app = this.createCleanableApp()) {
Expand All @@ -108,7 +110,17 @@ public void reset() {

@Override
public final Optional<ConsumerProducerExecutionOptions> createExecutionOptions() {
return Optional.empty();
final ConsumerProducerExecutionOptions executionOptions = ConsumerProducerExecutionOptions.builder()
.consumerExecutionOptions(ConsumerExecutionOptions.builder()
.closeExecutionOptions(CloseExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.build())
.pollTimeout(this.getPollTimeout())
.build())
.producerExecutionOptions(ProducerExecutionOptions.builder()
.build())
.build();
return Optional.of(executionOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,7 @@
import lombok.RequiredArgsConstructor;

/**
* {@code KafkaConsumerProducerApplication} without any additional configuration options.
* {@link KafkaConsumerProducerApplication} without any additional configuration options.
*
* @param <T> type of {@link ConsumerProducerApp} created by this application
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

package com.bakdata.kafka.mixin;

import java.time.Duration;
import lombok.Data;
import picocli.CommandLine;
import picocli.CommandLine.Option;

/**
* Shared CLI options to configure Kafka Consumer applications.
Expand All @@ -38,4 +40,7 @@ public class ConsumerOptions {
@CommandLine.Option(names = "--group-id",
description = "Unique identifier for the Kafka Consumer applications, used as 'group.id'.")
private String groupId;
@Option(names = {"--poll-timeout"},
description = "The maximum time to block in the consumer poll loop. Examples: 'PT0.1S', 'PT2S', 'PT1M'.")
private Duration pollTimeout = Duration.ofSeconds(10L);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -73,7 +73,9 @@ public void clean() {

@Override
public final Optional<ProducerExecutionOptions> createExecutionOptions() {
return Optional.empty();
final ProducerExecutionOptions executionOptions = ProducerExecutionOptions.builder()
.build();
return Optional.of(executionOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,6 +24,7 @@

package com.bakdata.kafka.streams;

import com.bakdata.kafka.CloseExecutionOptions;
import com.bakdata.kafka.KafkaApplication;
import com.bakdata.kafka.mixin.ErrorOptions;
import com.bakdata.kafka.mixin.InputOptions;
Expand Down Expand Up @@ -116,7 +117,9 @@ public void reset() {
@Override
public final Optional<StreamsExecutionOptions> createExecutionOptions() {
final StreamsExecutionOptions options = StreamsExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.closeExecutionOptions(CloseExecutionOptions.builder()
.volatileGroupInstanceId(this.isVolatileGroupInstanceId())
.build())
.uncaughtExceptionHandler(this::createUncaughtExceptionHandler)
.stateListener(this::createStateListener)
.onStart(this::onStreamsStart)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -65,7 +65,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -97,7 +97,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -125,7 +125,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -161,7 +161,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -191,11 +191,11 @@ public ConsumerProducerApp createApp() {
return new ConsumerProducerApp() {
@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
return (consumerConfig, producerConfig) -> {};
return consumerConfig -> {};
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
return "my-id";
}

Expand Down Expand Up @@ -223,13 +223,13 @@ void shouldExitWithErrorInBuildRunnable() {
() -> new ConsumerProducerApp() {
@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
return (consumerConfig, producerConfig) -> {
return consumerConfig -> {
throw new RuntimeException("Error building runnable");
};
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
return "app";
}

Expand Down Expand Up @@ -264,8 +264,8 @@ void shouldExitWithSuccessCodeOnShutdown() {
() -> new ConsumerProducerApp() {
@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
return (consumerConfig, producerConfig) -> {
try (final Producer<String, String> producer = builder.producerBuilder()
return consumerConfig -> {
try (final Producer<String, String> producer = builder.getProducerBuilder()
.createProducer()) {
final ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(output, "foo", "bar");
Expand All @@ -275,7 +275,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
return "app";
}

Expand Down Expand Up @@ -321,7 +321,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -351,7 +351,7 @@ public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder buil
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -60,19 +60,19 @@ public ConsumerProducerApp createApp() {

@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
final Producer<String, String> producer = builder.producerBuilder().createProducer();
final Consumer<String, String> consumer = builder.consumerBuilder().createConsumer();
builder.consumerBuilder().subscribeToAllTopics(consumer);
final Producer<String, String> producer = builder.getProducerBuilder().createProducer();
final Consumer<String, String> consumer = builder.getConsumerBuilder().createConsumer();
builder.getConsumerBuilder().subscribeToAllTopics(consumer);
final ConsumerRunnable consumerRunnable =
builder.consumerBuilder().createDefaultConsumerRunnable(consumer, records ->
builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records ->
records.forEach(consumerRecord ->
producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(),
producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(),
consumerRecord.key(), consumerRecord.value()))));
return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable);
}

@Override
public String getUniqueAppId(final ConsumerProducerAppConfiguration configuration) {
public String getUniqueGroupId(final ConsumerProducerAppConfiguration configuration) {
return CloseFlagApp.this.getClass().getSimpleName() + "-" + configuration.getTopics().getOutputTopic();
}

Expand Down
Loading
Loading