in src/pipeline/preprocess.py [0:0]
def preprocess(s3_in_url,
s3_out_bucket,
s3_out_prefix,
delimiter=","):
"""Preprocesses data based on business logic
- Reads delimited file passed as s3_url and preprocess data by filtering
long tail in the customer ratings data i.e. keep customers who have rated 5
or more videos, and videos that have been rated by 9+ customers
- Preprocessed data is then written to output
Args:
s3_in_url:
s3 url to the delimited file to be processed
e.g. s3://amazon-reviews-pds/tsv/reviews.tsv.gz
s3_out_bucket:
s3 bucket where preprocessed data will be staged
e.g. mybucket
s3_out_prefix:
s3 url prefix to stage preprocessed data to use later in the pipeline
e.g. amazon-reviews-pds/preprocess/
delimiter:
delimiter to be used for parsing the file. Defaults to "," if none
provided
Returns:
status of preprocessed data
Raises:
IOError: An error occurred accessing the s3 file
"""
try:
print("preprocessing data from {}".format(s3_in_url))
# read s3 url into pandas dataframe
# pandas internally uses s3fs to read s3 file directory
df = pd.read_csv(s3_in_url, delimiter, error_bad_lines=False)
# limit dataframe to customer_id, product_id, and star_rating
# `product_title` will be useful validating recommendations
df = df[['customer_id', 'product_id', 'star_rating', 'product_title']]
# clean out the long tail because most people haven't seen most videos,
# and people rate fewer videos than they actually watch
customers = df['customer_id'].value_counts()
products = df['product_id'].value_counts()
# based on data exploration only about 5% of customers have rated 5 or
# more videos, and only 25% of videos have been rated by 9+ customers
customers = customers[customers >= 5]
products = products[products >= 10]
print("# of rows before the long tail = {:10d}".format(df.shape[0]))
reduced_df = df \
.merge(pd.DataFrame({'customer_id': customers.index})) \
.merge(pd.DataFrame({'product_id': products.index}))
print("# of rows after the long tail = {:10d}".format(
reduced_df.shape[0]))
reduced_df = reduced_df.drop_duplicates(['customer_id', 'product_id'])
print("# of rows after removing duplicates = {:10d}".format(
reduced_df.shape[0]))
# recreate customer and product lists since there are customers with
# more than 5 reviews, but all of their reviews are on products with
# less than 5 reviews (and vice versa)
customers = reduced_df['customer_id'].value_counts()
products = reduced_df['product_id'].value_counts()
# sequentially index each user and item to hold the sparse format where
# the indices indicate the row and column in our ratings matrix
customer_index = pd.DataFrame({
'customer_id': customers.index,
'customer': np.arange(customers.shape[0])})
product_index = pd.DataFrame({
'product_id': products.index,
'product': np.arange(products.shape[0])})
reduced_df = reduced_df \
.merge(customer_index) \
.merge(product_index)
nb_customer = reduced_df['customer'].max() + 1
nb_products = reduced_df['product'].max() + 1
feature_dim = nb_customer + nb_products
print(nb_customer, nb_products, feature_dim)
product_df = reduced_df[['customer', 'product', 'star_rating']]
# split into train, validation and test data sets
train_df, validate_df, test_df = np.split(
product_df.sample(frac=1),
[int(.6*len(product_df)), int(.8*len(product_df))]
)
print("# of rows train data set = {:10d}".format(
train_df.shape[0]))
print("# of rows validation data set = {:10d}".format(
validate_df.shape[0]))
print("# of rows test data set = {:10d}".format(
test_df.shape[0]))
# select columns required for training the model
# excluding columns "customer_id", "product_id", "product_title" to
# keep files small
cols = ["customer", "product", "star_rating"]
train_df = train_df[cols]
validate_df = validate_df[cols]
test_df = test_df[cols]
# write output to s3 as delimited file
fs = s3fs.S3FileSystem(anon=False)
s3_out_prefix = s3_out_prefix[:-1] \
if s3_out_prefix[-1] == "/" else s3_out_prefix
s3_out_train = "s3://{}/{}/{}".format(
s3_out_bucket, s3_out_prefix, "train/train.csv")
print("writing training data to {}".format(s3_out_train))
with fs.open(s3_out_train, "w") as f:
train_df.to_csv(f, sep=str(','), index=False)
s3_out_validate = "s3://{}/{}/{}".format(
s3_out_bucket, s3_out_prefix, "validate/validate.csv")
print("writing test data to {}".format(s3_out_validate))
with fs.open(s3_out_validate, "w") as f:
validate_df.to_csv(f, sep=str(','), index=False)
s3_out_test = "s3://{}/{}/{}".format(
s3_out_bucket, s3_out_prefix, "test/test.csv")
print("writing test data to {}".format(s3_out_test))
with fs.open(s3_out_test, "w") as f:
test_df.to_csv(f, sep=str(','), index=False)
print("preprocessing completed")
return "SUCCESS"
except Exception as e:
raise e