def get_recordio_protobuf_dmatrix()

in src/sagemaker_xgboost_container/data_utils.py [0:0]


def get_recordio_protobuf_dmatrix(path, is_pipe=False):
    """Get Data Matrix from recordio-protobuf data.

    :param path: Path where recordio-protobuf formatted training data resides, either directory, file, or SageMaker pipe
    :param is_pipe: Boolean to indicate if data is being read in pipe mode
    :return: xgb.DMatrix or None
    """
    try:
        if is_pipe:
            pipes_path = path if isinstance(path, list) else [path]
            dataset = [mlio.SageMakerPipe(pipe_path) for pipe_path in pipes_path]
        else:
            dataset = mlio.list_files(path)

        reader_params = mlio.DataReaderParams(dataset=dataset, batch_size=BATCH_SIZE)
        reader = mlio.RecordIOProtobufReader(reader_params)

        if reader.peek_example() is not None:
            # recordio-protobuf tensor may be dense (use numpy) or sparse (use scipy)
            is_dense_tensor = type(reader.peek_example()["values"]) is mlio.DenseTensor

            all_features = []
            all_labels = []
            for example in reader:
                features = as_numpy(example["values"]) if is_dense_tensor else to_coo_matrix(example["values"])
                all_features.append(features)

                labels = as_numpy(example["label_values"])
                all_labels.append(labels)

            all_features = np.vstack(all_features) if is_dense_tensor else scipy_vstack(all_features).tocsr()
            all_labels = np.concatenate(all_labels, axis=None)
            dmatrix = xgb.DMatrix(all_features, label=all_labels)
            return dmatrix
        else:
            return None

    except Exception as e:
        raise exc.UserError("Failed to load recordio-protobuf data with exception:\n{}".format(e))