in app/services/bigquery.py [0:0]
def query_bigquery() -> bf.DataFrame:
"""
Query BigQuery to retrieve data.
Returns:
bf.DataFrame: A BigFrames DataFrame containing the query results.
"""
logger.info("š Starting BigQuery query process")
client: bigquery.Client = connect_to_bigquery()
bf.options.bigquery.client = client
bf.options.bigquery.location = GBQ_LOCATION
bf.options.bigquery.project = GBQ_PROJECT_ID
columns: List[str] = [
"active",
"article_id",
"article_type",
"flagged",
"meta",
"meta_description",
"number",
"published",
"short_description",
"text",
"topic",
"version_link",
"workflow_state",
"sys_updated_on",
"sys_created_on",
"sys_id",
"kb_knowledge_base_value",
"can_read_user_criteria",
]
# Check if required values exist before constructing the query
if not all([GBQ_PROJECT_ID, GBQ_DATASET, GBQ_TABLE, GBQ_MAX_RESULTS]):
raise ValueError("Missing required BigQuery configuration values")
# Get knowledge base values from settings
kb_values = []
if KB_KNOWLEDGE_BASE_VALUES:
kb_values = [val.strip() for val in KB_KNOWLEDGE_BASE_VALUES.split(',') if val.strip()]
# Construct the knowledge base filter
query: str = f"""
SELECT {', '.join(columns)}
FROM `{GBQ_PROJECT_ID}.{GBQ_DATASET}.{GBQ_TABLE}`
WHERE workflow_state = 'published'
AND (
(kb_knowledge_base_value = 'a7e8a78bff0221009b20ffffffffff17')
OR
(kb_knowledge_base_value = 'bb0370019f22120047a2d126c42e7073' AND (can_read_user_criteria IS NULL OR can_read_user_criteria = ''))
)
LIMIT {GBQ_MAX_RESULTS}
"""
logger.info(f"\nš Query: {query}\n")
df: bf.DataFrame = bf.read_gbq(query)
logger.info(f"⨠Query executed, retrieved {len(df)} rows")
return df