00_environment_setup.ipynb (600 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "ur8xi4C7S06n" }, "outputs": [], "source": [ "# Copyright 2023 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "fsv4jGuU89rX" }, "source": [ "# FraudFinder - Environment Setup\n", "\n", "<table align=\"left\">\n", " <td>\n", " <a href=\"https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/raw/main/00_environment_setup.ipynb\">\n", " <img src=\"https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg\" alt=\"Google Cloud Notebooks\">Open in Cloud Notebook\n", " </a>\n", " </td> \n", " <td>\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/00_environment_setup.ipynb\">\n", " <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Colab logo\"> Open in Colab\n", " </a>\n", " </td>\n", " <td>\n", " <a href=\"https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/00_environment_setup.ipynb\">\n", " <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\">\n", " View on GitHub\n", " </a>\n", " </td>\n", "</table>" ] }, { "cell_type": "markdown", "metadata": { "id": "827c41ab1a12" }, "source": [ "## Overview\n", "\n", "[FraudFinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the FraudFinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model." ] }, { "cell_type": "markdown", "metadata": { "id": "45f6e923dc75" }, "source": [ "### Objective\n", "\n", "Before you run this notebook, make sure that you have completed the steps in [README](README.md).\n", "\n", "In this notebook, you will setup your environment for Fraudfinder to be used in subsequent labs.\n", "\n", "This lab uses the following Google Cloud services and resources:\n", "\n", "- [Vertex AI](https://cloud.google.com/vertex-ai/)\n", "- [BigQuery](https://cloud.google.com/bigquery/)\n", "- [Google Cloud Storage](https://cloud.google.com/storage)\n", "- [Pub/Sub](https://cloud.google.com/pubsub/)\n", "\n", "Steps performed in this notebook:\n", "\n", "- Setup your environment.\n", "- Load historical bank transactions into BigQuery.\n", "- Read data from BigQuery tables.\n", "- Read data from Pub/Sub topics, which contain a live stream of new transactions." ] }, { "cell_type": "markdown", "metadata": { "id": "8b5e2e2a7bdb" }, "source": [ "### Costs" ] }, { "cell_type": "markdown", "metadata": { "id": "04c1dae4ca17" }, "source": [ "This tutorial uses billable components of Google Cloud:\n", "\n", "* Vertex AI\n", "* Cloud Storage\n", "* Pub/Sub\n", "* BigQuery\n", "\n", "Learn about [Vertex AI\n", "pricing](https://cloud.google.com/vertex-ai/pricing), [Cloud Storage\n", "pricing](https://cloud.google.com/storage/pricing), [Pub/Sub pricing](https://cloud.google.com/pubsub/pricing), [BigQuery pricing](https://cloud.google.com/bigquery/pricing) and use the [Pricing\n", "Calculator](https://cloud.google.com/products/calculator/)\n", "to generate a cost estimate based on your projected usage." ] }, { "cell_type": "markdown", "metadata": { "id": "773901ca47fd" }, "source": [ "### Install additional packages\n", "\n", "Install the following packages required to execute this notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "b7c7ce6bbf03", "tags": [] }, "outputs": [], "source": [ "! pip install --upgrade -r 'requirements.txt'" ] }, { "cell_type": "markdown", "metadata": { "id": "d07214a67580" }, "source": [ "After you install the additional packages, you need to restart the notebook kernel so it can find the packages." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "18c113700b6f", "tags": [] }, "outputs": [], "source": [ "# Automatically restart kernel after installs\n", "import os\n", "\n", "if not os.getenv(\"IS_TESTING\"):\n", " import IPython\n", "\n", " app = IPython.Application.instance()\n", " app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "metadata": { "id": "f31ae3fed8ab" }, "source": [ "### Setup your environment\n", "\n", "Run the next cells to import libraries used in this notebook and configure some options." ] }, { "cell_type": "markdown", "metadata": { "id": "7d61a362443d" }, "source": [ "Run the next cell to set your project ID and some of the other constants used in the lab. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "wxiE6dEWOFm3", "tags": [] }, "outputs": [], "source": [ "import random\n", "import string\n", "from typing import Union\n", "\n", "import pandas as pd\n", "from google.cloud import bigquery\n", "\n", "# Generate unique ID to help w/ unique naming of certain pieces\n", "ID = \"\".join(random.choices(string.ascii_lowercase + string.digits, k=5))\n", "\n", "GCP_PROJECTS = !gcloud config get-value project\n", "PROJECT_ID = GCP_PROJECTS[0]\n", "BUCKET_NAME = f\"{PROJECT_ID}-fraudfinder\"\n", "REGION = \"us-central1\"\n", "TRAINING_DS_SIZE = 1000" ] }, { "cell_type": "markdown", "metadata": { "id": "bd738fc1e201" }, "source": [ "### Create a Google Cloud Storage bucket and save the config data.\n", "\n", "Next, we will create a Google Cloud Storage bucket and will save the config data in this bucket. After the cell operation finishes, you can navigate to [Google Cloud Storage](https://console.cloud.google.com/storage/) to see the GCS bucket. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "7d3556c598a6", "tags": [] }, "outputs": [], "source": [ "config = f\"\"\"\n", "BUCKET_NAME = \\\"{BUCKET_NAME}\\\"\n", "PROJECT = \\\"{PROJECT_ID}\\\"\n", "REGION = \\\"{REGION}\\\"\n", "ID = \\\"{ID}\\\"\n", "FEATURESTORE_ID = \\\"fraudfinder_{ID}\\\"\n", "MODEL_NAME = \\\"ff_model\\\"\n", "ENDPOINT_NAME = \\\"ff_model_endpoint\\\"\n", "TRAINING_DS_SIZE = \\\"{TRAINING_DS_SIZE}\\\"\n", "\"\"\"\n", "\n", "!gsutil mb -l {REGION} gs://{BUCKET_NAME}\n", "\n", "!echo '{config}' | gsutil cp - gs://{BUCKET_NAME}/config/notebook_env.py" ] }, { "cell_type": "markdown", "metadata": { "id": "dc2dff7ba2e0" }, "source": [ "### Copy the historical transaction data into BigQuery tables\n", "\n", "Now we will copy the historical transaction data and ingest it into BigQuery tables. For this, we will need to run `copy_bigquery_data.py`." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "4ac6e0bc33b1", "tags": [] }, "outputs": [], "source": [ "!python3 scripts/copy_bigquery_data.py $BUCKET_NAME" ] }, { "cell_type": "markdown", "metadata": { "id": "29dbf432339c" }, "source": [ "### Check data in BigQuery\n", "\n", "After ingesting our data into BigQuery, it's time to run some queries against the tables to inspect the data. You can also go to the [BigQuery console](https://console.cloud.google.com/bigquery) to see the data." ] }, { "cell_type": "markdown", "metadata": { "id": "e12ec3dae852" }, "source": [ "#### Initialize BigQuery SDK for Python " ] }, { "cell_type": "markdown", "metadata": { "id": "ace8667cc99e" }, "source": [ "Use a helper function for sending queries to BigQuery." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "f7afa36c6090", "tags": [] }, "outputs": [], "source": [ "# Wrapper to use BigQuery client to run query/job, return job ID or result as DF\n", "def run_bq_query(sql: str) -> Union[str, pd.DataFrame]:\n", " \"\"\"\n", " Run a BigQuery query and return the job ID or result as a DataFrame\n", " Args:\n", " sql: SQL query, as a string, to execute in BigQuery\n", " Returns:\n", " df: DataFrame of results from query, or error, if any\n", " \"\"\"\n", "\n", " bq_client = bigquery.Client()\n", "\n", " # Try dry run before executing query to catch any errors\n", " job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)\n", " bq_client.query(sql, job_config=job_config)\n", "\n", " # If dry run succeeds without errors, proceed to run query\n", " job_config = bigquery.QueryJobConfig()\n", " client_result = bq_client.query(sql, job_config=job_config)\n", "\n", " job_id = client_result.job_id\n", "\n", " # Wait for query/job to finish running. then get & return data frame\n", " df = client_result.result().to_arrow().to_pandas()\n", " print(f\"Finished job_id: {job_id}\")\n", " return df" ] }, { "cell_type": "markdown", "metadata": { "id": "20875916c5d4" }, "source": [ "#### tx.tx\n", "The `tx.tx` table contains the basic information about each transaction:\n", "- `TX_ID` is a unique ID per transaction\n", "- `TX_TS` is the timestamp of the transaction, in UTC\n", "- `CUSTOMER_ID` is a unique 16-digit string ID per customer\n", "- `TERMINAL_ID` is a unique 16-digit string ID per point-of-sale terminal\n", "- `TX_AMOUNT` is the amount of money spent by the customer at a terminal, in dollars" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "cc0e50b158d2", "tags": [] }, "outputs": [], "source": [ "run_bq_query(\n", " \"\"\"\n", "SELECT\n", " *\n", "FROM\n", " tx.tx\n", "LIMIT 5\n", "\"\"\"\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "5e0ab0d56773" }, "source": [ "#### tx.txlabels\n", "The `tx.txlabels` table contains information on whether each transation was fraud or not:\n", "- `TX_ID` is a unique ID per transaction\n", "- `TX_FRAUD` is 1 if the transaction was fraud, and 0 if the transaction was not fraudulent" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "c128a6c78e82", "tags": [] }, "outputs": [], "source": [ "run_bq_query(\n", " \"\"\"\n", "SELECT\n", " *\n", "FROM\n", " tx.txlabels\n", "LIMIT 5\n", "\"\"\"\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "ffdfcfed70bd" }, "source": [ "### Check live streaming transactions via public Pub/Sub topics\n", "\n", "As part of the [README](README.md), you've created [subscriptions](https://console.cloud.google.com/cloudpubsub/subscription/) to public Pub/Sub topics, where there is a constant flow of new transactions. This means you have, in your own Google Cloud project, subscriptions to the public Pub/Sub topics. You will receive a Pub/Sub message in your subscription every time a new transaction is streamed into the Pub/Sub topic.\n", "\n", "There are two public Pub/Sub topics where there is a constant stream of live transactions occurring.\n", "\n", "The following Pub/Sub topics are used for transactions:\n", "```\n", "projects/cymbal-fraudfinder/topics/ff-tx\n", "projects/cymbal-fraudfinder/topics/ff-txlabels\n", "```\n", "\n", "Note: If you haven't completed the steps in the README, please make sure that you complete them first before continuing this notebook, otherwise you may not have Pub/Sub subscriptions." ] }, { "cell_type": "markdown", "metadata": { "id": "6bd15fba9b30" }, "source": [ "### Reading messages from the Pub/Sub topics" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "544309c7c12f", "tags": [] }, "outputs": [], "source": [ "def read_from_sub(project_id, subscription_name, messages=10):\n", " \"\"\"\n", " Read messages from a Pub/Sub subscription\n", " Args:\n", " project_id: project ID\n", " subscription_name: the name of a Pub/Sub subscription in your project\n", " messages: number of messages to read\n", " Returns:\n", " msg_data: list of messages in your Pub/Sub subscription as a Python dictionary\n", " \"\"\"\n", " \n", " import ast\n", "\n", " from google.api_core import retry\n", " from google.cloud import pubsub_v1\n", "\n", " subscriber = pubsub_v1.SubscriberClient()\n", " subscription_path = subscriber.subscription_path(project_id, subscription_name)\n", "\n", " # Wrap the subscriber in a 'with' block to automatically call close() to\n", " # close the underlying gRPC channel when done.\n", " with subscriber:\n", " # The subscriber pulls a specific number of messages. The actual\n", " # number of messages pulled may be smaller than max_messages.\n", " response = subscriber.pull(\n", " subscription=subscription_path,\n", " max_messages=messages,\n", " retry=retry.Retry(deadline=300),\n", " )\n", "\n", " if len(response.received_messages) == 0:\n", " print(\"no messages\")\n", " return\n", "\n", " ack_ids = []\n", " msg_data = []\n", " for received_message in response.received_messages:\n", " msg = ast.literal_eval(received_message.message.data.decode(\"utf-8\"))\n", " msg_data.append(msg)\n", " ack_ids.append(received_message.ack_id)\n", "\n", " # Acknowledges the received messages so they will not be sent again.\n", " subscriber.acknowledge(subscription=subscription_path, ack_ids=ack_ids)\n", "\n", " print(\n", " f\"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}.\"\n", " )\n", "\n", " return msg_data" ] }, { "cell_type": "markdown", "metadata": { "id": "583454c80a46" }, "source": [ "#### Reading from the `ff-tx-sub` subscription\n", "\n", "Now let's read from the `ff-tx-sub` subscription. You should see some recent transactions (in UTC timezone)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "40e3b8abc1cc", "tags": [] }, "outputs": [], "source": [ "messages_tx = read_from_sub(\n", " project_id=PROJECT_ID, subscription_name=\"ff-tx-sub\", messages=2\n", ")\n", "\n", "messages_tx" ] }, { "cell_type": "markdown", "metadata": { "id": "7b5f23c94328" }, "source": [ "#### Reading from the `ff-txlabels-sub` subscription\n", "\n", "We will do the same with `ff-txlabels-sub` subscription, which receives the same stream of transactions as `ff-tx-sub`, but also contain the ground-truth label, `TX_FRAUD`, if the transaction is fraudulent (1) or not (0)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ccd79c9037b9", "tags": [] }, "outputs": [], "source": [ "messages_txlabels = read_from_sub(\n", " project_id=PROJECT_ID, subscription_name=\"ff-txlabels-sub\", messages=2\n", ")\n", "\n", "messages_txlabels" ] }, { "cell_type": "markdown", "metadata": { "id": "de7be6182813" }, "source": [ "### END\n", "\n", "Now you can go to the next notebook `01_exploratory_data_analysis.ipynb`" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "00_environment_setup.ipynb", "toc_visible": true }, "environment": { "kernel": "python3", "name": "common-cpu.m115", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/base-cpu:m115" }, "kernelspec": { "display_name": "Python 3 (Local)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.13" } }, "nbformat": 4, "nbformat_minor": 4 }