ark-demo/pipelines/nhc_forecast/cone_pipeline.py (50 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Load NHC forecast cone into BigQuery """ import os from apache_beam.options.pipeline_options import PipelineOptions import logging layer_suffixes = ['lin', 'pgn', 'pts'] tables = { 'lin': 'nhc_5day_lin', 'pgn': 'nhc_5day_pgn', 'pts': 'nhc_5day_pts' } def get_layername(gcs_url, suffix): filename, ext = os.path.splitext(gcs_url.split('/')[-1]) [date, dur, incr] = filename.split('_') return '{}-{}_{}_{}'.format(date, incr, dur, suffix) def run(pipeline_args, gcs_url, dataset): import apache_beam as beam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from geobeam.io import ShapefileSource from geobeam.fn import make_valid, filter_invalid, format_record pipeline_options = PipelineOptions( pipeline_args, experiments=['use_runner_v2'], temp_location='gs://gsd-pipeline-temp', sdk_container_image='gcr.io/dataflow-geobeam/base', project='geo-solution-demos', region='us-central1', max_num_workers=1, number_of_worker_harness_threads=1 ) with beam.Pipeline(options=pipeline_options) as p: for suffix in layer_suffixes: layer = get_layername(gcs_url, suffix) (p | 'Read ' + layer >> beam.io.Read(ShapefileSource(gcs_url, layer_name=layer, in_epsg=4326)) | 'MakeValid ' + layer >> beam.Map(make_valid) | 'FilterInvalid ' + layer >> beam.Filter(filter_invalid) | 'FormatRecord ' + layer >> beam.Map(format_record, output_type='geojson') | 'WriteToBigQuery ' + layer >> beam.io.WriteToBigQuery( beam_bigquery.TableReference(projectId='geo-solution-demos', datasetId=dataset, tableId=tables[suffix]), method=beam.io.WriteToBigQuery.Method.FILE_LOADS, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER) ) if __name__ == '__main__': import argparse logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument('--gcs_url', type=str) parser.add_argument('--dataset', type=str, default='nhc') known_args, pipeline_args = parser.parse_known_args() run(pipeline_args, known_args.gcs_url, known_args.dataset)