bigquery_etl/public_data/publish_json.py (253 lines of code) (raw):

"""Machinery for exporting query results as JSON to Cloud storage.""" import datetime import json import logging import random import re import string import sys from argparse import ArgumentParser import smart_open from google.cloud import storage # type: ignore from google.cloud import bigquery from bigquery_etl.config import ConfigLoader from bigquery_etl.metadata.parse_metadata import Metadata from bigquery_etl.metadata.validate_metadata import validate_public_data SUBMISSION_DATE_RE = re.compile(r"^submission_date:DATE:(\d\d\d\d-\d\d-\d\d)$") QUERY_FILE_RE = re.compile(r"^.*/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+)_(v[0-9]+)/query\.sql$") MAX_JSON_SIZE = 1 * 1024 * 1024 * 1024 # 1 GB as max. size of exported JSON files # maximum number of JSON output files, output file names are only up to 12 characters MAX_FILE_COUNT = 10_000 # exported file name format: 000000000000.json, 000000000001.json, ... MAX_JSON_NAME_LENGTH = 12 logging.basicConfig( level=logging.DEBUG, format="%(asctime)s: %(levelname)s: %(message)s" ) class JsonPublisher: """Publishes query results as JSON.""" def __init__( self, client, storage_client, project_id, query_file, api_version, target_bucket, parameter=None, gcs_path="", ): """Init JsonPublisher.""" self.project_id = project_id self.query_file = query_file self.api_version = api_version self.target_bucket = target_bucket self.gcs_path = gcs_path self.parameter = parameter self.client = client self.storage_client = storage_client self.temp_table = None self.date = None self.stage_gcs_path = ( self.gcs_path + "stage/json/" + "".join(random.choice(string.ascii_lowercase) for i in range(12)) + "/" ) self.metadata = Metadata.of_query_file(self.query_file) # only for incremental exports files are written into separate directories # for each date, ignore date parameters for non-incremental exports if self.metadata.is_incremental_export() and self.parameter: for p in self.parameter: date_search = re.search(SUBMISSION_DATE_RE, p) if date_search: self.date = date_search.group(1) query_file_re = re.search(QUERY_FILE_RE, self.query_file) if query_file_re: self.dataset = query_file_re.group(1) self.table = query_file_re.group(2) self.version = query_file_re.group(3) else: logging.error("Invalid file naming format: %s", self.query_file) sys.exit(1) def _clear_stage_directory(self): """Delete files in stage directory.""" tmp_blobs = self.storage_client.list_blobs( self.target_bucket, prefix=self.stage_gcs_path ) for tmp_blob in tmp_blobs: tmp_blob.delete() def publish_json(self): """Publish query results as JSON to GCP Storage bucket.""" self.last_updated = datetime.datetime.utcnow() try: if self.metadata.is_incremental_export(): if self.date is None: logging.error( "Cannot publish JSON. submission_date missing in parameter." ) sys.exit(1) # if it is an incremental query, then the query result needs to be # written to a temporary table to get exported as JSON self._write_results_to_temp_table() self._publish_table_as_json(self.temp_table) else: # for non-incremental queries, the entire destination table is exported result_table = f"{self.dataset}.{self.table}_{self.version}" self._publish_table_as_json(result_table) self._publish_last_updated() finally: # delete temporary artifacts if self.temp_table: self.client.delete_table(self.temp_table) self._clear_stage_directory() def _publish_table_as_json(self, result_table): """Export the `result_table` data as JSON to Cloud Storage.""" prefix = ( f"{self.gcs_path}api/{self.api_version}/tables/{self.dataset}/" f"{self.table}/{self.version}/files/" ) if self.date is not None: # if date exists, then query is incremental and newest results are exported prefix += f"{self.date}/" logging.info(f"""Export JSON for {result_table} to {self.stage_gcs_path}""") table_ref = self.client.get_table(result_table) job_config = bigquery.ExtractJobConfig() job_config.destination_format = "NEWLINE_DELIMITED_JSON" # "*" makes sure that files larger than 1GB get split up into JSON files # files are written to a stage directory first destination_uri = f"gs://{self.target_bucket}/{self.stage_gcs_path}*.ndjson" extract_job = self.client.extract_table( table_ref, destination_uri, location="US", job_config=job_config ) extract_job.result() self._gcp_convert_ndjson_to_json(prefix) def _gcp_convert_ndjson_to_json(self, gcs_path): """Convert ndjson files on GCP to json files.""" blobs = self.storage_client.list_blobs( self.target_bucket, prefix=self.stage_gcs_path ) bucket = self.storage_client.bucket(self.target_bucket) # keeps track of the number of bytes written to the JSON output file output_size = 0 # tracks the current output file name output_file_counter = 0 # track if the first JSON object is currently processed first_line = True # output file handler output_file = None for blob in blobs: blob_path = f"gs://{self.target_bucket}/{blob.name}" # stream from GCS with smart_open.open(blob_path) as fin: for line in fin: # check if JSON output file reached limit if output_file is None or output_size >= MAX_JSON_SIZE: if output_file is not None: output_file.write("]") output_file.close() if output_file_counter >= MAX_FILE_COUNT: logging.error( "Maximum number of JSON output files reached." ) sys.exit(1) tmp_blob_name = "/".join(blob.name.split("/")[:-1]) tmp_blob_name += ( "/" + str(output_file_counter).zfill(MAX_JSON_NAME_LENGTH) + ".json.tmp.gz" ) tmp_blob_path = f"gs://{self.target_bucket}/{tmp_blob_name}" logging.info(f"""Write {blob_path} to {tmp_blob_path}""") output_file = smart_open.open(tmp_blob_path, "w") output_file.write("[") first_line = True output_file_counter += 1 output_size = 1 # skip the first line, it has no preceding json object if not first_line: output_file.write(",") output_file.write(line.replace("\n", "")) output_size += len(line) first_line = False output_file.write("]") output_file.close() # move all files from stage directory to target directory tmp_blobs = self.storage_client.list_blobs( self.target_bucket, prefix=self.stage_gcs_path ) for tmp_blob in tmp_blobs: # only copy gzipped files to target directory if "tmp.gz" in tmp_blob.name: # remove .tmp from the final file name file_name = tmp_blob.name.split("/")[-1].replace(".tmp.gz", "") logging.info(f"""Move {tmp_blob.name} to {gcs_path + file_name}""") bucket.rename_blob(tmp_blob, gcs_path + file_name) # set Content-Type to json and encoding to gzip blob = self.storage_client.get_bucket(self.target_bucket).get_blob( gcs_path + file_name ) blob.content_type = "application/json" blob.content_encoding = "gzip" blob.patch() def _write_results_to_temp_table(self): """Write the query results to a temporary table and return the table name.""" table_date = self.date.replace("-", "") + "".join( random.choice(string.ascii_lowercase) for i in range(12) ) self.temp_table = ( f"{self.project_id}.tmp.{self.table}_{self.version}_{table_date}_temp" ) with open(self.query_file) as query_stream: sql = query_stream.read() query_parameters = [] for p in self.parameter: [name, parameter_type, value] = p.split(":") query_parameters.append( bigquery.ScalarQueryParameter(name, parameter_type, value) ) job_config = bigquery.QueryJobConfig( destination=self.temp_table, query_parameters=query_parameters ) query_job = self.client.query(sql, job_config=job_config) query_job.result() def _publish_last_updated(self): """Write the timestamp when file of the dataset were last modified to GCS.""" last_updated_path = ( f"{self.gcs_path}api/{self.api_version}/tables/{self.dataset}/" f"{self.table}/{self.version}/last_updated" ) output_file = f"gs://{self.target_bucket}/{last_updated_path}" logging.info(f"Write last_updated to {output_file}") with smart_open.open(output_file, "w") as fout: last_updated = self.last_updated.strftime("%Y-%m-%d %H:%M:%S") fout.write(json.dumps(last_updated)) # set Content-Type to json so that timestamp is displayed in the browser blob = self.storage_client.get_bucket(self.target_bucket).get_blob( last_updated_path ) blob.content_type = "application/json" blob.patch() parser = ArgumentParser(description=__doc__) parser.add_argument( "--target-bucket", "--target_bucket", default=ConfigLoader.get( "public_data", "bucket", fallback="mozilla-public-data-http" ), help="GCP bucket JSON data is exported to", ) parser.add_argument( "--public_project_id", "--public-project-id", default="mozilla-public-data", help="Run query in the target project", ) parser.add_argument( "--api_version", "--api-version", default=ConfigLoader.get("public_data", "api_version", fallback="v1"), help="API version data is published under in the storage bucket", ) parser.add_argument( "--parameter", action="append", help="Query parameters, such as submission_date" ) parser.add_argument( "--query-file", "--query_file", help="File path to query to be executed" ) parser.add_argument( "--gcs-path", "--gcs_path", default="", help="GCS path data is exported to" ) def main(): """Publish query data as JSON to GCS.""" args, query_arguments = parser.parse_known_args() try: metadata = Metadata.of_query_file(args.query_file) except FileNotFoundError: print("No metadata file for: {}".format(args.query_file)) return # check if the data should be published as JSON if not metadata.is_public_json(): return if not validate_public_data(metadata, args.query_file): sys.exit(1) storage_client = storage.Client() client = bigquery.Client(args.public_project_id) publisher = JsonPublisher( client, storage_client, args.public_project_id, args.query_file, args.api_version, args.target_bucket, args.parameter, args.gcs_path, ) publisher.publish_json() if __name__ == "__main__": main()