in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/main.py [0:0]
def main(event: Dict, context): # pylint: disable=unused-argument
"""entry point for background cloud function for event driven GCS to
BigQuery ingest."""
try:
function_start_time = time.monotonic()
# pylint: disable=too-many-locals
bucket_id, object_id = utils.parse_notification(event)
basename_object_id = os.path.basename(object_id)
# Exit eagerly if this is not a file to take action on
# (e.g. a data, config, or lock file)
if basename_object_id not in constants.ACTION_FILENAMES:
action_filenames = constants.ACTION_FILENAMES
if constants.START_BACKFILL_FILENAME is None:
action_filenames.discard(None)
print(f"No-op. This notification was not for a "
f"{action_filenames} file.")
return
gcs_client = lazy_gcs_client()
bq_client = lazy_bq_client()
enforce_ordering = (constants.ORDER_PER_TABLE or
utils.look_for_config_in_parents(
gcs_client, f"gs://{bucket_id}/{object_id}",
"ORDERME") is not None)
bkt: storage.Bucket = utils.cached_get_bucket(gcs_client, bucket_id)
event_blob: storage.Blob = bkt.blob(object_id)
triage_event(gcs_client, bq_client, event_blob, function_start_time,
enforce_ordering)
# Unexpected exceptions will actually raise which may cause a cold restart.
except exceptions.DuplicateNotificationException:
print("recieved duplicate notification. this was handled gracefully. "
f"{traceback.format_exc()}")
except exceptions.EXCEPTIONS_TO_REPORT as original_error:
# We do this because we know these errors do not require a cold restart
# of the cloud function.
if (distutils.util.strtobool(
os.getenv("USE_ERROR_REPORTING_API", "True"))):
try:
lazy_error_reporting_client().report_exception()
except Exception: # pylint: disable=broad-except
# This mostly handles the case where error reporting API is not
# enabled or IAM permissions did not allow us to report errors
# with error reporting API.
raise original_error # pylint: disable=raise-missing-from
else:
raise original_error