in bigquery_etl/public_data/publish_json.py [0:0]
def _gcp_convert_ndjson_to_json(self, gcs_path):
"""Convert ndjson files on GCP to json files."""
blobs = self.storage_client.list_blobs(
self.target_bucket, prefix=self.stage_gcs_path
)
bucket = self.storage_client.bucket(self.target_bucket)
# keeps track of the number of bytes written to the JSON output file
output_size = 0
# tracks the current output file name
output_file_counter = 0
# track if the first JSON object is currently processed
first_line = True
# output file handler
output_file = None
for blob in blobs:
blob_path = f"gs://{self.target_bucket}/{blob.name}"
# stream from GCS
with smart_open.open(blob_path) as fin:
for line in fin:
# check if JSON output file reached limit
if output_file is None or output_size >= MAX_JSON_SIZE:
if output_file is not None:
output_file.write("]")
output_file.close()
if output_file_counter >= MAX_FILE_COUNT:
logging.error(
"Maximum number of JSON output files reached."
)
sys.exit(1)
tmp_blob_name = "/".join(blob.name.split("/")[:-1])
tmp_blob_name += (
"/"
+ str(output_file_counter).zfill(MAX_JSON_NAME_LENGTH)
+ ".json.tmp.gz"
)
tmp_blob_path = f"gs://{self.target_bucket}/{tmp_blob_name}"
logging.info(f"""Write {blob_path} to {tmp_blob_path}""")
output_file = smart_open.open(tmp_blob_path, "w")
output_file.write("[")
first_line = True
output_file_counter += 1
output_size = 1
# skip the first line, it has no preceding json object
if not first_line:
output_file.write(",")
output_file.write(line.replace("\n", ""))
output_size += len(line)
first_line = False
output_file.write("]")
output_file.close()
# move all files from stage directory to target directory
tmp_blobs = self.storage_client.list_blobs(
self.target_bucket, prefix=self.stage_gcs_path
)
for tmp_blob in tmp_blobs:
# only copy gzipped files to target directory
if "tmp.gz" in tmp_blob.name:
# remove .tmp from the final file name
file_name = tmp_blob.name.split("/")[-1].replace(".tmp.gz", "")
logging.info(f"""Move {tmp_blob.name} to {gcs_path + file_name}""")
bucket.rename_blob(tmp_blob, gcs_path + file_name)
# set Content-Type to json and encoding to gzip
blob = self.storage_client.get_bucket(self.target_bucket).get_blob(
gcs_path + file_name
)
blob.content_type = "application/json"
blob.content_encoding = "gzip"
blob.patch()