in src/sagemaker_xgboost_container/data_utils.py [0:0]
def get_recordio_protobuf_dmatrix(path, is_pipe=False):
"""Get Data Matrix from recordio-protobuf data.
:param path: Path where recordio-protobuf formatted training data resides, either directory, file, or SageMaker pipe
:param is_pipe: Boolean to indicate if data is being read in pipe mode
:return: xgb.DMatrix or None
"""
try:
if is_pipe:
pipes_path = path if isinstance(path, list) else [path]
dataset = [mlio.SageMakerPipe(pipe_path) for pipe_path in pipes_path]
else:
dataset = mlio.list_files(path)
reader_params = mlio.DataReaderParams(dataset=dataset, batch_size=BATCH_SIZE)
reader = mlio.RecordIOProtobufReader(reader_params)
if reader.peek_example() is not None:
# recordio-protobuf tensor may be dense (use numpy) or sparse (use scipy)
is_dense_tensor = type(reader.peek_example()["values"]) is mlio.DenseTensor
all_features = []
all_labels = []
for example in reader:
features = as_numpy(example["values"]) if is_dense_tensor else to_coo_matrix(example["values"])
all_features.append(features)
labels = as_numpy(example["label_values"])
all_labels.append(labels)
all_features = np.vstack(all_features) if is_dense_tensor else scipy_vstack(all_features).tocsr()
all_labels = np.concatenate(all_labels, axis=None)
dmatrix = xgb.DMatrix(all_features, label=all_labels)
return dmatrix
else:
return None
except Exception as e:
raise exc.UserError("Failed to load recordio-protobuf data with exception:\n{}".format(e))