in src/sagemaker_xgboost_container/data_utils.py [0:0]
def _get_parquet_dmatrix_pipe_mode(pipe_path):
"""Get Data Matrix from parquet data in pipe mode.
:param pipe_path: SageMaker pipe path where parquet formatted training data is piped
:return: xgb.DMatrix or None
"""
try:
examples = []
pipes_path = pipe_path if isinstance(pipe_path, list) else [pipe_path]
for path in pipes_path:
f = mlio.SageMakerPipe(path)
with f.open_read() as strm:
reader = mlio.ParquetRecordReader(strm)
for record in reader:
table = pq.read_table(as_arrow_file(record))
array = table.to_pandas()
if type(array) is pd.DataFrame:
array = array.to_numpy()
examples.append(array)
if examples:
data = np.vstack(examples)
del examples
dmatrix = xgb.DMatrix(data[:, 1:], label=data[:, 0])
return dmatrix
else:
return None
except Exception as e:
raise exc.UserError("Failed to load parquet data with exception:\n{}".format(e))