03_feature_engineering_streaming.ipynb (792 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "ur8xi4C7S06n" }, "outputs": [], "source": [ "# Copyright 2023 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "JAPoU8Sm5E6e" }, "source": [ "# FraudFinder - Feature Engineering (streaming)\n", "\n", "<table align=\"left\">\n", " <td>\n", " <a href=\"https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/raw/main/03_feature_engineering_streaming.ipynb\">\n", " <img src=\"https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg\" alt=\"Google Cloud Notebooks\">Open in Cloud Notebook\n", " </a>\n", " </td> \n", " <td>\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/03_feature_engineering_streaming.ipynb\">\n", " <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Colab logo\"> Open in Colab\n", " </a>\n", " </td>\n", " <td>\n", " <a href=\"https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/03_feature_engineering_streaming.ipynb\">\n", " <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\">\n", " View on GitHub\n", " </a>\n", " </td>\n", "</table>" ] }, { "cell_type": "markdown", "metadata": { "id": "tvgnzT1CKxrO" }, "source": [ "## Overview\n", "\n", "[FraudFinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the FraudFinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model." ] }, { "cell_type": "markdown", "metadata": { "id": "b3f6a603e433" }, "source": [ "### Objective\n", "\n", "As you engineer features for model training, it's important to consider how the features are computed when making predictions with new data. For online predictions, you may have features that can be pre-computed via _batch feature engineering_. You may also features that need to be computed on-the-fly via _streaming-based feature engineering_. For these Fraudfinder labs, for computing features based on the last n _days_, you will use _batch_ feature engineering in BigQuery; for computing features based on the last n _minutes_, you will use _streaming-based_ feature engineering using Dataflow.\n", "\n", "In order to calculate very recent customer and terminal activity (i.e. within the last hour), computation has to be done on real-time streaming data, rather than via batch-based feature engineering. This notebook shows a step-by-step guide to create real-time data pipelines to build features. You will learn to:\n", "\n", "- Create features, using window and aggreation functions in an Apache Beam pipeline\n", "- Deploy the Apache Beam pipeline to Dataflow\n", "- Ingest engineered features from Dataflow into Vertex AI Feature Store\n", "\n", "This lab uses the following Google Cloud services and resources:\n", "\n", "- [Pub/Sub](https://cloud.google.com/pubsub/)\n", "- [DataFlow](https://cloud.google.com/dataflow/)\n", "- [Vertex AI](https://cloud.google.com/vertex-ai/)\n", "\n", "Step performed in this notebook:\n", "\n", "- calculate customer spending features (last 15-mins, 30-mins, and 60-mins)\n", "- calculate terminal activity features (last 15-mins, 30-mins, and 60-mins)\n", "\n", "by pulling the streaming data from a Pub/Sub topic using the Pub/Sub subscription that we created in `00_environment_setup.ipynb` and ingesting the streaming features directly into Vertex AI Feature Store using Dataflow. \n", "\n", "![image](./misc/images/streaming-architecture.png)" ] }, { "cell_type": "markdown", "metadata": { "id": "535c9218e18d" }, "source": [ "### Load configuration settings from the setup notebook\n", "\n", "Set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb` notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "94fe7f01ef5a", "tags": [] }, "outputs": [], "source": [ "GCP_PROJECTS = !gcloud config get-value project\n", "PROJECT_ID = GCP_PROJECTS[0]\n", "BUCKET_NAME = f\"{PROJECT_ID}-fraudfinder\"\n", "config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py\n", "print(config.n)\n", "exec(config.n)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create folder\n", "\n", "In favour of clean folder structure, we will create a separate folder and place all the files that we will produce there." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "FOLDER = \"./beam_pipeline\"\n", "PYTHON_SCRIPT = f\"{FOLDER}/main.py\"\n", "REQUIREMENTS_FILE = f\"{FOLDER}/requirements.txt\"\n", "\n", "# Create new folder for pipeline files\n", "!rm -rf {FOLDER} || True\n", "!mkdir {FOLDER}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Before we begin\n", "\n", "As deploying Apache Beam pipelines to Dataflow works better if we submit the job from a Python script, we will be writting the code into a python script instead of running directly on the notebook. \n", "\n", "In the next cells, we write the cell contents to a Python script `main.py`. We are NOT running the code direcly and an additional invocation is required. The notebook is done this way for eaiser demonstration purposes.\n" ] }, { "cell_type": "markdown", "metadata": { "id": "XoEqT2Y4DJmf", "tags": [] }, "source": [ "### Write import statements\n", "\n", "Here we write the code to import all the required libraries to the external python script" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "a725b800a52b", "tags": [] }, "outputs": [], "source": [ "%%writefile {PYTHON_SCRIPT}\n", "\n", "import json\n", "import logging\n", "import time\n", "\n", "from typing import Tuple, Any, List\n", "\n", "import apache_beam as beam\n", "\n", "from apache_beam.options.pipeline_options import PipelineOptions\n", "from apache_beam.transforms.combiners import CountCombineFn, MeanCombineFn\n", " \n", "from google.cloud import aiplatform\n", "from google.cloud import aiplatform_v1beta1" ] }, { "cell_type": "markdown", "metadata": { "id": "5bd54c684051" }, "source": [ "### Defining an auxiliary magic function\n", "\n", "The magic function `writefile` from Jupyter Notebook can only write the cell as is and could not unpack Python variables. Hence, we need to create an auxiliary magic function that can unpack Python variables and write them to a file." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ad8dba046d05", "tags": [] }, "outputs": [], "source": [ "from IPython.core.magic import register_line_cell_magic\n", "\n", "@register_line_cell_magic\n", "def writetemplate(line, cell):\n", " with open(line, \"a\") as f:\n", " f.write(cell.format(**globals()))" ] }, { "cell_type": "markdown", "metadata": { "id": "a74807b98c34" }, "source": [ "### Write the variable values\n", "\n", "Here we write the variable values to the external python script using the new magic function" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "3a4f9df411aa", "tags": [] }, "outputs": [], "source": [ "# Adding additional variables to project_variables\n", "project_variables = \"\\n\".join(config[1:-1])\n", "project_variables += f'\\nPROJECT_ID = \"{PROJECT}\"'\n", "project_variables += f'\\nBUCKET_NAME = \"{BUCKET_NAME}\"'\n", "project_variables += f'\\nREQUIREMENTS_FILE = \"{REQUIREMENTS_FILE}\"'" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5055cc7ce79e", "tags": [] }, "outputs": [], "source": [ "%%writetemplate {PYTHON_SCRIPT}\n", "\n", "# Project variables\n", "{project_variables}" ] }, { "cell_type": "markdown", "metadata": { "id": "c82e89dd7f2c" }, "source": [ "### Write constant variables\n", "\n", "Here we write constant variables to the external python script" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "a437ab07d805", "tags": [] }, "outputs": [], "source": [ "%%writefile -a {PYTHON_SCRIPT}\n", "\n", "# Pub/Sub variables\n", "SUBSCRIPTION_NAME = \"ff-tx-for-feat-eng-sub\"\n", "SUBSCRIPTION_PATH = f\"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_NAME}\"\n", "\n", "# Dataflow variables\n", "FIFTEEN_MIN_IN_SECS = 15 * 60\n", "THIRTY_MIN_IN_SECS = 30 * 60\n", "WINDOW_SIZE = 60 * 60 # 1 hour in secs\n", "WINDOW_PERIOD = 1 * 60 # 1 min in secs" ] }, { "cell_type": "markdown", "metadata": { "id": "11185856b722" }, "source": [ "### Defining auxiliary functions and classes\n", "\n", "Here we define auxiliary functions and classes that will be used in building our real-time feature engineering and ingestion pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dabd10a0a18e", "tags": [] }, "outputs": [], "source": [ "%%writefile -a {PYTHON_SCRIPT}\n", "\n", "def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:\n", " \"\"\"\n", " Convert a time string to Unix time\n", " Args:\n", " time_str: time string\n", " time_format: time format\n", " Returns:\n", " unix_time: Unix time\n", " \"\"\"\n", " import time\n", " # Converts a time string into Unix time\n", " time_tuple = time.strptime(time_str, time_format)\n", " return int(time.mktime(time_tuple))\n", " \n", " \n", "class AddAddtionalInfo(beam.DoFn):\n", " \n", " # Add composite key and difference from window end timestamp to element\n", " def process(self, element: Tuple, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam) -> Tuple:\n", " \"\"\"\n", " Add composite key and difference from window end timestamp to element\n", " Args:\n", " element: element to process\n", " timestamp: timestamp of element\n", " window: window of element\n", " Returns:\n", " element: element with composite key and difference from window end timestamp\n", " \"\"\"\n", " window_end_dt = window.end.to_utc_datetime().strftime(\"%Y%m%d%H%M%S\")\n", " new_element = {\n", " 'TX_ID': element['TX_ID'],\n", " 'TX_TS': element['TX_TS'],\n", " 'CUSTOMER_ID': element['CUSTOMER_ID'],\n", " 'TERMINAL_ID': element['TERMINAL_ID'],\n", " 'TX_AMOUNT': element['TX_AMOUNT'],\n", " 'CUSTOMER_ID_COMPOSITE_KEY': f\"{element['CUSTOMER_ID']}_{window_end_dt}\",\n", " 'TERMINAL_ID_COMPOSITE_KEY': f\"{element['TERMINAL_ID']}_{window_end_dt}\",\n", " 'TS_DIFF': window.end - timestamp\n", " }\n", " return (new_element,)\n", "\n", "\n", "class WriteFeatures(beam.DoFn):\n", " def __init__(self, resource_name: str):\n", " self.resource_name = resource_name\n", "\n", " def populate_customer_payload(self, new_records, aggregated) -> List[Any]:\n", " \"\"\"\n", " Prepare payloads for customer related features to be written\n", " at the Vertex AI Feature store. The values are required to be of FeatureValue type.\n", " Args:\n", " new_records: new records to write\n", " aggregated: aggregated records to write\n", " Returns:\n", " payloads: list of payloads to write\n", " \"\"\"\n", " payloads = []\n", " for row in new_records:\n", " payload = aiplatform_v1beta1.WriteFeatureValuesPayload()\n", " payload.entity_id = row.CUSTOMER_ID\n", " payload.feature_values = {\n", " \"customer_id_nb_tx_15min_window\": aiplatform_v1beta1.FeatureValue(\n", " int64_value=aggregated.CUSTOMER_ID_NB_TX_15MIN_WINDOW),\n", " \"customer_id_nb_tx_30min_window\": aiplatform_v1beta1.FeatureValue(\n", " int64_value=aggregated.CUSTOMER_ID_NB_TX_30MIN_WINDOW),\n", " \"customer_id_nb_tx_60min_window\": aiplatform_v1beta1.FeatureValue(\n", " int64_value=aggregated.CUSTOMER_ID_NB_TX_60MIN_WINDOW),\n", " \"customer_id_avg_amount_15min_window\": aiplatform_v1beta1.FeatureValue(\n", " double_value=aggregated.CUSTOMER_ID_SUM_AMOUNT_15MIN_WINDOW / aggregated.CUSTOMER_ID_NB_TX_15MIN_WINDOW),\n", " \"customer_id_avg_amount_30min_window\": aiplatform_v1beta1.FeatureValue(\n", " double_value=aggregated.CUSTOMER_ID_SUM_AMOUNT_30MIN_WINDOW / aggregated.CUSTOMER_ID_NB_TX_30MIN_WINDOW),\n", " \"customer_id_avg_amount_60min_window\": aiplatform_v1beta1.FeatureValue(\n", " double_value=aggregated.CUSTOMER_ID_AVG_AMOUNT_60MIN_WINDOW),\n", " }\n", " payloads.append(payload)\n", " return payloads\n", "\n", " def populate_terminal_payload(self, new_records, aggregated) -> List[Any]:\n", " \"\"\"\n", " Prepare payloads for terminal related features to be written\n", " at the Vertex AI Feature store. The values are required to be of FeatureValue type.\n", " Args:\n", " new_records: new records to write\n", " aggregated: aggregated records to write\n", " Returns:\n", " payloads: list of payloads to write\n", " \"\"\"\n", " payloads = []\n", " for row in new_records:\n", " payload = aiplatform_v1beta1.WriteFeatureValuesPayload()\n", " payload.entity_id = row.TERMINAL_ID\n", " payload.feature_values = {\n", " \"terminal_id_nb_tx_15min_window\": aiplatform_v1beta1.FeatureValue(\n", " int64_value=aggregated.TERMINAL_ID_NB_TX_15MIN_WINDOW),\n", " \"terminal_id_nb_tx_30min_window\": aiplatform_v1beta1.FeatureValue(\n", " int64_value=aggregated.TERMINAL_ID_NB_TX_30MIN_WINDOW),\n", " \"terminal_id_nb_tx_60min_window\": aiplatform_v1beta1.FeatureValue(\n", " int64_value=aggregated.TERMINAL_ID_NB_TX_60MIN_WINDOW),\n", " \"terminal_id_avg_amount_15min_window\": aiplatform_v1beta1.FeatureValue(\n", " double_value=aggregated.TERMINAL_ID_SUM_AMOUNT_15MIN_WINDOW / aggregated.TERMINAL_ID_NB_TX_15MIN_WINDOW),\n", " \"terminal_id_avg_amount_30min_window\": aiplatform_v1beta1.FeatureValue(\n", " double_value=aggregated.TERMINAL_ID_SUM_AMOUNT_30MIN_WINDOW / aggregated.TERMINAL_ID_NB_TX_30MIN_WINDOW),\n", " \"terminal_id_avg_amount_60min_window\": aiplatform_v1beta1.FeatureValue(\n", " double_value=aggregated.TERMINAL_ID_AVG_AMOUNT_60MIN_WINDOW),\n", " }\n", " payloads.append(payload)\n", " return payloads\n", " \n", " #TODO: Add response to docstring\n", " def send_request_to_feature_store(self, resource_name: str, payloads: List[Any]):\n", " \"\"\"\n", " Sends a write request to vertex ai feature store by preparing \n", " a write feature value request using provided resource name and payloads, and \n", " by making use of a feature store online serving service client\n", " Args:\n", " resource_name: resource name of the feature store\n", " payloads: list of payloads to write\n", " \"\"\"\n", " # Prepare request\n", " request = aiplatform_v1beta1.WriteFeatureValuesRequest(\n", " entity_type=resource_name,\n", " payloads=payloads,\n", " )\n", "\n", " # Create feature store online serving service client\n", " client_options = {\n", " \"api_endpoint\": \"us-central1-aiplatform.googleapis.com\"\n", " }\n", " v1beta1_client = aiplatform_v1beta1.FeaturestoreOnlineServingServiceClient(client_options=client_options)\n", "\n", " # Send the request\n", " response = v1beta1_client.write_feature_values(request=request)\n", " return response\n", "\n", " def process(self, element: Tuple) -> Tuple:\n", " \"\"\"\n", " Select entity using resource_name variable and \n", " write the respective features to Vertex AI Feature store\n", " Args:\n", " element: tuple of new records and aggregated records\n", " Returns:\n", " element: tuple of new records and aggregated records\n", " \"\"\"\n", " new_records = element[1]['new_records']\n", " aggregated = element[1]['aggregated'][0]\n", "\n", " entity = self.resource_name.split(\"/\")[-1]\n", " payloads = []\n", " message = \"\"\n", "\n", " if entity == \"customer\":\n", " payloads = self.populate_customer_payload(new_records, aggregated)\n", " customer_ids = [x.entity_id for x in payloads]\n", " message = f\"Inserted features for CUSTOMER IDs: {', '.join(customer_ids)}\"\n", "\n", " elif entity == \"terminal\":\n", " payloads = self.populate_terminal_payload(new_records, aggregated)\n", " terminal_ids = [x.entity_id for x in payloads]\n", " message = f\"Inserted features for TERMINAL IDs: {', '.join(terminal_ids)}\"\n", "\n", " response = self.send_request_to_feature_store(self.resource_name, payloads)\n", " logging.info(message)\n", "\n", " yield (response,)" ] }, { "cell_type": "markdown", "metadata": { "id": "2ee7d34ce9b1" }, "source": [ "### Building the pipeline\n", "\n", "Now we are ready to build the pipeline using the defined classes and functions above. Once the pipeline is ready, we will wrap everything into a main function and submit it to the Dataflow." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1c77896262f8", "tags": [] }, "outputs": [], "source": [ "%%writefile -a {PYTHON_SCRIPT}\n", "\n", "def main():\n", " # Initialize Vertex AI client\n", " aiplatform.init(\n", " project=PROJECT_ID,\n", " location=REGION\n", " )\n", "\n", " # Get entity types for customer and terminal\n", " fs = aiplatform.featurestore.Featurestore(\n", " featurestore_name=FEATURESTORE_ID\n", " )\n", " customer_entity_type = fs.get_entity_type(\"customer\")\n", " terminal_entity_type = fs.get_entity_type(\"terminal\")\n", " \n", " # Setup pipeline options for deploying to dataflow\n", " pipeline_options = PipelineOptions(streaming=True, \n", " save_main_session=True,\n", " runner=\"DataflowRunner\",\n", " project=PROJECT_ID,\n", " region=REGION,\n", " temp_location=f\"gs://{BUCKET_NAME}/dataflow/tmp\",\n", " requirements_file=REQUIREMENTS_FILE,\n", " max_num_workers=2)\n", " \n", " # Build pipeline and transformation steps\n", " pipeline = beam.Pipeline(options=pipeline_options)\n", " \n", " source = (\n", " pipeline\n", " | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION_PATH)\n", " | 'Decode byte array to json dict' >> beam.Map(lambda row: json.loads(row.decode('utf-8')))\n", " )\n", "\n", " enriched_source = (\n", " source\n", " | 'Attach timestamps' >> beam.Map(lambda row: beam.window.TimestampedValue(row, to_unix_time(row['TX_TS'])))\n", " | 'Create sliding window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_SIZE, WINDOW_PERIOD, offset=WINDOW_SIZE))\n", " | 'Add window info' >> beam.ParDo(AddAddtionalInfo())\n", " | 'Convert to namedtuple' >> beam.Map(lambda row: beam.Row(**row))\n", " )\n", "\n", " new_records = (\n", " enriched_source\n", " | 'Filter only new rows' >> beam.Filter(lambda row: row.TS_DIFF <= WINDOW_PERIOD)\n", " )\n", "\n", " # Build customer features\n", " new_records_customer_id = (\n", " new_records\n", " | 'Assign CUSTOMER_ID_COMPOSITE_KEY as key' >> beam.WithKeys(lambda row: row.CUSTOMER_ID_COMPOSITE_KEY)\n", " )\n", "\n", " aggregated_customer_id = (\n", " enriched_source\n", " | 'Group by customer id composite key column' >> beam.GroupBy(CUSTOMER_ID_COMPOSITE_KEY='CUSTOMER_ID_COMPOSITE_KEY')\n", " .aggregate_field(lambda row: 1 if row.TS_DIFF <= FIFTEEN_MIN_IN_SECS else 0, sum, 'CUSTOMER_ID_NB_TX_15MIN_WINDOW')\n", " .aggregate_field(lambda row: 1 if row.TS_DIFF <= THIRTY_MIN_IN_SECS else 0, sum, 'CUSTOMER_ID_NB_TX_30MIN_WINDOW')\n", " .aggregate_field('TX_ID', CountCombineFn(), 'CUSTOMER_ID_NB_TX_60MIN_WINDOW')\n", " .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= FIFTEEN_MIN_IN_SECS else 0, sum,'CUSTOMER_ID_SUM_AMOUNT_15MIN_WINDOW')\n", " .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= THIRTY_MIN_IN_SECS else 0, sum, 'CUSTOMER_ID_SUM_AMOUNT_30MIN_WINDOW')\n", " .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'CUSTOMER_ID_AVG_AMOUNT_60MIN_WINDOW')\n", " | 'Assign key for aggregated results (customer id)' >> beam.WithKeys(lambda row: row.CUSTOMER_ID_COMPOSITE_KEY)\n", " )\n", "\n", " merged_customer_id = (\n", " ({\n", " 'new_records': new_records_customer_id, \n", " 'aggregated': aggregated_customer_id\n", " })\n", " | 'Merge pcollections (customer id)' >> beam.CoGroupByKey()\n", " | 'Filter empty rows (customer id)' >> beam.Filter(lambda row: len(row[1]['new_records']) > 0)\n", " | 'Write to feature store (customer id)' >> beam.ParDo(WriteFeatures(customer_entity_type.resource_name))\n", " )\n", "\n", " # Build terminal features\n", " new_records_terminal_id = (\n", " new_records\n", " | 'Assign TERMINAL_ID_COMPOSITE_KEY as key' >> beam.WithKeys(lambda row: row.TERMINAL_ID_COMPOSITE_KEY)\n", " )\n", "\n", " aggregated_terminal_id = (\n", " enriched_source\n", " | 'Group by terminal id composite key column' >> beam.GroupBy(TERMINAL_ID_COMPOSITE_KEY='TERMINAL_ID_COMPOSITE_KEY')\n", " .aggregate_field(lambda row: 1 if row.TS_DIFF <= FIFTEEN_MIN_IN_SECS else 0, sum, 'TERMINAL_ID_NB_TX_15MIN_WINDOW')\n", " .aggregate_field(lambda row: 1 if row.TS_DIFF <= THIRTY_MIN_IN_SECS else 0, sum, 'TERMINAL_ID_NB_TX_30MIN_WINDOW')\n", " .aggregate_field('TX_ID', CountCombineFn(), 'TERMINAL_ID_NB_TX_60MIN_WINDOW')\n", " .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= FIFTEEN_MIN_IN_SECS else 0, sum, 'TERMINAL_ID_SUM_AMOUNT_15MIN_WINDOW')\n", " .aggregate_field(lambda row: row.TX_AMOUNT if row.TS_DIFF <= THIRTY_MIN_IN_SECS else 0, sum, 'TERMINAL_ID_SUM_AMOUNT_30MIN_WINDOW')\n", " .aggregate_field('TX_AMOUNT', MeanCombineFn(), 'TERMINAL_ID_AVG_AMOUNT_60MIN_WINDOW')\n", " | 'Assign key for aggregated results (terminal id)' >> beam.WithKeys(lambda row: row.TERMINAL_ID_COMPOSITE_KEY)\n", " )\n", "\n", " merged_terminal_id = (\n", " ({\n", " 'new_records': new_records_terminal_id, \n", " 'aggregated': aggregated_terminal_id\n", " })\n", " | 'Merge pcollections (terminal id)' >> beam.CoGroupByKey()\n", " | 'Filter empty rows (terminal id)' >> beam.Filter(lambda row: len(row[1]['new_records']) > 0)\n", " | 'Write to feature store (terminal id)' >> beam.ParDo(WriteFeatures(terminal_entity_type.resource_name))\n", " )\n", " \n", " # Run the pipeline (async)\n", " pipeline.run()\n", "\n", " \n", "if __name__ == \"__main__\":\n", " main()" ] }, { "cell_type": "markdown", "metadata": { "id": "b9133e7c54aa" }, "source": [ "### Creating `requirement.txt` for Dataflow Workers\n", "\n", "As we are using `google-cloud-aiplatform` and `google-apitools` package, we need to pass the `requirement.txt` to the Dataflow Workers so that the workers will install the packages in their respective environment before running the job." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0b58205547e1", "tags": [] }, "outputs": [], "source": [ "%%writefile {REQUIREMENTS_FILE}\n", "\n", "google-cloud-aiplatform<=1.36.1\n", "google-apitools==0.5.32" ] }, { "cell_type": "markdown", "metadata": { "id": "c9ede8f5aff0" }, "source": [ "### Deploying the pipeline\n", "\n", "Now we are ready to deploy this pipeline to Dataflow." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0601ef9a1b20", "tags": [] }, "outputs": [], "source": [ "!python3 {PYTHON_SCRIPT}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Congrats! Now the job should be running on <a href=\"https://console.cloud.google.com/dataflow/jobs\">Dataflow<a>!\n", " \n", "If everything went well, you should see this Dataflow pipeline diagram on Dataflow Console.\n", " \n", "![image](./misc/images/streaming-dataflow-pipeline.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Verifying the ingestion pipeline\n", "\n", "Once the dataflow pipeline is up and running, you should be able to see which feature entities are being ingested via the `Step Log` of respective `Write to feature store` step. \n", "\n", "To verify whether the data ingestion job is doing what it is supposed to be doing, copy a list of entity ids from the logs and use the following code." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from google.cloud import aiplatform as vertex_ai\n", "\n", "vertex_ai.init(\n", " project=PROJECT_ID,\n", " location=REGION\n", ")\n", "\n", "fs = vertex_ai.featurestore.Featurestore(featurestore_name=FEATURESTORE_ID)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "customer_entity_ids = [\"5830444124423549\", \"5469689693941771\", \"1361459972478769\"] # copy customer ids from Write to feature store (customer id) step's log\n", "\n", "customer_entity_type = fs.get_entity_type(\"customer\")\n", "customer_aggregated_features = customer_entity_type.read(\n", " entity_ids=customer_entity_ids\n", ")\n", "\n", "customer_aggregated_features" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "terminal_entity_ids = [\"97802258\", \"48770968\", \"98391079\"] # copy terminal ids from Write to feature store (terminal id) step's log\n", "\n", "terminal_entity_type = fs.get_entity_type(\"terminal\")\n", "terminal_aggregated_features = terminal_entity_type.read(\n", " entity_ids=terminal_entity_ids\n", ")\n", "\n", "terminal_aggregated_features" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### END\n", "\n", "If you want to explore BigQuery ML pipeline, you can go here: `bqml/04_model_training_and_prediction.ipynb`\n", "\n", "Or else, if you want to explore Vertex ML pipeline, go here: `vertex_ai/04_experimentation.ipynb`" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "03_feature_engineering_streaming.ipynb", "toc_visible": true }, "environment": { "kernel": "python3", "name": "common-cpu.m115", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/base-cpu:m115" }, "kernelspec": { "display_name": "Python 3 (Local)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.13" } }, "nbformat": 4, "nbformat_minor": 4 }