In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# FraudFinder - Feature Engineering (streaming)

<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/raw/main/03_feature_engineering_streaming.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/03_feature_engineering_streaming.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Open in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/03_feature_engineering_streaming.ipynb">
        <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

[FraudFinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the FraudFinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.

### Objective

As you engineer features for model training, it's important to consider how the features are computed when making predictions with new data. For online predictions, you may have features that can be pre-computed via _batch feature engineering_. You may also features that need to be computed on-the-fly via _streaming-based feature engineering_. For these Fraudfinder labs, for computing features based on the last n _days_, you will use _batch_ feature engineering in BigQuery; for computing features based on the last n _minutes_, you will use _streaming-based_ feature engineering using Dataflow.

In order to calculate very recent customer and terminal activity (i.e. within the last hour), computation has to be done on real-time streaming data, rather than via batch-based feature engineering. This notebook shows a step-by-step guide to create real-time data pipelines to build features. You will learn to:

- Create features, using window and aggreation functions in an Apache Beam pipeline
- Deploy the Apache Beam pipeline to Dataflow
- Ingest engineered features from Dataflow into Vertex AI Feature Store

This lab uses the following Google Cloud services and resources:

- [Pub/Sub](https://cloud.google.com/pubsub/)
- [DataFlow](https://cloud.google.com/dataflow/)
- [Vertex AI](https://cloud.google.com/vertex-ai/)

Step performed in this notebook:

- calculate customer spending features (last 15-mins, 30-mins, and 60-mins)
- calculate terminal activity features (last 15-mins, 30-mins, and 60-mins)

by pulling the streaming data from a Pub/Sub topic using the Pub/Sub subscription that we created in `00_environment_setup.ipynb` and ingesting the streaming features directly into Vertex AI Feature Store using Dataflow. 

![image](./misc/images/streaming-architecture.png)

### Load configuration settings from the setup notebook

Set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb` notebook.

In [None]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
BUCKET_NAME = f"{PROJECT_ID}-fraudfinder"
config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py
print(config.n)
exec(config.n)

### Create folder

In favour of clean folder structure, we will create a separate folder and place all the files that we will produce there.

In [None]:
FOLDER = "./beam_pipeline"
PYTHON_SCRIPT = f"{FOLDER}/main.py"
REQUIREMENTS_FILE = f"{FOLDER}/requirements.txt"

# Create new folder for pipeline files
!rm -rf {FOLDER} || True
!mkdir {FOLDER}

## Before we begin

As deploying Apache Beam pipelines to Dataflow works better if we submit the job from a Python script, we will be writting the code into a python script instead of running directly on the notebook. 

In the next cells, we write the cell contents to a Python script `main.py`. We are NOT running the code direcly and an additional invocation is required. The notebook is done this way for eaiser demonstration purposes.


### Write import statements

Here we write the code to import all the required libraries to the external python script

In [None]:
%%writefile {PYTHON_SCRIPT}

import json
import logging
import time

from typing import Tuple, Any, List

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import CountCombineFn, MeanCombineFn
    
from google.cloud import aiplatform
from google.cloud import aiplatform_v1beta1

### Defining an auxiliary magic function

The magic function `writefile` from Jupyter Notebook can only write the cell as is and could not unpack Python variables. Hence, we need to create an auxiliary magic function that can unpack Python variables and write them to a file.

In [None]:
from IPython.core.magic import register_line_cell_magic

@register_line_cell_magic
def writetemplate(line, cell):
    with open(line, "a") as f:
        f.write(cell.format(**globals()))

### Write the variable values

Here we write the variable values to the external python script using the new magic function

In [None]:
# Adding additional variables to project_variables
project_variables = "\n".join(config[1:-1])
project_variables += f'\nPROJECT_ID = "{PROJECT}"'
project_variables += f'\nBUCKET_NAME = "{BUCKET_NAME}"'
project_variables += f'\nREQUIREMENTS_FILE = "{REQUIREMENTS_FILE}"'

In [None]:
%%writetemplate {PYTHON_SCRIPT}

# Project variables
{project_variables}

### Write constant variables

Here we write constant variables to the external python script

In [None]:
%%writefile -a {PYTHON_SCRIPT}

# Pub/Sub variables
SUBSCRIPTION_NAME = "ff-tx-for-feat-eng-sub"
SUBSCRIPTION_PATH = f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_NAME}"

# Dataflow variables
FIFTEEN_MIN_IN_SECS = 15 * 60
THIRTY_MIN_IN_SECS = 30 * 60
WINDOW_SIZE = 60 * 60 # 1 hour in secs
WINDOW_PERIOD = 1 * 60  # 1 min in secs

### Defining auxiliary functions and classes

Here we define auxiliary functions and classes that will be used in building our real-time feature engineering and ingestion pipeline.

In [None]:
%%writefile -a {PYTHON_SCRIPT}

def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:
    """
    Convert a time string to Unix time
    Args:
        time_str: time string
        time_format: time format
    Returns:
        unix_time: Unix time
    """
    import time
    # Converts a time string into Unix time
    time_tuple = time.strptime(time_str, time_format)
    return int(time.mktime(time_tuple))
    
    
class AddAddtionalInfo(beam.DoFn):
    
    # Add composite key and difference from window end timestamp to element
    def process(self, element: Tuple, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam) -> Tuple:
        """
        Add composite key and difference from window end timestamp to element
        Args:
            element: element to process
            timestamp: timestamp of element
            window: window of element
        Returns:
            element: element with composite key and difference from window end timestamp
        """
        window_end_dt = window.end.to_utc_datetime().strftime("%Y%m%d%H%M%S")
        new_element = {
            'TX_ID': element['TX_ID'],
            'TX_TS': element['TX_TS'],
            'CUSTOMER_ID': element['CUSTOMER_ID'],
            'TERMINAL_ID': element['TERMINAL_ID'],
            'TX_AMOUNT': element['TX_AMOUNT'],
            'CUSTOMER_ID_COMPOSITE_KEY': f"{element['CUSTOMER_ID']}_{window_end_dt}",
            'TERMINAL_ID_COMPOSITE_KEY': f"{element['TERMINAL_ID']}_{window_end_dt}",
            'TS_DIFF': window.end - timestamp
        }
        return (new_element,)


class WriteFeatures(beam.DoFn):
    def __init__(self, resource_name: str):
        self.resource_name = resource_name

    def populate_customer_payload(self, new_records, aggregated) -> List[Any]:
        """
        Prepare payloads for customer related features to be written
        at the Vertex AI Feature store. The values are required to be of FeatureValue type.
        Args:
            new_records: new records to write
            aggregated: aggregated records to write
        Returns:
            payloads: list of payloads to write
        """
        payloads = []
        for row in new_records:
            payload = aiplatform_v1beta1.WriteFeatureValuesPayload()
            payload.entity_id = row.CUSTOMER_ID
            payload.feature_values = {
                "customer_id_nb_tx_15min_window": aiplatform_v1beta1.FeatureValue(
                    int64_value=aggregated.CUSTOMER_ID_NB_TX_15MIN_WINDOW),
                "customer_id_nb_tx_30min_window": aiplatform_v1beta1.FeatureValue(
                    int64_value=aggregated.CUSTOMER_ID_NB_TX_30MIN_WINDOW),
                "customer_id_nb_tx_60min_window": aiplatform_v1beta1.FeatureValue(
                    int64_value=aggregated.CUSTOMER_ID_NB_TX_60MIN_WINDOW),
                "customer_id_avg_amount_15min_window": aiplatform_v1beta1.FeatureValue(
                    double_value=aggregated.CUSTOMER_ID_SUM_AMOUNT_15MIN_WINDOW / aggregated.CUSTOMER_ID_NB_TX_15MIN_WINDOW),
                "customer_id_avg_amount_30min_window": aiplatform_v1beta1.FeatureValue(
                    double_value=aggregated.CUSTOMER_ID_SUM_AMOUNT_30MIN_WINDOW / aggregated.CUSTOMER_ID_NB_TX_30MIN_WINDOW),
                "customer_id_avg_amount_60min_window": aiplatform_v1beta1.FeatureValue(
                    double_value=aggregated.CUSTOMER_ID_AVG_AMOUNT_60MIN_WINDOW),
            }
            payloads.append(payload)
        return payloads

    def populate_terminal_payload(self, new_records, aggregated) -> List[Any]:
        """
        Prepare payloads for terminal related features to be written
        at the Vertex AI Feature store. The values are required to be of FeatureValue type.
        Args:
            new_records: new records to write
            aggregated: aggregated records to write
        Returns:
            payloads: list of payloads to write
        """
        payloads = []
        for row in new_records:
            payload = aiplatform_v1beta1.WriteFeatureValuesPayload()
            payload.entity_id = row.TERMINAL_ID
            payload.feature_values = {
                "terminal_id_nb_tx_15min_window": aiplatform_v1beta1.FeatureValue(
                    int64_value=aggregated.TERMINAL_ID_NB_TX_15MIN_WINDOW),
                "terminal_id_nb_tx_30min_window": aiplatform_v1beta1.FeatureValue(
                    int64_value=aggregated.TERMINAL_ID_NB_TX_30MIN_WINDOW),
                "terminal_id_nb_tx_60min_window": aiplatform_v1beta1.FeatureValue(
                    int64_value=aggregated.TERMINAL_ID_NB_TX_60MIN_WINDOW),
                "terminal_id_avg_amount_15min_window": aiplatform_v1beta1.FeatureValue(
                    double_value=aggregated.TERMINAL_ID_SUM_AMOUNT_15MIN_WINDOW / aggregated.TERMINAL_ID_NB_TX_15MIN_WINDOW),
                "terminal_id_avg_amount_30min_window": aiplatform_v1beta1.FeatureValue(
                    double_value=aggregated.TERMINAL_ID_SUM_AMOUNT_30MIN_WINDOW / aggregated.TERMINAL_ID_NB_TX_30MIN_WINDOW),
                "terminal_id_avg_amount_60min_window": aiplatform_v1beta1.FeatureValue(
                    double_value=aggregated.TERMINAL_ID_AVG_AMOUNT_60MIN_WINDOW),
            }
            payloads.append(payload)
        return payloads
    
    #TODO: Add response to docstring
    def send_request_to_feature_store(self, resource_name: str, payloads: List[Any]):
        """
        Sends a write request to vertex ai feature store by preparing 
        a write feature value request using provided resource name and payloads, and 
        by making use of a feature store online serving service client
        Args:
            resource_name: resource name of the feature store
            payloads: list of payloads to write
        """
        # Prepare request
        request = aiplatform_v1beta1.WriteFeatureValuesRequest(
            entity_type=resource_name,
            payloads=payloads,
        )

        # Create feature store online serving service client
        client_options = {
            "api_endpoint": "us-central1-aiplatform.googleapis.com"
        }
        v1beta1_client = aiplatform_v1beta1.FeaturestoreOnlineServingServiceClient(client_options=client_options)

        # Send the request
        response = v1beta1_client.write_feature_values(request=request)
        return response

    def process(self, element: Tuple) -> Tuple:
        """
        Select entity using resource_name variable and 
        write the respective features to Vertex AI Feature store
        Args:
            element: tuple of new records and aggregated records
        Returns:
            element: tuple of new records and aggregated records
        """
        new_records = element[1]['new_records']
        aggregated = element[1]['aggregated'][0]

        entity = self.resource_name.split("/")[-1]
        payloads = []
        message = ""

        if entity == "customer":
            payloads = self.populate_customer_payload(new_records, aggregated)
            customer_ids = [x.entity_id for x in payloads]
            message = f"Inserted features for CUSTOMER IDs: {', '.join(customer_ids)}"

        elif entity == "terminal":
            payloads = self.populate_terminal_payload(new_records, aggregated)
            terminal_ids = [x.entity_id for x in payloads]
            message = f"Inserted features for TERMINAL IDs: {', '.join(terminal_ids)}"

        response = self.send_request_to_feature_store(self.resource_name, payloads)
        logging.info(message)

        yield (response,)

### Building the pipeline

Now we are ready to build the pipeline using the defined classes and functions above. Once the pipeline is ready, we will wrap everything into a main function and submit it to the Dataflow.

In [None]:
%%writefile -a {PYTHON_SCRIPT}

def main():
    # Initialize Vertex AI client
    aiplatform.init(
        project=PROJECT_ID,
        location=REGION
    )

    # Get entity types for customer and terminal
    fs = aiplatform.featurestore.Featurestore(
        featurestore_name=FEATURESTORE_ID
    )
    customer_entity_type = fs.get_entity_type("customer")
    terminal_entity_type = fs.get_entity_type("terminal")
    
    # Setup pipeline options for deploying to dataflow
    pipeline_options = PipelineOptions(streaming=True, 
                                       save_main_session=True,
                                       runner="DataflowRunner",
                                       project=PROJECT_ID,
                                       region=REGION,
                                       temp_location=f"gs://{BUCKET_NAME}/dataflow/tmp",
                                       requirements_file=REQUIREMENTS_FILE,
                                       max_num_workers=2)
    
    # Build pipeline and transformation steps
    pipeline = beam.Pipeline(options=pipeline_options)
    
    source = (
        pipeline
        | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION_PATH)
        | 'Decode byte array to json dict' >> beam.Map(lambda row: json.loads(row.decode('utf-8')))
    )

    enriched_source = (
        source
        | 'Attach timestamps' >> beam.Map(lambda row: beam.window.TimestampedValue(row, to_unix_time(row['TX_TS'])))
        | 'Create sliding window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_SIZE, WINDOW_PERIOD, offset=WINDOW_SIZE))
        | 'Add window info' >> beam.ParDo(AddAddtionalInfo())
        | 'Convert to namedtuple' >> beam.Map(lambda row: beam.Row(**row))
    )

    new_records = (
        enriched_source
        | 'Filter only new rows' >> beam.Filter(lambda row: row.TS_DIFF <= WINDOW_PERIOD)
    )

    # Build customer features
    new_records_customer_id = (
        new_records
        | 'Assign CUSTOMER_ID_COMPOSITE_KEY as key' >> beam.WithKeys(lambda row: row.CUSTOMER_ID_COMPOSITE_KEY)
    )

    aggregated_customer_id = (
        enriched_source
        | 'Group by customer id composite key column' >> beam.GroupBy(CUSTOMER_ID_COMPOSITE_KEY='CUSTOMER_ID_COMPOSITE_KEY')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= FIFTEEN_MIN_IN_SECS else 0, sum, 'CUSTOMER_ID_NB_TX_15MIN_WINDOW')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= THIRTY_MIN_IN_SECS else 0, sum, 'CUSTOMER_ID_NB_TX_30MIN_WINDOW')
            .aggregate_field('TX_ID', CountCombineFn(), 'CUSTOMER_ID_NB_TX_60MIN_WINDOW')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= FIFTEEN_MIN_IN_SECS else 0, sum,'CUSTOMER_ID_SUM_AMOUNT_15MIN_WINDOW')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= THIRTY_MIN_IN_SECS else 0, sum, 'CUSTOMER_ID_SUM_AMOUNT_30MIN_WINDOW')
            .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'CUSTOMER_ID_AVG_AMOUNT_60MIN_WINDOW')
        | 'Assign key for aggregated results (customer id)' >> beam.WithKeys(lambda row: row.CUSTOMER_ID_COMPOSITE_KEY)
    )

    merged_customer_id = (
        ({
            'new_records': new_records_customer_id, 
            'aggregated': aggregated_customer_id
        })
        | 'Merge pcollections (customer id)' >> beam.CoGroupByKey()
        | 'Filter empty rows (customer id)' >> beam.Filter(lambda row: len(row[1]['new_records']) > 0)
        | 'Write to feature store (customer id)' >> beam.ParDo(WriteFeatures(customer_entity_type.resource_name))
    )

    # Build terminal features
    new_records_terminal_id = (
        new_records
        | 'Assign TERMINAL_ID_COMPOSITE_KEY as key' >> beam.WithKeys(lambda row: row.TERMINAL_ID_COMPOSITE_KEY)
    )

    aggregated_terminal_id = (
        enriched_source
        | 'Group by terminal id composite key column' >> beam.GroupBy(TERMINAL_ID_COMPOSITE_KEY='TERMINAL_ID_COMPOSITE_KEY')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= FIFTEEN_MIN_IN_SECS else 0, sum, 'TERMINAL_ID_NB_TX_15MIN_WINDOW')
            .aggregate_field(lambda row: 1 if row.TS_DIFF <= THIRTY_MIN_IN_SECS else 0, sum, 'TERMINAL_ID_NB_TX_30MIN_WINDOW')
            .aggregate_field('TX_ID', CountCombineFn(), 'TERMINAL_ID_NB_TX_60MIN_WINDOW')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= FIFTEEN_MIN_IN_SECS else 0, sum, 'TERMINAL_ID_SUM_AMOUNT_15MIN_WINDOW')
            .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= THIRTY_MIN_IN_SECS else 0, sum, 'TERMINAL_ID_SUM_AMOUNT_30MIN_WINDOW')
            .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'TERMINAL_ID_AVG_AMOUNT_60MIN_WINDOW')
        | 'Assign key for aggregated results (terminal id)' >> beam.WithKeys(lambda row: row.TERMINAL_ID_COMPOSITE_KEY)
    )

    merged_terminal_id = (
        ({
            'new_records': new_records_terminal_id, 
            'aggregated': aggregated_terminal_id
        })
        | 'Merge pcollections (terminal id)' >> beam.CoGroupByKey()
        | 'Filter empty rows (terminal id)' >> beam.Filter(lambda row: len(row[1]['new_records']) > 0)
        | 'Write to feature store (terminal id)' >> beam.ParDo(WriteFeatures(terminal_entity_type.resource_name))
    )
    
    # Run the pipeline (async)
    pipeline.run()

    
if __name__ == "__main__":
    main()

### Creating `requirement.txt` for Dataflow Workers

As we are using `google-cloud-aiplatform` and `google-apitools` package, we need to pass the `requirement.txt` to the Dataflow Workers so that the workers will install the packages in their respective environment before running the job.

In [None]:
%%writefile {REQUIREMENTS_FILE}

google-cloud-aiplatform<=1.36.1
google-apitools==0.5.32

### Deploying the pipeline

Now we are ready to deploy this pipeline to Dataflow.

In [None]:
!python3 {PYTHON_SCRIPT}

Congrats! Now the job should be running on <a href="https://console.cloud.google.com/dataflow/jobs">Dataflow<a>!
    
If everything went well, you should see this Dataflow pipeline diagram on Dataflow Console.
    
![image](./misc/images/streaming-dataflow-pipeline.png)

### Verifying the ingestion pipeline

Once the dataflow pipeline is up and running, you should be able to see which feature entities are being ingested via the `Step Log` of respective `Write to feature store` step. 

To verify whether the data ingestion job is doing what it is supposed to be doing, copy a list of entity ids from the logs and use the following code.

In [None]:
from google.cloud import aiplatform as vertex_ai

vertex_ai.init(
    project=PROJECT_ID,
    location=REGION
)

fs = vertex_ai.featurestore.Featurestore(featurestore_name=FEATURESTORE_ID)

In [None]:
customer_entity_ids = ["5830444124423549", "5469689693941771", "1361459972478769"]  # copy customer ids from Write to feature store (customer id) step's log

customer_entity_type = fs.get_entity_type("customer")
customer_aggregated_features = customer_entity_type.read(
    entity_ids=customer_entity_ids
)

customer_aggregated_features

In [None]:
terminal_entity_ids = ["97802258", "48770968", "98391079"] # copy terminal ids from Write to feature store (terminal id) step's log

terminal_entity_type = fs.get_entity_type("terminal")
terminal_aggregated_features = terminal_entity_type.read(
    entity_ids=terminal_entity_ids
)

terminal_aggregated_features

### END

If you want to explore BigQuery ML pipeline, you can go here: `bqml/04_model_training_and_prediction.ipynb`

Or else, if you want to explore Vertex ML pipeline, go here: `vertex_ai/04_experimentation.ipynb`