app/services/bigquery.py (86 lines of code) (raw):

"""(c) 2025, Elastic Co. Author: Adhish Thite <adhish.thite@elastic.co> """ from typing import List from google.cloud import bigquery import bigframes.pandas as bf from datetime import datetime, timedelta from config.logging_config import setup_logger from config.settings import ( GBQ_PROJECT_ID, GBQ_DATASET, GBQ_TABLE, GBQ_NEWS_TABLE, GBQ_MAX_RESULTS, GBQ_LOCATION, KB_KNOWLEDGE_BASE_VALUES, ) logger = setup_logger(__name__) def connect_to_bigquery() -> bigquery.Client: """ Establish a connection to BigQuery. Returns: bigquery.Client: A BigQuery client object. """ client: bigquery.Client = bigquery.Client() logger.info("šŸ”Œ Connected to BigQuery") return client def query_bigquery() -> bf.DataFrame: """ Query BigQuery to retrieve data. Returns: bf.DataFrame: A BigFrames DataFrame containing the query results. """ logger.info("šŸ” Starting BigQuery query process") client: bigquery.Client = connect_to_bigquery() bf.options.bigquery.client = client bf.options.bigquery.location = GBQ_LOCATION bf.options.bigquery.project = GBQ_PROJECT_ID columns: List[str] = [ "active", "article_id", "article_type", "flagged", "meta", "meta_description", "number", "published", "short_description", "text", "topic", "version_link", "workflow_state", "sys_updated_on", "sys_created_on", "sys_id", "kb_knowledge_base_value", "can_read_user_criteria", ] # Check if required values exist before constructing the query if not all([GBQ_PROJECT_ID, GBQ_DATASET, GBQ_TABLE, GBQ_MAX_RESULTS]): raise ValueError("Missing required BigQuery configuration values") # Get knowledge base values from settings kb_values = [] if KB_KNOWLEDGE_BASE_VALUES: kb_values = [val.strip() for val in KB_KNOWLEDGE_BASE_VALUES.split(',') if val.strip()] # Construct the knowledge base filter query: str = f""" SELECT {', '.join(columns)} FROM `{GBQ_PROJECT_ID}.{GBQ_DATASET}.{GBQ_TABLE}` WHERE workflow_state = 'published' AND ( (kb_knowledge_base_value = 'a7e8a78bff0221009b20ffffffffff17') OR (kb_knowledge_base_value = 'bb0370019f22120047a2d126c42e7073' AND (can_read_user_criteria IS NULL OR can_read_user_criteria = '')) ) LIMIT {GBQ_MAX_RESULTS} """ logger.info(f"\nšŸ“ Query: {query}\n") df: bf.DataFrame = bf.read_gbq(query) logger.info(f"✨ Query executed, retrieved {len(df)} rows") return df def query_news_articles() -> bf.DataFrame: """ Query BigQuery to retrieve news articles from the last 6 months. Returns: bf.DataFrame: A BigFrames DataFrame containing the news articles. """ logger.info("šŸ” Starting BigQuery news articles query process") client: bigquery.Client = connect_to_bigquery() bf.options.bigquery.client = client bf.options.bigquery.location = GBQ_LOCATION bf.options.bigquery.project = GBQ_PROJECT_ID columns: List[str] = [ "sys_id", "sys_updated_on", "rich_content_html", "sys_created_on", "content_template_value", "content_template_link", "subheadline", "news_end_date", "news_start_date", "headline", "_fivetran_synced", "_fivetran_deleted", "thumbnail", "rich_content_components", "rich_content_css" ] # Calculate date 6 months ago six_months_ago = (datetime.now() - timedelta(days=180)).strftime('%Y-%m-%d') # Check if required values exist before constructing the query if not all([GBQ_PROJECT_ID, GBQ_DATASET, GBQ_NEWS_TABLE]): raise ValueError("Missing required BigQuery configuration values") query: str = f""" SELECT {', '.join(columns)} FROM `{GBQ_PROJECT_ID}.{GBQ_DATASET}.{GBQ_NEWS_TABLE}` WHERE news_start_date >= '{six_months_ago}' AND (_fivetran_deleted IS NULL OR _fivetran_deleted = FALSE) LIMIT {GBQ_MAX_RESULTS} """ logger.info(f"\nšŸ“ News Query: {query}\n") df: bf.DataFrame = bf.read_gbq(query) logger.info(f"✨ News query executed, retrieved {len(df)} rows") return df