in src/lookoutequipment/dataset.py [0:0]
def generate_replay_data(dataset_name,
replay_start_timestamp,
upload_frequency,
replay_days=1,
inference_timezone='UTC'):
"""
Generates inference input data from the training data to test a scheduler
that would be configured for a model trained with this dataset. The data
will be output in an S3 location next to your training data S3 location.
Parameters:
dataset_name (string):
Lookout for Equipment `dataset_name` containing the training data
for replaying.
replay_start_date (string):
Point in time in the training data from which to begin generating
replay data.
Example: `"2020-10-01 00:00:00"`
upload_frequency (string):
How often replay data is uploaded to the S3 bucket for the inference
input data. Valid Values are `PT5M`, `PT10M`, `PT15M`, `PT30M`,
or `PT1H`.
replay_days (integer):
Duration of the replay data in days (default: 1)
inference_timezone (string):
Indicates the timezone for the inference replay dataset.
(default: 'UTC')
Returns:
(boolean)
True if no problem detected, otherwise a list of sequences that
could not be generated (which will trigger a failed scheduler
execution)
"""
replay_start_timestamp = datetime.strptime(
replay_start_timestamp, '%Y-%m-%d %H:%M:%S'
)
replay_end_timestamp = datetime.strftime(
replay_start_timestamp + timedelta(days=replay_days), '%Y-%m-%d %H:%M:%S'
)
# Wait for an ingestion to be done if one is in progress:
client = boto3.client('lookoutequipment')
dataset_description = client.describe_dataset(DatasetName=dataset_name)
status = dataset_description['Status']
while status == 'INGESTION_IN_PROGRESS':
print(f'Ingestion job is still in progress for dataset: {dataset_name}.')
time.sleep(10)
dataset_description = client.describe_dataset(DatasetName=dataset_name)
status = dataset_description['Status']
# Locate the S3 path of the training data from `dataset_name`:
dataset_config = dataset_description['IngestionInputConfiguration']['S3InputConfiguration']
bucket = dataset_config['Bucket']
train_prefix = dataset_config['Prefix']
train_path = f's3://{bucket}/{train_prefix}'
# Build inference path: we position the inference input data at the same
# level on S3 than the location of the training data:
inference_prefix = '/'.join(train_prefix.split('/')[:-2]) + '/inference-data/input'
INFERENCE_INPUT_DATA = f's3://{bucket}/{inference_prefix}'
upload_freq_dict = {
'PT5M' : '5T',
'PT10M': '10T',
'PT15M': '15T',
'PT30M': '30T',
'PT1H' : '1H'
}
upload_frequency = upload_freq_dict[upload_frequency]
# Loop through each component in `train_path`:
component_params = [
{'component_name': path.split('/')[-2], 'train_path': path}
for path in wr.s3.list_objects(train_path)
]
empty_sequences = dict()
for component_param in component_params:
component_name = component_param['component_name']
empty_sequences.update({component_name: []})
print('Generating data for component:', component_name)
# Load training data for this component:
component_df = wr.s3.read_csv(component_param['train_path'],
parse_dates=True,
index_col='Timestamp')
# subsample replay data from train data
replay_df = component_df.loc[replay_start_timestamp:replay_end_timestamp, :]
# get inference start time in UTC
current_timestamp = datetime.strftime(
datetime.now() + timedelta(minutes=1),'%Y-%m-%d %H:%M:%S'
)
inference_start_timestamp_utc = str(pd.to_datetime(current_timestamp).ceil(upload_frequency))
# generate new time index for inference replay data
time_delta = pd.to_datetime(inference_start_timestamp_utc) - pd.to_datetime(replay_start_timestamp)
replay_df.index = replay_df.index.to_series() + time_delta
replay_df.index.name = 'Timestamp'
ts_splits = replay_df.resample(upload_frequency).ffill().index.tolist()
# loop through each time slip
for split in zip(ts_splits[:-1], ts_splits[1:]):
# subselect split data from replay data
split_start = split[0]
split_end = split[1]
split_df = replay_df.loc[split_start:split_end,:].reset_index()
# convert split_start from UTC to `inference_timezone`
split_start_tz = split_start.tz_localize('UTC').tz_convert(inference_timezone)
split_start_str = split_start_tz.strftime('%Y%m%d%H%M%S')
f_name = f'{component_name}_{split_start_str}.csv'
# Save current sequence data to inference path:
if split_df.shape[0] > 0:
wr.s3.to_csv(split_df,
f'{INFERENCE_INPUT_DATA}/{f_name}',
index=None)
# Otherwise, add the missing sequence to an error list:
else:
if empty_sequences[component_name] is None:
empty_sequences.update({component_name: [f_name]})
else:
empty_sequences.update({
component_name: empty_sequences[component_name].append(f_name)
})
if empty_sequences is None:
return True
else:
return empty_sequences