def _get_parquet_dmatrix_pipe_mode()

in src/sagemaker_xgboost_container/data_utils.py [0:0]


def _get_parquet_dmatrix_pipe_mode(pipe_path):
    """Get Data Matrix from parquet data in pipe mode.

    :param pipe_path: SageMaker pipe path where parquet formatted training data is piped
    :return: xgb.DMatrix or None
    """
    try:
        examples = []

        pipes_path = pipe_path if isinstance(pipe_path, list) else [pipe_path]
        for path in pipes_path:
            f = mlio.SageMakerPipe(path)
            with f.open_read() as strm:
                reader = mlio.ParquetRecordReader(strm)

                for record in reader:
                    table = pq.read_table(as_arrow_file(record))
                    array = table.to_pandas()
                    if type(array) is pd.DataFrame:
                        array = array.to_numpy()
                    examples.append(array)

        if examples:
            data = np.vstack(examples)
            del examples

            dmatrix = xgb.DMatrix(data[:, 1:], label=data[:, 0])
            return dmatrix
        else:
            return None

    except Exception as e:
        raise exc.UserError("Failed to load parquet data with exception:\n{}".format(e))