def log_step_bigquery()

in functions/orchestration-helpers/intermediate/main.py [0:0]


def log_step_bigquery(request_json, status):
    """
    Logs a new entry in workflows bigquery table on finished or started step, ether it failed of succeed

    Args:
        status: status of the execution
        request_json: event object containing info to log

    """
    target_function_url = request_json['function_url_to_call']
    current_datetime = datetime.now().isoformat()
    status_to_error_code = {
        'success': '0',
        'started': '0',
        'failed_start': '1',
        'failed': '2'
    }
    data = {
        'workflow_execution_id': request_json['execution_id'],
        'workflow_name': request_json['workflow_name'],
        'job_name': request_json['job_name'],
        'job_status': status,
        'timestamp': current_datetime,
        'error_code': status_to_error_code.get(status, '2'),
        'job_params': str(request_json),
        'log_path': get_cloud_logging_url(target_function_url),
        'retry_count': 0  # TODO
    }

    workflows_control_table = bq_client.dataset(WORKFLOW_CONTROL_DATASET_ID).table(WORKFLOW_CONTROL_TABLE_ID)
    errors = bq_client.insert_rows_json(workflows_control_table, [data])
    if not errors:
        print("New row has been added.")
    else:
        raise Exception("Encountered errors while inserting row: {}".format(errors))