def execute()

in api-connector/src/handlers/bigquery.py [0:0]


    def execute(request: Any) -> Tuple[Any, int]:
        logger.debug("BigQuery Routine request received.")

        calls = Utils.get_property(request, "calls", required=True)
        replies = []

        for bq_args in calls:
            logger.debug(bq_args)

            # The following arguments are received from the BigQuery Routine, in order:
            expected_args = {
                "workflow_id": None,
                "request_config": None,
                "auth": None,
                "headers": None,
                "query_string": None,
                "body": None,
                "result_table": None,
                "queue_name": None,
            }

            # This is what the CLOUD_TASK routine expects
            payload = {
                "workflow_id": None,
                "request_config": None,
                "headers": None,
                "query_string": None,
                "auth": None,
                "body": None,
                "result_table": None,
                "source": "CLOUD_TASK",
            }

            try:
                for i, arg in enumerate(expected_args.keys()):
                    expected_args[arg] = bq_args[i]
                    if arg in payload.keys():
                        try:
                            payload[arg] = json.loads(bq_args[i])
                        except:
                            payload[arg] = bq_args[i]
            except KeyError as ke:
                log_info = {
                    "error": f"Unable to parse BigQuery Routine arguments. Are there missing parameters? {ke}"
                }
                logger.error(log_info)
                replies.append(log_info)
                continue

            # Fix query strings
            payload["query_string"] = urlencode(payload["query_string"])  # type: ignore

            # Get log table
            log_table = expected_args["result_table"].replace("tbl_result", "tbl_process_log")  # type: ignore

            try:
                taskClient = tasks_v2.CloudTasksClient()

                function_uri = f"https://{config.REGION}-{config.PROJECT_ID}.cloudfunctions.net/{config.FUNCTION_NAME}"

                task_descriptor = tasks_v2.Task(
                    http_request=tasks_v2.HttpRequest(
                        http_method=tasks_v2.HttpMethod.POST,
                        url=function_uri,
                        headers={"Content-type": "application/json"},
                        body=json.dumps(payload).encode() if payload else None,
                    )
                )

                logger.debug(
                    {"parent": expected_args["queue_name"], "task": task_descriptor}
                )

                task = taskClient.create_task(
                    request={
                        "parent": expected_args["queue_name"],
                        "task": task_descriptor,
                    }
                )
                logger.info(f"Created task: {task.name}")

                log_info = {"response": "Request added to the queue."}
                Utils.save_bigquery(
                    log_table,
                    {
                        "query_string": expected_args["query_string"],
                        "headers": expected_args["headers"],
                        "body": expected_args["body"],
                        "result": json.dumps(log_info),
                        "exec_time": f"{datetime.now().isoformat()}",
                    },
                )
                replies.append(log_info)

            except Exception as e:
                logger.error(e)
                log_info = {"error": f"Error adding request to the queue: {e}"}

                Utils.save_bigquery(
                    log_table,
                    {
                        "query_string": expected_args["query_string"],
                        "headers": expected_args["headers"],
                        "body": expected_args["body"],
                        "result": json.dumps(log_info),
                        "exec_time": f"{datetime.now().isoformat()}",
                    },
                )

                replies.append(log_info)
        return json.dumps({"replies": replies}), 200