def get_dmatrix()

in src/sagemaker_xgboost_container/data_utils.py [0:0]


def get_dmatrix(data_path, content_type, csv_weights=0, is_pipe=False):
    """Create Data Matrix from CSV or LIBSVM file.

    Assumes that sanity validation for content type has been done.

    :param data_path: Either directory or file
    :param content_type:
    :param csv_weights: Only used if file_type is 'csv'.
                        1 if the instance weights are in the second column of csv file; otherwise, 0
    :param is_pipe: Boolean to indicate if data is being read in pipe mode
    :return: xgb.DMatrix or None
    """

    # To get best results from cross validation, we should merge train_dmatrix
    # and val_dmatrix for bigger data. However, DMatrix doesn't support concat
    # operation and it cannot be exported to other formats (e.g. numpy).
    # It is possible to write it to a file in binary format matrix.save("data.buffer").
    # However, xgb doesn't support read multiple buffer files.
    #
    # So the only way to combine the data is to read them in one shot.
    # Fortunately, milo can read multiple pipes together. So we extends
    # the parameter data_path to support list. If data_path is string as usual,
    # get_dmatrix will work as before. When it is a list, we work as follows.
    # For pipe mode, it leverages mlio directly by creating a list of SageMakerPipe.
    # In file mode, we create a temp directory with symlink to all input files or
    # directories to meet XGB's assumption that all files are in the same directory.
    if is_pipe:
        if isinstance(data_path, list):
            files_path = data_path
        else:
            files_path = [data_path]
            if not os.path.exists(data_path + '_0'):
                logging.info('Pipe path {} does not exist!'.format(data_path))
                return None
    else:
        if not isinstance(data_path, list):
            if not os.path.exists(data_path):
                logging.info('File path {} does not exist!'.format(data_path))
                return None
            files_path = get_files_path(data_path)
        else:
            # Create a directory with symlinks to input files.
            files_path = "/tmp/sagemaker_xgboost_input_data"
            shutil.rmtree(files_path, ignore_errors=True)
            os.mkdir(files_path)
            for path in data_path:
                if not os.path.exists(path):
                    return None
                if os.path.isfile(path):
                    os.symlink(path, os.path.join(files_path, os.path.basename(path)))
                else:
                    for file in os.scandir(path):
                        os.symlink(file, os.path.join(files_path, file.name))

    if content_type.lower() == CSV:
        dmatrix = get_csv_dmatrix(files_path, csv_weights, is_pipe)
    elif content_type.lower() == LIBSVM:
        dmatrix = get_libsvm_dmatrix(files_path, is_pipe)
    elif content_type.lower() == PARQUET:
        dmatrix = get_parquet_dmatrix(files_path, is_pipe)
    elif content_type.lower() == RECORDIO_PROTOBUF:
        dmatrix = get_recordio_protobuf_dmatrix(files_path, is_pipe)

    if dmatrix and dmatrix.get_label().size == 0:
        raise exc.UserError(
            "Got input data without labels. Please check the input data set. "
            "If training job is running on multiple instances, please switch "
            "to using single instance if number of records in the data set "
            "is less than number of workers (16 * number of instance) in the cluster.")

    return dmatrix