00-Setup.ipynb (361 lines of code) (raw):
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "00-Setup",
"cell_execution_strategy": "setup",
"provenance": [],
"toc_visible": true,
"collapsed_sections": [
"sOLrHLG1z9_b",
"uSi6vUFs3xdy",
"MNCOblK30G8S",
"KMvz64ON5_FH",
"zn2FOUW59EAw"
]
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"source": [
"# Setting Up Environment for Streaming and Cloudera Migration Demo"
],
"metadata": {
"id": "Y0dXFpSp4sgl"
}
},
{
"cell_type": "markdown",
"source": [
"## Imports & APIs Enablement"
],
"metadata": {
"id": "sOLrHLG1z9_b"
}
},
{
"cell_type": "code",
"source": [
"import os\n",
"import time"
],
"metadata": {
"id": "jMVddC9P0azs"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"#enable kafka in the project\n",
"!(gcloud services enable managedkafka.googleapis.com --project \"${GOOGLE_CLOUD_PROJECT}\")"
],
"metadata": {
"id": "2pTrO3XSnzNf"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"## Variables"
],
"metadata": {
"id": "uSi6vUFs3xdy"
}
},
{
"cell_type": "code",
"source": [
"PROJECT_ID = os.environ[\"GOOGLE_CLOUD_PROJECT\"]\n",
"REGION = \"us-central1\"\n",
"kafka_cluster_name = \"ti-kafka-cluster-01-eri\"\n",
"network=\"colab-network\"\n",
"subnet = \"colab-subnetwork\""
],
"metadata": {
"id": "y0CpFI1437Hr"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"DATA_BUCKET_NAME_DW = f\"dw-{PROJECT_ID}\""
],
"metadata": {
"id": "xHjBxXdR8ppN"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"## Functions"
],
"metadata": {
"id": "MNCOblK30G8S"
}
},
{
"cell_type": "markdown",
"source": [
"### Credentials Managament"
],
"metadata": {
"id": "tUYwh7cm522O"
}
},
{
"cell_type": "markdown",
"source": [
"#### Rest API Helper"
],
"metadata": {
"id": "KMvz64ON5_FH"
}
},
{
"cell_type": "code",
"source": [
"def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n",
" \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n",
"\n",
" import requests\n",
" import google.auth\n",
" import json\n",
"\n",
" # Get an access token based upon the current user\n",
" creds, project = google.auth.default()\n",
" auth_req = google.auth.transport.requests.Request()\n",
" creds.refresh(auth_req)\n",
" access_token=creds.token\n",
"\n",
" headers = {\n",
" \"Content-Type\" : \"application/json\",\n",
" \"Authorization\" : \"Bearer \" + access_token\n",
" }\n",
"\n",
" if http_verb == \"GET\":\n",
" response = requests.get(url, headers=headers)\n",
" elif http_verb == \"POST\":\n",
" response = requests.post(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PUT\":\n",
" response = requests.put(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PATCH\":\n",
" response = requests.patch(url, json=request_body, headers=headers)\n",
" elif http_verb == \"DELETE\":\n",
" response = requests.delete(url, headers=headers)\n",
" else:\n",
" raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n",
"\n",
" if response.status_code == 200:\n",
" return json.loads(response.content)\n",
" #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n",
" else:\n",
" error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n",
" raise RuntimeError(error)"
],
"metadata": {
"id": "zq11xygv6BM2"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"### Apache Kafka"
],
"metadata": {
"id": "bTdOcZd-4a50"
}
},
{
"cell_type": "code",
"source": [
"def createApacheKafkaForBigQueryCluster():\n",
" \"\"\"Creates a Apache Kafka For BigQuery Cluster.\"\"\"\n",
"\n",
" # First find the cluster if it exists\n",
" # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters\"\n",
"\n",
" # Gather existing clusters\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
" print(f\"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}\")\n",
"\n",
" # Test to see if cluster exists, if so return\n",
" if \"clusters\" in json_result:\n",
" for item in json_result[\"clusters\"]:\n",
" print(f\"Apache Kafka for BigQuery: {item['name']}\")\n",
" # \"projects/${project_id}/locations/us-central1/clusters/kafka-cluster\"\n",
" if item[\"name\"] == f\"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}\":\n",
" print(\"Apache Kafka for BigQuery already exists\")\n",
" return f\"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}\"\n",
"\n",
" # Create Apache Kafka For BigQuery Cluster\n",
" # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/create\n",
" print(\"Creating Apache Kafka For BigQuery Cluster\")\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/clusters?clusterId={kafka_cluster_name}\"\n",
"\n",
" # Larger Apache Kafka Cluster\n",
" # vcpuCount: 32 -> You can probably use less CPUs since they are mainly ideal\n",
" # memoryBytes: 34359738368 -> RAM was at 50% when doing 11,000 customers\n",
"\n",
" request_body = {\n",
" \"capacityConfig\": {\n",
" \"vcpuCount\": \"3\",\n",
" \"memoryBytes\": \"3221225472\"\n",
" },\n",
" \"gcpConfig\": {\n",
" \"accessConfig\": {\n",
" \"networkConfigs\": {\n",
" \"subnet\": f\"projects/{PROJECT_ID}/regions/{REGION}/subnetworks/{subnet}\"\n",
" }\n",
" }\n",
" }\n",
" }\n",
"\n",
" json_result = restAPIHelper(url, \"POST\", request_body)\n",
"\n",
" name = json_result[\"name\"]\n",
" done = json_result[\"done\"]\n",
" print(\"Apache Kafka for BigQuery created: \", name)\n",
" return f\"projects/{PROJECT_ID}/locations/{REGION}/clusters/{kafka_cluster_name}\""
],
"metadata": {
"id": "UnRhyYyD3IWJ"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"def waitForApacheKafkaForBigQueryCluster(operation):\n",
" \"\"\"\n",
" Waits for an Apache Kafka For BigQuery Cluster to be Created.\n",
"\n",
" opertion:\n",
" projects/${project_id}/locations/us-central1/operations/operation-1723064212031-61f1e264889a9-9e3a863b-90613855\n",
" \"\"\"\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/{operation}\"\n",
" max_retries = 100\n",
" attempt = 0\n",
"\n",
" while True:\n",
" # Gather existing connections\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
" print(f\"waitForApacheKafkaForBigQueryCluster (GET) json_result: {json_result}\")\n",
"\n",
" # Test to see if connection exists, if so return\n",
" if \"state\" in json_result:\n",
" if json_result[\"state\"] == \"ACTIVE\":\n",
" print(\"Apache Kafka for BigQuery Cluster created\")\n",
" return None\n",
"\n",
" # Wait for 10 seconds\n",
" attempt += 1\n",
" if attempt > max_retries:\n",
" raise RuntimeError(\"Apache Kafka for BigQuery Cluster not created\")\n",
" time.sleep(30)\n"
],
"metadata": {
"id": "DRi8OBFY_URC"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"### GCS"
],
"metadata": {
"id": "zn2FOUW59EAw"
}
},
{
"cell_type": "code",
"source": [
"!(gcloud storage buckets create gs://{DATA_BUCKET_NAME_DW} \\\n",
" --project=\"{PROJECT_ID}\")"
],
"metadata": {
"id": "3q7Kzzu89HKE"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# Copy our data (CSV files). We want the files in our local bucket with local location.\n",
"source_path = \"gs://data-analytics-golden-demo/warehouse/*\"\n",
"dest_path = f\"gs://{DATA_BUCKET_NAME_DW}/warehouse/\"\n",
"print(f\"Copying data from {source_path} to {dest_path}\")\n",
"print(\"This may take a few minutes...\")\n",
"!gsutil -m -q cp -r {source_path} {dest_path}\n",
"print(\"Copy [data] is complete\")"
],
"metadata": {
"id": "zgBmZgGP9tLd"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"# Creating Objects"
],
"metadata": {
"id": "YPQBnPEn45XQ"
}
},
{
"cell_type": "markdown",
"source": [
"## Apache Kafka"
],
"metadata": {
"id": "N7FwaSrm5HVZ"
}
},
{
"cell_type": "code",
"source": [
"# To see your clusters: https://console.cloud.google.com/managedkafka/clusterList\n",
"\n",
"# NOTE: If you get a subnet error, please re-run this cell\n",
"\n",
"opertion = createApacheKafkaForBigQueryCluster()\n",
"\n",
"if opertion is not None:\n",
" waitForApacheKafkaForBigQueryCluster(opertion)"
],
"metadata": {
"id": "ZB9C89uS5Kgv"
},
"execution_count": null,
"outputs": []
}
]
}