00_environment_setup.ipynb (600 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "ur8xi4C7S06n"
},
"outputs": [],
"source": [
"# Copyright 2023 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fsv4jGuU89rX"
},
"source": [
"# FraudFinder - Environment Setup\n",
"\n",
"<table align=\"left\">\n",
" <td>\n",
" <a href=\"https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/raw/main/00_environment_setup.ipynb\">\n",
" <img src=\"https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg\" alt=\"Google Cloud Notebooks\">Open in Cloud Notebook\n",
" </a>\n",
" </td> \n",
" <td>\n",
" <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/00_environment_setup.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Colab logo\"> Open in Colab\n",
" </a>\n",
" </td>\n",
" <td>\n",
" <a href=\"https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/00_environment_setup.ipynb\">\n",
" <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\">\n",
" View on GitHub\n",
" </a>\n",
" </td>\n",
"</table>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "827c41ab1a12"
},
"source": [
"## Overview\n",
"\n",
"[FraudFinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the FraudFinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "45f6e923dc75"
},
"source": [
"### Objective\n",
"\n",
"Before you run this notebook, make sure that you have completed the steps in [README](README.md).\n",
"\n",
"In this notebook, you will setup your environment for Fraudfinder to be used in subsequent labs.\n",
"\n",
"This lab uses the following Google Cloud services and resources:\n",
"\n",
"- [Vertex AI](https://cloud.google.com/vertex-ai/)\n",
"- [BigQuery](https://cloud.google.com/bigquery/)\n",
"- [Google Cloud Storage](https://cloud.google.com/storage)\n",
"- [Pub/Sub](https://cloud.google.com/pubsub/)\n",
"\n",
"Steps performed in this notebook:\n",
"\n",
"- Setup your environment.\n",
"- Load historical bank transactions into BigQuery.\n",
"- Read data from BigQuery tables.\n",
"- Read data from Pub/Sub topics, which contain a live stream of new transactions."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "8b5e2e2a7bdb"
},
"source": [
"### Costs"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "04c1dae4ca17"
},
"source": [
"This tutorial uses billable components of Google Cloud:\n",
"\n",
"* Vertex AI\n",
"* Cloud Storage\n",
"* Pub/Sub\n",
"* BigQuery\n",
"\n",
"Learn about [Vertex AI\n",
"pricing](https://cloud.google.com/vertex-ai/pricing), [Cloud Storage\n",
"pricing](https://cloud.google.com/storage/pricing), [Pub/Sub pricing](https://cloud.google.com/pubsub/pricing), [BigQuery pricing](https://cloud.google.com/bigquery/pricing) and use the [Pricing\n",
"Calculator](https://cloud.google.com/products/calculator/)\n",
"to generate a cost estimate based on your projected usage."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "773901ca47fd"
},
"source": [
"### Install additional packages\n",
"\n",
"Install the following packages required to execute this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "b7c7ce6bbf03",
"tags": []
},
"outputs": [],
"source": [
"! pip install --upgrade -r 'requirements.txt'"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "d07214a67580"
},
"source": [
"After you install the additional packages, you need to restart the notebook kernel so it can find the packages."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "18c113700b6f",
"tags": []
},
"outputs": [],
"source": [
"# Automatically restart kernel after installs\n",
"import os\n",
"\n",
"if not os.getenv(\"IS_TESTING\"):\n",
" import IPython\n",
"\n",
" app = IPython.Application.instance()\n",
" app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "f31ae3fed8ab"
},
"source": [
"### Setup your environment\n",
"\n",
"Run the next cells to import libraries used in this notebook and configure some options."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "7d61a362443d"
},
"source": [
"Run the next cell to set your project ID and some of the other constants used in the lab. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "wxiE6dEWOFm3",
"tags": []
},
"outputs": [],
"source": [
"import random\n",
"import string\n",
"from typing import Union\n",
"\n",
"import pandas as pd\n",
"from google.cloud import bigquery\n",
"\n",
"# Generate unique ID to help w/ unique naming of certain pieces\n",
"ID = \"\".join(random.choices(string.ascii_lowercase + string.digits, k=5))\n",
"\n",
"GCP_PROJECTS = !gcloud config get-value project\n",
"PROJECT_ID = GCP_PROJECTS[0]\n",
"BUCKET_NAME = f\"{PROJECT_ID}-fraudfinder\"\n",
"REGION = \"us-central1\"\n",
"TRAINING_DS_SIZE = 1000"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "bd738fc1e201"
},
"source": [
"### Create a Google Cloud Storage bucket and save the config data.\n",
"\n",
"Next, we will create a Google Cloud Storage bucket and will save the config data in this bucket. After the cell operation finishes, you can navigate to [Google Cloud Storage](https://console.cloud.google.com/storage/) to see the GCS bucket. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "7d3556c598a6",
"tags": []
},
"outputs": [],
"source": [
"config = f\"\"\"\n",
"BUCKET_NAME = \\\"{BUCKET_NAME}\\\"\n",
"PROJECT = \\\"{PROJECT_ID}\\\"\n",
"REGION = \\\"{REGION}\\\"\n",
"ID = \\\"{ID}\\\"\n",
"FEATURESTORE_ID = \\\"fraudfinder_{ID}\\\"\n",
"MODEL_NAME = \\\"ff_model\\\"\n",
"ENDPOINT_NAME = \\\"ff_model_endpoint\\\"\n",
"TRAINING_DS_SIZE = \\\"{TRAINING_DS_SIZE}\\\"\n",
"\"\"\"\n",
"\n",
"!gsutil mb -l {REGION} gs://{BUCKET_NAME}\n",
"\n",
"!echo '{config}' | gsutil cp - gs://{BUCKET_NAME}/config/notebook_env.py"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "dc2dff7ba2e0"
},
"source": [
"### Copy the historical transaction data into BigQuery tables\n",
"\n",
"Now we will copy the historical transaction data and ingest it into BigQuery tables. For this, we will need to run `copy_bigquery_data.py`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "4ac6e0bc33b1",
"tags": []
},
"outputs": [],
"source": [
"!python3 scripts/copy_bigquery_data.py $BUCKET_NAME"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "29dbf432339c"
},
"source": [
"### Check data in BigQuery\n",
"\n",
"After ingesting our data into BigQuery, it's time to run some queries against the tables to inspect the data. You can also go to the [BigQuery console](https://console.cloud.google.com/bigquery) to see the data."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "e12ec3dae852"
},
"source": [
"#### Initialize BigQuery SDK for Python "
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ace8667cc99e"
},
"source": [
"Use a helper function for sending queries to BigQuery."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "f7afa36c6090",
"tags": []
},
"outputs": [],
"source": [
"# Wrapper to use BigQuery client to run query/job, return job ID or result as DF\n",
"def run_bq_query(sql: str) -> Union[str, pd.DataFrame]:\n",
" \"\"\"\n",
" Run a BigQuery query and return the job ID or result as a DataFrame\n",
" Args:\n",
" sql: SQL query, as a string, to execute in BigQuery\n",
" Returns:\n",
" df: DataFrame of results from query, or error, if any\n",
" \"\"\"\n",
"\n",
" bq_client = bigquery.Client()\n",
"\n",
" # Try dry run before executing query to catch any errors\n",
" job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)\n",
" bq_client.query(sql, job_config=job_config)\n",
"\n",
" # If dry run succeeds without errors, proceed to run query\n",
" job_config = bigquery.QueryJobConfig()\n",
" client_result = bq_client.query(sql, job_config=job_config)\n",
"\n",
" job_id = client_result.job_id\n",
"\n",
" # Wait for query/job to finish running. then get & return data frame\n",
" df = client_result.result().to_arrow().to_pandas()\n",
" print(f\"Finished job_id: {job_id}\")\n",
" return df"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "20875916c5d4"
},
"source": [
"#### tx.tx\n",
"The `tx.tx` table contains the basic information about each transaction:\n",
"- `TX_ID` is a unique ID per transaction\n",
"- `TX_TS` is the timestamp of the transaction, in UTC\n",
"- `CUSTOMER_ID` is a unique 16-digit string ID per customer\n",
"- `TERMINAL_ID` is a unique 16-digit string ID per point-of-sale terminal\n",
"- `TX_AMOUNT` is the amount of money spent by the customer at a terminal, in dollars"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "cc0e50b158d2",
"tags": []
},
"outputs": [],
"source": [
"run_bq_query(\n",
" \"\"\"\n",
"SELECT\n",
" *\n",
"FROM\n",
" tx.tx\n",
"LIMIT 5\n",
"\"\"\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5e0ab0d56773"
},
"source": [
"#### tx.txlabels\n",
"The `tx.txlabels` table contains information on whether each transation was fraud or not:\n",
"- `TX_ID` is a unique ID per transaction\n",
"- `TX_FRAUD` is 1 if the transaction was fraud, and 0 if the transaction was not fraudulent"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "c128a6c78e82",
"tags": []
},
"outputs": [],
"source": [
"run_bq_query(\n",
" \"\"\"\n",
"SELECT\n",
" *\n",
"FROM\n",
" tx.txlabels\n",
"LIMIT 5\n",
"\"\"\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ffdfcfed70bd"
},
"source": [
"### Check live streaming transactions via public Pub/Sub topics\n",
"\n",
"As part of the [README](README.md), you've created [subscriptions](https://console.cloud.google.com/cloudpubsub/subscription/) to public Pub/Sub topics, where there is a constant flow of new transactions. This means you have, in your own Google Cloud project, subscriptions to the public Pub/Sub topics. You will receive a Pub/Sub message in your subscription every time a new transaction is streamed into the Pub/Sub topic.\n",
"\n",
"There are two public Pub/Sub topics where there is a constant stream of live transactions occurring.\n",
"\n",
"The following Pub/Sub topics are used for transactions:\n",
"```\n",
"projects/cymbal-fraudfinder/topics/ff-tx\n",
"projects/cymbal-fraudfinder/topics/ff-txlabels\n",
"```\n",
"\n",
"Note: If you haven't completed the steps in the README, please make sure that you complete them first before continuing this notebook, otherwise you may not have Pub/Sub subscriptions."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "6bd15fba9b30"
},
"source": [
"### Reading messages from the Pub/Sub topics"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "544309c7c12f",
"tags": []
},
"outputs": [],
"source": [
"def read_from_sub(project_id, subscription_name, messages=10):\n",
" \"\"\"\n",
" Read messages from a Pub/Sub subscription\n",
" Args:\n",
" project_id: project ID\n",
" subscription_name: the name of a Pub/Sub subscription in your project\n",
" messages: number of messages to read\n",
" Returns:\n",
" msg_data: list of messages in your Pub/Sub subscription as a Python dictionary\n",
" \"\"\"\n",
" \n",
" import ast\n",
"\n",
" from google.api_core import retry\n",
" from google.cloud import pubsub_v1\n",
"\n",
" subscriber = pubsub_v1.SubscriberClient()\n",
" subscription_path = subscriber.subscription_path(project_id, subscription_name)\n",
"\n",
" # Wrap the subscriber in a 'with' block to automatically call close() to\n",
" # close the underlying gRPC channel when done.\n",
" with subscriber:\n",
" # The subscriber pulls a specific number of messages. The actual\n",
" # number of messages pulled may be smaller than max_messages.\n",
" response = subscriber.pull(\n",
" subscription=subscription_path,\n",
" max_messages=messages,\n",
" retry=retry.Retry(deadline=300),\n",
" )\n",
"\n",
" if len(response.received_messages) == 0:\n",
" print(\"no messages\")\n",
" return\n",
"\n",
" ack_ids = []\n",
" msg_data = []\n",
" for received_message in response.received_messages:\n",
" msg = ast.literal_eval(received_message.message.data.decode(\"utf-8\"))\n",
" msg_data.append(msg)\n",
" ack_ids.append(received_message.ack_id)\n",
"\n",
" # Acknowledges the received messages so they will not be sent again.\n",
" subscriber.acknowledge(subscription=subscription_path, ack_ids=ack_ids)\n",
"\n",
" print(\n",
" f\"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}.\"\n",
" )\n",
"\n",
" return msg_data"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "583454c80a46"
},
"source": [
"#### Reading from the `ff-tx-sub` subscription\n",
"\n",
"Now let's read from the `ff-tx-sub` subscription. You should see some recent transactions (in UTC timezone)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "40e3b8abc1cc",
"tags": []
},
"outputs": [],
"source": [
"messages_tx = read_from_sub(\n",
" project_id=PROJECT_ID, subscription_name=\"ff-tx-sub\", messages=2\n",
")\n",
"\n",
"messages_tx"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "7b5f23c94328"
},
"source": [
"#### Reading from the `ff-txlabels-sub` subscription\n",
"\n",
"We will do the same with `ff-txlabels-sub` subscription, which receives the same stream of transactions as `ff-tx-sub`, but also contain the ground-truth label, `TX_FRAUD`, if the transaction is fraudulent (1) or not (0)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ccd79c9037b9",
"tags": []
},
"outputs": [],
"source": [
"messages_txlabels = read_from_sub(\n",
" project_id=PROJECT_ID, subscription_name=\"ff-txlabels-sub\", messages=2\n",
")\n",
"\n",
"messages_txlabels"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "de7be6182813"
},
"source": [
"### END\n",
"\n",
"Now you can go to the next notebook `01_exploratory_data_analysis.ipynb`"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [],
"name": "00_environment_setup.ipynb",
"toc_visible": true
},
"environment": {
"kernel": "python3",
"name": "common-cpu.m115",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m115"
},
"kernelspec": {
"display_name": "Python 3 (Local)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 4
}