def main()

in tools/cloud_functions/gcs_event_based_ingest/backfill.py [0:0]


def main(args: argparse.Namespace):
    """main entry point for backfill CLI."""
    gcs_client: storage.Client = storage.Client(client_info=CLIENT_INFO)
    pubsub_client = None
    suffix = args.success_filename
    if args.destination_regex:
        os.environ["DESTINATION_REGEX"] = args.destination_regex
    if args.mode == "NOTIFICATIONS":
        if not args.pubsub_topic:
            raise ValueError("when passing mode=NOTIFICATIONS"
                             "you must also pass pubsub_topic.")
        # import is here because this utility can be used without
        # google-cloud-pubsub dependency in LOCAL mode.
        # pylint: disable=import-outside-toplevel
        from google.cloud import pubsub
        pubsub_client = pubsub.PublisherClient()

    # These are all I/O bound tasks so use Thread Pool concurrency for speed.
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_gsurl = {}
        for blob in find_blobs_with_suffix(gcs_client, args.gcs_path, suffix):
            if pubsub_client:
                # kwargs are message attributes
                # https://googleapis.dev/python/pubsub/latest/publisher/index.html#publish-a-message
                logging.info("sending pubsub message for: %s",
                             f"gs://{blob.bucket.name}/{blob.name}")
                future_to_gsurl[executor.submit(
                    pubsub_client.publish,
                    args.pubsub_topic,
                    b'',  # cloud function ignores message body
                    bucketId=blob.bucket.name,
                    objectId=blob.name,
                    _metaInfo="this message was submitted with "
                    "gcs_ocn_bq_ingest backfill.py utility"
                )] = f"gs://{blob.bucket.name}/{blob.name}"
            else:
                logging.info("running  cloud function locally for: %s",
                             f"gs://{blob.bucket.name}/{blob.name}")
                future_to_gsurl[executor.submit(
                    gcs_ocn_bq_ingest.main.main,
                    {
                        "attributes": {
                            "bucketId": blob.bucket.name,
                            "objectId": blob.name
                        }
                    },
                    None,
                )] = f"gs://{blob.bucket.name}/{blob.name}"
        exceptions: Dict[str, Exception] = dict()
        for future in concurrent.futures.as_completed(future_to_gsurl):
            gsurl = future_to_gsurl[future]
            try:
                future.result()
            except Exception as err:  # pylint: disable=broad-except
                logging.error("Error processing %s: %s", gsurl, err)
                exceptions[gsurl] = err
        if exceptions:
            raise RuntimeError("The following errors were encountered:\n" +
                               pprint.pformat(exceptions))