in src/pipeline/prepare.py [0:0]
def prepare(s3_in_bucket,
s3_in_prefix,
s3_out_bucket,
s3_out_prefix,
delimiter=","):
"""Prepare data for training with Sagemaker algorithms
- Read preprocessed data and converts to ProtoBuf format to prepare for
training with Sagemaker algorithms
Args:
s3_in_bucket:
s3 bucket where preprocessed files are staged
e.g. mybucket
s3_in_prefix:
s3 prefix to the files to be used for training
e.g. amazon-reviews-pds/preprocess/
it's expected to have train and test folders in this prefix that will
be staged by preprocessor
s3_out_bucket:
s3 bucket where training and test files will be staged
e.g. mybucket
s3_out_prefix:
s3 url prefix to stage prepared data to use for training the model
e.g. amazon-reviews-pds/prepare/
delimiter:
delimiter to be used for parsing the file. Defaults to "," if none
provided
Returns:
s3 url with key to the prepared data
Raises:
IOError: An error occurred accessing the s3 file
"""
try:
print("preparing data from {}".format(s3_in_prefix))
# prepare training data set
if s3_in_prefix[-1] == "/":
s3_in_prefix = s3_in_prefix[:-1]
s3_train_url = "s3://{}/{}/{}".format(
s3_in_bucket, s3_in_prefix, 'train/train.csv')
train_df = pd.read_csv(s3_train_url,
sep=str(','), error_bad_lines=False)
# prepare validateion dataset
s3_validate_url = "s3://{}/{}/{}".format(
s3_in_bucket, s3_in_prefix, 'validate/validate.csv')
validate_df = pd.read_csv(s3_validate_url,
sep=str(','), error_bad_lines=False)
# prepare test dataset
s3_test_url = "s3://{}/{}/{}".format(
s3_in_bucket, s3_in_prefix, 'test/test.csv')
test_df = pd.read_csv(s3_test_url,
sep=str(','), error_bad_lines=False)
# get feature dimension
all_df = pd.concat([train_df, validate_df, test_df])
nb_customer = np.unique(all_df['customer'].values).shape[0]
nb_products = np.unique(all_df['product'].values).shape[0]
feature_dim = nb_customer + nb_products
print(nb_customer, nb_products, feature_dim)
train_X, train_Y = convert_sparse_matrix(
train_df, train_df.shape[0], nb_customer, nb_products)
validate_X, validate_Y = convert_sparse_matrix(
validate_df, validate_df.shape[0], nb_customer, nb_products)
test_X, test_Y = convert_sparse_matrix(
test_df, test_df.shape[0], nb_customer, nb_products)
# write train and test in protobuf format to s3
if s3_out_prefix[-1] == "/":
s3_out_prefix = s3_out_prefix[:-1]
train_data = save_as_protobuf(
train_X, train_Y, s3_out_bucket,
s3_out_prefix + "/" + "train/train.protobuf")
print(train_data)
validate_data = save_as_protobuf(
validate_X, validate_Y, s3_out_bucket,
s3_out_prefix + "/" + "validate/validate.protobuf")
print(validate_data)
# chunk test data to avoid payload size issues when batch transforming
test_x_chunks = chunk(test_X, 10000)
test_y_chunks = chunk(test_Y, 10000)
N = len(test_x_chunks)
for i in range(N):
test_data = save_as_protobuf(
test_x_chunks[i],
test_y_chunks[i],
s3_out_bucket,
s3_out_prefix + "/" + "test/test_" + str(i) + ".protobuf")
print(test_data)
return "SUCCESS"
except Exception as e:
raise e