in src/sagemaker_xgboost_container/data_utils.py [0:0]
def get_dmatrix(data_path, content_type, csv_weights=0, is_pipe=False):
"""Create Data Matrix from CSV or LIBSVM file.
Assumes that sanity validation for content type has been done.
:param data_path: Either directory or file
:param content_type:
:param csv_weights: Only used if file_type is 'csv'.
1 if the instance weights are in the second column of csv file; otherwise, 0
:param is_pipe: Boolean to indicate if data is being read in pipe mode
:return: xgb.DMatrix or None
"""
# To get best results from cross validation, we should merge train_dmatrix
# and val_dmatrix for bigger data. However, DMatrix doesn't support concat
# operation and it cannot be exported to other formats (e.g. numpy).
# It is possible to write it to a file in binary format matrix.save("data.buffer").
# However, xgb doesn't support read multiple buffer files.
#
# So the only way to combine the data is to read them in one shot.
# Fortunately, milo can read multiple pipes together. So we extends
# the parameter data_path to support list. If data_path is string as usual,
# get_dmatrix will work as before. When it is a list, we work as follows.
# For pipe mode, it leverages mlio directly by creating a list of SageMakerPipe.
# In file mode, we create a temp directory with symlink to all input files or
# directories to meet XGB's assumption that all files are in the same directory.
if is_pipe:
if isinstance(data_path, list):
files_path = data_path
else:
files_path = [data_path]
if not os.path.exists(data_path + '_0'):
logging.info('Pipe path {} does not exist!'.format(data_path))
return None
else:
if not isinstance(data_path, list):
if not os.path.exists(data_path):
logging.info('File path {} does not exist!'.format(data_path))
return None
files_path = get_files_path(data_path)
else:
# Create a directory with symlinks to input files.
files_path = "/tmp/sagemaker_xgboost_input_data"
shutil.rmtree(files_path, ignore_errors=True)
os.mkdir(files_path)
for path in data_path:
if not os.path.exists(path):
return None
if os.path.isfile(path):
os.symlink(path, os.path.join(files_path, os.path.basename(path)))
else:
for file in os.scandir(path):
os.symlink(file, os.path.join(files_path, file.name))
if content_type.lower() == CSV:
dmatrix = get_csv_dmatrix(files_path, csv_weights, is_pipe)
elif content_type.lower() == LIBSVM:
dmatrix = get_libsvm_dmatrix(files_path, is_pipe)
elif content_type.lower() == PARQUET:
dmatrix = get_parquet_dmatrix(files_path, is_pipe)
elif content_type.lower() == RECORDIO_PROTOBUF:
dmatrix = get_recordio_protobuf_dmatrix(files_path, is_pipe)
if dmatrix and dmatrix.get_label().size == 0:
raise exc.UserError(
"Got input data without labels. Please check the input data set. "
"If training job is running on multiple instances, please switch "
"to using single instance if number of records in the data set "
"is less than number of workers (16 * number of instance) in the cluster.")
return dmatrix