in ark-demo/pipelines/nfhl/nfhl_pipeline.py [0:0]
def run(pipeline_args, gcs_url, layer=None, dataset=None):
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery
from geobeam.io import GeodatabaseSource
from geobeam.fn import make_valid, filter_invalid, format_record
release_date, gdb_name = parse_gcs_url(gcs_url)
layer_schemas = get_schemas()
if layer is not None:
nfhl_layers = [layer]
else:
nfhl_layers = layer_schemas.keys()
pipeline_options = PipelineOptions(
pipeline_args,
experiments=['use_runner_v2'],
temp_location='gs://gsd-pipeline-temp',
sdk_container_image='gcr.io/dataflow-geobeam/base',
project='geo-solution-demos',
region='us-central1',
number_of_worker_harness_threads=1
)
pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options = ['enable_prime']
write_method = beam.io.BigQueryDisposition.WRITE_APPEND
if known_args.truncate:
write_method = beam.io.BigQueryDisposition.WRITE_TRUNCATE
with beam.Pipeline(options=pipeline_options) as p:
for layer in nfhl_layers:
layer_schema = layer_schemas[layer]
(p
| 'Read ' + layer >> beam.io.Read(GeodatabaseSource(gcs_url,
layer_name=layer,
gdb_name=gdb_name)).with_resource_hints(min_ram="30GB")
| 'MakeValid ' + layer >> beam.Map(make_valid).with_resource_hints(min_ram="30GB")
| 'FilterInvalid ' + layer >> beam.Filter(filter_invalid).with_resource_hints(min_ram="30GB")
| 'FormatGDBDatetimes ' + layer >> beam.Map(format_gdb_datetime, layer_schema).with_resource_hints(min_ram="30GB")
| 'FormatRecord ' + layer >> beam.Map(format_record).with_resource_hints(min_ram="30GB")
| 'WriteToBigQuery ' + layer >> beam.io.WriteToBigQuery(
beam_bigquery.TableReference(projectId='geo-solution-demos', datasetId=dataset, tableId=layer),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
write_disposition=write_method,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER).with_resource_hints(min_ram="30GB")
)