def _gcp_convert_ndjson_to_json()

in bigquery_etl/public_data/publish_json.py [0:0]


    def _gcp_convert_ndjson_to_json(self, gcs_path):
        """Convert ndjson files on GCP to json files."""
        blobs = self.storage_client.list_blobs(
            self.target_bucket, prefix=self.stage_gcs_path
        )
        bucket = self.storage_client.bucket(self.target_bucket)

        # keeps track of the number of bytes written to the JSON output file
        output_size = 0
        # tracks the current output file name
        output_file_counter = 0
        # track if the first JSON object is currently processed
        first_line = True
        # output file handler
        output_file = None

        for blob in blobs:
            blob_path = f"gs://{self.target_bucket}/{blob.name}"

            # stream from GCS
            with smart_open.open(blob_path) as fin:
                for line in fin:
                    # check if JSON output file reached limit
                    if output_file is None or output_size >= MAX_JSON_SIZE:
                        if output_file is not None:
                            output_file.write("]")
                            output_file.close()

                        if output_file_counter >= MAX_FILE_COUNT:
                            logging.error(
                                "Maximum number of JSON output files reached."
                            )
                            sys.exit(1)

                        tmp_blob_name = "/".join(blob.name.split("/")[:-1])
                        tmp_blob_name += (
                            "/"
                            + str(output_file_counter).zfill(MAX_JSON_NAME_LENGTH)
                            + ".json.tmp.gz"
                        )
                        tmp_blob_path = f"gs://{self.target_bucket}/{tmp_blob_name}"

                        logging.info(f"""Write {blob_path} to {tmp_blob_path}""")

                        output_file = smart_open.open(tmp_blob_path, "w")
                        output_file.write("[")
                        first_line = True
                        output_file_counter += 1
                        output_size = 1

                    # skip the first line, it has no preceding json object
                    if not first_line:
                        output_file.write(",")

                    output_file.write(line.replace("\n", ""))
                    output_size += len(line)
                    first_line = False

        output_file.write("]")
        output_file.close()

        # move all files from stage directory to target directory
        tmp_blobs = self.storage_client.list_blobs(
            self.target_bucket, prefix=self.stage_gcs_path
        )

        for tmp_blob in tmp_blobs:
            # only copy gzipped files to target directory
            if "tmp.gz" in tmp_blob.name:
                # remove .tmp from the final file name
                file_name = tmp_blob.name.split("/")[-1].replace(".tmp.gz", "")

                logging.info(f"""Move {tmp_blob.name} to {gcs_path + file_name}""")

                bucket.rename_blob(tmp_blob, gcs_path + file_name)

                # set Content-Type to json and encoding to gzip
                blob = self.storage_client.get_bucket(self.target_bucket).get_blob(
                    gcs_path + file_name
                )
                blob.content_type = "application/json"
                blob.content_encoding = "gzip"
                blob.patch()