in scripts/cloud_run_model_inference/main.py [0:0]
def index():
envelope = request.get_json()
if not envelope:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(envelope, dict) or "message" not in envelope:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
pubsub_message = envelope["message"]
if isinstance(pubsub_message, dict) and "data" in pubsub_message:
payload_input = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()
print(f" >> payload input {payload_input}!")
# parse payload string into JSON object
payload_json = json.loads(payload_input)
payload={}
payload["tx_amount"] = payload_json["TX_AMOUNT"]
# look up the customer features from Vertex AI Feature Store
customer_features = features_lookup(ff_feature_store, "customer",[payload_json["CUSTOMER_ID"]])
payload["customer_id_nb_tx_1day_window"] = customer_features["customer_id_nb_tx_1day_window"]
payload["customer_id_nb_tx_7day_window"] = customer_features["customer_id_nb_tx_7day_window"]
payload["customer_id_nb_tx_14day_window"] = customer_features["customer_id_nb_tx_14day_window"]
payload["customer_id_avg_amount_1day_window"] = customer_features["customer_id_avg_amount_1day_window"]
payload["customer_id_avg_amount_7day_window"] = customer_features["customer_id_avg_amount_7day_window"]
payload["customer_id_avg_amount_14day_window"] = customer_features["customer_id_avg_amount_14day_window"]
payload["customer_id_nb_tx_15min_window"] = customer_features["customer_id_nb_tx_15min_window"]
payload["customer_id_avg_amount_15min_window"] = customer_features["customer_id_avg_amount_15min_window"]
payload["customer_id_nb_tx_30min_window"] = customer_features["customer_id_nb_tx_30min_window"]
payload["customer_id_avg_amount_30min_window"] = customer_features[ "customer_id_avg_amount_30min_window"]
payload["customer_id_nb_tx_60min_window"] = customer_features["customer_id_nb_tx_60min_window"]
payload["customer_id_avg_amount_60min_window"] = customer_features["customer_id_avg_amount_60min_window"]
# look up the terminal features from Vertex AI Feature Store
terminal_features = features_lookup(ff_feature_store, "terminal",[payload_json["TERMINAL_ID"]])
payload["terminal_id_nb_tx_1day_window"] = terminal_features["terminal_id_nb_tx_1day_window"]
payload["terminal_id_nb_tx_7day_window"] = terminal_features["terminal_id_nb_tx_7day_window"]
payload["terminal_id_nb_tx_14day_window"] = terminal_features["terminal_id_nb_tx_14day_window"]
payload["terminal_id_risk_1day_window"] = terminal_features["terminal_id_risk_1day_window"]
payload["terminal_id_risk_7day_window"] = terminal_features["terminal_id_risk_7day_window"]
payload["terminal_id_risk_14day_window"] = terminal_features["terminal_id_risk_14day_window"]
payload["terminal_id_nb_tx_15min_window"] = terminal_features["terminal_id_nb_tx_15min_window"]
payload["terminal_id_avg_amount_15min_window"] = terminal_features["terminal_id_avg_amount_15min_window"]
payload["terminal_id_nb_tx_30min_window"] = terminal_features["terminal_id_nb_tx_30min_window"]
payload["terminal_id_avg_amount_30min_window"] = terminal_features["terminal_id_avg_amount_30min_window"]
payload["terminal_id_nb_tx_60min_window"] = terminal_features["terminal_id_nb_tx_60min_window"]
payload["terminal_id_avg_amount_60min_window"] = terminal_features["terminal_id_avg_amount_60min_window"]
payload = preprocess(payload)
print("-------------------------------------------------------")
print(f"[Pre-processed payload to be sent to Vertex AI endpoint]: {payload}")
result = endpoint_obj.predict(instances = [payload])
print(f"[Prediction result]: {result}")
return ("", 204)