in src/lookoutequipment/dataset.py [0:0]
def load_dataset(dataset_name, target_dir):
"""
This function can be used to download example datasets to run Amazon
Lookout for Equipment on.
Parameters:
dataset_name (string):
Can only be 'expander' at this stage
target_dir (string):
Location where to download the data: this location must be readable
and writable
Returns:
data (dict): dictionnary with data dataframe, labels dataframe,
training start and end datetime, evaluation start and end datetime,
and the tags description dataframe
"""
if dataset_name == 'expander':
REGION_NAME = boto3.session.Session().region_name
BUCKET = f'lookoutforequipmentbucket-{REGION_NAME}'
PREFIX = 'datasets/demo'
FILES = ['timeseries.zip',
'labels.csv',
'tags_description.csv',
'timeranges.txt']
TRAIN_DATA = os.path.join(target_dir, 'training-data')
os.makedirs(target_dir, exist_ok=True)
os.makedirs(TRAIN_DATA, exist_ok=True)
os.makedirs(os.path.join(target_dir, 'label-data'), exist_ok=True)
os.makedirs(os.path.join(target_dir, 'inference-data'), exist_ok=True)
root_url = f'https://{BUCKET}.s3.{REGION_NAME}.amazonaws.com/{PREFIX}'
for f in FILES:
target_file = os.path.join(target_dir, f)
url_file = root_url + '/' + f
urllib.request.urlretrieve(url_file, target_file)
# Load the time series data:
timeseries_zip_file = os.path.join(target_dir, 'timeseries.zip')
with zipfile.ZipFile(timeseries_zip_file, 'r') as zip_ref:
zip_ref.extractall(target_dir)
os.remove(timeseries_zip_file)
all_tags_fname = os.path.join(target_dir, 'expander.parquet')
table = pq.read_table(all_tags_fname)
all_tags_df = table.to_pandas()
del table
# Load the labels data:
labels_fname = os.path.join(target_dir, 'labels.csv')
labels_df = pd.read_csv(labels_fname, header=None)
labels_df[0] = pd.to_datetime(labels_df[0])
labels_df[1] = pd.to_datetime(labels_df[1])
labels_df.columns = ['start', 'end']
# Loads the analysis time range:
timeranges_fname = os.path.join(target_dir, 'timeranges.txt')
with open(timeranges_fname, 'r') as f:
timeranges = f.readlines()
training_start = pd.to_datetime(timeranges[0][:-1])
training_end = pd.to_datetime(timeranges[1][:-1])
evaluation_start = pd.to_datetime(timeranges[2][:-1])
evaluation_end = pd.to_datetime(timeranges[3][:-1])
# Loads the tags description:
tags_description_fname = os.path.join(target_dir, 'tags_description.csv')
tags_description_df = pd.read_csv(tags_description_fname)
# Create the training data, by processing each subsystem one by one:
components = list(tags_description_df['Subsystem'].unique())
progress_bar = tqdm(components)
for component in progress_bar:
progress_bar.set_description(f'Component {component}')
progress_bar.refresh()
# Check if CSV file already exist and do not overwrite it:
component_tags_fname = os.path.join(TRAIN_DATA,
f'{component}',
f'{component}.csv')
if not os.path.exists(component_tags_fname):
# Build the dataframe with all the signal
# timeseries for the current subsystem:
component_tags_list = list(tags_description_df[tags_description_df['Subsystem'] == component]['Tag'])
component_tags_df = all_tags_df[component_tags_list]
component_tags_df = component_tags_df.reset_index()
component_tags_df['Timestamp'] = component_tags_df['Timestamp'].dt.strftime('%Y-%m-%dT%H:%M:%S.%f')
# Save to disk:
os.makedirs(os.path.join(TRAIN_DATA, f'{component}'), exist_ok=True)
component_tags_df.to_csv(component_tags_fname, index=None)
# Build a dictionnary with all the data:
return {
'data': all_tags_df,
'labels': labels_df,
'tags_description': tags_description_df,
'training_start': training_start,
'training_end': training_end,
'evaluation_start': evaluation_start,
'evaluation_end': evaluation_end
}
else:
raise Exception('Dataset name must be one of ["expander"]')