From bc17eacea5febefd064c2395daa0fa6946b2f719 Mon Sep 17 00:00:00 2001 From: HonahX Date: Fri, 5 Apr 2024 21:37:05 -0700 Subject: [PATCH 1/5] add TimestamptzType to storageDescriptor construction --- pyiceberg/catalog/glue.py | 2 +- pyiceberg/catalog/hive.py | 18 +-------- tests/catalog/test_hive.py | 79 +++++++++++++++++++++++++++++++++----- tests/conftest.py | 54 ++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 26 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index a32a79a905..6f93fab91d 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -150,7 +150,7 @@ def primitive(self, primitive: PrimitiveType) -> str: if isinstance(primitive, DecimalType): return f"decimal({primitive.precision},{primitive.scale})" if (primitive_type := type(primitive)) not in GLUE_PRIMITIVE_TYPES: - return str(primitive_type.root) + return str(primitive) return GLUE_PRIMITIVE_TYPES[primitive_type] diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index b383063e3d..fb1f94f323 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -95,6 +95,7 @@ StringType, StructType, TimestampType, + TimestamptzType, TimeType, UUIDType, ) @@ -103,22 +104,6 @@ import pyarrow as pa -# Replace by visitor -hive_types = { - BooleanType: "boolean", - IntegerType: "int", - LongType: "bigint", - FloatType: "float", - DoubleType: "double", - DateType: "date", - TimeType: "string", - TimestampType: "timestamp", - StringType: "string", - UUIDType: "string", - BinaryType: "binary", - FixedType: "binary", -} - COMMENT = "comment" OWNER = "owner" @@ -195,6 +180,7 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD DateType: "date", TimeType: "string", TimestampType: "timestamp", + TimestamptzType: "timestamp", StringType: "string", UUIDType: "string", BinaryType: "binary", diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index dc2689e0d8..6a5e886f74 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -61,11 +61,24 @@ from pyiceberg.transforms import BucketTransform, IdentityTransform from pyiceberg.typedef import UTF8 from pyiceberg.types import ( + BinaryType, BooleanType, + DateType, + DecimalType, + DoubleType, + FixedType, + FloatType, IntegerType, + ListType, LongType, + MapType, NestedField, StringType, + StructType, + TimestampType, + TimestamptzType, + TimeType, + UUIDType, ) HIVE_CATALOG_NAME = "hive" @@ -182,14 +195,14 @@ def test_check_number_of_namespaces(table_schema_simple: Schema) -> None: @patch("time.time", MagicMock(return_value=12345)) -def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase, hive_table: HiveTable) -> None: +def test_create_table(table_schema_with_all_types: Schema, hive_database: HiveDatabase, hive_table: HiveTable) -> None: catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) catalog._client = MagicMock() catalog._client.__enter__().create_table.return_value = None catalog._client.__enter__().get_table.return_value = hive_table catalog._client.__enter__().get_database.return_value = hive_database - catalog.create_table(("default", "table"), schema=table_schema_simple, properties={"owner": "javaberg"}) + catalog.create_table(("default", "table"), schema=table_schema_with_all_types, properties={"owner": "javaberg"}) called_hive_table: HiveTable = catalog._client.__enter__().create_table.call_args[0][0] # This one is generated within the function itself, so we need to extract @@ -207,9 +220,23 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase, retention=None, sd=StorageDescriptor( cols=[ - FieldSchema(name="foo", type="string", comment=None), - FieldSchema(name="bar", type="int", comment=None), - FieldSchema(name="baz", type="boolean", comment=None), + FieldSchema(name='boolean', type='boolean', comment=None), + FieldSchema(name='integer', type='int', comment=None), + FieldSchema(name='long', type='bigint', comment=None), + FieldSchema(name='float', type='float', comment=None), + FieldSchema(name='double', type='double', comment=None), + FieldSchema(name='decimal', type='decimal(32,3)', comment=None), + FieldSchema(name='date', type='date', comment=None), + FieldSchema(name='time', type='string', comment=None), + FieldSchema(name='timestamp', type='timestamp', comment=None), + FieldSchema(name='timestamptz', type='timestamp', comment=None), + FieldSchema(name='string', type='string', comment=None), + FieldSchema(name='uuid', type='string', comment=None), + FieldSchema(name='fixed', type='binary', comment=None), + FieldSchema(name='binary', type='binary', comment=None), + FieldSchema(name='list', type='array', comment=None), + FieldSchema(name='map', type='map', comment=None), + FieldSchema(name='struct', type='struct', comment=None), ], location=f"{hive_database.locationUri}/table", inputFormat="org.apache.hadoop.mapred.FileInputFormat", @@ -266,12 +293,46 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase, location=metadata.location, table_uuid=metadata.table_uuid, last_updated_ms=metadata.last_updated_ms, - last_column_id=3, + last_column_id=22, schemas=[ Schema( - NestedField(field_id=1, name="foo", field_type=StringType(), required=False), - NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + NestedField(field_id=1, name='boolean', field_type=BooleanType(), required=True), + NestedField(field_id=2, name='integer', field_type=IntegerType(), required=True), + NestedField(field_id=3, name='long', field_type=LongType(), required=True), + NestedField(field_id=4, name='float', field_type=FloatType(), required=True), + NestedField(field_id=5, name='double', field_type=DoubleType(), required=True), + NestedField(field_id=6, name='decimal', field_type=DecimalType(precision=32, scale=3), required=True), + NestedField(field_id=7, name='date', field_type=DateType(), required=True), + NestedField(field_id=8, name='time', field_type=TimeType(), required=True), + NestedField(field_id=9, name='timestamp', field_type=TimestampType(), required=True), + NestedField(field_id=10, name='timestamptz', field_type=TimestamptzType(), required=True), + NestedField(field_id=11, name='string', field_type=StringType(), required=True), + NestedField(field_id=12, name='uuid', field_type=UUIDType(), required=True), + NestedField(field_id=13, name='fixed', field_type=FixedType(length=12), required=True), + NestedField(field_id=14, name='binary', field_type=BinaryType(), required=True), + NestedField( + field_id=15, + name='list', + field_type=ListType(type='list', element_id=18, element_type=StringType(), element_required=True), + required=True, + ), + NestedField( + field_id=16, + name='map', + field_type=MapType( + type='map', key_id=19, key_type=StringType(), value_id=20, value_type=IntegerType(), value_required=True + ), + required=True, + ), + NestedField( + field_id=17, + name='struct', + field_type=StructType( + NestedField(field_id=21, name='inner_string', field_type=StringType(), required=False), + NestedField(field_id=22, name='inner_int', field_type=IntegerType(), required=True), + ), + required=True, + ), schema_id=0, identifier_field_ids=[2], ) diff --git a/tests/conftest.py b/tests/conftest.py index 62444b457a..862f0bb021 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -72,7 +72,9 @@ BinaryType, BooleanType, DateType, + DecimalType, DoubleType, + FixedType, FloatType, IntegerType, ListType, @@ -81,6 +83,10 @@ NestedField, StringType, StructType, + TimestampType, + TimestamptzType, + TimeType, + UUIDType, ) from pyiceberg.utils.datetime import datetime_to_millis @@ -267,6 +273,54 @@ def table_schema_nested_with_struct_key_map() -> Schema: ) +@pytest.fixture(scope="session") +def table_schema_with_all_types() -> Schema: + return schema.Schema( + NestedField(field_id=1, name="boolean", field_type=BooleanType(), required=True), + NestedField(field_id=2, name="integer", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="long", field_type=LongType(), required=True), + NestedField(field_id=4, name="float", field_type=FloatType(), required=True), + NestedField(field_id=5, name="double", field_type=DoubleType(), required=True), + NestedField(field_id=6, name="decimal", field_type=DecimalType(32, 3), required=True), + NestedField(field_id=7, name="date", field_type=DateType(), required=True), + NestedField(field_id=8, name="time", field_type=TimeType(), required=True), + NestedField(field_id=9, name="timestamp", field_type=TimestampType(), required=True), + NestedField(field_id=10, name="timestamptz", field_type=TimestamptzType(), required=True), + NestedField(field_id=11, name="string", field_type=StringType(), required=True), + NestedField(field_id=12, name="uuid", field_type=UUIDType(), required=True), + NestedField(field_id=14, name="fixed", field_type=FixedType(12), required=True), + NestedField(field_id=13, name="binary", field_type=BinaryType(), required=True), + NestedField( + field_id=15, + name="list", + field_type=ListType(element_id=16, element_type=StringType(), element_required=True), + required=True, + ), + NestedField( + field_id=17, + name="map", + field_type=MapType( + key_id=18, + key_type=StringType(), + value_id=19, + value_type=IntegerType(), + value_required=True, + ), + required=True, + ), + NestedField( + field_id=20, + name="struct", + field_type=StructType( + NestedField(field_id=21, name="inner_string", field_type=StringType(), required=False), + NestedField(field_id=22, name="inner_int", field_type=IntegerType(), required=True), + ), + ), + schema_id=1, + identifier_field_ids=[2], + ) + + @pytest.fixture(scope="session") def pyarrow_schema_simple_without_ids() -> "pa.Schema": import pyarrow as pa From 15df46099c9eab54d6f296a977e2db13f4cb2312 Mon Sep 17 00:00:00 2001 From: Honah J Date: Mon, 8 Apr 2024 00:11:34 -0700 Subject: [PATCH 2/5] [Bug Fix] Allow HiveCatalog to create table with TimestamptzType (#585) --- mkdocs/docs/configuration.md | 9 +++++++++ pyiceberg/catalog/glue.py | 4 ++-- pyiceberg/catalog/hive.py | 29 ++++++++++++++++++++++++----- pyiceberg/table/__init__.py | 6 ++++++ tests/catalog/test_hive.py | 13 +++++++++++-- tests/integration/test_writes.py | 20 ++++++++++++++++++++ 6 files changed, 72 insertions(+), 9 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 3684082a2e..a761e005b0 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -215,6 +215,15 @@ catalog: s3.secret-access-key: password ``` +When using Hive 2.x, make sure to set the compatibility flag: + +```yaml +catalog: + default: +... + hive.hive2-compatible: true +``` + ## Glue Catalog Your AWS credentials can be passed directly through the Python API. diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 6f93fab91d..97393881d1 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -62,7 +62,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata, PropertyUtil from pyiceberg.table.metadata import TableMetadata, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -313,7 +313,7 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T self.glue.update_table( DatabaseName=database_name, TableInput=table_input, - SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), + SkipArchive=PropertyUtil.property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), VersionId=version_id, ) except self.glue.exceptions.EntityNotFoundException as e: diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index fb1f94f323..d97ddd804d 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -74,7 +74,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -107,6 +107,10 @@ COMMENT = "comment" OWNER = "owner" +# If set to true, HiveCatalog will operate in Hive2 compatibility mode +HIVE2_COMPATIBLE = "hive.hive2-compatible" +HIVE2_COMPATIBLE_DEFAULT = False + class _HiveClient: """Helper class to nicely open and close the transport.""" @@ -132,10 +136,15 @@ def __exit__( self._transport.close() -def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str]) -> StorageDescriptor: +def _construct_hive_storage_descriptor( + schema: Schema, location: Optional[str], hive2_compatible: bool = False +) -> StorageDescriptor: ser_de_info = SerDeInfo(serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") return StorageDescriptor( - [FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter()), field.doc) for field in schema.fields], + [ + FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter(hive2_compatible)), field.doc) + for field in schema.fields + ], location, "org.apache.hadoop.mapred.FileInputFormat", "org.apache.hadoop.mapred.FileOutputFormat", @@ -180,7 +189,7 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD DateType: "date", TimeType: "string", TimestampType: "timestamp", - TimestamptzType: "timestamp", + TimestamptzType: "timestamp with local time zone", StringType: "string", UUIDType: "string", BinaryType: "binary", @@ -189,6 +198,11 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD class SchemaToHiveConverter(SchemaVisitor[str]): + hive2_compatible: bool + + def __init__(self, hive2_compatible: bool): + self.hive2_compatible = hive2_compatible + def schema(self, schema: Schema, struct_result: str) -> str: return struct_result @@ -208,6 +222,9 @@ def map(self, map_type: MapType, key_result: str, value_result: str) -> str: def primitive(self, primitive: PrimitiveType) -> str: if isinstance(primitive, DecimalType): return f"decimal({primitive.precision},{primitive.scale})" + elif self.hive2_compatible and isinstance(primitive, TimestamptzType): + # Hive2 doesn't support timestamp with local time zone + return "timestamp" else: return HIVE_PRIMITIVE_TYPES[type(primitive)] @@ -296,7 +313,9 @@ def create_table( owner=properties[OWNER] if properties and OWNER in properties else getpass.getuser(), createTime=current_time_millis // 1000, lastAccessTime=current_time_millis // 1000, - sd=_construct_hive_storage_descriptor(schema, location), + sd=_construct_hive_storage_descriptor( + schema, location, PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT) + ), tableType=EXTERNAL_TABLE, parameters=_construct_parameters(metadata_location), ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0f4e334d8d..c2d5409b4c 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -223,6 +223,12 @@ def property_as_int(properties: Dict[str, str], property_name: str, default: Opt else: return default + @staticmethod + def property_as_bool(properties: Dict[str, str], property_name: str, default: bool) -> bool: + if value := properties.get(property_name): + return value.lower() == "true" + return default + class Transaction: _table: Table diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 6a5e886f74..396b762cbc 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -194,9 +194,14 @@ def test_check_number_of_namespaces(table_schema_simple: Schema) -> None: catalog.create_table("table", schema=table_schema_simple) +@pytest.mark.parametrize("hive2_compatible", [True, False]) @patch("time.time", MagicMock(return_value=12345)) -def test_create_table(table_schema_with_all_types: Schema, hive_database: HiveDatabase, hive_table: HiveTable) -> None: +def test_create_table( + table_schema_with_all_types: Schema, hive_database: HiveDatabase, hive_table: HiveTable, hive2_compatible: bool +) -> None: catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + if hive2_compatible: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"}) catalog._client = MagicMock() catalog._client.__enter__().create_table.return_value = None @@ -229,7 +234,11 @@ def test_create_table(table_schema_with_all_types: Schema, hive_database: HiveDa FieldSchema(name='date', type='date', comment=None), FieldSchema(name='time', type='string', comment=None), FieldSchema(name='timestamp', type='timestamp', comment=None), - FieldSchema(name='timestamptz', type='timestamp', comment=None), + FieldSchema( + name='timestamptz', + type='timestamp' if hive2_compatible else 'timestamp with local time zone', + comment=None, + ), FieldSchema(name='string', type='string', comment=None), FieldSchema(name='uuid', type='string', comment=None), FieldSchema(name='fixed', type='binary', comment=None), diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 887a519437..11e1553bd6 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -32,6 +32,8 @@ from pytest_mock.plugin import MockerFixture from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.catalog import Catalog +from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError from pyiceberg.schema import Schema @@ -635,3 +637,21 @@ def test_duckdb_url_import(warehouse: Path, arrow_table_with_null: pa.Table) -> b'\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11', ), ] + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_hive_catalog_storage_descriptor( + session_catalog_hive: HiveCatalog, + pa_schema: pa.Schema, + arrow_table_with_null: pa.Table, + spark: SparkSession, + format_version: int, +) -> None: + tbl = _create_table( + session_catalog_hive, "default.test_storage_descriptor", {"format-version": format_version}, [arrow_table_with_null] + ) + + # check if pyiceberg can read the table + assert len(tbl.scan().to_arrow()) == 3 + # check if spark can read the table + assert spark.sql("SELECT * FROM hive.default.test_storage_descriptor").count() == 3 From e9183ec4ed863d97eae6abd7b60c9233b0e14d67 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sat, 13 Apr 2024 15:07:46 -0700 Subject: [PATCH 3/5] fix lint issue --- pyiceberg/catalog/glue.py | 2 +- tests/integration/test_writes.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 97393881d1..291c0667e2 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -62,7 +62,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata, PropertyUtil +from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, update_table_metadata from pyiceberg.table.metadata import TableMetadata, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 11e1553bd6..b72612cad9 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -32,7 +32,6 @@ from pytest_mock.plugin import MockerFixture from pyiceberg.catalog import Catalog, load_catalog -from pyiceberg.catalog import Catalog from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError @@ -638,6 +637,7 @@ def test_duckdb_url_import(warehouse: Path, arrow_table_with_null: pa.Table) -> ), ] + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_hive_catalog_storage_descriptor( @@ -648,7 +648,7 @@ def test_hive_catalog_storage_descriptor( format_version: int, ) -> None: tbl = _create_table( - session_catalog_hive, "default.test_storage_descriptor", {"format-version": format_version}, [arrow_table_with_null] + session_catalog_hive, "default.test_storage_descriptor", {"format-version": str(format_version)}, [arrow_table_with_null] ) # check if pyiceberg can read the table From 3361bc94635c31cc97180d50eef0ac34f6f5c993 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sat, 13 Apr 2024 15:37:12 -0700 Subject: [PATCH 4/5] fix integration tests --- tests/integration/test_writes.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index b72612cad9..a19d56148f 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -131,6 +131,20 @@ def session_catalog() -> Catalog: ) +@pytest.fixture(scope="session") +def session_catalog_hive() -> Catalog: + return load_catalog( + "local", + **{ + "type": "hive", + "uri": "http://localhost:9083", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + @pytest.fixture(scope="session") def pa_schema() -> pa.Schema: return pa.schema([ @@ -282,6 +296,13 @@ def spark() -> SparkSession: .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") .config("spark.sql.catalog.integration.s3.path-style-access", "true") .config("spark.sql.defaultCatalog", "integration") + .config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.hive.type", "hive") + .config("spark.sql.catalog.hive.uri", "http://localhost:9083") + .config("spark.sql.catalog.hive.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/") + .config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000") + .config("spark.sql.catalog.hive.s3.path-style-access", "true") .getOrCreate() ) From f19a1364fed378084c08ee1a230c23cf8df1a216 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sat, 13 Apr 2024 16:06:08 -0700 Subject: [PATCH 5/5] remove the new public method --- pyiceberg/catalog/__init__.py | 6 ++++++ pyiceberg/catalog/glue.py | 5 +++-- pyiceberg/catalog/hive.py | 5 +++-- pyiceberg/table/__init__.py | 6 ------ 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index e14b4c94e9..d12f8be478 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -257,6 +257,12 @@ def delete_data_files(io: FileIO, manifests_to_delete: List[ManifestFile]) -> No deleted_files[path] = True +def _property_as_bool(properties: Dict[str, str], property_name: str, default: bool) -> bool: + if value := properties.get(property_name): + return value.lower() == "true" + return default + + @dataclass class PropertiesUpdateSummary: removed: List[str] diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 291c0667e2..2d30333b42 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -47,6 +47,7 @@ TABLE_TYPE, Catalog, PropertiesUpdateSummary, + _property_as_bool, ) from pyiceberg.exceptions import ( CommitFailedException, @@ -62,7 +63,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata from pyiceberg.table.metadata import TableMetadata, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -313,7 +314,7 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T self.glue.update_table( DatabaseName=database_name, TableInput=table_input, - SkipArchive=PropertyUtil.property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), + SkipArchive=_property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), VersionId=version_id, ) except self.glue.exceptions.EntityNotFoundException as e: diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index d97ddd804d..f543626424 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -60,6 +60,7 @@ TABLE_TYPE, Catalog, PropertiesUpdateSummary, + _property_as_bool, ) from pyiceberg.exceptions import ( CommitFailedException, @@ -74,7 +75,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -314,7 +315,7 @@ def create_table( createTime=current_time_millis // 1000, lastAccessTime=current_time_millis // 1000, sd=_construct_hive_storage_descriptor( - schema, location, PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT) + schema, location, _property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT) ), tableType=EXTERNAL_TABLE, parameters=_construct_parameters(metadata_location), diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c2d5409b4c..0f4e334d8d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -223,12 +223,6 @@ def property_as_int(properties: Dict[str, str], property_name: str, default: Opt else: return default - @staticmethod - def property_as_bool(properties: Dict[str, str], property_name: str, default: bool) -> bool: - if value := properties.get(property_name): - return value.lower() == "true" - return default - class Transaction: _table: Table