# Ingesting data with BigQuery

This notebook demonstrates how to consume data contained in BigQuery and index it into Elasticsearch. This notebook is based on the article [Ingesting data with BigQuery](https://www.elastic.co/search-labs/blog/ingesting-data-with-big-query).

## Installing dependencies and importing packages

In [None]:
!pip install google-cloud-bigquery elasticsearch==8.16 google-auth

In [None]:
from elasticsearch import Elasticsearch, exceptions
from google.cloud import bigquery
from google.colab import auth
from getpass import getpass

import json

## Declaring variables

This code will create inputs where you can enter your credentials.
Here you can learn how to retrieve your Elasticsearch credentials: [Finding Your Cloud ID](https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id).

In [None]:
ELASTICSEARCH_ENDPOINT = getpass("Elasticsearch endpoint: ")
ELASTIC_API_KEY = getpass("Elastic Api Key: ")


# Google Cloud project name and BigQuery dataset name
PROJECT_ID = "elasticsearch-bigquery"
# dataset_id in format <your-project-name>.<your-dataset-name>
DATASET_ID = f"{PROJECT_ID}.server_logs"

## Instance a Elasticsearch client

In [None]:
auth.authenticate_user()

# Elasticsearch client
es_client = Elasticsearch(
    ELASTICSEARCH_ENDPOINT,
    api_key=ELASTIC_API_KEY,
)

## Creating mappings

In [None]:
try:
    es_client.indices.create(
        index="bigquery-logs",
        body={
            "mappings": {
                "properties": {
                    "status_code_description": {"type": "match_only_text"},
                    "status_code": {"type": "keyword"},
                    "@timestamp": {"type": "date"},
                    "ip_address": {"type": "ip"},
                    "http_method": {"type": "keyword"},
                    "endpoint": {"type": "keyword"},
                    "response_time": {"type": "integer"},
                }
            }
        },
    )
except exceptions.RequestError as e:
    if e.error == "resource_already_exists_exception":
        print("Index already exists.")
    else:
        raise e

## Getting data from BigQuery

In [None]:
client = bigquery.Client(project=PROJECT_ID)
# Getting tables from dataset
tables = client.list_tables(DATASET_ID)

data = {}

for table in tables:
    # Table id must be in format <dataset_name>.<table_name>
    table_id = f"{DATASET_ID}.{table.table_id}"

    print(f"Processing table: {table.table_id}")

    # Query to retrieve BigQuery tables data
    query = f"""
        SELECT *
        FROM `{table_id}`
    """

    query_job = client.query(query)

    results = query_job.result()

    print(f"Results for table: {table.table_id}:")

    data[table.table_id] = []

    for row in results:
        # Saving data with key=table_id
        data[table.table_id].append(dict(row))
        print(row)

In [None]:
# variable with data
logs_data = data["logs"]


print(logs_data)

## Indexing to Elasticsearch

In [None]:
bulk_data = []

for log_entry in logs_data:
    # Convert timestamp to ISO 8601 string
    timestamp_iso8601 = log_entry["_timestamp"].isoformat()

    # Prepare action metadata
    action_metadata = {
        "index": {
            "_index": "bigquery-logs",
            "_id": f"{log_entry['ip_address']}-{timestamp_iso8601}",
        }
    }

    # Prepare document
    document = {
        "ip_address": log_entry["ip_address"],
        "status_code": log_entry["status_code"],
        "@timestamp": timestamp_iso8601,
        "http_method": log_entry["http_method"],
        "endpoint": log_entry["endpoint"],
        "response_time": log_entry["response_time"],
        "status_code_description": log_entry["status_code_description"],
    }

    # Append to bulk data
    bulk_data.append(action_metadata)
    bulk_data.append(document)

print(bulk_data)

In [None]:
try:
    # Indexing data
    response = es_client.bulk(body=bulk_data)

    if response["errors"]:
        print("Errors while indexing:")
        for item in response["items"]:
            if "error" in item["index"]:
                print(item["index"]["error"])
    else:
        print("Documents indexed successfully.")
except Exception as e:
    print(f"Bulk indexing failed: {e}")

# Searching data

In [None]:
response = es_client.search(
    index="bigquery-logs",
    body={
        "query": {"match": {"status_code_description": "error"}},
        "sort": [{"@timestamp": {"order": "desc"}}],
        "aggs": {"by_ip": {"terms": {"field": "ip_address", "size": 10}}},
    },
)

# Print results
formatted_json = json.dumps(response.body, indent=4)
print(formatted_json)

## Deleting

Finally, we can delete the resources used to prevent them from consuming resources.

In [None]:
# Cleanup - Delete Index
es_client.indices.delete(index="bigquery-logs", ignore=[400, 404])