in utilities/feature_store_helper.py [0:0]
def get_features(self,
events_df: pd.DataFrame,
timestamp_col: str,
features: List[str],
parallel: bool=True,
verbose: bool=False) -> pd.DataFrame:
"""Performs row-level time travel, returning a point-in-time accurate dataset.
This convenience method lets the caller specify a "feature set" to be retrieved, with feature values
that are free from feature leakage. The events_df specifies a set of event timestamps and corresponding
record identifiers to drive the row-level time travel. The features parameter identifies the feature set
to retrieve. It is an ordered list of fully-qualified feature names with a feature group name as well
as a feature name. The feature name can be a wildcard to indicate that all features from that feature
group should be returned. Since multiple feature groups may be involved, the events dataframe must
have a column for each unique identifier name across the feature set. That same identifier value will
be used for each feature group with a matching record identifier feature name. For example, 'customer_id'
may be the identifier used for a 'customer' feature group and a 'customer-demographics' feature group.
The 'customer_id' lookup identifier would be a column in the events_df dataframe.
Depending on the requested timestamp and the event times of the feature values found, the row-level time
travel will return null values for feature values that would not yet have been available. For feature values
that are available, the time travel will return the most recent value, without allowing any values that are
newer than the requested timestamp.
Args:
fg_name (str): Name of the feature group from which to retrieve the records.
events_df (pd.DataFrame): Dataframe with an event timestamp and one or more identifier columns.
timestamp_col (str): Name used in the events_df dataframe to indicate the desired event timestamp.
features (List[str]): List of named features to retrieve. Features are fully-qualified as 'fg-name:feature-name',
or 'fg-name:*' for all features.
parallel (bool): Whether to perform subsets of the time travel in parallel. This saves time, but there are cases where parallel execution will fail.
verbose (bool): Whether to print out details of the execution along the way.
Returns:
pd.DataFrame: Dataframe containing the event time stamp, the record identifiers, followed by the features
requested in the feature set
"""
# Copy the entities dataframe to S3 as a CSV file with no header, making it suitable for
# an Athena temporary table.
_base_tmp_prefix = 'fs-query-input'
_tmp_prefix = int(f'{time.time():<19}'.replace(' ', '0')[:18].replace('.', ''))
_obj_name = f'{_base_tmp_prefix}/{_tmp_prefix}/{_tmp_prefix}.csv'
_csv_uri = f's3://{self._default_bucket}/{_obj_name}'
if verbose:
print(f'\nUploading events as a csv file to be used as a temp table...\n{_csv_uri}')
events_df[TEMP_TIME_TRAVEL_ID_NAME] = np.arange(len(events_df))
events_df.to_csv(_csv_uri, index=False, header=None)
if verbose:
print('...upload completed')
_csv_location = f's3://{self._default_bucket}/{_base_tmp_prefix}/{_tmp_prefix}/'
_events_table_name = f'tmp_events_{_tmp_prefix}'
# Create a temporary table on top of the entities CSV in S3
self._make_temp_table(_events_table_name, _csv_location,
events_df, verbose=verbose)
# Perform row-level time travel
# first parse out the names of the FG's and features
_features_df = self._feature_list_to_df(features)
# now establish the final column names in the resulting df; take care of lower case
_feature_columns = [c.lower() for c in _features_df['feature_name'].values]
_final_columns = events_df.columns.tolist()
_final_columns.extend(_feature_columns)
# de-duplicate the list, as a feature group wildcard may have pulled in some keys
# that are also in the event dataframe.
_final_columns = list(dict.fromkeys(_final_columns))
if verbose:
print(f' feature columns: {_feature_columns}')
print(f' event df columns: {events_df.columns.tolist()}')
print(f' final columns: {_final_columns}')
_gb = _features_df.groupby('fg_name')
_cumulative_features_df = None
_requests = []
# capture a list of offline row-level time travel requests, one for each feature group
for _g in _gb:
_curr_features = []
fg_name = _g[0]
for _f in _g[1].iterrows():
_curr_features.append(_f[1]['feature_name'])
if verbose:
print(f'fg name: {fg_name}, features: {_curr_features}')
_offlinestore_table, _database, _tmp_uri = self._get_offline_details(fg_name)
_requests.append([events_df, fg_name, _events_table_name, timestamp_col,
_curr_features, _offlinestore_table, verbose])
if verbose:
print(f'\nTime travel requests list:\n{_requests}')
# perform row-level time travel for each feature group in parallel or in series
_return_data = []
if parallel:
_p = Pool(processes=len(_requests))
_return_data = _p.map(FeatureStore._mp_row_level_time_travel_wrapper, _requests)
else:
for _r in _requests:
_curr_return = self._row_level_time_travel_wrapper(_r)
_return_data.append(_curr_return)
# clean up the result columns to eliminate extras, concatenate subsets of columns
# returned from pieces of the time travel. column subsets are returned in the
# user-specified order, and they get appended in the right order. this helps with
# usability of the resulting dataframe, eliminating confusion for the caller.
_which_return = 0
for _curr_results_df in _return_data:
# ensure each set of results is sorted exactly the same way, using the temp ID
# row number column we added to the events dataframe at the start
_curr_results_df.sort_values(by=[TEMP_TIME_TRAVEL_ID_NAME], ascending=True, inplace=True)
if verbose:
if _curr_results_df is None:
print(f'Got back None from row level time travel for {_which_return} query')
else:
print(f'\nGot back these results for query {_which_return}')
print(_curr_results_df.head())
# drop the event time and ID columns, and the tmp ID, just focus on the features
_curr_results_df = _curr_results_df.drop([_curr_results_df.columns[0],
_curr_results_df.columns[1],
TEMP_TIME_TRAVEL_ID_NAME], axis=1)
if _cumulative_features_df is None:
_cumulative_features_df = _curr_results_df.copy()
else:
# now append the remaining current result columns at the end of the cumulative columns
_cumulative_features_df = pd.concat([_cumulative_features_df,
_curr_results_df], axis=1)
if verbose:
print(f'Cumulative results:')
print(_cumulative_features_df.head())
_which_return += 1
# Clean up the temporary Athena table
_drop_df = self._drop_temp_table(_events_table_name, verbose=verbose)
if verbose:
if _drop_df is None:
print('drop_temp returned None')
else:
print(f'drop_temp returned {_drop_df.head()}')
# Clean up the temporary CSV from S3
# TODO: Athena seems to leave a .txt file hanging around as well, should delete that
if verbose:
print(f'deleting the temp csv from s3: {_obj_name}')
_s3 = boto3.resource('s3')
_obj = _s3.Object(self._default_bucket, _obj_name)
_obj.delete()
# drop the tmp ID
events_df = events_df.drop(TEMP_TIME_TRAVEL_ID_NAME, axis=1)
# Ensure that the final dataframe columns and column order matches the request.
_final_df = pd.concat([events_df, _cumulative_features_df], axis=1)
_final_columns.remove(TEMP_TIME_TRAVEL_ID_NAME)
if verbose:
print(f' final df columns before reorder: {_final_df.columns.tolist()}')
print(f' final column names in order: {_final_columns}')
return _final_df[_final_columns]