in src/translation/dags/batch_sql_translation.py [0:0]
def _save_stats(ti):
translation_config = ti.xcom_pull(
key="config", task_ids="create_translation_workflow"
)
all_translated_files = ti.xcom_pull(
key="files", task_ids="get_all_translated_files"
)
successfully_translated_files = ti.xcom_pull(
key="files", task_ids="get_successfully_translated_files"
)
failed_files = ti.xcom_pull(key="files", task_ids="get_failed_files_from_csv")
translation_errors = ti.xcom_pull(
key="errors", task_ids="get_failed_files_from_csv"
)
workflow_info = ti.xcom_pull(key="workflow_info", task_ids="poll_workflow_state")
logging.info(f"workflow info: {workflow_info}")
stats = [
stats_utils.new_record(file.split("/")[-1], None, "OK")
for file in successfully_translated_files
]
stats += translation_errors
for record in stats:
record["unique_id"] = translation_config["unique_id"]
record["create_time"] = workflow_info["createTime"]
record["name"] = workflow_info["name"]
translation_run = (
translation_config[TRANSLATION_CONFIG_KEY]
if TRANSLATION_CONFIG_KEY in translation_config
else "ddl"
)
record["type"] = translation_run.casefold()
bq_client = bigquery.Client(
client_info=ClientInfo(user_agent=custom_user_agent.USER_AGENT)
)
if stats == []:
logging.info("Translation stats are empty. Please check translation csv file. ")
else:
bq_client.insert_rows_json(TRANSLATION_STATS_TABLE_ID, stats)
translation_summary_csv_path = f"https://console.cloud.google.com/storage/browser/_details/{translation_config['migrationTask']['translationConfigDetails']['gcsTargetPath'].split('//')[-1]}/batch_translation_report.csv;tab=live_object?{PROJECT_ID}"
rows_json_list = [
{
"unique_id": translation_config["unique_id"],
"total_files": len(all_translated_files),
"successful_files": len(successfully_translated_files),
"failed_files": len(failed_files),
"input_files_path": translation_config["migrationTask"][
"translationConfigDetails"
]["gcsSourcePath"],
"output_files_path": translation_config["migrationTask"][
"translationConfigDetails"
]["gcsTargetPath"],
"translation_summary_csv_path": translation_summary_csv_path,
}
]
if rows_json_list == []:
logging.info("Translation Aggregate Stats are empty. ")
else:
bq_client.insert_rows_json(TRANSLATION_AGG_RESULTS_TABLE_ID, rows_json_list)