bigquery_etl/public_data/publish_gcs_metadata.py (176 lines of code) (raw):

"""Generate and upload JSON metadata files for public datasets on GCS.""" import json import logging import os import re from argparse import ArgumentParser from itertools import groupby import smart_open from google.cloud import storage # type: ignore from bigquery_etl.config import ConfigLoader from bigquery_etl.metadata.parse_metadata import Metadata from bigquery_etl.util import standard_args from bigquery_etl.util.common import project_dirs GCS_FILE_PATH_RE = re.compile( r"api/(?P<api_version>.+)/tables/(?P<dataset>.+)/(?P<table>.+)/(?P<version>.+)/" r"files/(?:(?P<date>.+)/)?(?P<filename>.+\.json(\.gz)?)" ) parser = ArgumentParser(description=__doc__) parser.add_argument( "--project_id", "--project-id", default="mozilla-public-data", help="Target project" ) parser.add_argument( "--target-bucket", "--target_bucket", default=ConfigLoader.get( "public_data", "bucket", fallback="mozilla-public-data-http" ), help="GCP bucket JSON data is exported to", ) parser.add_argument( "--endpoint", default=ConfigLoader.get( "public_data", "endpoint", fallback="https://public-data.telemetry.mozilla.org/" ), help="The URL to access the HTTP endpoint", ) parser.add_argument( "--api_version", "--api-version", default=ConfigLoader.get("public_data", "api_version", fallback="v1"), help="Endpoint API version", ) standard_args.add_log_level(parser) class GcsTableMetadata: """Metadata associated with table data stored on GCS.""" def __init__(self, blobs, endpoint, target_dir): """Initialize container for metadata of a table published on GCS.""" assert len(blobs) > 0 self.blobs = blobs self.endpoint = endpoint self.files_path = self.blobs[0].name.split("files")[0] + "files" self.files_uri = endpoint + self.files_path (self.dataset, self.table, self.version) = dataset_table_version_from_gcs_blob( self.blobs[0] ) self.metadata = Metadata.of_table( self.dataset, self.table, self.version, target_dir ) self.last_updated_path = self.blobs[0].name.split("files")[0] + "last_updated" self.last_updated_uri = endpoint + self.last_updated_path def table_metadata_to_json(self): """Return a JSON object of the table metadata for GCS.""" metadata_json = {} metadata_json["friendly_name"] = self.metadata.friendly_name metadata_json["description"] = self.metadata.description metadata_json["incremental"] = self.metadata.is_incremental() metadata_json["incremental_export"] = self.metadata.is_incremental_export() if self.metadata.review_bugs() is not None: review_links = list( map( lambda bug: ConfigLoader.get( "public_data", "review_link", fallback="https://bugzilla.mozilla.org/show_bug.cgi?id=", ) + str(bug), self.metadata.review_bugs(), ) ) metadata_json["review_links"] = review_links metadata_json["files_uri"] = self.files_uri metadata_json["last_updated"] = self.last_updated_uri return metadata_json def files_metadata_to_json(self): """Return a JSON object containing metadata of the files on GCS.""" if self.metadata.is_incremental_export(): metadata_json = {} for blob in self.blobs: match = GCS_FILE_PATH_RE.match(blob.name) date = match.group("date") if date is not None: if date in metadata_json: metadata_json[date].append(self.endpoint + blob.name) else: metadata_json[date] = [self.endpoint + blob.name] return metadata_json else: return [self.endpoint + blob.name for blob in self.blobs] def dataset_table_version_from_gcs_blob(gcs_blob): """Extract the dataset, table and version from the provided GCS blob path.""" match = GCS_FILE_PATH_RE.match(gcs_blob.name) if match is not None: return (match.group("dataset"), match.group("table"), match.group("version")) else: return None def get_public_gcs_table_metadata( storage_client, bucket, api_version, endpoint, target_dir ): """Return a list of metadata of public tables and their locations on GCS.""" prefix = f"api/{api_version}" blobs = storage_client.list_blobs(bucket, prefix=prefix) return [ GcsTableMetadata(list(blobs), endpoint, target_dir) for table, blobs in groupby(blobs, dataset_table_version_from_gcs_blob) if table is not None and table[0] in os.listdir(target_dir) and f"{table[1]}_{table[2]}" in os.listdir(os.path.join(target_dir, table[0])) ] def publish_all_datasets_metadata(table_metadata, output_file): """Write metadata about all available public datasets to GCS.""" metadata_json = {} for metadata in table_metadata: if metadata.dataset not in metadata_json: metadata_json[metadata.dataset] = {} dataset = metadata_json[metadata.dataset] if metadata.table not in dataset: dataset[metadata.table] = {} table = dataset[metadata.table] if metadata.version not in table: table[metadata.version] = {} table[metadata.version] = metadata.table_metadata_to_json() logging.info(f"Write metadata to {output_file}") with smart_open.open(output_file, "w") as fout: fout.write(json.dumps(metadata_json, indent=4)) def publish_table_metadata(storage_client, table_metadata, bucket): """Write metadata for each public table to GCS.""" for metadata in table_metadata: output_file = f"gs://{bucket}/{metadata.files_path}" logging.info(f"Write metadata to {output_file}") with smart_open.open(output_file, "w") as fout: fout.write(json.dumps(metadata.files_metadata_to_json(), indent=4)) set_content_type( storage_client, bucket, f"{metadata.files_path}", "application/json" ) def set_content_type(storage_client, bucket, file, content_type): """Set the file content type.""" blob = storage_client.get_bucket(bucket).get_blob(file) blob.content_type = content_type blob.patch() def main(): """Generate and upload GCS metadata.""" args = parser.parse_args() storage_client = storage.Client(args.project_id) # set log level try: logging.basicConfig(level=args.log_level, format="%(levelname)s %(message)s") except ValueError as e: parser.error(f"argument --log-level: {e}") projects = project_dirs() all_metadata = [] for target in projects: if os.path.isdir(target): gcs_table_metadata = get_public_gcs_table_metadata( storage_client, args.target_bucket, args.api_version, args.endpoint, target, ) all_metadata += gcs_table_metadata publish_table_metadata( storage_client, gcs_table_metadata, args.target_bucket ) else: print( f"Invalid target: {target}, target must be a directory with" "structure <project>/<dataset>/<table>/metadata.yaml." ) output_file = f"gs://{args.target_bucket}/all-datasets.json" publish_all_datasets_metadata(all_metadata, output_file) set_content_type( storage_client, args.target_bucket, "all-datasets.json", "application/json", ) if __name__ == "__main__": main()