in functions/orchestration-helpers/intermediate/main.py [0:0]
def log_step_bigquery(request_json, status):
"""
Logs a new entry in workflows bigquery table on finished or started step, ether it failed of succeed
Args:
status: status of the execution
request_json: event object containing info to log
"""
target_function_url = request_json['function_url_to_call']
current_datetime = datetime.now().isoformat()
status_to_error_code = {
'success': '0',
'started': '0',
'failed_start': '1',
'failed': '2'
}
data = {
'workflow_execution_id': request_json['execution_id'],
'workflow_name': request_json['workflow_name'],
'job_name': request_json['job_name'],
'job_status': status,
'timestamp': current_datetime,
'error_code': status_to_error_code.get(status, '2'),
'job_params': str(request_json),
'log_path': get_cloud_logging_url(target_function_url),
'retry_count': 0 # TODO
}
workflows_control_table = bq_client.dataset(WORKFLOW_CONTROL_DATASET_ID).table(WORKFLOW_CONTROL_TABLE_ID)
errors = bq_client.insert_rows_json(workflows_control_table, [data])
if not errors:
print("New row has been added.")
else:
raise Exception("Encountered errors while inserting row: {}".format(errors))