in tools/cloud_functions/gcs_event_based_ingest/backfill.py [0:0]
def main(args: argparse.Namespace):
"""main entry point for backfill CLI."""
gcs_client: storage.Client = storage.Client(client_info=CLIENT_INFO)
pubsub_client = None
suffix = args.success_filename
if args.destination_regex:
os.environ["DESTINATION_REGEX"] = args.destination_regex
if args.mode == "NOTIFICATIONS":
if not args.pubsub_topic:
raise ValueError("when passing mode=NOTIFICATIONS"
"you must also pass pubsub_topic.")
# import is here because this utility can be used without
# google-cloud-pubsub dependency in LOCAL mode.
# pylint: disable=import-outside-toplevel
from google.cloud import pubsub
pubsub_client = pubsub.PublisherClient()
# These are all I/O bound tasks so use Thread Pool concurrency for speed.
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_gsurl = {}
for blob in find_blobs_with_suffix(gcs_client, args.gcs_path, suffix):
if pubsub_client:
# kwargs are message attributes
# https://googleapis.dev/python/pubsub/latest/publisher/index.html#publish-a-message
logging.info("sending pubsub message for: %s",
f"gs://{blob.bucket.name}/{blob.name}")
future_to_gsurl[executor.submit(
pubsub_client.publish,
args.pubsub_topic,
b'', # cloud function ignores message body
bucketId=blob.bucket.name,
objectId=blob.name,
_metaInfo="this message was submitted with "
"gcs_ocn_bq_ingest backfill.py utility"
)] = f"gs://{blob.bucket.name}/{blob.name}"
else:
logging.info("running cloud function locally for: %s",
f"gs://{blob.bucket.name}/{blob.name}")
future_to_gsurl[executor.submit(
gcs_ocn_bq_ingest.main.main,
{
"attributes": {
"bucketId": blob.bucket.name,
"objectId": blob.name
}
},
None,
)] = f"gs://{blob.bucket.name}/{blob.name}"
exceptions: Dict[str, Exception] = dict()
for future in concurrent.futures.as_completed(future_to_gsurl):
gsurl = future_to_gsurl[future]
try:
future.result()
except Exception as err: # pylint: disable=broad-except
logging.error("Error processing %s: %s", gsurl, err)
exceptions[gsurl] = err
if exceptions:
raise RuntimeError("The following errors were encountered:\n" +
pprint.pformat(exceptions))