00-Setup.ipynb (361 lines of code) (raw):

{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "name": "00-Setup", "cell_execution_strategy": "setup", "provenance": [], "toc_visible": true, "collapsed_sections": [ "sOLrHLG1z9_b", "uSi6vUFs3xdy", "MNCOblK30G8S", "KMvz64ON5_FH", "zn2FOUW59EAw" ] }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "markdown", "source": [ "# Setting Up Environment for Streaming and Cloudera Migration Demo" ], "metadata": { "id": "Y0dXFpSp4sgl" } }, { "cell_type": "markdown", "source": [ "## Imports & APIs Enablement" ], "metadata": { "id": "sOLrHLG1z9_b" } }, { "cell_type": "code", "source": [ "import os\n", "import time" ], "metadata": { "id": "jMVddC9P0azs" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "#enable kafka in the project\n", "!(gcloud services enable managedkafka.googleapis.com --project \"${GOOGLE_CLOUD_PROJECT}\")" ], "metadata": { "id": "2pTrO3XSnzNf" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Variables" ], "metadata": { "id": "uSi6vUFs3xdy" } }, { "cell_type": "code", "source": [ "PROJECT_ID = os.environ[\"GOOGLE_CLOUD_PROJECT\"]\n", "REGION = \"us-central1\"\n", "kafka_cluster_name = \"ti-kafka-cluster-01-eri\"\n", "network=\"colab-network\"\n", "subnet = \"colab-subnetwork\"" ], "metadata": { "id": "y0CpFI1437Hr" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "DATA_BUCKET_NAME_DW = f\"dw-{PROJECT_ID}\"" ], "metadata": { "id": "xHjBxXdR8ppN" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Functions" ], "metadata": { "id": "MNCOblK30G8S" } }, { "cell_type": "markdown", "source": [ "### Credentials Managament" ], "metadata": { "id": "tUYwh7cm522O" } }, { "cell_type": "markdown", "source": [ "#### Rest API Helper" ], "metadata": { "id": "KMvz64ON5_FH" } }, { "cell_type": "code", "source": [ "def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n", " \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n", "\n", " import requests\n", " import google.auth\n", " import json\n", "\n", " # Get an access token based upon the current user\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request()\n", " creds.refresh(auth_req)\n", " access_token=creds.token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " if http_verb == \"GET\":\n", " response = requests.get(url, headers=headers)\n", " elif http_verb == \"POST\":\n", " response = requests.post(url, json=request_body, headers=headers)\n", " elif http_verb == \"PUT\":\n", " response = requests.put(url, json=request_body, headers=headers)\n", " elif http_verb == \"PATCH\":\n", " response = requests.patch(url, json=request_body, headers=headers)\n", " elif http_verb == \"DELETE\":\n", " response = requests.delete(url, headers=headers)\n", " else:\n", " raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n", "\n", " if response.status_code == 200:\n", " return json.loads(response.content)\n", " #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n", " else:\n", " error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n", " raise RuntimeError(error)" ], "metadata": { "id": "zq11xygv6BM2" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Apache Kafka" ], "metadata": { "id": "bTdOcZd-4a50" } }, { "cell_type": "code", "source": [ "def createApacheKafkaForBigQueryCluster():\n", " \"\"\"Creates a Apache Kafka For BigQuery Cluster.\"\"\"\n", "\n", " # First find the cluster if it exists\n", " # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list\n", "\n", " url = f\"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters\"\n", "\n", " # Gather existing clusters\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}\")\n", "\n", " # Test to see if cluster exists, if so return\n", " if \"clusters\" in json_result:\n", " for item in json_result[\"clusters\"]:\n", " print(f\"Apache Kafka for BigQuery: {item['name']}\")\n", " # \"projects/${project_id}/locations/us-central1/clusters/kafka-cluster\"\n", " if item[\"name\"] == f\"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}\":\n", " print(\"Apache Kafka for BigQuery already exists\")\n", " return f\"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}\"\n", "\n", " # Create Apache Kafka For BigQuery Cluster\n", " # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/create\n", " print(\"Creating Apache Kafka For BigQuery Cluster\")\n", "\n", " url = f\"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters?clusterId={kafka_cluster_name}\"\n", "\n", " # Larger Apache Kafka Cluster\n", " # vcpuCount: 32 -> You can probably use less CPUs since they are mainly ideal\n", " # memoryBytes: 34359738368 -> RAM was at 50% when doing 11,000 customers\n", "\n", " request_body = {\n", " \"capacityConfig\": {\n", " \"vcpuCount\": \"3\",\n", " \"memoryBytes\": \"3221225472\"\n", " },\n", " \"gcpConfig\": {\n", " \"accessConfig\": {\n", " \"networkConfigs\": {\n", " \"subnet\": f\"projects/{PROJECT_ID}/regions/{REGION}/subnetworks/{subnet}\"\n", " }\n", " }\n", " }\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " name = json_result[\"name\"]\n", " done = json_result[\"done\"]\n", " print(\"Apache Kafka for BigQuery created: \", name)\n", " return f\"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}\"" ], "metadata": { "id": "UnRhyYyD3IWJ" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "def waitForApacheKafkaForBigQueryCluster(operation):\n", " \"\"\"\n", " Waits for an Apache Kafka For BigQuery Cluster to be Created.\n", "\n", " opertion:\n", " projects/${project_id}/locations/us-central1/operations/operation-1723064212031-61f1e264889a9-9e3a863b-90613855\n", " \"\"\"\n", "\n", " url = f\"https://managedkafka.googleapis.com/v1/{operation}\"\n", " max_retries = 100\n", " attempt = 0\n", "\n", " while True:\n", " # Gather existing connections\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"waitForApacheKafkaForBigQueryCluster (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so return\n", " if \"state\" in json_result:\n", " if json_result[\"state\"] == \"ACTIVE\":\n", " print(\"Apache Kafka for BigQuery Cluster created\")\n", " return None\n", "\n", " # Wait for 10 seconds\n", " attempt += 1\n", " if attempt > max_retries:\n", " raise RuntimeError(\"Apache Kafka for BigQuery Cluster not created\")\n", " time.sleep(30)\n" ], "metadata": { "id": "DRi8OBFY_URC" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### GCS" ], "metadata": { "id": "zn2FOUW59EAw" } }, { "cell_type": "code", "source": [ "!(gcloud storage buckets create gs://{DATA_BUCKET_NAME_DW} \\\n", " --project=\"{PROJECT_ID}\")" ], "metadata": { "id": "3q7Kzzu89HKE" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "# Copy our data (CSV files). We want the files in our local bucket with local location.\n", "source_path = \"gs://data-analytics-golden-demo/warehouse/*\"\n", "dest_path = f\"gs://{DATA_BUCKET_NAME_DW}/warehouse/\"\n", "print(f\"Copying data from {source_path} to {dest_path}\")\n", "print(\"This may take a few minutes...\")\n", "!gsutil -m -q cp -r {source_path} {dest_path}\n", "print(\"Copy [data] is complete\")" ], "metadata": { "id": "zgBmZgGP9tLd" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "# Creating Objects" ], "metadata": { "id": "YPQBnPEn45XQ" } }, { "cell_type": "markdown", "source": [ "## Apache Kafka" ], "metadata": { "id": "N7FwaSrm5HVZ" } }, { "cell_type": "code", "source": [ "# To see your clusters: https://console.cloud.google.com/managedkafka/clusterList\n", "\n", "# NOTE: If you get a subnet error, please re-run this cell\n", "\n", "opertion = createApacheKafkaForBigQueryCluster()\n", "\n", "if opertion is not None:\n", " waitForApacheKafkaForBigQueryCluster(opertion)" ], "metadata": { "id": "ZB9C89uS5Kgv" }, "execution_count": null, "outputs": [] } ] }