in utilities/feature_store_helper.py [0:0]
def _row_level_time_travel(self, events_df: pd.DataFrame, fg_name: str,
events_table_name: str, events_timestamp_col: str,
features: List[str], fg_table_name: str, verbose: bool=False) -> pd.DataFrame:
_fg_desc = self.describe_feature_group(fg_name)
if verbose:
print(f'** in _row_level_time_travel, fg name: {fg_name}, desc: {_fg_desc}')
_id_feature_name = _fg_desc['RecordIdentifierFeatureName']
_time_feature_name = _fg_desc['EventTimeFeatureName']
_feat_defs = _fg_desc['FeatureDefinitions']
_other_cols = self._get_other_cols(events_df, events_timestamp_col, _fg_desc, features)
if verbose:
print(f'fg: {fg_name}, features: {features}, _other_cols: {_other_cols}')
_f_cols = self._qualify_cols('f', _other_cols)
_t_cols = self._qualify_cols('t', _other_cols)
_sub_query = f'select f.{_id_feature_name}, {_f_cols}, ' +\
f'{events_timestamp_col}, {_time_feature_name}, e.{TEMP_TIME_TRAVEL_ID_NAME}, is_deleted, write_time, row_number() ' +\
f'over (partition by f.{_id_feature_name}, {events_timestamp_col} ' +\
f'order by {_time_feature_name} desc, write_time desc) as row_number ' +\
f'from "{fg_table_name}" f, "{events_table_name}" e ' +\
f'where {_time_feature_name} < {events_timestamp_col} ' +\
f'and f.{_id_feature_name} = e.{_id_feature_name}'
_travel_string = f'with temp as (select {_id_feature_name}, {_other_cols}, {events_timestamp_col},' +\
f' {_time_feature_name}, write_time ' +\
f'from ({_sub_query}) ' +\
'where row_number = 1 and NOT is_deleted ' +\
f'order by {_id_feature_name} desc, {events_timestamp_col} desc, {_time_feature_name} desc) ' +\
f'select e.{events_timestamp_col}, e.{_id_feature_name}, {_t_cols}, e.{TEMP_TIME_TRAVEL_ID_NAME} ' +\
f'from "{events_table_name}" e left outer join temp t ' +\
f'on e.{events_timestamp_col} = t.{events_timestamp_col} and ' +\
f'e.{_id_feature_name} = t.{_id_feature_name} ' +\
f'order by e.{TEMP_TIME_TRAVEL_ID_NAME} asc'
_database = 'sagemaker_featurestore'
_s3_uri = f's3://{self._default_bucket}/offline-store'
_tmp_uri = f'{_s3_uri}/query_results/'
df = self._run_query(_travel_string, _tmp_uri, _database, verbose=verbose)
if verbose:
print(f'== Completed row-level travel on {fg_table_name}, result shape: {0 if df is None else df.shape}')
return df