def _read_with_shuffle_row_drop()

in petastorm/arrow_reader_worker.py [0:0]


    def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_drop_partition):
        partition_names = self._dataset.partitions.partition_names if self._dataset.partitions else set()

        # pyarrow would fail if we request a column names that the dataset is partitioned by
        table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions)

        # Handle row shuffling based on shuffle_rows setting
        if self._shuffle_rows:
            if self._random_seed is not None and self._random_seed != 0:
                # Deterministic randomization: use provided seed
                indices = self._rng.permutation(table.num_rows)
            else:
                # Non-deterministic randomization: use np.random directly
                indices = np.random.permutation(table.num_rows)
        else:
            # Deterministic natural order: shuffle_rows=False
            indices = np.arange(table.num_rows)
        table = table.take(indices)

        # Drop columns we did not explicitly request. This may happen when a table is partitioned. Besides columns
        # requested, pyarrow will also return partition values. Having these unexpected fields will break some
        # downstream code.

        loaded_column_names = set(table.column_names)
        unasked_for_columns = loaded_column_names - column_names
        if unasked_for_columns:
            table = table.drop(unasked_for_columns)

        num_rows = len(table)
        num_partitions = shuffle_row_drop_partition[1]
        this_partition = shuffle_row_drop_partition[0]

        if num_partitions > 1:
            data_frame_pandas = table.to_pandas()
            partition_indexes = np.floor(np.arange(num_rows) / (float(num_rows) / min(num_rows, num_partitions)))

            table = pa.Table.from_pandas(data_frame_pandas.loc[partition_indexes == this_partition],
                                         preserve_index=False)

        return table