Conversation
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
R: @chamikaramj |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
| Map<String, Object> remappedConfig = new HashMap<>(); | ||
| for (Map.Entry<String, Object> entry : configMap.entrySet()) { | ||
| String paramName = entry.getKey(); | ||
| if (mapping.containsKey(paramName)) { |
There was a problem hiding this comment.
Right now any parameters that aren't recognized are silently dropped. I think we should be at least warning here.
There was a problem hiding this comment.
The mapping contains parameter names that should be updated. If a parameter is not included in the mapping, we assume its original name is correct.
But agreed some logging would be helpful. I'll add a log showing the final Row configuration used to build the underlying transform
There was a problem hiding this comment.
P.S. alternatively we can make it mandatory to include all parameter names in the mapping, regardless if they need to be updated or not. (i'm open to this option but i find it unnecessarily strict)
There was a problem hiding this comment.
Ah got it - I think leaving it is fine, thanks
But agreed some logging would be helpful. I'll add a log showing the final Row configuration used to build the underlying transform
Sounds good, thank you!
| // The config Row object will be used to build the underlying SchemaTransform. | ||
| // If a mapping for the SchemaTransform exists, we use it to update parameter names and align | ||
| // with the underlying config schema | ||
| Map<String, String> mapping = MAPPINGS.get(config.getTransformIdentifier()); |
There was a problem hiding this comment.
What happens if the transform identifier isn't found - semes like Iceberg already falls into that case, I'm also worried about typos though if folks don't use the enum (maybe rare)
There was a problem hiding this comment.
Similar to the reasoning above, a mapping exists only if the transform needs one.
In the Iceberg case, we are already producing a configuration schema with snake_case convention, so no remapping of names is needed.
| package org.apache.beam.sdk.io.kafka; | ||
|
|
||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertNotNull; |
There was a problem hiding this comment.
It would also be good to add a basic Kafka IT (though this is about testing Managed as much as it is Kafka). I'm fine deferring that to a future PR where we can address this together with Iceberg or including it here, up to you
There was a problem hiding this comment.
Done in #31362. We have existing integration tests for Kafka SchemaTransforms and I just switched them to use Managed API instead (which ultimately uses the schematransforms)
Onboarding KafkaIO to be available via Managed API