def call_custom_function()

in functions/orchestration-helpers/intermediate/main.py [0:0]


def call_custom_function(request_json, async_job_id):
    """
    calls an executor function passed by parameter

    Args:
        request_json: json input object with parameters
        async_job_id: if filled, function ask by the execution status. if not, launches the execution for the first time

    Returns:
        raise Exception if the word "exception" is found in message
        str: original message coming from executor functions
    """
    workflow_name = request_json['workflow_name']
    job_name = request_json['job_name']
    workflow_properties = request_json.get('workflow_properties')
    step_properties = request_json.get('step_properties')
    workflow_properties = join_properties(workflow_properties, step_properties)

    params = {
        "workflow_properties": workflow_properties,
        "workflow_name": workflow_name,
        "job_name": job_name,
        "query_variables": {
            "start_date": "'" + request_json['query_variables']['start_date'] + "'",
            "end_date": "'" + request_json['query_variables']['end_date'] + "'"
        }
    }

    if async_job_id:
        params['job_id'] = async_job_id

    target_function_url = request_json['function_url_to_call']
    try:
        req = urllib.request.Request(target_function_url, data=json.dumps(params).encode("utf-8"))

        auth_req = google.auth.transport.requests.Request()
        id_token = google.oauth2.id_token.fetch_id_token(auth_req, target_function_url)

        req.add_header("Authorization", f"Bearer {id_token}")
        req.add_header("Content-Type", "application/json")
        response = urllib.request.urlopen(req)
        response = response.read()

        print('response: ' + str(response))
        final_response = ''
        # Handle the response
        decoded_response = response.decode("utf-8")
        if async_job_id is None and is_valid_step_id(decoded_response):
            final_response = decoded_response
        elif decoded_response in JobStatus.SUCCESS.value:
            final_response = "success"
            log_step_bigquery(request_json, final_response)
        elif decoded_response in JobStatus.RUNNING.value:
            final_response = "running"
        else:  # FAILURE
            final_response = f"Exception calling target function {target_function_url.split('/')[-1]}:{decoded_response}"
            log_step_bigquery(request_json, "failed")
        print("final response: " + final_response)
        return final_response
    except (urllib.error.HTTPError) as e:
        print('Exception: ' + repr(e))
        raise Exception(
            "Unexpected error in custom function: " + target_function_url.split('/')[-1] + ":" + repr(e))