diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 05bb1cdac4a8..684735bfbe22 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -967,7 +967,7 @@ def test_to_from_runner_api(self): schema=schema) # pylint: disable=expression-not-assigned - p | 'MyWriteToBigQuery' >> original + p | beam.Create([]) | 'MyWriteToBigQuery' >> original # Run the pipeline through to generate a pipeline proto from an empty # context. This ensures that the serialization code ran. diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index c011732f9352..022b7effcd0d 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -797,6 +797,11 @@ def apply( type_options = self._options.view_as(TypeOptions) if type_options.pipeline_type_check: transform.type_check_inputs(pvalueish) + if isinstance(pvalueish, pvalue.PBegin) and isinstance(transform, ParDo): + full_label = self._current_transform().full_label + raise TypeCheckError( + f"Transform '{full_label}' expects a PCollection as input. " + "Got a PBegin/Pipeline instead.") pvalueish_result = self.runner.apply(transform, pvalueish, self._options) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index b18bc0d668e2..20aebdd7790f 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -702,6 +702,23 @@ def test_track_pcoll_unbounded_flatten(self): self.assertIs(pcoll2_unbounded.is_bounded, False) self.assertIs(merged.is_bounded, False) + def test_incompatible_pcollection_errmsg(self): + with pytest.raises(Exception, + match=r".*Map\(print\).*Got a PBegin/Pipeline instead."): + with beam.Pipeline() as pipeline: + _ = (pipeline | beam.Map(print)) + + class ParentTransform(PTransform): + def expand(self, pcoll): + return pcoll | beam.Map(print) + + with pytest.raises( + Exception, + match=r".*ParentTransform/Map\(print\).*Got a PBegin/Pipeline instead." + ): + with beam.Pipeline() as pipeline: + _ = (pipeline | ParentTransform()) + def test_incompatible_submission_and_runtime_envs_fail_pipeline(self): with mock.patch( 'apache_beam.transforms.environments.sdk_base_version_capability'