diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 00e0c3c25433..8675e9535061 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 27 + "modification": 28 } diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index e527185bd571..fe5728c0f16e 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -85,9 +85,7 @@ # occurs. from apache_beam.internal.dill_pickler import dill except ImportError: - # We fall back to using the stock dill library in tests that don't use the - # full Python SDK. - import dill + dill = None __all__ = [ 'Coder', @@ -900,6 +898,13 @@ def to_type_hint(self): class DillCoder(_PickleCoderBase): """Coder using dill's pickle functionality.""" + def __init__(self): + if not dill: + raise RuntimeError( + "This pipeline contains a DillCoder which requires " + "the dill package. Install the dill package with the dill extra " + "e.g. apache-beam[dill]") + def _create_impl(self): return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 587e5d87522e..1ae9a32790ac 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -59,6 +59,11 @@ except ImportError: dataclasses = None # type: ignore +try: + import dill +except ImportError: + dill = None + MyNamedTuple = collections.namedtuple('A', ['x', 'y']) # type: ignore[name-match] AnotherNamedTuple = collections.namedtuple('AnotherNamedTuple', ['x', 'y']) MyTypedNamedTuple = NamedTuple('MyTypedNamedTuple', [('f1', int), ('f2', str)]) @@ -116,6 +121,7 @@ class UnFrozenDataClass: # These tests need to all be run in the same process due to the asserts # in tearDownClass. @pytest.mark.no_xdist +@pytest.mark.uses_dill class CodersTest(unittest.TestCase): # These class methods ensure that we test each defined coder in both @@ -173,6 +179,9 @@ def tearDownClass(cls): coders.BigIntegerCoder, # tested in DecimalCoder coders.TimestampPrefixingOpaqueWindowCoder, ]) + if not dill: + standard -= set( + [coders.DillCoder, coders.DeterministicFastPrimitivesCoder]) cls.seen_nested -= set( [coders.ProtoCoder, coders.ProtoPlusCoder, CustomCoder]) assert not standard - cls.seen, str(standard - cls.seen) @@ -241,8 +250,13 @@ def test_memoizing_pickle_coder(self): param(compat_version="2.67.0"), ]) def test_deterministic_coder(self, compat_version): + typecoders.registry.update_compatibility_version = compat_version coder = coders.FastPrimitivesCoder() + if not dill and compat_version: + with self.assertRaises(RuntimeError): + coder.as_deterministic_coder(step_label="step") + self.skipTest('Dill not installed') deterministic_coder = coder.as_deterministic_coder(step_label="step") self.check_coder(deterministic_coder, *self.test_values_deterministic) @@ -321,6 +335,11 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version): coder = coders.MapCoder( coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder()) + if not dill and compat_version: + with self.assertRaises(RuntimeError): + coder.as_deterministic_coder(step_label="step") + self.skipTest('Dill not installed') + deterministic_coder = coder.as_deterministic_coder(step_label="step") assert isinstance( @@ -331,6 +350,11 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version): self.check_coder(deterministic_coder, *values) def test_dill_coder(self): + if not dill: + with self.assertRaises(RuntimeError): + coders.DillCoder() + self.skipTest('Dill not installed') + cell_value = (lambda x: lambda: x)(0).__closure__[0] self.check_coder(coders.DillCoder(), 'a', 1, cell_value) self.check_coder( @@ -661,6 +685,8 @@ def test_param_windowed_value_coder(self): def test_cross_process_encoding_of_special_types_is_deterministic( self, compat_version): """Test cross-process determinism for all special deterministic types""" + if compat_version: + pytest.importorskip("dill") if sys.executable is None: self.skipTest('No Python interpreter found') diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 256f88c5453f..e7b404fdc47c 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -29,10 +29,15 @@ """ from apache_beam.internal import cloudpickle_pickler -from apache_beam.internal import dill_pickler + +try: + from apache_beam.internal import dill_pickler +except ImportError: + dill_pickler = None # type: ignore[assignment] USE_CLOUDPICKLE = 'cloudpickle' USE_DILL = 'dill' +USE_DILL_UNSAFE = 'dill_unsafe' DEFAULT_PICKLE_LIB = USE_CLOUDPICKLE desired_pickle_lib = cloudpickle_pickler @@ -74,14 +79,29 @@ def load_session(file_path): def set_library(selected_library=DEFAULT_PICKLE_LIB): """ Sets pickle library that will be used. """ global desired_pickle_lib + + if selected_library == USE_DILL and not dill_pickler: + raise ImportError( + "Pipeline option pickle_library=dill is set, but dill is not " + "installed. Install apache-beam with the dill extras package " + "e.g. apache-beam[dill].") + if selected_library == USE_DILL_UNSAFE and not dill_pickler: + raise ImportError( + "Pipeline option pickle_library=dill_unsafe is set, but dill is not " + "installed. Install dill in job submission and runtime environments.") + + is_currently_dill = (desired_pickle_lib == dill_pickler) + dill_is_requested = ( + selected_library == USE_DILL or selected_library == USE_DILL_UNSAFE) + # If switching to or from dill, update the pickler hook overrides. - if (selected_library == USE_DILL) != (desired_pickle_lib == dill_pickler): + if is_currently_dill != dill_is_requested: dill_pickler.override_pickler_hooks(selected_library == USE_DILL) if selected_library == 'default': selected_library = DEFAULT_PICKLE_LIB - if selected_library == USE_DILL: + if dill_is_requested: desired_pickle_lib = dill_pickler elif selected_library == USE_CLOUDPICKLE: desired_pickle_lib = cloudpickle_pickler diff --git a/sdks/python/apache_beam/internal/pickler_test.py b/sdks/python/apache_beam/internal/pickler_test.py index 7048f680de87..a0135b221e8c 100644 --- a/sdks/python/apache_beam/internal/pickler_test.py +++ b/sdks/python/apache_beam/internal/pickler_test.py @@ -25,6 +25,7 @@ import types import unittest +import pytest from parameterized import param from parameterized import parameterized @@ -34,6 +35,12 @@ from apache_beam.internal.pickler import loads +def maybe_skip_if_no_dill(pickle_library): + if pickle_library == 'dill': + pytest.importorskip("dill") + + +@pytest.mark.uses_dill class PicklerTest(unittest.TestCase): NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") @@ -43,6 +50,7 @@ class PicklerTest(unittest.TestCase): param(pickle_lib='cloudpickle'), ]) def test_basics(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )]))) @@ -55,6 +63,7 @@ def test_basics(self, pickle_lib): ]) def test_lambda_with_globals(self, pickle_lib): """Tests that the globals of a function are preserved.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) # The point of the test is that the lambda being called after unpickling @@ -68,6 +77,7 @@ def test_lambda_with_globals(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_lambda_with_main_globals(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual(unittest, loads(dumps(lambda: unittest))()) @@ -77,6 +87,7 @@ def test_lambda_with_main_globals(self, pickle_lib): ]) def test_lambda_with_closure(self, pickle_lib): """Tests that the closure of a function is preserved.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'closure: abc', @@ -88,6 +99,7 @@ def test_lambda_with_closure(self, pickle_lib): ]) def test_class(self, pickle_lib): """Tests that a class object is pickled correctly.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual(['abc', 'def'], loads(dumps(module_test.Xyz))().foo('abc def')) @@ -98,6 +110,7 @@ def test_class(self, pickle_lib): ]) def test_object(self, pickle_lib): """Tests that a class instance is pickled correctly.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual(['abc', 'def'], loads(dumps(module_test.XYZ_OBJECT)).foo('abc def')) @@ -108,6 +121,7 @@ def test_object(self, pickle_lib): ]) def test_nested_class(self, pickle_lib): """Tests that a nested class object is pickled correctly.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum) @@ -121,6 +135,7 @@ def test_nested_class(self, pickle_lib): ]) def test_dynamic_class(self, pickle_lib): """Tests that a nested class object is pickled correctly.""" + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'Z:abc', loads(dumps(module_test.create_class('abc'))).get()) @@ -130,6 +145,7 @@ def test_dynamic_class(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_generators(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) with self.assertRaises(TypeError): dumps((_ for _ in range(10))) @@ -139,6 +155,7 @@ def test_generators(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_recursive_class(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'RecursiveClass:abc', @@ -149,6 +166,7 @@ def test_recursive_class(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_pickle_rlock(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) rlock_instance = threading.RLock() rlock_type = type(rlock_instance) @@ -160,6 +178,7 @@ def test_pickle_rlock(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_save_paths(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) f = loads(dumps(lambda x: x)) co_filename = f.__code__.co_filename @@ -171,6 +190,7 @@ def test_save_paths(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_dump_and_load_mapping_proxy(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) self.assertEqual( 'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc']) @@ -184,6 +204,7 @@ def test_dump_and_load_mapping_proxy(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_dataclass(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) exec( ''' from apache_beam.internal.module_test import DataClass @@ -195,6 +216,7 @@ def test_dataclass(self, pickle_lib): param(pickle_lib='cloudpickle'), ]) def test_class_states_not_changed_at_subsequent_loading(self, pickle_lib): + maybe_skip_if_no_dill(pickle_lib) pickler.set_library(pickle_lib) class Local: @@ -255,6 +277,7 @@ def maybe_get_sets_with_different_iteration_orders(self): return set1, set2 def test_best_effort_determinism(self): + maybe_skip_if_no_dill('dill') pickler.set_library('dill') set1, set2 = self.maybe_get_sets_with_different_iteration_orders() self.assertEqual( @@ -267,6 +290,7 @@ def test_best_effort_determinism(self): self.skipTest('Set iteration orders matched. Test results inconclusive.') def test_disable_best_effort_determinism(self): + maybe_skip_if_no_dill('dill') pickler.set_library('dill') set1, set2 = self.maybe_get_sets_with_different_iteration_orders() # The test relies on the sets having different iteration orders for the diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 5005290ad9e8..c318b1988536 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -63,6 +63,11 @@ except ImportError: raise unittest.SkipTest('GCP dependencies are not installed') +try: + import dill +except ImportError: + dill = None + _LOGGER = logging.getLogger(__name__) _DESTINATION_ELEMENT_PAIRS = [ @@ -406,6 +411,13 @@ def test_partition_files_dofn_size_split(self): label='CheckSinglePartition') +def maybe_skip(compat_version): + if compat_version and not dill: + raise unittest.SkipTest( + 'Dill dependency not installed which is required for compat_version' + ' <= 2.67.0') + + class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp): def test_trigger_load_jobs_with_empty_files(self): destination = "project:dataset.table" @@ -485,7 +497,9 @@ def test_records_traverse_transform_with_mocks(self): param(compat_version=None), param(compat_version="2.64.0"), ]) + @pytest.mark.uses_dill def test_reshuffle_before_load(self, compat_version): + maybe_skip(compat_version) destination = 'project1:dataset1.table1' job_reference = bigquery_api.JobReference() @@ -994,6 +1008,7 @@ def dynamic_destination_resolver(element, *side_inputs): ]) def test_triggering_frequency( self, is_streaming, with_auto_sharding, compat_version): + maybe_skip(compat_version) destination = 'project1:dataset1.table1' job_reference = bigquery_api.JobReference() diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py index ccd8efd286cb..a222cf57973e 100644 --- a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py +++ b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py @@ -22,6 +22,7 @@ import unittest from typing import Optional +import pytest from parameterized import parameterized from apache_beam.internal.cloudpickle import cloudpickle @@ -323,7 +324,10 @@ def __init__(self, arg): self.my_arg = arg * 10 type(self).counter += 1 - def test_on_pickle(self): + @pytest.mark.uses_dill + def test_on_dill_pickle(self): + pytest.importorskip("dill") + FooForPickle = TestInitCallCount.FooForPickle import dill @@ -339,6 +343,9 @@ def test_on_pickle(self): self.assertEqual(FooForPickle.counter, 1) self.assertEqual(new_foo_2.__dict__, foo.__dict__) + def test_on_pickle(self): + FooForPickle = TestInitCallCount.FooForPickle + # Note that pickle does not support classes/functions nested in a function. import pickle FooForPickle.counter = 0 diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c30a902063e0..a5f3d81a1564 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1619,7 +1619,7 @@ def _add_argparse_args(cls, parser): help=( 'Chooses which pickle library to use. Options are dill, ' 'cloudpickle or default.'), - choices=['cloudpickle', 'default', 'dill']) + choices=['cloudpickle', 'default', 'dill', 'dill_unsafe']) parser.add_argument( '--save_main_session', default=False, @@ -1701,6 +1701,7 @@ def _add_argparse_args(cls, parser): def validate(self, validator): errors = [] errors.extend(validator.validate_container_prebuilding_options(self)) + errors.extend(validator.validate_pickle_library(self)) return errors diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py index ebe9c8f223ce..0217363bc9b8 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator.py @@ -119,6 +119,15 @@ class PipelineOptionsValidator(object): ERR_REPEATABLE_OPTIONS_NOT_SET_AS_LIST = ( '(%s) is a string. Programmatically set PipelineOptions like (%s) ' 'options need to be specified as a list.') + ERR_DILL_NOT_INSTALLED = ( + 'Option pickle_library=dill requires dill==0.3.1.1. Install apache-beam ' + 'with the dill extra e.g. apache-beam[gcp, dill]. Dill package was not ' + 'found') + ERR_UNSAFE_DILL_VERSION = ( + 'Dill version 0.3.1.1 is required when using pickle_library=dill. Other ' + 'versions of dill are untested with Apache Beam. To install the supported' + ' dill version instal apache-beam[dill] extra. To use an unsupported ' + 'dill version, use pickle_library=dill_unsafe. %s') # GCS path specific patterns. GCS_URI = '(?P[^:]+)://(?P[^/]+)(/(?P.*))?' @@ -196,6 +205,25 @@ def validate_gcs_path(self, view, arg_name): return self._validate_error(self.ERR_INVALID_GCS_OBJECT, arg, arg_name) return [] + def validate_pickle_library(self, view): + """Validates the pickle_library option.""" + if view.pickle_library == 'default' or view.pickle_library == 'cloudpickle': + return [] + + if view.pickle_library == 'dill_unsafe': + return [] + + if view.pickle_library == 'dill': + try: + import dill + if dill.__version__ != "0.3.1.1": + return self._validate_error( + self.ERR_UNSAFE_DILL_VERSION, + f"Dill version found {dill.__version__}") + except ImportError: + return self._validate_error(self.ERR_DILL_NOT_INSTALLED) + return [] + def validate_cloud_options(self, view): """Validates job_name and project arguments.""" errors = [] diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py index 56f305a01b74..8206d45dcf03 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py @@ -22,6 +22,7 @@ import logging import unittest +import pytest from hamcrest import assert_that from hamcrest import contains_string from hamcrest import only_contains @@ -244,6 +245,48 @@ def test_is_service_runner(self, runner, options, expected): validator = PipelineOptionsValidator(PipelineOptions(options), runner) self.assertEqual(validator.is_service_runner(), expected) + def test_pickle_library_dill_not_installed_returns_error(self): + runner = MockRunners.OtherRunner() + # Remove default region for this test. + options = PipelineOptions(['--pickle_library=dill']) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertEqual(len(errors), 1, errors) + self.assertIn("Option pickle_library=dill requires dill", errors[0]) + + @pytest.mark.uses_dill + def test_pickle_library_dill_installed_returns_no_error(self): + pytest.importorskip("dill") + runner = MockRunners.OtherRunner() + # Remove default region for this test. + options = PipelineOptions(['--pickle_library=dill']) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertEqual(len(errors), 0, errors) + + @pytest.mark.uses_dill + def test_pickle_library_dill_installed_returns_wrong_version(self): + pytest.importorskip("dill") + with unittest.mock.patch('dill.__version__', '0.3.6'): + runner = MockRunners.OtherRunner() + # Remove default region for this test. + options = PipelineOptions(['--pickle_library=dill']) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertEqual(len(errors), 1, errors) + self.assertIn("Dill version 0.3.1.1 is required when using ", errors[0]) + + @pytest.mark.uses_dill + def test_pickle_library_dill_unsafe_no_error(self): + pytest.importorskip("dill") + with unittest.mock.patch('dill.__version__', '0.3.6'): + runner = MockRunners.OtherRunner() + # Remove default region for this test. + options = PipelineOptions(['--pickle_library=dill_unsafe']) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertEqual(len(errors), 0, errors) + def test_dataflow_job_file_and_template_location_mutually_exclusive(self): runner = MockRunners.OtherRunner() options = PipelineOptions( diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index dc0d9a7cc58f..6e439aff5848 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -177,7 +177,9 @@ def expand(self, pcoll): _ = pipeline | ParentTransform() | beam.Map(lambda x: x + 1) @mock.patch('logging.info') + @pytest.mark.uses_dill def test_runner_overrides_default_pickler(self, mock_info): + pytest.importorskip("dill") with mock.patch.object(PipelineRunner, 'default_pickle_library_override') as mock_fn: mock_fn.return_value = 'dill' diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 60e247080665..22a41e592c2b 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -173,11 +173,13 @@ def test_no_main_session(self): # xdist adds unpicklable modules to the main session. @pytest.mark.no_xdist + @pytest.mark.uses_dill @unittest.skipIf( sys.platform == "win32" and sys.version_info < (3, 8), 'https://github.com/apache/beam/issues/20659: pytest on Windows pulls ' 'in a zipimporter, unpicklable before py3.8') def test_with_main_session(self): + pytest.importorskip("dill") staging_dir = self.make_temp_dir() options = PipelineOptions() diff --git a/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py b/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py index 647e08db7aaa..69172a55f246 100644 --- a/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py +++ b/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py @@ -59,7 +59,12 @@ def test_combining_value_state(self): {'runner': fn_api_runner.FnApiRunner, 'pickler': 'dill'}, {'runner': fn_api_runner.FnApiRunner, 'pickler': 'cloudpickle'}, ]) # yapf: disable +@pytest.mark.uses_dill class LocalCombineFnLifecycleTest(unittest.TestCase): + def setUp(self): + if self.pickler == 'dill': + pytest.importorskip("dill") + def tearDown(self): CallSequenceEnforcingCombineFn.instances.clear() diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index b365d9b22090..66e7a9e194d3 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -83,6 +83,11 @@ from apache_beam.utils.windowed_value import PaneInfoTiming from apache_beam.utils.windowed_value import WindowedValue +try: + import dill +except ImportError: + dill = None + warnings.filterwarnings( 'ignore', category=FutureWarning, module='apache_beam.transform.util_test') @@ -112,6 +117,13 @@ def is_deterministic(self): return True +def maybe_skip(compat_version): + if compat_version and not dill: + raise unittest.SkipTest( + 'Dill dependency not installed which is required for compat_version' + ' <= 2.67.0') + + class CoGroupByKeyTest(unittest.TestCase): def test_co_group_by_key_on_tuple(self): with TestPipeline() as pipeline: @@ -997,8 +1009,10 @@ def test_reshuffle_streaming_global_window_with_buckets(self): param(compat_version=None), param(compat_version="2.64.0"), ]) + @pytest.mark.uses_dill def test_reshuffle_custom_window_preserves_metadata(self, compat_version): """Tests that Reshuffle preserves pane info.""" + maybe_skip(compat_version) element_count = 12 timestamp_value = timestamp.Timestamp(0) l = [ @@ -1098,10 +1112,11 @@ def test_reshuffle_custom_window_preserves_metadata(self, compat_version): param(compat_version=None), param(compat_version="2.64.0"), ]) + @pytest.mark.uses_dill def test_reshuffle_default_window_preserves_metadata(self, compat_version): """Tests that Reshuffle preserves timestamp, window, and pane info metadata.""" - + maybe_skip(compat_version) no_firing = PaneInfo( is_first=True, is_last=True, diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 6cf37322147e..73db06b9a8d2 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -30,8 +30,8 @@ from typing import Optional from typing import Sequence -import dill import numpy as np +import pytest from hypothesis import given from hypothesis import settings from parameterized import parameterized @@ -711,13 +711,19 @@ def test_named_fields_roundtrip(self, named_fields): 'pickler': pickle, }, { - 'pickler': dill, + 'pickler': 'dill', }, { 'pickler': cloudpickle, }, ]) +@pytest.mark.uses_dill class PickleTest(unittest.TestCase): + def setUp(self): + # pylint: disable=access-member-before-definition + if self.pickler == 'dill': + self.pickler = pytest.importorskip("dill") + def test_generated_class_pickle_instance(self): schema = schema_pb2.Schema( id="some-uuid", @@ -733,7 +739,7 @@ def test_generated_class_pickle_instance(self): self.assertEqual(instance, self.pickler.loads(self.pickler.dumps(instance))) def test_generated_class_pickle(self): - if self.pickler in [pickle, dill]: + if self.pickler in [pickle, pytest.importorskip("dill")]: self.skipTest('https://github.com/apache/beam/issues/22714') schema = schema_pb2.Schema( diff --git a/sdks/python/container/base_image_requirements_manual.txt b/sdks/python/container/base_image_requirements_manual.txt index bef89e9fd31e..536f62c27f5d 100644 --- a/sdks/python/container/base_image_requirements_manual.txt +++ b/sdks/python/container/base_image_requirements_manual.txt @@ -40,3 +40,6 @@ google-crc32c scipy scikit-learn build>=1.0,<2 # tool to build sdist from setup.py in stager. +# Dill 0.3.1.1 is included as a base manual requirement so is avaiable to users +# with pickle_library=dill, but apache-beam does not have a hard dependency. +dill>=0.3.1.1,<0.3.2 diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index cb244025812d..3eee1a5c0e80 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -71,6 +71,7 @@ markers = uses_feast: tests that uses feast in some way gemini_postcommit: gemini postcommits that need additional deps. require_docker_in_docker: tests that require running Docker inside Docker (Docker-in-Docker), which is not supported on Beam’s self-hosted runners. Context: https://github.com/apache/beam/pull/35585 + uses_dill: tests that require dill pickle library. # Default timeout intended for unit tests. # If certain tests need a different value, please see the docs on how to diff --git a/sdks/python/setup.py b/sdks/python/setup.py index e7ffc0c9780c..4d7ba0d5a506 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -360,12 +360,6 @@ def get_portability_package_data(): install_requires=[ 'crcmod>=1.7,<2.0', 'orjson>=3.9.7,<4', - # Dill doesn't have forwards-compatibility guarantees within minor - # version. Pickles created with a new version of dill may not unpickle - # using older version of dill. It is best to use the same version of - # dill on client and server, therefore list of allowed versions is - # very narrow. See: https://github.com/uqfoundation/dill/issues/341. - 'dill>=0.3.1.1,<0.3.2', 'fastavro>=0.23.6,<2', 'fasteners>=0.3,<1.0', # TODO(https://github.com/grpc/grpc/issues/37710): Unpin grpc @@ -411,6 +405,15 @@ def get_portability_package_data(): python_requires=python_requires, # BEAM-8840: Do NOT use tests_require or setup_requires. extras_require={ + 'dill': [ + # Dill doesn't have forwards-compatibility guarantees within minor + # version. Pickles created with a new version of dill may not + # unpickle using older version of dill. It is best to use the same + # version of dill on client and server, therefore list of allowed + # versions is very narrow. + # See: https://github.com/uqfoundation/dill/issues/341. + 'dill>=0.3.1.1,<0.3.2', + ], 'docs': [ 'jinja2>=3.0,<3.2', 'Sphinx>=7.0.0,<8.0', diff --git a/sdks/python/test-suites/tox/common.gradle b/sdks/python/test-suites/tox/common.gradle index 75a12cdcf4cb..ac5dc57d8a55 100644 --- a/sdks/python/test-suites/tox/common.gradle +++ b/sdks/python/test-suites/tox/common.gradle @@ -29,6 +29,9 @@ test.dependsOn "testPy${pythonVersionSuffix}Cloud" toxTask "testPy${pythonVersionSuffix}ML", "py${pythonVersionSuffix}-ml", "${posargs}" test.dependsOn "testPy${pythonVersionSuffix}ML" +toxTask "testPy${pythonVersionSuffix}Dill", "py${pythonVersionSuffix}-dill", "${posargs}" +test.dependsOn "testPy${pythonVersionSuffix}Dill" + // toxTask "testPy${pythonVersionSuffix}Dask", "py${pythonVersionSuffix}-dask", "${posargs}" // test.dependsOn "testPy${pythonVersionSuffix}Dask" project.tasks.register("preCommitPy${pythonVersionSuffix}") { diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index f344cfc61ccf..354f03a7dba6 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -571,3 +571,11 @@ commands = /bin/sh -c "pip freeze | grep -E tensorflow" # Allow exit code 5 (no tests run) so that we can run this command safely on arbitrary subdirectories. bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms/embeddings' + +[testenv:py{310,312}-dill] +extras = test,dill +commands = + # Log dill version for debugging + /bin/sh -c "pip freeze | grep -E dill" + # Run all dill-specific tests + /bin/sh -c 'pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 1 -m uses_dill {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret'