def _row_level_time_travel()

in utilities/feature_store_helper.py [0:0]


    def _row_level_time_travel(self, events_df: pd.DataFrame, fg_name: str, 
                               events_table_name: str, events_timestamp_col: str, 
                               features: List[str], fg_table_name: str, verbose: bool=False) -> pd.DataFrame:
        _fg_desc = self.describe_feature_group(fg_name)
        if verbose:
            print(f'** in _row_level_time_travel, fg name: {fg_name}, desc: {_fg_desc}')

        _id_feature_name = _fg_desc['RecordIdentifierFeatureName']
        _time_feature_name = _fg_desc['EventTimeFeatureName']
        _feat_defs = _fg_desc['FeatureDefinitions']

        _other_cols = self._get_other_cols(events_df, events_timestamp_col, _fg_desc, features)
        
        
        if verbose:
            print(f'fg: {fg_name}, features: {features}, _other_cols: {_other_cols}')

        _f_cols = self._qualify_cols('f', _other_cols)
        _t_cols = self._qualify_cols('t', _other_cols)
        
        _sub_query = f'select f.{_id_feature_name}, {_f_cols}, ' +\
                     f'{events_timestamp_col}, {_time_feature_name}, e.{TEMP_TIME_TRAVEL_ID_NAME}, is_deleted, write_time, row_number() ' +\
                     f'over (partition by f.{_id_feature_name}, {events_timestamp_col} ' +\
                     f'order by {_time_feature_name} desc, write_time desc) as row_number ' +\
                     f'from "{fg_table_name}" f, "{events_table_name}" e ' +\
                     f'where {_time_feature_name} < {events_timestamp_col} ' +\
                     f'and f.{_id_feature_name} = e.{_id_feature_name}'

        _travel_string = f'with temp as (select {_id_feature_name}, {_other_cols}, {events_timestamp_col},' +\
                         f' {_time_feature_name}, write_time ' +\
                         f'from ({_sub_query}) ' +\
                         'where row_number = 1 and NOT is_deleted ' +\
                         f'order by {_id_feature_name} desc, {events_timestamp_col} desc, {_time_feature_name} desc) ' +\
                         f'select e.{events_timestamp_col}, e.{_id_feature_name}, {_t_cols}, e.{TEMP_TIME_TRAVEL_ID_NAME} ' +\
                         f'from "{events_table_name}" e left outer join temp t ' +\
                         f'on e.{events_timestamp_col} = t.{events_timestamp_col} and ' +\
                         f'e.{_id_feature_name} = t.{_id_feature_name} ' +\
                         f'order by e.{TEMP_TIME_TRAVEL_ID_NAME} asc'

        _database = 'sagemaker_featurestore'
        _s3_uri = f's3://{self._default_bucket}/offline-store'
        _tmp_uri = f'{_s3_uri}/query_results/'

        df = self._run_query(_travel_string, _tmp_uri, _database, verbose=verbose)
        if verbose:
            print(f'== Completed row-level travel on {fg_table_name}, result shape: {0 if df is None else df.shape}')
        return df