[BEAM-2852] Add support for Kafka as source/sink on Nexmark#5019
Conversation
|
R: @echauchot |
|
Thanks @aromanenko-dev ! |
|
Thanks @aromanenko-dev ! |
echauchot
left a comment
There was a problem hiding this comment.
Thanks!
some small changes + one new small feature + include my PR
| /** | ||
| * Send {@code events} to Kafka. | ||
| */ | ||
| private void sinkEventsToKafka(PCollection<Event> events) { |
There was a problem hiding this comment.
Can you please wire this method up and add a COMBINED mode similar to what is done in Pub/Sub?
There was a problem hiding this comment.
IMHO I think we should refactor the whole COMBINED mode:
- See NexmarkLauncher#createSource: it does a switch on the source type to configure sink.
- it sends synthetic events to sink when in COMBINED mode but NexmarkUtils#COMBINED states that combine modes is for "Both publish and consume, but as separate jobs".
Once refactored to something more coherent, implement it for kafka.
There was a problem hiding this comment.
what is important in the connection with MOM:
- have the ability to keep a track of the generated events that lead to a benchmark result
- be able to read events from a topic
- write benchmark results to topic
| * Send {@code events} to Kafka. | ||
| */ | ||
| private void sinkEventsToKafka(PCollection<Event> events) { | ||
| PTransform<PCollection<byte[]>, PDone> io = KafkaIO.<Long, byte[]>write() |
There was a problem hiding this comment.
This is a comment that I had on the previous PR. IMHO it is very wired code: it explicitely uses PTransform in place of Write transform and also it specifies a key and the associated coder whereas there is no key in the input PCollection. To be quicker I submited a PR to your repo so that you could include the fix in that PR branch. See aromanenko-dev#2
| throw new RuntimeException("Missing --bootstrapServers"); | ||
| } | ||
|
|
||
| KafkaIO.Read<Long, byte[]> io = KafkaIO.<Long, byte[]>read() |
| private PCollection<Event> sourceEventsFromKafka(Pipeline p) { | ||
| NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic()); | ||
|
|
||
| if (Strings.isNullOrEmpty(options.getBootstrapServers())) { |
| checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()), | ||
| "Missing --bootstrapServers"); | ||
|
|
||
| PTransform<PCollection<String>, PDone> io = KafkaIO.<Long, String>write() |
There was a problem hiding this comment.
same comment as above See my PR #2 on your repo
2e744e5 to
8f7d724
Compare
|
@aromanenko-dev please update gradle build to reflect maven one, squash this commit with code style one and run "Run Java PreCommit" to launch gradle build. LGTM at green lights of gradle build |
…n the PCollection
[BEAM-2852] Adjust gradle build with maven
ca3436c to
6df0add
Compare
|
Run Java PreCommit |
|
Merging this PR, we will takle COMBINED mode (refactoring this mode in pub/sub and apply it to kafka) in another PR. I opened a ticket for that: https://issues.apache.org/jira/browse/BEAM-4048 |
|
Thanks! @aromanenko-dev |
|
@vectorijk Thank you for your initial work! |
Allows to use Kafka as as source/sink for Nexmark benchmark.
Based on original implementation of #3937 by @vectorijk
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue.mvn clean verifyto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.