In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# FraudFinder - Environment Setup

<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/raw/main/00_environment_setup.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/00_environment_setup.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Open in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/00_environment_setup.ipynb">
        <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

[FraudFinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the FraudFinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.

### Objective

Before you run this notebook, make sure that you have completed the steps in [README](README.md).

In this notebook, you will setup your environment for Fraudfinder to be used in subsequent labs.

This lab uses the following Google Cloud services and resources:

- [Vertex AI](https://cloud.google.com/vertex-ai/)
- [BigQuery](https://cloud.google.com/bigquery/)
- [Google Cloud Storage](https://cloud.google.com/storage)
- [Pub/Sub](https://cloud.google.com/pubsub/)

Steps performed in this notebook:

- Setup your environment.
- Load historical bank transactions into BigQuery.
- Read data from BigQuery tables.
- Read data from Pub/Sub topics, which contain a live stream of new transactions.

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage
* Pub/Sub
* BigQuery

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), [Pub/Sub pricing](https://cloud.google.com/pubsub/pricing), [BigQuery pricing](https://cloud.google.com/bigquery/pricing) and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Install additional packages

Install the following packages required to execute this notebook.

In [None]:
! pip install --upgrade -r 'requirements.txt'

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

### Setup your environment

Run the next cells to import libraries used in this notebook and configure some options.

Run the next cell to set your project ID and some of the other constants used in the lab.  

In [None]:
import random
import string
from typing import Union

import pandas as pd
from google.cloud import bigquery

# Generate unique ID to help w/ unique naming of certain pieces
ID = "".join(random.choices(string.ascii_lowercase + string.digits, k=5))

GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
BUCKET_NAME = f"{PROJECT_ID}-fraudfinder"
REGION = "us-central1"
TRAINING_DS_SIZE = 1000

### Create a Google Cloud Storage bucket and save the config data.

Next, we will create a Google Cloud Storage bucket and will save the config data in this bucket. After the cell operation finishes, you can navigate to [Google Cloud Storage](https://console.cloud.google.com/storage/) to see the GCS bucket. 

In [None]:
config = f"""
BUCKET_NAME          = \"{BUCKET_NAME}\"
PROJECT              = \"{PROJECT_ID}\"
REGION               = \"{REGION}\"
ID                   = \"{ID}\"
FEATURESTORE_ID      = \"fraudfinder_{ID}\"
MODEL_NAME           = \"ff_model\"
ENDPOINT_NAME        = \"ff_model_endpoint\"
TRAINING_DS_SIZE     = \"{TRAINING_DS_SIZE}\"
"""

!gsutil mb -l {REGION} gs://{BUCKET_NAME}

!echo '{config}' | gsutil cp - gs://{BUCKET_NAME}/config/notebook_env.py

### Copy the historical transaction data into BigQuery tables

Now we will copy the historical transaction data and ingest it into BigQuery tables. For this, we will need to run `copy_bigquery_data.py`.

In [None]:
!python3 scripts/copy_bigquery_data.py $BUCKET_NAME

### Check data in BigQuery

After ingesting our data into BigQuery, it's time to run some queries against the tables to inspect the data. You can also go to the [BigQuery console](https://console.cloud.google.com/bigquery) to see the data.

#### Initialize BigQuery SDK for Python 

Use a helper function for sending queries to BigQuery.

In [None]:
# Wrapper to use BigQuery client to run query/job, return job ID or result as DF
def run_bq_query(sql: str) -> Union[str, pd.DataFrame]:
    """
    Run a BigQuery query and return the job ID or result as a DataFrame
    Args:
        sql: SQL query, as a string, to execute in BigQuery
    Returns:
        df: DataFrame of results from query,  or error, if any
    """

    bq_client = bigquery.Client()

    # Try dry run before executing query to catch any errors
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    bq_client.query(sql, job_config=job_config)

    # If dry run succeeds without errors, proceed to run query
    job_config = bigquery.QueryJobConfig()
    client_result = bq_client.query(sql, job_config=job_config)

    job_id = client_result.job_id

    # Wait for query/job to finish running. then get & return data frame
    df = client_result.result().to_arrow().to_pandas()
    print(f"Finished job_id: {job_id}")
    return df

#### tx.tx
The `tx.tx` table contains the basic information about each transaction:
- `TX_ID` is a unique ID per transaction
- `TX_TS` is the timestamp of the transaction, in UTC
- `CUSTOMER_ID` is a unique 16-digit string ID per customer
- `TERMINAL_ID` is a unique 16-digit string ID per point-of-sale terminal
- `TX_AMOUNT` is the amount of money spent by the customer at a terminal, in dollars

In [None]:
run_bq_query(
    """
SELECT
  *
FROM
  tx.tx
LIMIT 5
"""
)

#### tx.txlabels
The `tx.txlabels` table contains information on whether each transation was fraud or not:
- `TX_ID` is a unique ID per transaction
- `TX_FRAUD` is 1 if the transaction was fraud, and 0 if the transaction was not fraudulent

In [None]:
run_bq_query(
    """
SELECT
  *
FROM
  tx.txlabels
LIMIT 5
"""
)

### Check live streaming transactions via public Pub/Sub topics

As part of the [README](README.md), you've created [subscriptions](https://console.cloud.google.com/cloudpubsub/subscription/) to public Pub/Sub topics, where there is a constant flow of new transactions. This means you have, in your own Google Cloud project, subscriptions to the public Pub/Sub topics. You will receive a Pub/Sub message in your subscription every time a new transaction is streamed into the Pub/Sub topic.

There are two public Pub/Sub topics where there is a constant stream of live transactions occurring.

The following Pub/Sub topics are used for transactions:
```
projects/cymbal-fraudfinder/topics/ff-tx
projects/cymbal-fraudfinder/topics/ff-txlabels
```

Note: If you haven't completed the steps in the README, please make sure that you complete them first before continuing this notebook, otherwise you may not have Pub/Sub subscriptions.

### Reading messages from the Pub/Sub topics

In [None]:
def read_from_sub(project_id, subscription_name, messages=10):
    """
    Read messages from a Pub/Sub subscription
    Args:
        project_id: project ID
        subscription_name: the name of a Pub/Sub subscription in your project
        messages: number of messages to read
    Returns:
        msg_data: list of messages in your Pub/Sub subscription as a Python dictionary
    """
    
    import ast

    from google.api_core import retry
    from google.cloud import pubsub_v1

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_name)

    # Wrap the subscriber in a 'with' block to automatically call close() to
    # close the underlying gRPC channel when done.
    with subscriber:
        # The subscriber pulls a specific number of messages. The actual
        # number of messages pulled may be smaller than max_messages.
        response = subscriber.pull(
            subscription=subscription_path,
            max_messages=messages,
            retry=retry.Retry(deadline=300),
        )

        if len(response.received_messages) == 0:
            print("no messages")
            return

        ack_ids = []
        msg_data = []
        for received_message in response.received_messages:
            msg = ast.literal_eval(received_message.message.data.decode("utf-8"))
            msg_data.append(msg)
            ack_ids.append(received_message.ack_id)

        # Acknowledges the received messages so they will not be sent again.
        subscriber.acknowledge(subscription=subscription_path, ack_ids=ack_ids)

        print(
            f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
        )

        return msg_data

#### Reading from the `ff-tx-sub` subscription

Now let's read from the `ff-tx-sub` subscription. You should see some recent transactions (in UTC timezone).

In [None]:
messages_tx = read_from_sub(
    project_id=PROJECT_ID, subscription_name="ff-tx-sub", messages=2
)

messages_tx

#### Reading from the `ff-txlabels-sub` subscription

We will do the same with `ff-txlabels-sub` subscription, which receives the same stream of transactions as `ff-tx-sub`, but also contain the ground-truth label, `TX_FRAUD`, if the transaction is fraudulent (1) or not (0).

In [None]:
messages_txlabels = read_from_sub(
    project_id=PROJECT_ID, subscription_name="ff-txlabels-sub", messages=2
)

messages_txlabels

### END

Now you can go to the next notebook `01_exploratory_data_analysis.ipynb`