In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Fraudfinder - Model training and deployment using Vertex AI

<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/05_model_training_xgboost_formalization.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/05_model_training_xgboost_formalization.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Open in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/05_model_training_xgboost_formalization.ipynb">
        <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

[Fraudfinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the Fraudfinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.

### Objective

In the following notebook, you will learn how to:

* Build a Vertex AI dataset
* Build a Docker container and train a custom XGBoost model using Vertex AI
* Evaluate the model locally
* Deploy the model to Vertex AI as an endpoint. 

This tutorial uses the following Google Cloud data analytics and services:

- [BigQuery](https://cloud.google.com/bigquery/)
- [Vertex AI](https://cloud.google.com/vertex-ai/)

### Costs 

This tutorial uses billable components of Google Cloud:

* BigQuery
* Vertex AI

Learn about [BigQuery Pricing](https://cloud.google.com/bigquery/pricing), [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing), and use the [Pricing Calculator](https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage.

### Load configuration settings from the setup notebook

Set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb` notebook.

In [None]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
BUCKET_NAME = f"{PROJECT_ID}-fraudfinder"
config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py
print(config.n)
exec(config.n)

### Import libraries

In [None]:
# General
import os
import sys
from typing import Union, List
import json
from datetime import datetime, timedelta

# Data Preprocessing
import numpy as np
import pandas as pd

# Model Training with Vertex AI
from google.cloud import bigquery
from google.cloud import aiplatform as vertex_ai
from google.cloud.aiplatform_v1 import ModelServiceClient
from google.cloud.aiplatform_v1.types import ListModelEvaluationsRequest
from google.protobuf.json_format import MessageToDict
from google.cloud.aiplatform import gapic as aip
from google.cloud import storage

# Model Deployment and Evaluation
from sklearn.metrics import precision_recall_fscore_support
import xgboost as xgb


# Feature Store
from google.cloud import aiplatform as vertex_ai
from google.cloud.aiplatform import Featurestore, EntityType, Feature

### Define constants

In [None]:
# General
DATA_DIR = os.path.join(os.pardir, "data")
TRAIN_DATA_DIR = os.path.join(DATA_DIR, "train")
DATA_URI = f"gs://{BUCKET_NAME}/data"
TRAIN_DATA_URI = f"{DATA_URI}/train"

# Feature Store
START_DATE_TRAIN = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
CUSTOMER_ENTITY = "customer"
TERMINAL_ENTITY = "terminal"
SERVING_FEATURE_IDS = {CUSTOMER_ENTITY: ["*"], TERMINAL_ENTITY: ["*"]}
READ_INSTANCES_TABLE = f"ground_truth_{ID}"
READ_INSTANCES_URI = f"bq://{PROJECT_ID}.tx.{READ_INSTANCES_TABLE}"

# Training
EXPERIMENT_NAME = f"fraudfinder-xgb-experiment-{ID}"
TARGET = "tx_fraud"

## Custom Training
DATASET_NAME = f"sample_train-{ID}"
TRAIN_JOB_NAME = f"fraudfinder_xgb_train_frmlz-{ID}"
MODEL_NAME = f"{MODEL_NAME}_xgb_frmlz_{ID}"
ENDPOINT_NAME = f"{ENDPOINT_NAME}_xgb_frmlz_{ID}"
MODEL_SERVING_IMAGE_URI = (
    "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-1:latest"
)
IMAGE_REPOSITORY = f"fraudfinder-{ID}"
IMAGE_NAME = "dask-xgb-classificator"
IMAGE_TAG = "v1"
IMAGE_URI = f"us-central1-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REPOSITORY}/{IMAGE_NAME}:{IMAGE_TAG}"
TRAIN_COMPUTE = "e2-standard-4"
DEPLOY_COMPUTE = "n1-standard-4"

### Initialize Vertex AI SDK and BigQuery Client for Python

In [None]:
bq_client = bigquery.Client(project=PROJECT_ID, location=REGION)

In [None]:
vertex_ai.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=BUCKET_NAME,
    experiment=EXPERIMENT_NAME,
)

### Helper Functions
You will now run some helper functions that we will use throughout the notebook.

In [None]:
def get_evaluation_metrics(client, model_resource_name):
    model_evalution_request = ListModelEvaluationsRequest(parent=model_resource_name)
    model_evaluation_list = client.list_model_evaluations(
        request=model_evalution_request
    )
    metrics_strlist = []
    for evaluation in model_evaluation_list:
        metrics = MessageToDict(evaluation._pb.metrics)
    return metrics


def gcs_list(gcs_uri):
    obj_list = []
    storage_client = storage.Client()
    bucket, key = gcs_uri.replace("gs://", "").split("/", 1)
    for blob in storage_client.list_blobs(bucket, prefix=key):
        obj_list.append("gs://" + bucket + "/" + str(blob.name))
    return obj_list

We're also using the BigQuery helper function. 

In [None]:
# Wrapper to use BigQuery client to run query/job, return job ID or result as DF
def run_bq_query(sql: str) -> Union[str, pd.DataFrame]:
    """
    Run a BigQuery query and return the job ID or result as a DataFrame
    Args:
        sql: SQL query, as a string, to execute in BigQuery
    Returns:
        df: DataFrame of results from query,  or error, if any
    """

    bq_client = bigquery.Client()

    # Try dry run before executing query to catch any errors
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    bq_client.query(sql, job_config=job_config)

    # If dry run succeeds without errors, proceed to run query
    job_config = bigquery.QueryJobConfig()
    client_result = bq_client.query(sql, job_config=job_config)

    job_id = client_result.job_id

    # Wait for query/job to finish running. then get & return data frame
    df = client_result.result().to_arrow().to_pandas()
    print(f"Finished job_id: {job_id}")
    return df

## Fetching feature values for model training

To fetch training data, we have to specify the following inputs to batch serving:

- a file containing a "query", with the entities and timestamps for each label
- a list of feature values to fetch
- the destination location and format


### Read-instance list

In our case, we need a csv file with content formatted like the table below:

|customer                     |terminal|timestamp                                    |
|-----------------------------|--------|---------------------------------------------|
|xxx3859                         |xxx8811    |2021-07-07 00:01:10 UTC                      |
|xxx4165                         |xxx8810    |2021-07-07 00:01:55 UTC                      |
|xxx2289                         |xxx2081    |2021-07-07 00:02:12 UTC                      |
|xxx3227                         |xxx3011    |2021-07-07 00:03:23 UTC                      |
|xxx2819                         |xxx6263    |2021-07-07 00:05:30 UTC                      |

where the column names are the names of entities in Feature Store and the timestamps represents the time an event occurred.

In [None]:
sql_query = f"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.tx.{READ_INSTANCES_TABLE}` as (
    SELECT
        raw_tx.TX_TS AS timestamp,
        raw_tx.CUSTOMER_ID AS customer,
        raw_tx.TERMINAL_ID AS terminal,
        raw_tx.TX_AMOUNT AS tx_amount,
        raw_lb.TX_FRAUD AS tx_fraud,
    FROM 
        tx.tx as raw_tx
    LEFT JOIN 
        tx.txlabels as raw_lb
    ON raw_tx.TX_ID = raw_lb.TX_ID
    WHERE
        DATE(raw_tx.TX_TS) = "{START_DATE_TRAIN}"
    LIMIT 50000
);
"""

print(sql_query)

run_bq_query(sql_query)

### Get Feature Store ID
Initiate the feature store you created in the `02_feature_engineering_batch.ipynb` notebook.

In [None]:
try:
    ff_feature_store = Featurestore(FEATURESTORE_ID)
except NameError:
    print(f"""The feature store {FEATURESTORE_ID} does not exist!""")

### Fetch a sample of data and dump it into a bucket 
In this section, we will use the batch serving of the Vertex AI Feature Store to prepare a dataset for training.

You first have to set `uniformbucketlevelaccess` on the bucket.  When you enable uniform bucket-level access on a bucket, Access Control Lists (ACLs) are disabled, and only bucket-level Identity and Access Management (IAM) permissions grant access to that bucket and the objects it contains. This is not the best practice for product workloads. We only use it to prevent issues when running the workshop. Read more about `uniformbucketlevelaccess` in our [documentation](https://cloud.google.com/storage/docs/uniform-bucket-level-access). 

In [None]:
!gsutil uniformbucketlevelaccess set on gs://{BUCKET_NAME}

Next fetch a batch of data from the Vertex AI Feature Store. 

In [None]:
ff_feature_store.batch_serve_to_gcs(
    gcs_destination_output_uri_prefix=TRAIN_DATA_URI,
    gcs_destination_type="csv",
    serving_feature_ids=SERVING_FEATURE_IDS,
    read_instances_uri=READ_INSTANCES_URI,
    pass_through_fields=["tx_amount", "tx_fraud"],
)

!gsutil uniformbucketlevelaccess set off gs://{BUCKET_NAME}

Now you will create a copy of the training data in your local notebook instance so that you can use it later for testing the model.

In [None]:
!gsutil ls $TRAIN_DATA_URI
!sudo gsutil cp -r $TRAIN_DATA_URI $TRAIN_DATA_DIR

Exporting the features into cloud storage will generate a csv file. Let's list the local file:

In [None]:
!ls $TRAIN_DATA_DIR

## Building a fraud detection model using Vertex AI custom training

#### Building a Vertex AI dataset
In this section, you will create a managed [Vertex AI dataset](https://cloud.google.com/vertex-ai/docs/training/using-managed-datasets). Vertex AI datasets can be used to train AutoML models or custom-trained models.  

In [None]:
# retrieve list of local files
flist = !ls $TRAIN_DATA_DIR
obj_list = [f"gs://{PROJECT_ID}-fraudfinder/data/train/{fname}" for fname in flist]
obj_list

In [None]:
# retrieve list of local files
flist = !ls $TRAIN_DATA_DIR
obj_list = [f"gs://{PROJECT_ID}-fraudfinder/data/train/{fname}" for fname in flist]

# create Vertex AI managed dataset
dataset = vertex_ai.TabularDataset.create(
    display_name=DATASET_NAME,
    gcs_source=obj_list[0],
)

print("Dataset:", f"{dataset.display_name}")
print("Name: \t", f"{dataset.resource_name}")

### Train a custom model

In this section, you will need to train an XGBoost model on Vertex AI custom training. Custom training on Vertex AI requires a container, which contains all of the necessary code, files, and code dependencies needed to train the model.

#### Create the training job with XGBoost and Dask

To perform custom training, you can use either a pre-built container or build your container. In this notebook we will being use XGBoost with the Dask framework, and so we will need to build a custom container for XGBoost and use it to train a model with the Vertex AI custom training service.

You will use Dask. Dask is a parallel computing library built on Python. Dask allows easy management of distributed workers and excels at handling large distributed data science workflows. The implementation in XGBoost originates from dask-xgboost with some extended functionalities and a different interface. 

##### Vertex AI and containers
The first step is to write your training code. Then, you will need to write a Dockerfile and build a container image based on it. The following cell writes our code into `train_xgb.py`, the module for training an XGBClassifier. We will copy this code into our container to run through the Vertex AI training service.

A custom container is a Docker image that you create to run your training application. By running your machine learning (ML) training job in a custom container, you can use ML frameworks, non-ML dependencies, libraries, and binaries that are not otherwise supported on Vertex AI. You can read more in our [documentation](https://cloud.google.com/vertex-ai/docs/training/containers-overview). 

In [None]:
# create a folder for all container-related files
!mkdir -p -m 777 build_training

In [None]:
%%writefile build_training/train_xgb.py

"""
train_xgb.py is the module for training a XGBClassifier pipeline
"""

# Libraries --------------------------------------------------------------------------------------------------------------------------

import argparse
import numpy as np
import os
import json
import logging
from pathlib import Path
import dask.dataframe as dask_df
from dask.distributed import LocalCluster, Client
import xgboost as xgb
from sklearn.metrics import (roc_curve, confusion_matrix, average_precision_score, f1_score, 
                            log_loss, precision_score, recall_score)

# Variables --------------------------------------------------------------------------------------------------------------------------

## Read environmental variables
def gcs_path_to_local_path(old_path):
    new_path = old_path.replace("gs://", "/gcs/")
    return new_path

TRAINING_DATA_PATH = gcs_path_to_local_path(os.environ["AIP_TRAINING_DATA_URI"])
TEST_DATA_PATH = gcs_path_to_local_path(os.environ["AIP_TEST_DATA_URI"])
MODEL_DIR = gcs_path_to_local_path(os.environ["AIP_MODEL_DIR"])
MODEL_PATH = MODEL_DIR + "model.bst"

## Training variables
LABEL_COLUMN = "tx_fraud"
UNUSED_COLUMNS = ["timestamp","entity_type_customer","entity_type_terminal"]
DATA_SCHEMA = {
"timestamp" : "object",
"tx_amount": "float64",
"tx_fraud": "Int64",
"entity_type_customer": "Int64",
"customer_id_nb_tx_1day_window": "Int64",
"customer_id_nb_tx_7day_window": "Int64",
"customer_id_nb_tx_14day_window": "Int64",
"customer_id_avg_amount_1day_window": "float64",
"customer_id_avg_amount_7day_window": "float64",
"customer_id_avg_amount_14day_window": "float64",
"customer_id_nb_tx_15min_window": "Int64",
"customer_id_avg_amount_15min_window": "float64",
"customer_id_nb_tx_30min_window": "Int64",
"customer_id_avg_amount_30min_window": "float64",
"customer_id_nb_tx_60min_window": "Int64",
"customer_id_avg_amount_60min_window": "float64",
"entity_type_terminal": "Int64",
"terminal_id_nb_tx_1day_window": "Int64",
"terminal_id_nb_tx_7day_window": "Int64",
"terminal_id_nb_tx_14day_window": "Int64",
"terminal_id_risk_1day_window": "float64",
"terminal_id_risk_7day_window": "float64",
"terminal_id_risk_14day_window": "float64",
"terminal_id_nb_tx_15min_window": "Int64",
"terminal_id_avg_amount_15min_window": "float64",
"terminal_id_nb_tx_30min_window": "Int64",
"terminal_id_avg_amount_30min_window": "float64",
"terminal_id_nb_tx_60min_window": "Int64",
"terminal_id_avg_amount_60min_window": "float64"
}

# Helpers -----------------------------------------------------------------------------------------------------------------------------
def get_args():
    parser = argparse.ArgumentParser()

    # Data files arguments
    parser.add_argument("--bucket", dest="bucket", type=str,
                        required=True, help="Bucket uri")
    parser.add_argument("--max_depth", dest="max_depth",
                        default=6, type=int,
                        help="max_depth value.")
    parser.add_argument("--eta", dest="eta",
                        default=0.4, type=float,
                        help="eta.")
    parser.add_argument("--gamma", dest="gamma",
                        default=0.0, type=float,
                        help="eta value")
    
    return parser.parse_args()

def resample(df, replace, frac=1, random_state = 8):
    shuffled_df = df.sample(frac=frac, replace=replace, random_state=random_state)
    return shuffled_df

def preprocess(df):
    df = df.drop(columns=UNUSED_COLUMNS)

    # Drop rows with NaN"s
    df = df.dropna()

    # Convert integer valued (numeric) columns to floating point
    numeric_columns = df.select_dtypes(["float32", "float64"]).columns
    numeric_format = {col:"float32" for col in numeric_columns}
    df = df.astype(numeric_format)

    return df

def evaluate_model(model, x_true, y_true):
    y_true = y_true.compute()
    
    #calculate metrics
    metrics={}
    
    y_score =  model.predict_proba(x_true)[:, 1]
    y_score = y_score.compute()
    fpr, tpr, thr = roc_curve(
         y_true=y_true, y_score=y_score, pos_label=True
    )
    fpr_list = fpr.tolist()[::1000]
    tpr_list = tpr.tolist()[::1000]
    thr_list = thr.tolist()[::1000]

    y_pred = model.predict(x_true)
    y_pred.compute()
    c_matrix = confusion_matrix(y_true, y_pred)
    
    avg_precision_score = round(average_precision_score(y_true, y_score), 3)
    f1 = round(f1_score(y_true, y_pred), 3)
    lg_loss = round(log_loss(y_true, y_pred), 3)
    prec_score = round(precision_score(y_true, y_pred), 3)
    rec_score = round(recall_score(y_true, y_pred), 3)
    
    metrics["fpr"] = [round(f, 3) for f in fpr_list]
    metrics["tpr"] = [round(f, 3) for f in tpr_list]
    metrics["thrs"] = [round(f, 3) for f in thr_list]
    metrics["confusion_matrix"] = c_matrix.tolist()
    metrics["avg_precision_score"] = avg_precision_score
    metrics["f1_score"] = f1
    metrics["log_loss"] = lg_loss
    metrics["precision_score"] = prec_score
    metrics["recall_score"] = rec_score
    
    return metrics


def main():
    args = get_args()
        
    # variables
    bucket = gcs_path_to_local_path(args.bucket)
    deliverable_uri = (Path(bucket)/"deliverables")
    metrics_uri = (deliverable_uri/"metrics.json")

    # read data
    train_df = dask_df.read_csv(TRAINING_DATA_PATH, dtype=DATA_SCHEMA)
    test_df = dask_df.read_csv(TEST_DATA_PATH, dtype=DATA_SCHEMA)
    
    # preprocessing
    preprocessed_train_df = preprocess(train_df)
    preprocessed_test_df = preprocess(test_df)
    
    # downsampling
    train_nfraud_df = preprocessed_train_df[preprocessed_train_df[LABEL_COLUMN]==0]
    train_fraud_df = preprocessed_train_df[preprocessed_train_df[LABEL_COLUMN]==1]
    train_nfraud_downsample = resample(train_nfraud_df,
                          replace=True, 
                          frac=len(train_fraud_df)/len(train_df))
    ds_preprocessed_train_df = dask_df.multi.concat([train_nfraud_downsample, train_fraud_df])
    
    # target, features split
    x_train = ds_preprocessed_train_df[ds_preprocessed_train_df.columns.difference([LABEL_COLUMN])]
    y_train = ds_preprocessed_train_df.loc[:, LABEL_COLUMN].astype(int)
    x_true = preprocessed_test_df[preprocessed_test_df.columns.difference([LABEL_COLUMN])]
    y_true = preprocessed_test_df.loc[:, LABEL_COLUMN].astype(int)
    
    # train model
    cluster =  LocalCluster()
    client = Client(cluster)
    model = xgb.dask.DaskXGBClassifier(objective="reg:logistic", eval_metric="logloss")
    model.client = client 
    model.fit(x_train, y_train, eval_set=[(x_true, y_true)])
    if not Path(MODEL_DIR).exists():
        Path(MODEL_DIR).mkdir(parents=True, exist_ok=True)
    model.save_model(MODEL_PATH)
    
    #generate metrics
    metrics = evaluate_model(model, x_true, y_true)
    if not Path(deliverable_uri).exists():
        Path(deliverable_uri).mkdir(parents=True, exist_ok=True)
    with open(metrics_uri, "w") as file:
        json.dump(metrics, file, sort_keys = True, indent = 4)
    file.close()
    
if __name__ == "__main__":
    main()

#### Define a custom image for Dask model training

Now you will build a custom container. By running your training job in a custom container, you can use any ML framework, non-ML dependencies, libraries, and binaries. Next you will package your training code into a Docker container image, push the container image to Artifact Registry, and create a custom job on Vertex AI, which will use the container image on Artifact Registry. As the evolution of Container Registry, Artifact Registry is a single place for your organization to manage container images and language packages. It's fullly intergrated with the Vertex AI platform. You can read more in our [documentation](https://cloud.google.com/artifact-registry). 

In [None]:
# Create image repository
!gcloud artifacts repositories create $IMAGE_REPOSITORY      --repository-format=docker      --location=us-central1      --description="FraudFinder Docker Image repository"

# List repositories under the project
!gcloud artifacts repositories list

# Get info on the repository
!gcloud artifacts repositories describe $IMAGE_REPOSITORY --location=us-central1

Run the follow cell to allow this notebook to push to Artifact Registry

In [None]:
!gcloud auth configure-docker us-central1-docker.pkg.dev -q

Next you need to write your Dockerfile in order to create your container. 

In [None]:
%%writefile build_training/Dockerfile
# Specifies base image and tag
FROM python:3.7
WORKDIR /root

# Installs additional packages
RUN pip install gcsfs numpy pandas scikit-learn dask distributed xgboost --upgrade

# Copies the trainer code to the docker image.
COPY ./train_xgb.py /root/train_xgb.py

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python3", "train_xgb.py"]

Next, build and push the Docker container. 

In [None]:
# Build and push Docker container
!docker build -t $IMAGE_URI ./build_training/
!docker push $IMAGE_URI

print("Done")

#### Start a custom training job on Vertex AI
Now that you have created your custom container, you will create a training job on Vertex AI. This will create a custom training job, load our dataset and register the model to Vertex AI Model Registry  after the training job is successfully completed. Learn more about the creaton of custom jobs [here](https://cloud.google.com/vertex-ai/docs/training/create-custom-job).

In [None]:
job = vertex_ai.CustomContainerTrainingJob(
    display_name=TRAIN_JOB_NAME,
    container_uri=IMAGE_URI,
    model_serving_container_image_uri=MODEL_SERVING_IMAGE_URI,
)

parameters = {"MAX_DEPTH": 4, "ETA": 0.3, "GAMMA": 0.1}

CMDARGS = [
    f"--bucket={BUCKET_NAME}",
    "--max_depth=" + str(parameters["MAX_DEPTH"]),
    "--eta=" + str(parameters["ETA"]),
    "--gamma=" + str(parameters["GAMMA"]),
]

model = job.run(
    dataset=dataset,
    model_display_name=MODEL_NAME,
    args=CMDARGS,
    replica_count=1,
    machine_type=TRAIN_COMPUTE,
    accelerator_count=0,
)

While the model is training, you can visit the model URL, or go to the console page for [Vertex AI training jobs](https://console.cloud.google.com/vertex-ai/training/training-pipelines) to track its progress.

#### Evaluate the model locally

Before you can run the model via an endpoint, you need to transform the data so that the model can perform a prediction on that.

In [None]:
LABEL_COLUMN = "tx_fraud"
UNUSED_COLUMNS = ["timestamp", "entity_type_customer", "entity_type_terminal"]
NA_VALUES = ["NA", "."]


def preprocess(df):
    """Converts categorical features to numeric. Removes unused columns.

    Args:
      df: Pandas df with raw data

    Returns:
      df with preprocessed data
    """
    df = df.drop(columns=UNUSED_COLUMNS)

    # Drop rows with NaN's
    df = df.dropna()

    # Convert integer valued (numeric) columns to floating point
    numeric_columns = df.select_dtypes(["int32", "float32", "float64"]).columns
    df[numeric_columns] = df[numeric_columns].astype("float32")

    dummy_columns = list(df.dtypes[df.dtypes == "category"].index)
    df = pd.get_dummies(df, columns=dummy_columns)

    return df


# test set
train_sample_path = os.path.join(TRAIN_DATA_DIR, "000000000000.csv")
df_test = pd.read_csv(train_sample_path)
preprocessed_test_Data = preprocess(df_test)

x_test = preprocessed_test_Data[
    preprocessed_test_Data.columns.drop(LABEL_COLUMN).to_list()
].values
y_test = preprocessed_test_Data.loc[:, LABEL_COLUMN].astype(int)

Next you will copy the model artifact to the local directory to evaluate the model localy before deploying the model:

In [None]:
!gsutil cp -r $model.uri .

Now it's time to test the model.

In [None]:
bst = xgb.Booster()
bst.load_model("./model/model.bst")
xgtest = xgb.DMatrix(x_test)
y_pred_prob = bst.predict(xgtest)
y_pred = y_pred_prob.round().astype(int)
y_pred_prob[0:10]
precision_recall_fscore_support(y_test.values, y_pred, average="weighted")

#### Deploy the model
Before you use your model to make predictions, you need to deploy it to an Endpoint. You can do this by calling the deploy function on the Model resource. This will do two things:

- create an Endpoint resource
- deploy the Model resource to the Endpoint resource


In [None]:
# Compute instance type
DEPLOY_COMPUTE = "n1-standard-4"

# Percentage of traffic that the model will receive in the endpoint
TRAFFIC_SPLIT = {"0": 100}

# Parameters to configure the minimum and maximum nodes during autoscaling
MIN_NODES = 1
MAX_NODES = 1


endpoint = model.deploy(
    deployed_model_display_name=MODEL_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=DEPLOY_COMPUTE,
    accelerator_count=0,
    min_replica_count=MIN_NODES,
    max_replica_count=MAX_NODES,
)

#### Test the deployed model (Make an online prediction request)
Send an online prediction request to your deployed model. To make sure your deployed model is working, test it out by sending a request to the endpoint.

Let's first get a test data.

In [None]:
payload = {"instances": x_test[:2].tolist()}

# In case you want to test it in the console
import json

with open("predictions.json", "w", encoding="utf-8") as f:
    json.dump(payload, f, ensure_ascii=False, indent=4)

In [None]:
endpoint.predict(instances=payload["instances"])

Now that we understand we packaged our XGBoost model and started a custom training job on Vertex AI we can take the ML workflow and formalize it into a Vertex AI Pipeline.

You can continue with the next Notebook: `06_formalization.ipynb`.