tools/cloud_functions/gcs_event_based_ingest/backfill.py (126 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Command Line utility for backfilling gcs_ocn_bq_ingest cloud function
"""
import argparse
import concurrent.futures
import logging
import os
import pprint
import sys
from typing import Dict, Iterator, List
import google.api_core.client_info
from google.cloud import storage
import gcs_ocn_bq_ingest.main # pylint: disable=import-error
CLIENT_INFO = google.api_core.client_info.ClientInfo(
user_agent="google-pso-tool/bq-severless-loader-cli")
os.environ["FUNCTION_NAME"] = "backfill-cli"
def find_blobs_with_suffix(
gcs_client: storage.Client,
prefix: str,
suffix: str = "_SUCCESS",
) -> Iterator[storage.Blob]:
"""
Find GCS blobs with a given suffix.
:param gcs_client: storage.Client
:param prefix: A GCS prefix to search i.e. gs://bucket/prefix/to/search
:param suffix: A suffix in blob name to match
:return: Iterable of blobs matching the suffix.
"""
prefix_blob: storage.Blob = storage.Blob.from_string(prefix)
# filter passes on scalability / laziness advantages of iterator.
return filter(
lambda blob: blob.name.endswith(suffix),
prefix_blob.bucket.list_blobs(client=gcs_client,
prefix=prefix_blob.name))
def main(args: argparse.Namespace):
"""main entry point for backfill CLI."""
gcs_client: storage.Client = storage.Client(client_info=CLIENT_INFO)
pubsub_client = None
suffix = args.success_filename
if args.destination_regex:
os.environ["DESTINATION_REGEX"] = args.destination_regex
if args.mode == "NOTIFICATIONS":
if not args.pubsub_topic:
raise ValueError("when passing mode=NOTIFICATIONS"
"you must also pass pubsub_topic.")
# import is here because this utility can be used without
# google-cloud-pubsub dependency in LOCAL mode.
# pylint: disable=import-outside-toplevel
from google.cloud import pubsub
pubsub_client = pubsub.PublisherClient()
# These are all I/O bound tasks so use Thread Pool concurrency for speed.
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_gsurl = {}
for blob in find_blobs_with_suffix(gcs_client, args.gcs_path, suffix):
if pubsub_client:
# kwargs are message attributes
# https://googleapis.dev/python/pubsub/latest/publisher/index.html#publish-a-message
logging.info("sending pubsub message for: %s",
f"gs://{blob.bucket.name}/{blob.name}")
future_to_gsurl[executor.submit(
pubsub_client.publish,
args.pubsub_topic,
b'', # cloud function ignores message body
bucketId=blob.bucket.name,
objectId=blob.name,
_metaInfo="this message was submitted with "
"gcs_ocn_bq_ingest backfill.py utility"
)] = f"gs://{blob.bucket.name}/{blob.name}"
else:
logging.info("running cloud function locally for: %s",
f"gs://{blob.bucket.name}/{blob.name}")
future_to_gsurl[executor.submit(
gcs_ocn_bq_ingest.main.main,
{
"attributes": {
"bucketId": blob.bucket.name,
"objectId": blob.name
}
},
None,
)] = f"gs://{blob.bucket.name}/{blob.name}"
exceptions: Dict[str, Exception] = dict()
for future in concurrent.futures.as_completed(future_to_gsurl):
gsurl = future_to_gsurl[future]
try:
future.result()
except Exception as err: # pylint: disable=broad-except
logging.error("Error processing %s: %s", gsurl, err)
exceptions[gsurl] = err
if exceptions:
raise RuntimeError("The following errors were encountered:\n" +
pprint.pformat(exceptions))
def parse_args(args: List[str]) -> argparse.Namespace:
"""argument parser for backfill CLI"""
parser = argparse.ArgumentParser(
description="utility to backfill success file notifications "
"or run the cloud function locally in concurrent threads.")
parser.add_argument(
"--gcs-path",
"-p",
help="GCS path (e.g. gs://bucket/prefix/to/search/)to search for "
"existing _SUCCESS files",
required=True,
)
parser.add_argument(
"--mode",
"-m",
help="How to perform the backfill: LOCAL run cloud function main"
" method locally (in concurrent threads) or NOTIFICATIONS just push"
" notifications to Pub/Sub for a deployed version of the cloud function"
" to pick up. Default is NOTIFICATIONS.",
required=False,
type=str.upper,
choices=["LOCAL", "NOTIFICATIONS"],
default="NOTIFICATIONS",
)
parser.add_argument(
"--pubsub-topic",
"--topic",
"-t",
help="Pub/Sub notifications topic to post notifications for. "
"i.e. projects/{PROJECT_ID}/topics/{TOPIC_ID} "
"Required if using NOTIFICATIONS mode.",
required=False,
default=None,
)
parser.add_argument(
"--success-filename",
"-f",
help="Override the default success filename '_SUCCESS'",
required=False,
default="_SUCCESS",
)
parser.add_argument(
"--destination-regex",
"-r",
help="Override the default destination regex for determining BigQuery"
"destination based on information encoded in the GCS path of the"
"success file",
required=False,
default=None,
)
return parser.parse_args(args)
if __name__ == "__main__":
main(parse_args(sys.argv))