in src/lookoutequipment/dataset.py [0:0]
def prepare_inference_data(root_dir,
sample_data_dict,
bucket,
prefix,
num_sequences=3,
frequency=5,
start_date=None):
"""
This function prepares sequence of data suitable as input for an inference
scheduler.
Parameters:
root_dir (string):
Location where the inference data will be written
sample_data_dict (dict):
A dictionnary with the sample data as output by `load_dataset()`
method
bucket (string):
Amazon S3 bucket name
prefix (string):
Prefix to a directory on Amazon S3 where to upload the data. This
prefix *MUST* end with a trailing slash "/"
num_sequences (integer):
Number of short time series sequences to extract: each sequence
will be used once by a scheduler. Defaults to 3: a scheduler will
run 3 times before failing (unless you provide additional suitable
files in the input location)
frequency (integer):
The scheduling frequency in minutes: this **MUST** match the
resampling rate used to train the model (defaults to 5 minutes)
start_date (string or datetime):
The datetime to start the extraction from. Default is None: in this
case this method will start looking at date located at the beginning
of the evaluation period associated to this sample
"""
tags_df = sample_data_dict['data']
tags_description_df = sample_data_dict['tags_description']
components = tags_description_df['Subsystem'].unique()
os.makedirs(os.path.join(root_dir, 'inference-data', 'input'), exist_ok=True)
# If no start date is provided we take
# the first one in the evaluation data:
if start_date is None:
start = sample_data_dict['evaluation_start']
elif isinstance(start_date, str):
start = pd.to_datetime(start_date)
else:
start = start_date
# Loops through each sequence to extract
for i in range(num_sequences):
end = start + timedelta(minutes=+frequency - 1)
# Rounding time to the previous 5 minutes:
tm = datetime.now()
tm = tm - timedelta(
minutes=tm.minute % frequency,
seconds=tm.second,
microseconds=tm.microsecond
)
tm = tm + timedelta(minutes=+frequency * (i))
current_timestamp = (tm).strftime(format='%Y%m%d%H%M%S')
# For each sequence, we need to loop through all components:
print(f'Extracting data from {start} to {end}')
new_index = None
for component in components:
# Extracting the dataframe for this component and this particular time range:
signals = list(tags_description_df.loc[(tags_description_df['Subsystem'] == component), 'Tag'])
signals_df = tags_df.loc[start:end, signals]
# We need to reset the index to match the time
# at which the scheduler will run inference:
if new_index is None:
new_index = pd.date_range(
start=tm,
periods=signals_df.shape[0],
freq=f'{frequency}min'
)
signals_df.index = new_index
signals_df.index.name = 'Timestamp'
signals_df = signals_df.reset_index()
# Export this file in CSV format:
component_fname = os.path.join(root_dir, 'inference-data', 'input', f'{component}_{current_timestamp}.csv')
signals_df.to_csv(component_fname, index=None)
# Upload this file to S3:
upload_file_to_s3(
component_fname,
bucket,
f'{prefix}inference-data/input/{component}_{current_timestamp}.csv'
)
start = start + timedelta(minutes=+frequency)