supporting-blog-content/ingesting-data-with-big-query/ingesting_data_with_big_query_notebook.ipynb (391 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "LlrEjmtJNpuX" }, "source": [ "# Ingesting data with BigQuery\n", "\n", "This notebook demonstrates how to consume data contained in BigQuery and index it into Elasticsearch. This notebook is based on the article [Ingesting data with BigQuery](https://www.elastic.co/search-labs/blog/ingesting-data-with-big-query)." ] }, { "cell_type": "markdown", "metadata": { "id": "GNaAN-GNO5qp" }, "source": [ "## Installing dependencies and importing packages" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "qgclZayCk1Ct", "outputId": "7da1e962-ead6-4016-b2e5-12a019885d86" }, "outputs": [], "source": [ "!pip install google-cloud-bigquery elasticsearch==8.16 google-auth" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rAesontNXpLu" }, "outputs": [], "source": [ "from elasticsearch import Elasticsearch, exceptions\n", "from google.cloud import bigquery\n", "from google.colab import auth\n", "from getpass import getpass\n", "\n", "import json" ] }, { "cell_type": "markdown", "metadata": { "id": "NwOmnk99Pfh3" }, "source": [ "## Declaring variables" ] }, { "cell_type": "markdown", "metadata": { "id": "-4sV9fiXdBwj" }, "source": [ "This code will create inputs where you can enter your credentials.\n", "Here you can learn how to retrieve your Elasticsearch credentials: [Finding Your Cloud ID](https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "GVKJKfFpPWuj", "outputId": "21c6f4e3-8cb5-4a2c-8efe-0e45c3c6b1c4" }, "outputs": [], "source": [ "ELASTICSEARCH_ENDPOINT = getpass(\"Elasticsearch endpoint: \")\n", "ELASTIC_API_KEY = getpass(\"Elastic Api Key: \")\n", "\n", "\n", "# Google Cloud project name and BigQuery dataset name\n", "PROJECT_ID = \"elasticsearch-bigquery\"\n", "# dataset_id in format <your-project-name>.<your-dataset-name>\n", "DATASET_ID = f\"{PROJECT_ID}.server_logs\"" ] }, { "cell_type": "markdown", "metadata": { "id": "3O2HclcYHEsS" }, "source": [ "## Instance a Elasticsearch client" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1LWiop8NYiQF" }, "outputs": [], "source": [ "auth.authenticate_user()\n", "\n", "# Elasticsearch client\n", "es_client = Elasticsearch(\n", " ELASTICSEARCH_ENDPOINT,\n", " api_key=ELASTIC_API_KEY,\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "9lvPHaXjPlfu" }, "source": [ "## Creating mappings" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "tc88YzAYw31e" }, "outputs": [], "source": [ "try:\n", " es_client.indices.create(\n", " index=\"bigquery-logs\",\n", " body={\n", " \"mappings\": {\n", " \"properties\": {\n", " \"status_code_description\": {\"type\": \"match_only_text\"},\n", " \"status_code\": {\"type\": \"keyword\"},\n", " \"@timestamp\": {\"type\": \"date\"},\n", " \"ip_address\": {\"type\": \"ip\"},\n", " \"http_method\": {\"type\": \"keyword\"},\n", " \"endpoint\": {\"type\": \"keyword\"},\n", " \"response_time\": {\"type\": \"integer\"},\n", " }\n", " }\n", " },\n", " )\n", "except exceptions.RequestError as e:\n", " if e.error == \"resource_already_exists_exception\":\n", " print(\"Index already exists.\")\n", " else:\n", " raise e" ] }, { "cell_type": "markdown", "metadata": { "id": "AOtwAfPXP38Z" }, "source": [ "## Getting data from BigQuery" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "DyKtsjXRXB2S", "outputId": "7734ad7e-cb11-42cc-b97d-e9241cab6b17" }, "outputs": [], "source": [ "client = bigquery.Client(project=PROJECT_ID)\n", "# Getting tables from dataset\n", "tables = client.list_tables(DATASET_ID)\n", "\n", "data = {}\n", "\n", "for table in tables:\n", " # Table id must be in format <dataset_name>.<table_name>\n", " table_id = f\"{DATASET_ID}.{table.table_id}\"\n", "\n", " print(f\"Processing table: {table.table_id}\")\n", "\n", " # Query to retrieve BigQuery tables data\n", " query = f\"\"\"\n", " SELECT *\n", " FROM `{table_id}`\n", " \"\"\"\n", "\n", " query_job = client.query(query)\n", "\n", " results = query_job.result()\n", "\n", " print(f\"Results for table: {table.table_id}:\")\n", "\n", " data[table.table_id] = []\n", "\n", " for row in results:\n", " # Saving data with key=table_id\n", " data[table.table_id].append(dict(row))\n", " print(row)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "UAznwqXStJ39", "outputId": "508a3255-43f7-4828-87d5-997cd04ca427" }, "outputs": [], "source": [ "# variable with data\n", "logs_data = data[\"logs\"]\n", "\n", "\n", "print(logs_data)" ] }, { "cell_type": "markdown", "metadata": { "id": "2s4Tr6wBP773" }, "source": [ "## Indexing to Elasticsearch" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "C4goyH6ZbDJK", "outputId": "cfb4af41-1ef1-40da-a2da-6c0aa83633d3" }, "outputs": [], "source": [ "bulk_data = []\n", "\n", "for log_entry in logs_data:\n", " # Convert timestamp to ISO 8601 string\n", " timestamp_iso8601 = log_entry[\"_timestamp\"].isoformat()\n", "\n", " # Prepare action metadata\n", " action_metadata = {\n", " \"index\": {\n", " \"_index\": \"bigquery-logs\",\n", " \"_id\": f\"{log_entry['ip_address']}-{timestamp_iso8601}\",\n", " }\n", " }\n", "\n", " # Prepare document\n", " document = {\n", " \"ip_address\": log_entry[\"ip_address\"],\n", " \"status_code\": log_entry[\"status_code\"],\n", " \"@timestamp\": timestamp_iso8601,\n", " \"http_method\": log_entry[\"http_method\"],\n", " \"endpoint\": log_entry[\"endpoint\"],\n", " \"response_time\": log_entry[\"response_time\"],\n", " \"status_code_description\": log_entry[\"status_code_description\"],\n", " }\n", "\n", " # Append to bulk data\n", " bulk_data.append(action_metadata)\n", " bulk_data.append(document)\n", "\n", "print(bulk_data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "WPEwsJrFbDHQ", "outputId": "ab5904f7-21c1-4596-fb06-55569cd9eb17" }, "outputs": [], "source": [ "try:\n", " # Indexing data\n", " response = es_client.bulk(body=bulk_data)\n", "\n", " if response[\"errors\"]:\n", " print(\"Errors while indexing:\")\n", " for item in response[\"items\"]:\n", " if \"error\" in item[\"index\"]:\n", " print(item[\"index\"][\"error\"])\n", " else:\n", " print(\"Documents indexed successfully.\")\n", "except Exception as e:\n", " print(f\"Bulk indexing failed: {e}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "uxix_o8LQDup" }, "source": [ "# Searching data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "285_MwI8yknk", "outputId": "67d43aca-b05d-43f7-c730-1fa3bc0c4662" }, "outputs": [], "source": [ "response = es_client.search(\n", " index=\"bigquery-logs\",\n", " body={\n", " \"query\": {\"match\": {\"status_code_description\": \"error\"}},\n", " \"sort\": [{\"@timestamp\": {\"order\": \"desc\"}}],\n", " \"aggs\": {\"by_ip\": {\"terms\": {\"field\": \"ip_address\", \"size\": 10}}},\n", " },\n", ")\n", "\n", "# Print results\n", "formatted_json = json.dumps(response.body, indent=4)\n", "print(formatted_json)" ] }, { "cell_type": "markdown", "metadata": { "id": "S6WZMJayyzxh" }, "source": [ "## Deleting\n", "\n", "Finally, we can delete the resources used to prevent them from consuming resources." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "9UcQwa41yy_x", "outputId": "02a33d89-b57c-4273-ba93-5b6441a4f91e" }, "outputs": [], "source": [ "# Cleanup - Delete Index\n", "es_client.indices.delete(index=\"bigquery-logs\", ignore=[400, 404])" ] } ], "metadata": { "colab": { "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }