in pyiceberg/io/pyarrow.py [0:0]
def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
"""Scan the Iceberg table and return a pa.Table.
Returns a pa.Table with data from the Iceberg table by resolving the
right columns that match the current table schema. Only data that
matches the provided row_filter expression is returned.
Args:
tasks: FileScanTasks representing the data files and delete files to read from.
Returns:
A PyArrow table. Total number of rows will be capped if specified.
Raises:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._io, tasks)
executor = ExecutorFactory.get_or_create()
def _table_from_scan_task(task: FileScanTask) -> pa.Table:
batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
if len(batches) > 0:
return pa.Table.from_batches(batches)
else:
return None
futures = [
executor.submit(
_table_from_scan_task,
task,
)
for task in tasks
]
total_row_count = 0
# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)
if table_result := future.result():
total_row_count += len(table_result)
# stop early if limit is satisfied
if self._limit is not None and total_row_count >= self._limit:
break
# by now, we've either completed all tasks or satisfied the limit
if self._limit is not None:
_ = [f.cancel() for f in futures if not f.done()]
tables = [f.result() for f in completed_futures if f.result()]
arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False)
if len(tables) < 1:
return pa.Table.from_batches([], schema=arrow_schema)
result = pa.concat_tables(tables, promote_options="permissive")
if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False):
deprecation_message(
deprecated_in="0.10.0",
removed_in="0.11.0",
help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
)
result = result.cast(arrow_schema)
if self._limit is not None:
return result.slice(0, self._limit)
return result