in helpers/bq-remote-functions/get-policy-tags/main.py [0:0]
def process_request(request):
datastore_cache_db_name = os.environ.get("DATASTORE_CACHE_DB_NAME")
# Instantiates a client
logging_client = google.cloud.logging.Client()
# Retrieves a Cloud Logging handler based on the environment
# you're running in and integrates the handler with the
# Python logging module. By default this captures all logs
# at INFO level and higher
logging_client.setup_logging()
try:
request_json = request.get_json()
# the function should be implemented in a way that receives a batch of calls. Each element in the calls array
# is 1 record-level invocation in BQ SQL
calls = request_json['calls']
calls_count = len(calls)
logging.info(f"Received {calls_count} calls from BQ.")
# create a cache for policy tags display names
cache = DatastoreCache(database_name=datastore_cache_db_name)
replies = []
for call in calls:
logging.info(f"Will process bq call " + str(call))
table_spec = call[0].strip('`')
table_spec_splits = table_spec.split(".")
table_project = table_spec_splits[0]
table_dataset = table_spec_splits[1]
table_name = table_spec_splits[2]
cols_and_policy_tags, error = get_columns_and_policy_tags(table_project, table_dataset, table_name)
if not error:
policy_tags_names = get_policy_tag_display_names(cols_and_policy_tags, cache)
final_result_list = combine_policy_tags(cols_and_policy_tags, policy_tags_names)
logging.info(f"No error path; table {table_spec}; cols_and_policy_tags len {len(cols_and_policy_tags)}; policy_tags_names len {len(policy_tags_names)}; final results len {len(final_result_list)}")
call_result = {"columns_and_policy_tags": final_result_list}
replies.append(call_result)
else:
logging.error(f"Failed to get schema for table {table_spec}. Error: {error}")
call_result = {"columns_and_policy_tags": [], "error": str(error)}
replies.append(call_result)
return_json = jsonify({"replies": replies})
logging.info(f"Function call ending. Replies count {len(replies)}")
return return_json, 200
except Exception as e:
logging.error(f"Error while processing request {str(e)}")
return jsonify({"errorMessage": str(e)}), 400