Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class HiveCatalog(MetastoreCatalog):

def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = _HiveClient(properties["uri"], properties.get("ugi"))
self._client = self._create_hive_client(properties)

self._lock_check_min_wait_time = PropertyUtil.property_as_float(
properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME
Expand All @@ -271,6 +271,19 @@ def __init__(self, name: str, **properties: str):
DEFAULT_LOCK_CHECK_RETRIES,
)

@staticmethod
def _create_hive_client(properties: Dict[str, str]) -> _HiveClient:
last_exception = None
for uri in properties["uri"].split(","):
try:
return _HiveClient(uri, properties.get("ugi"))
except BaseException as e:
last_exception = e
if last_exception is not None:
raise last_exception
else:
raise ValueError(f"Unable to connect to hive using uri: {properties['uri']}")

def _convert_hive_into_iceberg(self, table: HiveTable) -> Table:
properties: Dict[str, str] = table.parameters
if TABLE_TYPE not in properties:
Expand Down
30 changes: 30 additions & 0 deletions tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,3 +1195,33 @@ def test_hive_wait_for_lock() -> None:
with pytest.raises(WaitingForLockException):
catalog._wait_for_lock("db", "tbl", lockid, catalog._client)
assert catalog._client.check_lock.call_count == 5


def test_create_hive_client_success() -> None:
properties = {"uri": "thrift://localhost:10000", "ugi": "user"}

with patch("pyiceberg.catalog.hive._HiveClient", return_value=MagicMock()) as mock_hive_client:
client = HiveCatalog._create_hive_client(properties)
mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user")
assert client is not None


def test_create_hive_client_multiple_uris() -> None:
properties = {"uri": "thrift://localhost:10000,thrift://localhost:10001", "ugi": "user"}

with patch("pyiceberg.catalog.hive._HiveClient") as mock_hive_client:
mock_hive_client.side_effect = [Exception("Connection failed"), MagicMock()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean the first connection will fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comments. Yes this means the first connections will fail.

Scenario 2 uris; both passes will never be a valid case, if the first uri passes the second is never tried. So the only additional test would be to add one covering (optional) 2 uris; 1 pass, second one not tried

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! i think that should cover all the cases


client = HiveCatalog._create_hive_client(properties)
assert mock_hive_client.call_count == 2
mock_hive_client.assert_has_calls([call("thrift://localhost:10000", "user"), call("thrift://localhost:10001", "user")])
assert client is not None


def test_create_hive_client_failure() -> None:
properties = {"uri": "thrift://localhost:10000,thrift://localhost:10001", "ugi": "user"}

with patch("pyiceberg.catalog.hive._HiveClient", side_effect=Exception("Connection failed")) as mock_hive_client:
with pytest.raises(Exception, match="Connection failed"):
HiveCatalog._create_hive_client(properties)
assert mock_hive_client.call_count == 2