Skip to content

Commit eec13a6

Browse files
sungwykevinjqliu
andauthored
Refactor PyArrow DataFiles Projection functions (#1043)
* refactoring * refactor more * docstring * #1042 * adopt review feedback * thanks Kevin! Co-authored-by: Kevin Liu <[email protected]> --------- Co-authored-by: Kevin Liu <[email protected]>
1 parent 166e7bb commit eec13a6

File tree

3 files changed

+219
-21
lines changed

3 files changed

+219
-21
lines changed

pyiceberg/io/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import importlib
2929
import logging
30+
import os
3031
import warnings
3132
from abc import ABC, abstractmethod
3233
from io import SEEK_SET
@@ -36,6 +37,7 @@
3637
List,
3738
Optional,
3839
Protocol,
40+
Tuple,
3941
Type,
4042
Union,
4143
runtime_checkable,
@@ -356,3 +358,14 @@ def load_file_io(properties: Properties = EMPTY_DICT, location: Optional[str] =
356358
raise ModuleNotFoundError(
357359
'Could not load a FileIO, please consider installing one: pip3 install "pyiceberg[pyarrow]", for more options refer to the docs.'
358360
) from e
361+
362+
363+
def _parse_location(location: str) -> Tuple[str, str, str]:
364+
"""Return the path without the scheme."""
365+
uri = urlparse(location)
366+
if not uri.scheme:
367+
return "file", uri.netloc, os.path.abspath(location)
368+
elif uri.scheme in ("hdfs", "viewfs"):
369+
return uri.scheme, uri.netloc, uri.path
370+
else:
371+
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

pyiceberg/io/pyarrow.py

Lines changed: 196 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
InputStream,
109109
OutputFile,
110110
OutputStream,
111+
_parse_location,
111112
)
112113
from pyiceberg.manifest import (
113114
DataFile,
@@ -1195,7 +1196,7 @@ def _task_to_record_batches(
11951196
name_mapping: Optional[NameMapping] = None,
11961197
use_large_types: bool = True,
11971198
) -> Iterator[pa.RecordBatch]:
1198-
_, _, path = PyArrowFileIO.parse_location(task.file.file_path)
1199+
_, _, path = _parse_location(task.file.file_path)
11991200
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
12001201
with fs.open_input_file(path) as fin:
12011202
fragment = arrow_format.make_fragment(fin)
@@ -1304,6 +1305,195 @@ def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dic
13041305
return deletes_per_file
13051306

13061307

1308+
def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
1309+
scheme, netloc, _ = _parse_location(file_path)
1310+
if isinstance(io, PyArrowFileIO):
1311+
return io.fs_by_scheme(scheme, netloc)
1312+
else:
1313+
try:
1314+
from pyiceberg.io.fsspec import FsspecFileIO
1315+
1316+
if isinstance(io, FsspecFileIO):
1317+
from pyarrow.fs import PyFileSystem
1318+
1319+
return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
1320+
else:
1321+
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
1322+
except ModuleNotFoundError as e:
1323+
# When FsSpec is not installed
1324+
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e
1325+
1326+
1327+
class ArrowScan:
1328+
_table_metadata: TableMetadata
1329+
_io: FileIO
1330+
_fs: FileSystem
1331+
_projected_schema: Schema
1332+
_bound_row_filter: BooleanExpression
1333+
_case_sensitive: bool
1334+
_limit: Optional[int]
1335+
"""Scan the Iceberg Table and create an Arrow construct.
1336+
1337+
Attributes:
1338+
_table_metadata: Current table metadata of the Iceberg table
1339+
_io: PyIceberg FileIO implementation from which to fetch the io properties
1340+
_fs: PyArrow FileSystem to use to read the files
1341+
_projected_schema: Iceberg Schema to project onto the data files
1342+
_bound_row_filter: Schema bound row expression to filter the data with
1343+
_case_sensitive: Case sensitivity when looking up column names
1344+
_limit: Limit the number of records.
1345+
"""
1346+
1347+
def __init__(
1348+
self,
1349+
table_metadata: TableMetadata,
1350+
io: FileIO,
1351+
projected_schema: Schema,
1352+
row_filter: BooleanExpression,
1353+
case_sensitive: bool = True,
1354+
limit: Optional[int] = None,
1355+
) -> None:
1356+
self._table_metadata = table_metadata
1357+
self._io = io
1358+
self._fs = _fs_from_file_path(table_metadata.location, io) # TODO: use different FileSystem per file
1359+
self._projected_schema = projected_schema
1360+
self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
1361+
self._case_sensitive = case_sensitive
1362+
self._limit = limit
1363+
1364+
@property
1365+
def _use_large_types(self) -> bool:
1366+
"""Whether to represent data as large arrow types.
1367+
1368+
Defaults to True.
1369+
"""
1370+
return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1371+
1372+
@property
1373+
def _projected_field_ids(self) -> Set[int]:
1374+
"""Set of field IDs that should be projected from the data files."""
1375+
return {
1376+
id
1377+
for id in self._projected_schema.field_ids
1378+
if not isinstance(self._projected_schema.find_type(id), (MapType, ListType))
1379+
}.union(extract_field_ids(self._bound_row_filter))
1380+
1381+
def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
1382+
"""Scan the Iceberg table and return a pa.Table.
1383+
1384+
Returns a pa.Table with data from the Iceberg table by resolving the
1385+
right columns that match the current table schema. Only data that
1386+
matches the provided row_filter expression is returned.
1387+
1388+
Args:
1389+
tasks: FileScanTasks representing the data files and delete files to read from.
1390+
1391+
Returns:
1392+
A PyArrow table. Total number of rows will be capped if specified.
1393+
1394+
Raises:
1395+
ResolveError: When a required field cannot be found in the file
1396+
ValueError: When a field type in the file cannot be projected to the schema type
1397+
"""
1398+
deletes_per_file = _read_all_delete_files(self._fs, tasks)
1399+
executor = ExecutorFactory.get_or_create()
1400+
1401+
def _table_from_scan_task(task: FileScanTask) -> pa.Table:
1402+
batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
1403+
if len(batches) > 0:
1404+
return pa.Table.from_batches(batches)
1405+
else:
1406+
return None
1407+
1408+
futures = [
1409+
executor.submit(
1410+
_table_from_scan_task,
1411+
task,
1412+
)
1413+
for task in tasks
1414+
]
1415+
total_row_count = 0
1416+
# for consistent ordering, we need to maintain future order
1417+
futures_index = {f: i for i, f in enumerate(futures)}
1418+
completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
1419+
for future in concurrent.futures.as_completed(futures):
1420+
completed_futures.add(future)
1421+
if table_result := future.result():
1422+
total_row_count += len(table_result)
1423+
# stop early if limit is satisfied
1424+
if self._limit is not None and total_row_count >= self._limit:
1425+
break
1426+
1427+
# by now, we've either completed all tasks or satisfied the limit
1428+
if self._limit is not None:
1429+
_ = [f.cancel() for f in futures if not f.done()]
1430+
1431+
tables = [f.result() for f in completed_futures if f.result()]
1432+
1433+
if len(tables) < 1:
1434+
return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
1435+
1436+
result = pa.concat_tables(tables, promote_options="permissive")
1437+
1438+
if self._limit is not None:
1439+
return result.slice(0, self._limit)
1440+
1441+
return result
1442+
1443+
def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
1444+
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
1445+
1446+
Returns an Iterator of pa.RecordBatch with data from the Iceberg table
1447+
by resolving the right columns that match the current table schema.
1448+
Only data that matches the provided row_filter expression is returned.
1449+
1450+
Args:
1451+
tasks: FileScanTasks representing the data files and delete files to read from.
1452+
1453+
Returns:
1454+
An Iterator of PyArrow RecordBatches.
1455+
Total number of rows will be capped if specified.
1456+
1457+
Raises:
1458+
ResolveError: When a required field cannot be found in the file
1459+
ValueError: When a field type in the file cannot be projected to the schema type
1460+
"""
1461+
deletes_per_file = _read_all_delete_files(self._fs, tasks)
1462+
return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)
1463+
1464+
def _record_batches_from_scan_tasks_and_deletes(
1465+
self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]]
1466+
) -> Iterator[pa.RecordBatch]:
1467+
total_row_count = 0
1468+
for task in tasks:
1469+
if self._limit is not None and total_row_count >= self._limit:
1470+
break
1471+
batches = _task_to_record_batches(
1472+
self._fs,
1473+
task,
1474+
self._bound_row_filter,
1475+
self._projected_schema,
1476+
self._projected_field_ids,
1477+
deletes_per_file.get(task.file.file_path),
1478+
self._case_sensitive,
1479+
self._table_metadata.name_mapping(),
1480+
self._use_large_types,
1481+
)
1482+
for batch in batches:
1483+
if self._limit is not None:
1484+
if total_row_count >= self._limit:
1485+
break
1486+
elif total_row_count + len(batch) >= self._limit:
1487+
batch = batch.slice(0, self._limit - total_row_count)
1488+
yield batch
1489+
total_row_count += len(batch)
1490+
1491+
1492+
@deprecated(
1493+
deprecated_in="0.8.0",
1494+
removed_in="0.9.0",
1495+
help_message="project_table is deprecated. Use ArrowScan.to_table instead.",
1496+
)
13071497
def project_table(
13081498
tasks: Iterable[FileScanTask],
13091499
table_metadata: TableMetadata,
@@ -1398,6 +1588,11 @@ def project_table(
13981588
return result
13991589

14001590

1591+
@deprecated(
1592+
deprecated_in="0.8.0",
1593+
removed_in="0.9.0",
1594+
help_message="project_table is deprecated. Use ArrowScan.to_record_batches instead.",
1595+
)
14011596
def project_batches(
14021597
tasks: Iterable[FileScanTask],
14031598
table_metadata: TableMetadata,

pyiceberg/table/__init__.py

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2031,35 +2031,25 @@ def plan_files(self) -> Iterable[FileScanTask]:
20312031
]
20322032

20332033
def to_arrow(self) -> pa.Table:
2034-
from pyiceberg.io.pyarrow import project_table
2034+
from pyiceberg.io.pyarrow import ArrowScan
20352035

2036-
return project_table(
2037-
self.plan_files(),
2038-
self.table_metadata,
2039-
self.io,
2040-
self.row_filter,
2041-
self.projection(),
2042-
case_sensitive=self.case_sensitive,
2043-
limit=self.limit,
2044-
)
2036+
return ArrowScan(
2037+
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2038+
).to_table(self.plan_files())
20452039

20462040
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
20472041
import pyarrow as pa
20482042

2049-
from pyiceberg.io.pyarrow import project_batches, schema_to_pyarrow
2043+
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
20502044

20512045
target_schema = schema_to_pyarrow(self.projection())
2046+
batches = ArrowScan(
2047+
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2048+
).to_record_batches(self.plan_files())
2049+
20522050
return pa.RecordBatchReader.from_batches(
20532051
target_schema,
2054-
project_batches(
2055-
self.plan_files(),
2056-
self.table_metadata,
2057-
self.io,
2058-
self.row_filter,
2059-
self.projection(),
2060-
case_sensitive=self.case_sensitive,
2061-
limit=self.limit,
2062-
),
2052+
batches,
20632053
)
20642054

20652055
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:

0 commit comments

Comments
 (0)