# Setting Up Environment for Streaming and Cloudera Migration Demo

## Imports & APIs Enablement

In [None]:
import os
import time

In [None]:
#enable kafka  in the project
!(gcloud services enable managedkafka.googleapis.com --project "${GOOGLE_CLOUD_PROJECT}")

## Variables

In [None]:
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = "us-central1"
kafka_cluster_name = "ti-kafka-cluster-01-eri"
network="colab-network"
subnet = "colab-subnetwork"

In [None]:
DATA_BUCKET_NAME_DW = f"dw-{PROJECT_ID}"

## Functions

### Credentials Managament

#### Rest API Helper

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

### Apache Kafka

In [None]:
def createApacheKafkaForBigQueryCluster():
  """Creates a Apache Kafka For BigQuery Cluster."""

  # First find the cluster if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

  # Test to see if cluster exists, if so return
  if "clusters" in json_result:
    for item in json_result["clusters"]:
      print(f"Apache Kafka for BigQuery: {item['name']}")
      # "projects/${project_id}/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}":
        print("Apache Kafka for BigQuery already exists")
        return f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}"

  # Create Apache Kafka For BigQuery Cluster
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/create
  print("Creating Apache Kafka For BigQuery Cluster")

  url = f"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters?clusterId={kafka_cluster_name}"

  # Larger Apache Kafka Cluster
  # vcpuCount: 32 -> You can probably use less CPUs since they are mainly ideal
  # memoryBytes: 34359738368 -> RAM was at 50% when doing 11,000 customers

  request_body = {
      "capacityConfig": {
        "vcpuCount": "3",
        "memoryBytes": "3221225472"
      },
      "gcpConfig": {
          "accessConfig": {
              "networkConfigs": {
                  "subnet": f"projects/{PROJECT_ID}/regions/{REGION}/subnetworks/{subnet}"
                  }
            }
        }
    }

  json_result = restAPIHelper(url, "POST", request_body)

  name = json_result["name"]
  done = json_result["done"]
  print("Apache Kafka for BigQuery created: ", name)
  return f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}"

In [None]:
def waitForApacheKafkaForBigQueryCluster(operation):
  """
  Waits for an Apache Kafka For BigQuery Cluster to be Created.

  opertion:
    projects/${project_id}/locations/us-central1/operations/operation-1723064212031-61f1e264889a9-9e3a863b-90613855
  """

  url = f"https://managedkafka.googleapis.com/v1/{operation}"
  max_retries = 100
  attempt = 0

  while True:
    # Gather existing connections
    json_result = restAPIHelper(url, "GET", None)
    print(f"waitForApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

    # Test to see if connection exists, if so return
    if "state" in json_result:
      if json_result["state"] == "ACTIVE":
        print("Apache Kafka for BigQuery Cluster created")
        return None

    # Wait for 10 seconds
    attempt += 1
    if attempt > max_retries:
      raise RuntimeError("Apache Kafka for BigQuery Cluster not created")
    time.sleep(30)


### GCS

In [None]:
!(gcloud storage buckets create gs://{DATA_BUCKET_NAME_DW} \
    --project="{PROJECT_ID}")

In [None]:
# Copy our data (CSV files).  We want the files in our local bucket with local location.
source_path = "gs://data-analytics-golden-demo/warehouse/*"
dest_path = f"gs://{DATA_BUCKET_NAME_DW}/warehouse/"
print(f"Copying data from {source_path} to {dest_path}")
print("This may take a few minutes...")
!gsutil -m -q cp -r {source_path} {dest_path}
print("Copy [data] is complete")

# Creating Objects

## Apache Kafka

In [None]:
# To see your clusters: https://console.cloud.google.com/managedkafka/clusterList

# NOTE: If you get a subnet error, please re-run this cell

opertion = createApacheKafkaForBigQueryCluster()

if opertion is not None:
  waitForApacheKafkaForBigQueryCluster(opertion)