in ark-demo/pipelines/nhc_forecast/cone_pipeline.py [0:0]
def run(pipeline_args, gcs_url, dataset):
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery
from geobeam.io import ShapefileSource
from geobeam.fn import make_valid, filter_invalid, format_record
pipeline_options = PipelineOptions(
pipeline_args,
experiments=['use_runner_v2'],
temp_location='gs://gsd-pipeline-temp',
sdk_container_image='gcr.io/dataflow-geobeam/base',
project='geo-solution-demos',
region='us-central1',
max_num_workers=1,
number_of_worker_harness_threads=1
)
with beam.Pipeline(options=pipeline_options) as p:
for suffix in layer_suffixes:
layer = get_layername(gcs_url, suffix)
(p
| 'Read ' + layer >> beam.io.Read(ShapefileSource(gcs_url, layer_name=layer, in_epsg=4326))
| 'MakeValid ' + layer >> beam.Map(make_valid)
| 'FilterInvalid ' + layer >> beam.Filter(filter_invalid)
| 'FormatRecord ' + layer >> beam.Map(format_record, output_type='geojson')
| 'WriteToBigQuery ' + layer >> beam.io.WriteToBigQuery(
beam_bigquery.TableReference(projectId='geo-solution-demos', datasetId=dataset, tableId=tables[suffix]),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
)