def run()

in ark-demo/pipelines/nhc_forecast/cone_pipeline.py [0:0]


def run(pipeline_args, gcs_url, dataset):
    import apache_beam as beam
    from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery

    from geobeam.io import ShapefileSource
    from geobeam.fn import make_valid, filter_invalid, format_record

    pipeline_options = PipelineOptions(
        pipeline_args,
        experiments=['use_runner_v2'],
        temp_location='gs://gsd-pipeline-temp',
        sdk_container_image='gcr.io/dataflow-geobeam/base',
        project='geo-solution-demos',
        region='us-central1',
        max_num_workers=1,
        number_of_worker_harness_threads=1
    )

    with beam.Pipeline(options=pipeline_options) as p:
        for suffix in layer_suffixes:
            layer = get_layername(gcs_url, suffix)
            (p
             | 'Read ' + layer >> beam.io.Read(ShapefileSource(gcs_url, layer_name=layer, in_epsg=4326))
             | 'MakeValid ' + layer >> beam.Map(make_valid)
             | 'FilterInvalid ' + layer >> beam.Filter(filter_invalid)
             | 'FormatRecord ' + layer >> beam.Map(format_record, output_type='geojson')
             | 'WriteToBigQuery ' + layer >> beam.io.WriteToBigQuery(
                   beam_bigquery.TableReference(projectId='geo-solution-demos', datasetId=dataset, tableId=tables[suffix]),
                   method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
                   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                   create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
             )