From 523ff61ab5ba8d1918a92266948248a160a2d1cc Mon Sep 17 00:00:00 2001 From: Marquis Chamberlain Date: Fri, 27 Oct 2023 10:55:24 -0400 Subject: [PATCH 1/5] trying out naive hack --- pyiceberg/io/pyarrow.py | 141 +++++++++++++++++++++++++--------------- 1 file changed, 89 insertions(+), 52 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f3ef5ca5a0..5860cb5ab2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -590,14 +590,16 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi raise ValueError(f"Unsupported file format: {file_format}") -def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment: +def _construct_fragment(fs: FileSystem, data_file: DataFile, + file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment: _, _, path = PyArrowFileIO.parse_location(data_file.file_path) return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs) def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: delete_fragment = _construct_fragment( - fs, data_file, file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE} + fs, data_file, + file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE} ) table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() table = table.unify_dictionaries() @@ -729,7 +731,8 @@ def _get_field_doc(field: pa.Field) -> Optional[str]: class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): - def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[NestedField]: + def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[ + NestedField]: fields = [] for i, field in enumerate(arrow_fields): field_id = _get_field_id(field) @@ -753,7 +756,7 @@ def list(self, list_type: pa.ListType, element_result: Optional[IcebergType]) -> return None def map( - self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType] + self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType] ) -> Optional[IcebergType]: key_field = map_type.key_field key_id = _get_field_id(key_field) @@ -798,17 +801,39 @@ def primitive(self, primitive: pa.DataType) -> IcebergType: raise TypeError(f"Unsupported type: {primitive}") +# ToDo get guidance on where this should be and if we can find an exhaustive list of magic +parquet_magic_columns = { + """ + Apache Iceberg -> Parquet converts column names like + "foo:bar" to "foo_x3A" within the parquet file itself + """ + ":": "_x3A" +} + +# ToDo get guidance on where this should be, and how we want to flag it +def _hack_names(column_name_list: list[str], enabled: bool): + if enabled: + o = [] + # ToDo fix time and space complexity + for key in parquet_magic_columns.keys(): + for column_name in column_name_list: + if key in column_name: + o.append(column_name.replace(key, parquet_magic_columns[key])) + else: + o.append(column_name) + return o + return column_name_list def _task_to_table( - fs: FileSystem, - task: FileScanTask, - bound_row_filter: BooleanExpression, - projected_schema: Schema, - projected_field_ids: Set[int], - positional_deletes: Optional[List[ChunkedArray]], - case_sensitive: bool, - row_counts: List[int], - limit: Optional[int] = None, + fs: FileSystem, + task: FileScanTask, + bound_row_filter: BooleanExpression, + projected_schema: Schema, + projected_field_ids: Set[int], + positional_deletes: Optional[List[ChunkedArray]], + case_sensitive: bool, + row_counts: List[int], + limit: Optional[int] = None, ) -> Optional[pa.Table]: if limit and sum(row_counts) >= limit: return None @@ -823,7 +848,8 @@ def _task_to_table( schema_raw = metadata.get(ICEBERG_SCHEMA) # TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema, # see https://github.com/apache/iceberg/issues/7451 - file_schema = Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema) + file_schema = Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema( + physical_schema) pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): @@ -831,7 +857,8 @@ def _task_to_table( bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) - file_project_schema = sanitize_column_names(prune_columns(file_schema, projected_field_ids, select_full_types=False)) + file_project_schema = sanitize_column_names( + prune_columns(file_schema, projected_field_ids, select_full_types=False)) if file_schema is None: raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") @@ -842,7 +869,7 @@ def _task_to_table( # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first filter=pyarrow_filter if not positional_deletes else None, - columns=[col.name for col in file_project_schema.columns], + columns=_hack_names([col.name for col in file_project_schema.columns], True), ) if positional_deletes: @@ -902,12 +929,12 @@ def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dic def project_table( - tasks: Iterable[FileScanTask], - table: Table, - row_filter: BooleanExpression, - projected_schema: Schema, - case_sensitive: bool = True, - limit: Optional[int] = None, + tasks: Iterable[FileScanTask], + table: Table, + row_filter: BooleanExpression, + projected_schema: Schema, + case_sensitive: bool = True, + limit: Optional[int] = None, ) -> pa.Table: """Resolve the right columns based on the identifier. @@ -992,7 +1019,8 @@ def project_table( def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: - struct_array = visit_with_partner(requested_schema, table, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema)) + struct_array = visit_with_partner(requested_schema, table, ArrowProjectionVisitor(file_schema), + ArrowAccessor(file_schema)) arrays = [] fields = [] @@ -1015,11 +1043,12 @@ def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type))) return values - def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: + def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[ + pa.Array]: return struct_result def struct( - self, struct: StructType, struct_array: Optional[pa.Array], field_results: List[Optional[pa.Array]] + self, struct: StructType, struct_array: Optional[pa.Array], field_results: List[Optional[pa.Array]] ) -> Optional[pa.Array]: if struct_array is None: return None @@ -1042,7 +1071,8 @@ def struct( def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional[pa.Array]) -> Optional[pa.Array]: return field_array - def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[pa.Array]: + def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[ + pa.Array]: return ( pa.ListArray.from_arrays(list_array.offsets, self.cast_if_needed(list_type.element_field, value_array)) if isinstance(list_array, pa.ListArray) @@ -1050,7 +1080,8 @@ def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: ) def map( - self, map_type: MapType, map_array: Optional[pa.Array], key_result: Optional[pa.Array], value_result: Optional[pa.Array] + self, map_type: MapType, map_array: Optional[pa.Array], key_result: Optional[pa.Array], + value_result: Optional[pa.Array] ) -> Optional[pa.Array]: return ( pa.MapArray.from_arrays( @@ -1171,7 +1202,8 @@ class StatsAggregator: current_max: Any trunc_length: Optional[int] - def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None) -> None: + def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, + trunc_length: Optional[int] = None) -> None: self.current_min = None self.current_max = None self.trunc_length = trunc_length @@ -1284,27 +1316,30 @@ def __init__(self, schema: Schema, properties: Dict[str, str]): self._properties = properties self._default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY) - def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]: + def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsCollector]]) -> List[ + StatisticsCollector]: return struct_result() def struct( - self, struct: StructType, field_results: List[Callable[[], List[StatisticsCollector]]] + self, struct: StructType, field_results: List[Callable[[], List[StatisticsCollector]]] ) -> List[StatisticsCollector]: return list(chain(*[result() for result in field_results])) - def field(self, field: NestedField, field_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]: + def field(self, field: NestedField, field_result: Callable[[], List[StatisticsCollector]]) -> List[ + StatisticsCollector]: self._field_id = field.field_id return field_result() - def list(self, list_type: ListType, element_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]: + def list(self, list_type: ListType, element_result: Callable[[], List[StatisticsCollector]]) -> List[ + StatisticsCollector]: self._field_id = list_type.element_id return element_result() def map( - self, - map_type: MapType, - key_result: Callable[[], List[StatisticsCollector]], - value_result: Callable[[], List[StatisticsCollector]], + self, + map_type: MapType, + key_result: Callable[[], List[StatisticsCollector]], + value_result: Callable[[], List[StatisticsCollector]], ) -> List[StatisticsCollector]: self._field_id = map_type.key_id k = key_result() @@ -1327,8 +1362,8 @@ def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]: metrics_mode = match_metrics_mode(col_mode) if ( - not (isinstance(primitive, StringType) or isinstance(primitive, BinaryType)) - and metrics_mode.type == MetricModeTypes.TRUNCATE + not (isinstance(primitive, StringType) or isinstance(primitive, BinaryType)) + and metrics_mode.type == MetricModeTypes.TRUNCATE ): metrics_mode = MetricsMode(MetricModeTypes.FULL) @@ -1337,12 +1372,13 @@ def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]: if is_nested and metrics_mode.type in [MetricModeTypes.TRUNCATE, MetricModeTypes.FULL]: metrics_mode = MetricsMode(MetricModeTypes.COUNTS) - return [StatisticsCollector(field_id=self._field_id, iceberg_type=primitive, mode=metrics_mode, column_name=column_name)] + return [StatisticsCollector(field_id=self._field_id, iceberg_type=primitive, mode=metrics_mode, + column_name=column_name)] def compute_statistics_plan( - schema: Schema, - table_properties: Dict[str, str], + schema: Schema, + table_properties: Dict[str, str], ) -> Dict[int, StatisticsCollector]: """ Compute the statistics plan for all columns. @@ -1381,7 +1417,8 @@ def __init__(self) -> None: def schema(self, schema: Schema, struct_result: Callable[[], List[ID2ParquetPath]]) -> List[ID2ParquetPath]: return struct_result() - def struct(self, struct: StructType, field_results: List[Callable[[], List[ID2ParquetPath]]]) -> List[ID2ParquetPath]: + def struct(self, struct: StructType, field_results: List[Callable[[], List[ID2ParquetPath]]]) -> List[ + ID2ParquetPath]: return list(chain(*[result() for result in field_results])) def field(self, field: NestedField, field_result: Callable[[], List[ID2ParquetPath]]) -> List[ID2ParquetPath]: @@ -1399,10 +1436,10 @@ def list(self, list_type: ListType, element_result: Callable[[], List[ID2Parquet return result def map( - self, - map_type: MapType, - key_result: Callable[[], List[ID2ParquetPath]], - value_result: Callable[[], List[ID2ParquetPath]], + self, + map_type: MapType, + key_result: Callable[[], List[ID2ParquetPath]], + value_result: Callable[[], List[ID2ParquetPath]], ) -> List[ID2ParquetPath]: self._field_id = map_type.key_id self._path.append("key_value.key") @@ -1419,7 +1456,7 @@ def primitive(self, primitive: PrimitiveType) -> List[ID2ParquetPath]: def parquet_path_to_id_mapping( - schema: Schema, + schema: Schema, ) -> Dict[str, int]: """ Compute the mapping of parquet column path to Iceberg ID. @@ -1438,11 +1475,11 @@ def parquet_path_to_id_mapping( def fill_parquet_file_metadata( - df: DataFile, - parquet_metadata: pq.FileMetaData, - file_size: int, - stats_columns: Dict[int, StatisticsCollector], - parquet_column_mapping: Dict[str, int], + df: DataFile, + parquet_metadata: pq.FileMetaData, + file_size: int, + stats_columns: Dict[int, StatisticsCollector], + parquet_column_mapping: Dict[str, int], ) -> None: """ Compute and fill the following fields of the DataFile object. From b8c2ae398f43a0b4eae9db94815a0abbb32b1c2e Mon Sep 17 00:00:00 2001 From: Marquis Chamberlain Date: Fri, 27 Oct 2023 11:19:40 -0400 Subject: [PATCH 2/5] removing whole file reformatting --- pyiceberg/io/pyarrow.py | 117 ++++++++++++++++++---------------------- 1 file changed, 51 insertions(+), 66 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 5860cb5ab2..34c7308a4d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -590,16 +590,14 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi raise ValueError(f"Unsupported file format: {file_format}") -def _construct_fragment(fs: FileSystem, data_file: DataFile, - file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment: +def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: Dict[str, Any] = EMPTY_DICT) -> ds.Fragment: _, _, path = PyArrowFileIO.parse_location(data_file.file_path) return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs) def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: delete_fragment = _construct_fragment( - fs, data_file, - file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE} + fs, data_file, file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE} ) table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() table = table.unify_dictionaries() @@ -731,8 +729,7 @@ def _get_field_doc(field: pa.Field) -> Optional[str]: class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): - def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[ - NestedField]: + def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[NestedField]: fields = [] for i, field in enumerate(arrow_fields): field_id = _get_field_id(field) @@ -756,7 +753,7 @@ def list(self, list_type: pa.ListType, element_result: Optional[IcebergType]) -> return None def map( - self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType] + self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType] ) -> Optional[IcebergType]: key_field = map_type.key_field key_id = _get_field_id(key_field) @@ -825,15 +822,15 @@ def _hack_names(column_name_list: list[str], enabled: bool): return column_name_list def _task_to_table( - fs: FileSystem, - task: FileScanTask, - bound_row_filter: BooleanExpression, - projected_schema: Schema, - projected_field_ids: Set[int], - positional_deletes: Optional[List[ChunkedArray]], - case_sensitive: bool, - row_counts: List[int], - limit: Optional[int] = None, + fs: FileSystem, + task: FileScanTask, + bound_row_filter: BooleanExpression, + projected_schema: Schema, + projected_field_ids: Set[int], + positional_deletes: Optional[List[ChunkedArray]], + case_sensitive: bool, + row_counts: List[int], + limit: Optional[int] = None, ) -> Optional[pa.Table]: if limit and sum(row_counts) >= limit: return None @@ -848,8 +845,7 @@ def _task_to_table( schema_raw = metadata.get(ICEBERG_SCHEMA) # TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema, # see https://github.com/apache/iceberg/issues/7451 - file_schema = Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema( - physical_schema) + file_schema = Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema) pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): @@ -857,8 +853,7 @@ def _task_to_table( bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) - file_project_schema = sanitize_column_names( - prune_columns(file_schema, projected_field_ids, select_full_types=False)) + file_project_schema = sanitize_column_names(prune_columns(file_schema, projected_field_ids, select_full_types=False)) if file_schema is None: raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") @@ -929,12 +924,12 @@ def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dic def project_table( - tasks: Iterable[FileScanTask], - table: Table, - row_filter: BooleanExpression, - projected_schema: Schema, - case_sensitive: bool = True, - limit: Optional[int] = None, + tasks: Iterable[FileScanTask], + table: Table, + row_filter: BooleanExpression, + projected_schema: Schema, + case_sensitive: bool = True, + limit: Optional[int] = None, ) -> pa.Table: """Resolve the right columns based on the identifier. @@ -1019,8 +1014,7 @@ def project_table( def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table: - struct_array = visit_with_partner(requested_schema, table, ArrowProjectionVisitor(file_schema), - ArrowAccessor(file_schema)) + struct_array = visit_with_partner(requested_schema, table, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema)) arrays = [] fields = [] @@ -1043,12 +1037,11 @@ def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type))) return values - def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[ - pa.Array]: + def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: return struct_result def struct( - self, struct: StructType, struct_array: Optional[pa.Array], field_results: List[Optional[pa.Array]] + self, struct: StructType, struct_array: Optional[pa.Array], field_results: List[Optional[pa.Array]] ) -> Optional[pa.Array]: if struct_array is None: return None @@ -1071,8 +1064,7 @@ def struct( def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional[pa.Array]) -> Optional[pa.Array]: return field_array - def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[ - pa.Array]: + def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[pa.Array]: return ( pa.ListArray.from_arrays(list_array.offsets, self.cast_if_needed(list_type.element_field, value_array)) if isinstance(list_array, pa.ListArray) @@ -1080,8 +1072,7 @@ def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: ) def map( - self, map_type: MapType, map_array: Optional[pa.Array], key_result: Optional[pa.Array], - value_result: Optional[pa.Array] + self, map_type: MapType, map_array: Optional[pa.Array], key_result: Optional[pa.Array], value_result: Optional[pa.Array] ) -> Optional[pa.Array]: return ( pa.MapArray.from_arrays( @@ -1202,8 +1193,7 @@ class StatsAggregator: current_max: Any trunc_length: Optional[int] - def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, - trunc_length: Optional[int] = None) -> None: + def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None) -> None: self.current_min = None self.current_max = None self.trunc_length = trunc_length @@ -1316,30 +1306,27 @@ def __init__(self, schema: Schema, properties: Dict[str, str]): self._properties = properties self._default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY) - def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsCollector]]) -> List[ - StatisticsCollector]: + def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]: return struct_result() def struct( - self, struct: StructType, field_results: List[Callable[[], List[StatisticsCollector]]] + self, struct: StructType, field_results: List[Callable[[], List[StatisticsCollector]]] ) -> List[StatisticsCollector]: return list(chain(*[result() for result in field_results])) - def field(self, field: NestedField, field_result: Callable[[], List[StatisticsCollector]]) -> List[ - StatisticsCollector]: + def field(self, field: NestedField, field_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]: self._field_id = field.field_id return field_result() - def list(self, list_type: ListType, element_result: Callable[[], List[StatisticsCollector]]) -> List[ - StatisticsCollector]: + def list(self, list_type: ListType, element_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]: self._field_id = list_type.element_id return element_result() def map( - self, - map_type: MapType, - key_result: Callable[[], List[StatisticsCollector]], - value_result: Callable[[], List[StatisticsCollector]], + self, + map_type: MapType, + key_result: Callable[[], List[StatisticsCollector]], + value_result: Callable[[], List[StatisticsCollector]], ) -> List[StatisticsCollector]: self._field_id = map_type.key_id k = key_result() @@ -1362,8 +1349,8 @@ def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]: metrics_mode = match_metrics_mode(col_mode) if ( - not (isinstance(primitive, StringType) or isinstance(primitive, BinaryType)) - and metrics_mode.type == MetricModeTypes.TRUNCATE + not (isinstance(primitive, StringType) or isinstance(primitive, BinaryType)) + and metrics_mode.type == MetricModeTypes.TRUNCATE ): metrics_mode = MetricsMode(MetricModeTypes.FULL) @@ -1372,13 +1359,12 @@ def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]: if is_nested and metrics_mode.type in [MetricModeTypes.TRUNCATE, MetricModeTypes.FULL]: metrics_mode = MetricsMode(MetricModeTypes.COUNTS) - return [StatisticsCollector(field_id=self._field_id, iceberg_type=primitive, mode=metrics_mode, - column_name=column_name)] + return [StatisticsCollector(field_id=self._field_id, iceberg_type=primitive, mode=metrics_mode, column_name=column_name)] def compute_statistics_plan( - schema: Schema, - table_properties: Dict[str, str], + schema: Schema, + table_properties: Dict[str, str], ) -> Dict[int, StatisticsCollector]: """ Compute the statistics plan for all columns. @@ -1417,8 +1403,7 @@ def __init__(self) -> None: def schema(self, schema: Schema, struct_result: Callable[[], List[ID2ParquetPath]]) -> List[ID2ParquetPath]: return struct_result() - def struct(self, struct: StructType, field_results: List[Callable[[], List[ID2ParquetPath]]]) -> List[ - ID2ParquetPath]: + def struct(self, struct: StructType, field_results: List[Callable[[], List[ID2ParquetPath]]]) -> List[ID2ParquetPath]: return list(chain(*[result() for result in field_results])) def field(self, field: NestedField, field_result: Callable[[], List[ID2ParquetPath]]) -> List[ID2ParquetPath]: @@ -1436,10 +1421,10 @@ def list(self, list_type: ListType, element_result: Callable[[], List[ID2Parquet return result def map( - self, - map_type: MapType, - key_result: Callable[[], List[ID2ParquetPath]], - value_result: Callable[[], List[ID2ParquetPath]], + self, + map_type: MapType, + key_result: Callable[[], List[ID2ParquetPath]], + value_result: Callable[[], List[ID2ParquetPath]], ) -> List[ID2ParquetPath]: self._field_id = map_type.key_id self._path.append("key_value.key") @@ -1456,7 +1441,7 @@ def primitive(self, primitive: PrimitiveType) -> List[ID2ParquetPath]: def parquet_path_to_id_mapping( - schema: Schema, + schema: Schema, ) -> Dict[str, int]: """ Compute the mapping of parquet column path to Iceberg ID. @@ -1475,11 +1460,11 @@ def parquet_path_to_id_mapping( def fill_parquet_file_metadata( - df: DataFile, - parquet_metadata: pq.FileMetaData, - file_size: int, - stats_columns: Dict[int, StatisticsCollector], - parquet_column_mapping: Dict[str, int], + df: DataFile, + parquet_metadata: pq.FileMetaData, + file_size: int, + stats_columns: Dict[int, StatisticsCollector], + parquet_column_mapping: Dict[str, int], ) -> None: """ Compute and fill the following fields of the DataFile object. From 775091531ec85b8543e246e0eb295652a1f26a37 Mon Sep 17 00:00:00 2001 From: Marquis Chamberlain Date: Fri, 27 Oct 2023 21:27:26 -0400 Subject: [PATCH 3/5] reset pyarrow.py and add specific test --- pyiceberg/io/pyarrow.py | 24 +----------------------- tests/io/test_pyarrow.py | 1 + 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 34c7308a4d..f3ef5ca5a0 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -798,28 +798,6 @@ def primitive(self, primitive: pa.DataType) -> IcebergType: raise TypeError(f"Unsupported type: {primitive}") -# ToDo get guidance on where this should be and if we can find an exhaustive list of magic -parquet_magic_columns = { - """ - Apache Iceberg -> Parquet converts column names like - "foo:bar" to "foo_x3A" within the parquet file itself - """ - ":": "_x3A" -} - -# ToDo get guidance on where this should be, and how we want to flag it -def _hack_names(column_name_list: list[str], enabled: bool): - if enabled: - o = [] - # ToDo fix time and space complexity - for key in parquet_magic_columns.keys(): - for column_name in column_name_list: - if key in column_name: - o.append(column_name.replace(key, parquet_magic_columns[key])) - else: - o.append(column_name) - return o - return column_name_list def _task_to_table( fs: FileSystem, @@ -864,7 +842,7 @@ def _task_to_table( # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first filter=pyarrow_filter if not positional_deletes else None, - columns=_hack_names([col.name for col in file_project_schema.columns], True), + columns=[col.name for col in file_project_schema.columns], ) if positional_deletes: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 7174c91425..51b496ce1a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1547,3 +1547,4 @@ def check_results(location: str, expected_schema: str, expected_netloc: str, exp def test_make_compatible_name() -> None: assert make_compatible_name("label/abc") == "label_x2Fabc" assert make_compatible_name("label?abc") == "label_x3Fabc" + assert make_compatible_name("label:abc") == "label_x3Fabc" From 67e5a036864d15aab1d6e517c0aa6e7b290d178d Mon Sep 17 00:00:00 2001 From: Marquis Chamberlain Date: Fri, 27 Oct 2023 21:54:48 -0400 Subject: [PATCH 4/5] adding static condition and defaulting logic --- pyiceberg/schema.py | 7 ++++++- tests/io/test_pyarrow.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index b61e4678b9..a699d9dd52 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -1310,7 +1310,12 @@ def _sanitize_name(name: str) -> str: def _sanitize_char(character: str) -> str: - return "_" + character if character.isdigit() else "_x" + hex(ord(character))[2:].upper() + special_chars = {":": "_x3"} + def _static_or_magic_suffix(c : str, d : dict) -> str: + if c in d: + return d[c] + return "_x" + hex(ord(character))[2:].upper() + return "_" + character if character.isdigit() else _static_or_magic_suffix(character, special_chars) def sanitize_column_names(schema: Schema) -> Schema: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 51b496ce1a..b04ba9e7ca 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1547,4 +1547,4 @@ def check_results(location: str, expected_schema: str, expected_netloc: str, exp def test_make_compatible_name() -> None: assert make_compatible_name("label/abc") == "label_x2Fabc" assert make_compatible_name("label?abc") == "label_x3Fabc" - assert make_compatible_name("label:abc") == "label_x3Fabc" + assert make_compatible_name("label:foo") == "label_x3foo" From 84fa0753ffdb83ba9d380e56d474113a21272b1d Mon Sep 17 00:00:00 2001 From: Marquis Chamberlain Date: Sat, 28 Oct 2023 03:58:33 -0400 Subject: [PATCH 5/5] complete re-vert, please test --- pyiceberg/schema.py | 7 +------ tests/io/test_pyarrow.py | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index a699d9dd52..b61e4678b9 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -1310,12 +1310,7 @@ def _sanitize_name(name: str) -> str: def _sanitize_char(character: str) -> str: - special_chars = {":": "_x3"} - def _static_or_magic_suffix(c : str, d : dict) -> str: - if c in d: - return d[c] - return "_x" + hex(ord(character))[2:].upper() - return "_" + character if character.isdigit() else _static_or_magic_suffix(character, special_chars) + return "_" + character if character.isdigit() else "_x" + hex(ord(character))[2:].upper() def sanitize_column_names(schema: Schema) -> Schema: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index b04ba9e7ca..486e6685a8 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1547,4 +1547,4 @@ def check_results(location: str, expected_schema: str, expected_netloc: str, exp def test_make_compatible_name() -> None: assert make_compatible_name("label/abc") == "label_x2Fabc" assert make_compatible_name("label?abc") == "label_x3Fabc" - assert make_compatible_name("label:foo") == "label_x3foo" + assert make_compatible_name("label:foo") == "label_x3Afoo"