def process_request()

in helpers/bq-remote-functions/get-policy-tags/main.py [0:0]


def process_request(request):

    datastore_cache_db_name = os.environ.get("DATASTORE_CACHE_DB_NAME")

    # Instantiates a client
    logging_client = google.cloud.logging.Client()

    # Retrieves a Cloud Logging handler based on the environment
    # you're running in and integrates the handler with the
    # Python logging module. By default this captures all logs
    # at INFO level and higher
    logging_client.setup_logging()

    try:
        request_json = request.get_json()

        # the function should be implemented in a way that receives a batch of calls. Each element in the calls array
        # is 1 record-level invocation in BQ SQL
        calls = request_json['calls']

        calls_count = len(calls)
        logging.info(f"Received {calls_count} calls from BQ.")

        # create a cache for policy tags display names
        cache = DatastoreCache(database_name=datastore_cache_db_name)

        replies = []
        for call in calls:
            logging.info(f"Will process bq call " + str(call))

            table_spec = call[0].strip('`')

            table_spec_splits = table_spec.split(".")
            table_project = table_spec_splits[0]
            table_dataset = table_spec_splits[1]
            table_name = table_spec_splits[2]

            cols_and_policy_tags, error = get_columns_and_policy_tags(table_project, table_dataset, table_name)
            if not error:
                policy_tags_names = get_policy_tag_display_names(cols_and_policy_tags, cache)
                final_result_list = combine_policy_tags(cols_and_policy_tags, policy_tags_names)
                logging.info(f"No error path; table {table_spec}; cols_and_policy_tags len {len(cols_and_policy_tags)}; policy_tags_names len {len(policy_tags_names)}; final results len {len(final_result_list)}")
                call_result = {"columns_and_policy_tags": final_result_list}
                replies.append(call_result)
            else:
                logging.error(f"Failed to get schema for table {table_spec}. Error: {error}")
                call_result = {"columns_and_policy_tags": [], "error": str(error)}
                replies.append(call_result)

        return_json = jsonify({"replies": replies})
        logging.info(f"Function call ending. Replies count {len(replies)}")
        return return_json, 200

    except Exception as e:
        logging.error(f"Error while processing request {str(e)}")
        return jsonify({"errorMessage": str(e)}), 400