app/main.py (95 lines of code) (raw):
# app/main.py
"""(c) 2025, Elastic Co.
Author: Adhish Thite <adhish.thite@elastic.co>
Script Flow:
--------------
1. Set up the project root and update the Python path.
2. Import necessary modules and configure logging.
3. Initialize the output directory.
4. Query KB and News articles from BigQuery.
5. Obtain an Elasticsearch client.
6. Create the raw data index and store KB and News articles.
7. Delete and recreate the vector index for embedding storage.
8. Process KB documents in batches:
- Embed each document batch.
- Bulk insert embedded documents into Elasticsearch.
- Collect any errors and enforce a delay to avoid rate limits.
9. Process News documents similarly in batches.
10. Log the total processed chunks and any errors.
11. Conclude the application run.
"""
import sys
from pathlib import Path
# Add the project root directory to the Python path to allow local imports
project_root = str(Path(__file__).parent.parent)
sys.path.insert(0, project_root)
import time
from typing import List, Dict, Any
from tqdm import tqdm # Progress bar for monitoring batch processing
from elasticsearch.helpers import bulk # Bulk helper for Elasticsearch insertion
# Import project-specific configurations and utility functions
from app.config.logging_config import setup_logger
from app.config.settings import (
OUTPUT_DIR,
ES_INDEX_NAME,
ES_VECTOR_INDEX_NAME,
BATCH_SIZE,
)
from app.utils.helpers import init, batch_documents
from app.services.bigquery import query_bigquery, query_news_articles
from app.services.elasticsearch import (
get_elasticsearch_client,
create_elastic_index,
insert_dataframe_to_elasticsearch,
create_vector_index,
)
from app.services.embeddings import process_batch, EMBEDDING_MODEL
# Set up the logger using the project's logging configuration
logger = setup_logger(__name__)
if __name__ == "__main__":
# Log the start of the application
logger.info("🚀 Starting main application")
# Initialize the output directory (e.g., create necessary folders)
init(OUTPUT_DIR)
# -------------------------------
# Querying Data from BigQuery
# -------------------------------
# Process Knowledge Base (KB) Articles
logger.info("📚 Processing KB Articles...")
kb_results = query_bigquery()
# Process News Articles
logger.info("📰 Processing News Articles...")
news_results = query_news_articles()
# -------------------------------
# Elasticsearch Setup
# -------------------------------
# Get the Elasticsearch client instance
es_client = get_elasticsearch_client()
# Create a regular index to store raw data and insert queried documents
create_elastic_index(es_client, ES_INDEX_NAME)
insert_dataframe_to_elasticsearch(es_client, ES_INDEX_NAME, kb_results, doc_type="kb")
insert_dataframe_to_elasticsearch(es_client, ES_INDEX_NAME, news_results, doc_type="news")
logger.info("🔄 Processing and embedding documents...")
# -------------------------------
# Vector Index Management
# -------------------------------
# Delete the existing vector index if it exists to ensure a clean slate
if es_client.indices.exists(index=ES_VECTOR_INDEX_NAME):
es_client.indices.delete(index=ES_VECTOR_INDEX_NAME)
logger.info(f"🗑️ Deleted existing vector index: {ES_VECTOR_INDEX_NAME}")
# Create a new vector index to store embedded documents
create_vector_index(es_client)
logger.info(f"✨ Created new vector index: {ES_VECTOR_INDEX_NAME}")
# Initialize error collection and chunk counter
all_error_chunks: List[Dict[str, Any]] = []
total_chunks = 0
# -------------------------------
# Processing KB Documents
# -------------------------------
# Convert the KB articles from DataFrame to list of dictionary records
kb_documents = kb_results.to_dict("records")
total_kb_batches = (len(kb_documents) + BATCH_SIZE - 1) // BATCH_SIZE
logger.info(f"📚 Processing {len(kb_documents)} KB articles...")
# Process documents in batches using a progress bar for visibility
for batch in tqdm(
list(batch_documents(kb_documents, BATCH_SIZE)),
total=total_kb_batches,
unit="batch"
):
# Process each batch: generate embeddings and capture errors
batch_embedded_docs, error_chunks, chunks_count = process_batch(
batch, EMBEDDING_MODEL, es_client, source_type="kb"
)
total_chunks += chunks_count
# If there are embedded documents, bulk insert them into Elasticsearch
if batch_embedded_docs:
success, failed = bulk(
es_client,
batch_embedded_docs,
chunk_size=500,
request_timeout=60,
raise_on_error=False,
)
logger.info(f"📦 KB Bulk insert: {success} succeeded, {len(failed)} failed")
# Collect any error chunks from processing
if error_chunks:
all_error_chunks.extend(error_chunks)
# Pause for 5 seconds to avoid rate limiting and potential overload
time.sleep(5)
# -------------------------------
# Processing News Documents
# -------------------------------
# Convert the News articles from DataFrame to list of dictionary records
news_documents = news_results.to_dict("records")
total_news_batches = (len(news_documents) + BATCH_SIZE - 1) // BATCH_SIZE
logger.info(f"📰 Processing {len(news_documents)} News articles...")
# Process News documents in batches
for batch in tqdm(
list(batch_documents(news_documents, BATCH_SIZE)),
total=total_news_batches,
unit="batch"
):
# Process each batch: generate embeddings and capture errors
batch_embedded_docs, error_chunks, chunks_count = process_batch(
batch, EMBEDDING_MODEL, es_client, source_type="news"
)
total_chunks += chunks_count
# Bulk insert embedded documents if available
if batch_embedded_docs:
success, failed = bulk(
es_client,
batch_embedded_docs,
chunk_size=500,
request_timeout=60,
raise_on_error=False,
)
logger.info(f"📦 News Bulk insert: {success} succeeded, {len(failed)} failed")
# Collect any error chunks from processing
if error_chunks:
all_error_chunks.extend(error_chunks)
# Pause for 5 seconds to avoid rate limits and overload
time.sleep(5)
# -------------------------------
# Final Logging and Completion
# -------------------------------
# Log the total number of document chunks processed and any errors encountered
logger.info(f"📊 Total chunks processed: {total_chunks}")
logger.info(f"⚠️ Total errors: {len(all_error_chunks)}")
# Indicate that the application has completed successfully
logger.info("✅ Application completed successfully")