From a473e1b0c239b1d846c4c0d6a3a1f241d36a4b5d Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Mon, 20 May 2024 14:29:10 +0100 Subject: [PATCH 01/10] Support HA and kerberos --- pyiceberg/catalog/hive.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 708ae8c9d4..52537cd04c 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -139,11 +139,15 @@ class _HiveClient: _client: Client _ugi: Optional[List[str]] - def __init__(self, uri: str, ugi: Optional[str] = None): + def __init__(self, uri: str, ugi: Optional[str] = None, auth_mechanism: Optional[str] = None): url_parts = urlparse(uri) transport = TSocket.TSocket(url_parts.hostname, url_parts.port) - self._transport = TTransport.TBufferedTransport(transport) - protocol = TBinaryProtocol.TBinaryProtocol(transport) + if auth_mechanism == "GSSAPI": + self._transport = TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service='hive', mechanism='GSSAPI') + protocol = TBinaryProtocol.TBinaryProtocol(self._transport) + else: + self._transport = TTransport.TBufferedTransport(transport) + protocol = TBinaryProtocol.TBinaryProtocol(transport) self._client = Client(protocol) self._ugi = ugi.split(':') if ugi else None @@ -258,7 +262,7 @@ class HiveCatalog(MetastoreCatalog): def __init__(self, name: str, **properties: str): super().__init__(name, **properties) - self._client = _HiveClient(properties["uri"], properties.get("ugi")) + self._client = self._create_hive_client(properties) self._lock_check_min_wait_time = PropertyUtil.property_as_float( properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME @@ -298,6 +302,18 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table: catalog=self, ) + @staticmethod + def _create_hive_client(**properties: str) -> _HiveClient: + uris = properties["uri"].split(",") + for idx, uri in enumerate(uris): + try: + return _HiveClient(uri, properties.get("ugi"), properties.get("auth_mechanism")) + except Exception as e: + if idx + 1 == len(uris): + raise e + else: + continue + def create_table( self, identifier: Union[str, Identifier], From 371617a90a15fd67c8c0bf8e90c34467c36f11fd Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Mon, 20 May 2024 14:35:45 +0100 Subject: [PATCH 02/10] reformat --- pyiceberg/catalog/hive.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 52537cd04c..279c03a5c2 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -276,6 +276,18 @@ def __init__(self, name: str, **properties: str): DEFAULT_LOCK_CHECK_RETRIES, ) + @staticmethod + def _create_hive_client(properties: Dict[str, str]) -> _HiveClient: + uris = properties["uri"].split(",") + for idx, uri in enumerate(uris): + try: + return _HiveClient(uri, properties.get("ugi"), properties.get("auth_mechanism")) + except Exception as e: + if idx + 1 == len(uris): + raise e + else: + continue + def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table: properties: Dict[str, str] = table.parameters if TABLE_TYPE not in properties: @@ -302,18 +314,6 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table: catalog=self, ) - @staticmethod - def _create_hive_client(**properties: str) -> _HiveClient: - uris = properties["uri"].split(",") - for idx, uri in enumerate(uris): - try: - return _HiveClient(uri, properties.get("ugi"), properties.get("auth_mechanism")) - except Exception as e: - if idx + 1 == len(uris): - raise e - else: - continue - def create_table( self, identifier: Union[str, Identifier], From a1d73b5c84fbe8f5b787213f99a231d2d976fc4c Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Thu, 27 Jun 2024 08:25:24 +0100 Subject: [PATCH 03/10] Remove kerberos auth --- pyiceberg/catalog/hive.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 279c03a5c2..b926210496 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -139,15 +139,11 @@ class _HiveClient: _client: Client _ugi: Optional[List[str]] - def __init__(self, uri: str, ugi: Optional[str] = None, auth_mechanism: Optional[str] = None): + def __init__(self, uri: str, ugi: Optional[str] = None): url_parts = urlparse(uri) transport = TSocket.TSocket(url_parts.hostname, url_parts.port) - if auth_mechanism == "GSSAPI": - self._transport = TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service='hive', mechanism='GSSAPI') - protocol = TBinaryProtocol.TBinaryProtocol(self._transport) - else: - self._transport = TTransport.TBufferedTransport(transport) - protocol = TBinaryProtocol.TBinaryProtocol(transport) + self._transport = TTransport.TBufferedTransport(transport) + protocol = TBinaryProtocol.TBinaryProtocol(transport) self._client = Client(protocol) self._ugi = ugi.split(':') if ugi else None @@ -281,7 +277,7 @@ def _create_hive_client(properties: Dict[str, str]) -> _HiveClient: uris = properties["uri"].split(",") for idx, uri in enumerate(uris): try: - return _HiveClient(uri, properties.get("ugi"), properties.get("auth_mechanism")) + return _HiveClient(uri, properties.get("ugi")) except Exception as e: if idx + 1 == len(uris): raise e From 906a8e28118149b1f63e8bb6a75a7b08390ee81d Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Thu, 27 Jun 2024 09:31:14 +0100 Subject: [PATCH 04/10] Capture all exceptions --- pyiceberg/catalog/hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index b926210496..6c8f0e1f9f 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -278,7 +278,7 @@ def _create_hive_client(properties: Dict[str, str]) -> _HiveClient: for idx, uri in enumerate(uris): try: return _HiveClient(uri, properties.get("ugi")) - except Exception as e: + except BaseException as e: if idx + 1 == len(uris): raise e else: From 53fa3fe47bb03f3187bfce7ac7082bc7a6ad7730 Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Thu, 4 Jul 2024 14:54:14 +0100 Subject: [PATCH 05/10] Make more pythonic --- pyiceberg/catalog/hive.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 445da69291..d9efe1e08b 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -273,15 +273,16 @@ def __init__(self, name: str, **properties: str): @staticmethod def _create_hive_client(properties: Dict[str, str]) -> _HiveClient: - uris = properties["uri"].split(",") - for idx, uri in enumerate(uris): + last_exception = None + for uri in properties["uri"].split(","): try: return _HiveClient(uri, properties.get("ugi")) except BaseException as e: - if idx + 1 == len(uris): - raise e - else: - continue + last_exception = e + if last_exception not None: + raise e + else: + raise ValueError(f"Unable to connect to hive using uri: {properties["uri"]}") def _convert_hive_into_iceberg(self, table: HiveTable) -> Table: properties: Dict[str, str] = table.parameters From 86077b9753b8d0c8ad7b308ccea49460445db6bb Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Thu, 4 Jul 2024 16:07:22 +0100 Subject: [PATCH 06/10] Add uts --- tests/catalog/test_hive.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 96e95815be..0ba0f3e536 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -1195,3 +1195,40 @@ def test_hive_wait_for_lock() -> None: with pytest.raises(WaitingForLockException): catalog._wait_for_lock("db", "tbl", lockid, catalog._client) assert catalog._client.check_lock.call_count == 5 + +def test_create_hive_client_success(): + properties = { + "uri": "thrift://localhost:10000", + "ugi": "user" + } + + with patch('pyiceberg.catalog.hive._HiveClient', return_value=MagicMock()) as mock_hive_client: + client = HiveCatalog._create_hive_client(properties) + mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user") + assert client is not None + +def test_create_hive_client_multiple_uris(): + properties = { + "uri": "thrift://localhost:10000,thrift://localhost:10001", + "ugi": "user" + } + + with patch('pyiceberg.catalog.hive._HiveClient') as mock_hive_client: + mock_hive_client.side_effect = [Exception("Connection failed"), MagicMock()] + + client = HiveCatalog._create_hive_client(properties) + assert mock_hive_client.call_count == 2 + mock_hive_client.assert_any_call("thrift://localhost:10000", "user") + mock_hive_client.assert_any_call("thrift://localhost:10001", "user") + assert client is not None + +def test_create_hive_client_failure(): + properties = { + "uri": "thrift://localhost:10000,thrift://localhost:10001", + "ugi": "user" + } + + with patch('pyiceberg.catalog.hive._HiveClient', side_effect=Exception("Connection failed")) as mock_hive_client: + with pytest.raises(Exception, match="Connection failed"): + HiveCatalog._create_hive_client(properties) + assert mock_hive_client.call_count == 2 From 7a64498439afc70a75ec01aad14e2ec74a8341c3 Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Tue, 24 Sep 2024 20:42:34 +0100 Subject: [PATCH 07/10] Update UT to use assert_called_once_with --- tests/catalog/test_hive.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 0ba0f3e536..57285aaf0f 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -1218,8 +1218,8 @@ def test_create_hive_client_multiple_uris(): client = HiveCatalog._create_hive_client(properties) assert mock_hive_client.call_count == 2 - mock_hive_client.assert_any_call("thrift://localhost:10000", "user") - mock_hive_client.assert_any_call("thrift://localhost:10001", "user") + mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user") + mock_hive_client.assert_called_once_with("thrift://localhost:10001", "user") assert client is not None def test_create_hive_client_failure(): From a9d6ab60a62e59f7113e23365b6e21af0b4dddf2 Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Tue, 24 Sep 2024 22:21:05 +0100 Subject: [PATCH 08/10] Fix for linting Co-authored-by: Kevin Liu --- pyiceberg/catalog/hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index d9efe1e08b..ee9cccac09 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -279,7 +279,7 @@ def _create_hive_client(properties: Dict[str, str]) -> _HiveClient: return _HiveClient(uri, properties.get("ugi")) except BaseException as e: last_exception = e - if last_exception not None: + if last_exception is not None: raise e else: raise ValueError(f"Unable to connect to hive using uri: {properties["uri"]}") From 52e96715d0de15125ec82492ba17c3442ff4ae19 Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Tue, 24 Sep 2024 22:47:36 +0100 Subject: [PATCH 09/10] Fix f string --- pyiceberg/catalog/hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index ee9cccac09..5970c3c6b4 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -282,7 +282,7 @@ def _create_hive_client(properties: Dict[str, str]) -> _HiveClient: if last_exception is not None: raise e else: - raise ValueError(f"Unable to connect to hive using uri: {properties["uri"]}") + raise ValueError(f"Unable to connect to hive using uri: {properties['uri']}") def _convert_hive_into_iceberg(self, table: HiveTable) -> Table: properties: Dict[str, str] = table.parameters From afa5d6e9d7495db1e3f74d5f1dfb976643f13783 Mon Sep 17 00:00:00 2001 From: Alfred Davidson Date: Wed, 25 Sep 2024 16:54:36 +0100 Subject: [PATCH 10/10] fix formatting --- pyiceberg/catalog/hive.py | 4 ++-- tests/catalog/test_hive.py | 33 +++++++++++++-------------------- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 5970c3c6b4..5c5760caa1 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -280,10 +280,10 @@ def _create_hive_client(properties: Dict[str, str]) -> _HiveClient: except BaseException as e: last_exception = e if last_exception is not None: - raise e + raise last_exception else: raise ValueError(f"Unable to connect to hive using uri: {properties['uri']}") - + def _convert_hive_into_iceberg(self, table: HiveTable) -> Table: properties: Dict[str, str] = table.parameters if TABLE_TYPE not in properties: diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 57285aaf0f..a51598acf8 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -1196,39 +1196,32 @@ def test_hive_wait_for_lock() -> None: catalog._wait_for_lock("db", "tbl", lockid, catalog._client) assert catalog._client.check_lock.call_count == 5 -def test_create_hive_client_success(): - properties = { - "uri": "thrift://localhost:10000", - "ugi": "user" - } - with patch('pyiceberg.catalog.hive._HiveClient', return_value=MagicMock()) as mock_hive_client: +def test_create_hive_client_success() -> None: + properties = {"uri": "thrift://localhost:10000", "ugi": "user"} + + with patch("pyiceberg.catalog.hive._HiveClient", return_value=MagicMock()) as mock_hive_client: client = HiveCatalog._create_hive_client(properties) mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user") assert client is not None -def test_create_hive_client_multiple_uris(): - properties = { - "uri": "thrift://localhost:10000,thrift://localhost:10001", - "ugi": "user" - } - with patch('pyiceberg.catalog.hive._HiveClient') as mock_hive_client: +def test_create_hive_client_multiple_uris() -> None: + properties = {"uri": "thrift://localhost:10000,thrift://localhost:10001", "ugi": "user"} + + with patch("pyiceberg.catalog.hive._HiveClient") as mock_hive_client: mock_hive_client.side_effect = [Exception("Connection failed"), MagicMock()] client = HiveCatalog._create_hive_client(properties) assert mock_hive_client.call_count == 2 - mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user") - mock_hive_client.assert_called_once_with("thrift://localhost:10001", "user") + mock_hive_client.assert_has_calls([call("thrift://localhost:10000", "user"), call("thrift://localhost:10001", "user")]) assert client is not None -def test_create_hive_client_failure(): - properties = { - "uri": "thrift://localhost:10000,thrift://localhost:10001", - "ugi": "user" - } - with patch('pyiceberg.catalog.hive._HiveClient', side_effect=Exception("Connection failed")) as mock_hive_client: +def test_create_hive_client_failure() -> None: + properties = {"uri": "thrift://localhost:10000,thrift://localhost:10001", "ugi": "user"} + + with patch("pyiceberg.catalog.hive._HiveClient", side_effect=Exception("Connection failed")) as mock_hive_client: with pytest.raises(Exception, match="Connection failed"): HiveCatalog._create_hive_client(properties) assert mock_hive_client.call_count == 2