def generate_replay_data()

in src/lookoutequipment/dataset.py [0:0]


def generate_replay_data(dataset_name, 
                         replay_start_timestamp, 
                         upload_frequency, 
                         replay_days=1, 
                         inference_timezone='UTC'):
    """
    Generates inference input data from the training data to test a scheduler
    that would be configured for a model trained with this dataset. The data
    will be output in an S3 location next to your training data S3 location.

    Parameters:
        dataset_name (string):
            Lookout for Equipment `dataset_name` containing the training data 
            for replaying.

        replay_start_date (string):
            Point in time in the training data from which to begin generating 
            replay data.
            Example: `"2020-10-01 00:00:00"`

        upload_frequency (string):
            How often replay data is uploaded to the S3 bucket for the inference 
            input data. Valid Values are `PT5M`, `PT10M`, `PT15M`, `PT30M`, 
            or `PT1H`.

        replay_days (integer):
            Duration of the replay data in days (default: 1)

        inference_timezone (string):
            Indicates the timezone for the inference replay dataset.
            (default: 'UTC')
            
    Returns:
        (boolean)
            True if no problem detected, otherwise a list of sequences that
            could not be generated (which will trigger a failed scheduler
            execution)
    """
    replay_start_timestamp = datetime.strptime(
        replay_start_timestamp, '%Y-%m-%d %H:%M:%S'
    )
    replay_end_timestamp = datetime.strftime(
        replay_start_timestamp + timedelta(days=replay_days), '%Y-%m-%d %H:%M:%S'
    )
    
    # Wait for an ingestion to be done if one is in progress:
    client = boto3.client('lookoutequipment')
    dataset_description = client.describe_dataset(DatasetName=dataset_name)
    status = dataset_description['Status']
    while status == 'INGESTION_IN_PROGRESS':
        print(f'Ingestion job is still in progress for dataset: {dataset_name}.')
        time.sleep(10)
        dataset_description = client.describe_dataset(DatasetName=dataset_name)
        status = dataset_description['Status']
    
    # Locate the S3 path of the training data from `dataset_name`:
    dataset_config = dataset_description['IngestionInputConfiguration']['S3InputConfiguration']
    bucket = dataset_config['Bucket']
    train_prefix = dataset_config['Prefix']
    train_path = f's3://{bucket}/{train_prefix}'
    
    # Build inference path: we position the inference input data at the same
    # level on S3 than the location of the training data:
    inference_prefix = '/'.join(train_prefix.split('/')[:-2]) + '/inference-data/input'
    INFERENCE_INPUT_DATA = f's3://{bucket}/{inference_prefix}'
    
    upload_freq_dict = {
        'PT5M' : '5T',
        'PT10M': '10T',
        'PT15M': '15T',
        'PT30M': '30T',
        'PT1H' : '1H'
    }
    upload_frequency = upload_freq_dict[upload_frequency]
    
    # Loop through each component in `train_path`:
    component_params = [
        {'component_name': path.split('/')[-2], 'train_path': path} 
        for path in wr.s3.list_objects(train_path)
    ]
    empty_sequences = dict()
    for component_param in component_params:
        component_name = component_param['component_name']
        empty_sequences.update({component_name: []})
        print('Generating data for component:', component_name)
    
        # Load training data for this component:
        component_df = wr.s3.read_csv(component_param['train_path'], 
                                      parse_dates=True, 
                                      index_col='Timestamp')
    
        # subsample replay data from train data
        replay_df = component_df.loc[replay_start_timestamp:replay_end_timestamp, :]
    
        # get inference start time in UTC
        current_timestamp = datetime.strftime(
            datetime.now() + timedelta(minutes=1),'%Y-%m-%d %H:%M:%S'
        )
        inference_start_timestamp_utc = str(pd.to_datetime(current_timestamp).ceil(upload_frequency))
        
        # generate new time index for inference replay data
        time_delta = pd.to_datetime(inference_start_timestamp_utc) - pd.to_datetime(replay_start_timestamp)
        replay_df.index = replay_df.index.to_series() + time_delta
        replay_df.index.name = 'Timestamp'
        ts_splits = replay_df.resample(upload_frequency).ffill().index.tolist()
    
        # loop through each time slip
        for split in zip(ts_splits[:-1], ts_splits[1:]):
            # subselect split data from replay data
            split_start = split[0]
            split_end = split[1]
            split_df = replay_df.loc[split_start:split_end,:].reset_index()
            
            # convert split_start from UTC to `inference_timezone`
            split_start_tz = split_start.tz_localize('UTC').tz_convert(inference_timezone)
            split_start_str = split_start_tz.strftime('%Y%m%d%H%M%S')
            f_name = f'{component_name}_{split_start_str}.csv'
    
            # Save current sequence data to inference path:
            if split_df.shape[0] > 0:
                wr.s3.to_csv(split_df, 
                             f'{INFERENCE_INPUT_DATA}/{f_name}', 
                             index=None)
    
            # Otherwise, add the missing sequence to an error list:
            else:
                if empty_sequences[component_name] is None:
                    empty_sequences.update({component_name: [f_name]})
                else:
                    empty_sequences.update({
                        component_name: empty_sequences[component_name].append(f_name)
                    })
                    
    if empty_sequences is None:
        return True
    else:
        return empty_sequences