in petastorm/arrow_reader_worker.py [0:0]
def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_drop_partition):
partition_names = self._dataset.partitions.partition_names if self._dataset.partitions else set()
# pyarrow would fail if we request a column names that the dataset is partitioned by
table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions)
# Handle row shuffling based on shuffle_rows setting
if self._shuffle_rows:
if self._random_seed is not None and self._random_seed != 0:
# Deterministic randomization: use provided seed
indices = self._rng.permutation(table.num_rows)
else:
# Non-deterministic randomization: use np.random directly
indices = np.random.permutation(table.num_rows)
else:
# Deterministic natural order: shuffle_rows=False
indices = np.arange(table.num_rows)
table = table.take(indices)
# Drop columns we did not explicitly request. This may happen when a table is partitioned. Besides columns
# requested, pyarrow will also return partition values. Having these unexpected fields will break some
# downstream code.
loaded_column_names = set(table.column_names)
unasked_for_columns = loaded_column_names - column_names
if unasked_for_columns:
table = table.drop(unasked_for_columns)
num_rows = len(table)
num_partitions = shuffle_row_drop_partition[1]
this_partition = shuffle_row_drop_partition[0]
if num_partitions > 1:
data_frame_pandas = table.to_pandas()
partition_indexes = np.floor(np.arange(num_rows) / (float(num_rows) / min(num_rows, num_partitions)))
table = pa.Table.from_pandas(data_frame_pandas.loc[partition_indexes == this_partition],
preserve_index=False)
return table