diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java index 658d1fc29e32..01e258f5f3cb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java @@ -53,6 +53,7 @@ public static class Configuration { private String topic; private @Nullable String idAttribute; private @Nullable String timestampAttribute; + private boolean publishWithOrderingKey = false; public void setTopic(String topic) { this.topic = topic; @@ -65,6 +66,10 @@ public void setIdLabel(@Nullable String idAttribute) { public void setTimestampAttribute(@Nullable String timestampAttribute) { this.timestampAttribute = timestampAttribute; } + + public void setPublishWithOrderingKey(Boolean publishWithOrderingKey) { + this.publishWithOrderingKey = publishWithOrderingKey != null && publishWithOrderingKey; + } } public static class WriteBuilder @@ -85,6 +90,9 @@ public PTransform, PDone> buildExternal(Configuration config if (config.timestampAttribute != null) { writeBuilder.setTimestampAttribute(config.timestampAttribute); } + if (config.publishWithOrderingKey) { + writeBuilder.setPublishWithOrderingKey(true); + } writeBuilder.setDynamicDestinations(false); return writeBuilder.build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index d62d294ed2a7..57005745044b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1727,7 +1727,10 @@ public void startBundle(StartBundleContext c) throws IOException { this.pubsubClient = getPubsubClientFactory() .newClient( - getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class)); + getTimestampAttribute(), + null, + c.getPipelineOptions().as(PubsubOptions.class), + Write.this.getPubsubRootUrl()); } @ProcessElement diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py index a2a3430f9a1a..dc171bb422b2 100644 --- a/sdks/python/apache_beam/io/external/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -117,6 +117,7 @@ def expand(self, pbegin): # this is not implemented yet on the Java side: # ('with_attributes', bool), ('timestamp_attribute', typing.Optional[str]), + ('publish_with_ordering_key', bool), ]) @@ -135,6 +136,7 @@ def __init__( with_attributes=False, id_label=None, timestamp_attribute=None, + publish_with_ordering_key=False, expansion_service=None): """Initializes ``WriteToPubSub``. @@ -150,18 +152,24 @@ def __init__( in a ReadFromPubSub PTransform to deduplicate messages. timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value. + publish_with_ordering_key: If True, enables ordering key support when + publishing messages. The ordering key must be set on each + PubsubMessage via the ``ordering_key`` attribute. Requires + messages to be routed to the same region. """ self.params = WriteToPubsubSchema( topic=topic, id_label=id_label, # with_attributes=with_attributes, - timestamp_attribute=timestamp_attribute) + timestamp_attribute=timestamp_attribute, + publish_with_ordering_key=publish_with_ordering_key) self.expansion_service = expansion_service self.with_attributes = with_attributes def expand(self, pvalue): if self.with_attributes: - pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str) + pcoll = pvalue | 'ToProto' >> Map( + lambda m: m._to_proto_str(for_publish=True)) else: pcoll = pvalue | 'ToProto' >> Map( lambda x: pubsub.PubsubMessage(x, {})._to_proto_str()) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 59eadee5538e..69effb960eeb 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -658,12 +658,22 @@ def _flush(self): # Deserialize the protobuf to get the original PubsubMessage pubsub_msg = PubsubMessage._from_proto_str(elem) - # Publish with the correct data and attributes + # Publish with the correct data, attributes, and ordering_key if self.with_attributes and pubsub_msg.attributes: future = self._pub_client.publish( - self._topic, pubsub_msg.data, **pubsub_msg.attributes) + self._topic, + pubsub_msg.data, + ordering_key=pubsub_msg.ordering_key + if pubsub_msg.ordering_key else '', + **pubsub_msg.attributes) else: - future = self._pub_client.publish(self._topic, pubsub_msg.data) + if pubsub_msg.ordering_key: + future = self._pub_client.publish( + self._topic, + pubsub_msg.data, + ordering_key=pubsub_msg.ordering_key) + else: + future = self._pub_client.publish(self._topic, pubsub_msg.data) futures.append(future) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index 8387fe734fc1..cf8323c45187 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self): """Test WriteToPubSub in batch mode with attributes.""" self._test_batch_write(with_attributes=True) + @pytest.mark.it_postcommit + def test_batch_write_with_ordering_key(self): + """Test WriteToPubSub in batch mode with ordering keys.""" + from apache_beam.options.pipeline_options import PipelineOptions + from apache_beam.options.pipeline_options import StandardOptions + from apache_beam.transforms import Create + + # Create test messages with ordering keys + test_messages = [ + PubsubMessage( + b'order_data001', {'attr': 'value1'}, ordering_key='key1'), + PubsubMessage( + b'order_data002', {'attr': 'value2'}, ordering_key='key1'), + PubsubMessage( + b'order_data003', {'attr': 'value3'}, ordering_key='key2') + ] + + pipeline_options = PipelineOptions() + pipeline_options.view_as(StandardOptions).streaming = False + + with TestPipeline(options=pipeline_options) as p: + messages = p | 'CreateMessages' >> Create(test_messages) + _ = messages | 'WriteToPubSub' >> WriteToPubSub( + self.output_topic.name, with_attributes=True) + + # Verify messages were published + time.sleep(10) + + response = self.sub_client.pull( + request={ + "subscription": self.output_sub.name, + "max_messages": 10, + }) + + self.assertEqual(len(response.received_messages), len(test_messages)) + + # Verify ordering keys were preserved + for received_message in response.received_messages: + self.assertIn('ordering_key', dir(received_message.message)) + self.sub_client.acknowledge( + request={ + "subscription": self.output_sub.name, + "ack_ids": [received_message.ack_id], + }) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 5650e920e635..14b361ae45fa 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -1098,6 +1098,71 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock): Lineage.query(p.result.metrics(), Lineage.SINK), set(["pubsub:topic:fakeprj.a_topic"])) + def test_write_messages_with_ordering_key(self, mock_pubsub): + """Test WriteToPubSub with ordering_key in messages.""" + data = b'data' + ordering_key = 'order-123' + attributes = {'key': 'value'} + payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called with ordering_key + mock_pubsub.return_value.publish.assert_called() + call_args = mock_pubsub.return_value.publish.call_args + + # Check that ordering_key was passed as a keyword argument + self.assertIn('ordering_key', call_args.kwargs) + self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) + + def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub): + """Test WriteToPubSub with ordering_key but no attributes.""" + data = b'data' + ordering_key = 'order-456' + payloads = [PubsubMessage(data, None, ordering_key=ordering_key)] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called with ordering_key + mock_pubsub.return_value.publish.assert_called() + call_args = mock_pubsub.return_value.publish.call_args + + # Check that ordering_key was passed + self.assertIn('ordering_key', call_args.kwargs) + self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) + + def test_write_messages_without_ordering_key(self, mock_pubsub): + """Test WriteToPubSub without ordering_key (backward compatibility).""" + data = b'data' + attributes = {'key': 'value'} + payloads = [PubsubMessage(data, attributes)] # No ordering_key + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called + mock_pubsub.return_value.publish.assert_called() + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)