autopilot/mlops/timeseries/aws-automl-ts-cdk/glue/preprocess.py (76 lines of code) (raw):
#!/usr/bin/env python
# coding: utf-8
import zipfile
import boto3
import pandas as pd
import sys
import os
from awsglue.utils import getResolvedOptions
s3 = boto3.client('s3')
def download_and_extract(bucket, prefix, csv_dir):
# List objects in the given bucket with the provided prefix
s3_objects = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)['Contents']
# Filter out possible filenames
possible_filenames = ["data.zip", "tts.csv", "TTS.csv"]
fileuri = None
for obj in s3_objects:
filename = os.path.basename(obj['Key'])
if filename.lower() in [name.lower() for name in possible_filenames]:
fileuri = obj['Key']
break
# If none of the filenames matched, raise an error
if not fileuri:
exit()
file_name = os.path.join('/tmp', os.path.basename(fileuri))
print(f"File Name is: {file_name}")
os.makedirs(os.path.dirname(file_name), exist_ok=True)
s3.download_file(bucket, fileuri, file_name)
if fileuri.endswith('.zip'):
try:
with zipfile.ZipFile(file_name, 'r') as zip_ref:
zip_ref.extractall(csv_dir)
return "zip"
except FileNotFoundError:
print(f"{file_name} not found.")
exit()
elif fileuri.lower().endswith('.csv'):
source = os.path.join('/tmp', os.path.basename(fileuri))
destination = os.path.join(csv_dir, 'training_data.csv')
os.makedirs(csv_dir, exist_ok=True)
os.rename(source, destination)
return "csv"
else:
print(f"Unsupported file type for {file_name}")
exit()
def preprocess(csv_dir):
# Check which files are present
print("Data merge for ZIP started.")
tts_present = os.path.exists(os.path.join(csv_dir, 'TTS.csv'))
rts_present = os.path.exists(os.path.join(csv_dir, 'RTS.csv'))
metadata_present = os.path.exists(os.path.join(csv_dir, 'metadata.csv'))
# Load necessary files
if tts_present:
tts_df = pd.read_csv(os.path.join(csv_dir, 'TTS.csv'))
if rts_present:
rts_df = pd.read_csv(os.path.join(csv_dir, 'RTS.csv'))
if metadata_present:
metadata_df = pd.read_csv(os.path.join(csv_dir, 'metadata.csv'))
# Scenario 1: Only TTS.csv is present
if tts_present and not rts_present and not metadata_present:
final_data = tts_df
# Scenario 2: TTS.csv is present along with one of RTS.csv OR metadata.csv
elif tts_present and rts_present and not metadata_present:
final_data = pd.merge(tts_df, rts_df, how='right', on=['product_code', 'location_code', 'timestamp']) # Change the merge columns and type of Merge based on your dataset.
elif tts_present and not rts_present and metadata_present:
final_data = pd.merge(tts_df, metadata_df, how='right', on=['product_code']) # Change the merge columns and type of Merge based on your dataset.
# Scenario 3: All files are present
elif tts_present and rts_present and metadata_present:
merged_data = pd.merge(tts_df, rts_df, how='right', on=['product_code', 'location_code', 'timestamp']) # Change the merge columns and type of Merge based on your dataset.
final_data = pd.merge(merged_data, metadata_df, how='right', on=['product_code']) # Change the merge columns and type of Merge based on your dataset.
# Error if no recognized pattern is present
else:
print("Unrecognized file combination in directory.")
exit()
final_data.to_csv(os.path.join(csv_dir, 'training_data.csv'), index=False)
print(f"Final data merged into: {final_data}")
def save_to_s3(bucket, csv_dir):
single_csv = 'training_data.csv'
object_key = os.path.join('input/', os.path.basename(single_csv))
s3.upload_file(os.path.join(csv_dir, single_csv), bucket, object_key)
# Main Execution
args = getResolvedOptions(sys.argv, ['bucket', 'prefix'])
bucket = args['bucket']
prefix = args['prefix']
csv_dir = 'input/'
file_type = download_and_extract(bucket, prefix, csv_dir)
if file_type == "zip":
preprocess(csv_dir)
save_to_s3(bucket, csv_dir)