in cost-optimization/hpa-config-recommender/src/hpaconfigrecommender/run_workload_simulation.py [0:0]
def write_to_bigquery(analysis_df: pd.DataFrame,
rec: WorkloadRecommendation,
bq_project_id: str,
bq_dataset_id: str,
bq_table_id: str):
"""
Writes the given DataFrame to BigQuery, ensuring only the required columns are sent.
Args:
analysis_df (pd.DataFrame): DataFrame containing the time series data.
rec: Object containing workload details and additional attributes.
"""
if analysis_df.empty:
logger.info("No data to write to BigQuery.")
return
# Define additional workload details to be added to the DataFrame
required_columns = {
"project_id": rec.workload_details.project_id,
"cluster_name": rec.workload_details.cluster_name,
"location": rec.workload_details.location,
"namespace": rec.workload_details.namespace,
"controller_name": rec.workload_details.controller_name,
"container_name": rec.workload_details.container_name,
"analysis_period_start": rec.analysis_period_start,
"analysis_period_end": rec.analysis_period_end,
"recommended_cpu_request": rec.plan.recommended_cpu_request,
"recommended_mem_request_and_limits_mi": rec.plan.recommended_mem_request_and_limits_mi,
"recommended_cpu_limit_or_unbounded": rec.plan.recommended_cpu_limit_or_unbounded,
"recommended_min_replicas": rec.plan.recommended_min_replicas,
"recommended_max_replicas":rec.plan.recommended_max_replicas,
"recommended_hpa_target_cpu": rec.plan.recommended_hpa_target_cpu,
"max_usage_slope_up_ratio": rec.plan.max_usage_slope_up_ratio,
"workload_e2e_startup_latency_rows": rec.plan.workload_e2e_startup_latency_rows,
"method": rec.plan.method,
"forecast_mem_saving_mi": rec.forecast_mem_saving_mi,
"forecast_cpu_saving": rec.forecast_cpu_saving,
}
# Add workload details to the DataFrame
for col, value in required_columns.items():
analysis_df[col] = value
# Define only the required columns to be written to BigQuery
required_bq_columns = [
"window_begin",
"num_replicas_at_usage_window",
"sum_containers_cpu_request",
"sum_containers_cpu_usage",
"forecast_sum_cpu_up_and_running",
"sum_containers_mem_request_mi",
"sum_containers_mem_usage_mi",
"forecast_sum_mem_up_and_running",
"forecast_replicas_up_and_running",
] + list(required_columns.keys()) # Append workload details to required col
# Filter DataFrame to contain only the required columns
analysis_df = analysis_df[required_bq_columns]
full_table_id = f"{bq_project_id}.{bq_dataset_id}.{bq_table_id}"
# Configure BigQuery client with custom user-agent
client_info = ClientInfo(user_agent=USER_AGENT)
bq_client = bigquery.Client(project=bq_project_id, client_info=client_info)
try:
# Load DataFrame into BigQuery
job = bq_client.load_table_from_dataframe(
analysis_df,
full_table_id,
job_config=bigquery.LoadJobConfig(write_disposition="WRITE_APPEND"),
)
job.result() # Wait for the job to complete
logger.info(
f"Successfully wrote {len(analysis_df)} records to BigQuery table "
f"{full_table_id}")
except Exception as e:
logger.error(f"Failed to write to BigQuery: {e}")