def run()

in ark-demo/pipelines/nfhl/nfhl_pipeline.py [0:0]


def run(pipeline_args, gcs_url, layer=None, dataset=None):
    import apache_beam as beam
    from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery

    from geobeam.io import GeodatabaseSource
    from geobeam.fn import make_valid, filter_invalid, format_record

    release_date, gdb_name = parse_gcs_url(gcs_url)

    layer_schemas = get_schemas()

    if layer is not None:
        nfhl_layers = [layer]
    else:
        nfhl_layers = layer_schemas.keys()

    pipeline_options = PipelineOptions(
        pipeline_args,
        experiments=['use_runner_v2'],
        temp_location='gs://gsd-pipeline-temp',
        sdk_container_image='gcr.io/dataflow-geobeam/base',
        project='geo-solution-demos',
        region='us-central1',
        number_of_worker_harness_threads=1
    )
    
    pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options = ['enable_prime']

    write_method = beam.io.BigQueryDisposition.WRITE_APPEND
    if known_args.truncate:
        write_method = beam.io.BigQueryDisposition.WRITE_TRUNCATE

    with beam.Pipeline(options=pipeline_options) as p:
        for layer in nfhl_layers:
            layer_schema = layer_schemas[layer]
            (p
             | 'Read ' + layer >> beam.io.Read(GeodatabaseSource(gcs_url,
                 layer_name=layer,
                 gdb_name=gdb_name)).with_resource_hints(min_ram="30GB")
             | 'MakeValid ' + layer >> beam.Map(make_valid).with_resource_hints(min_ram="30GB")
             | 'FilterInvalid ' + layer >> beam.Filter(filter_invalid).with_resource_hints(min_ram="30GB")
             | 'FormatGDBDatetimes ' + layer >> beam.Map(format_gdb_datetime, layer_schema).with_resource_hints(min_ram="30GB")
             | 'FormatRecord ' + layer >> beam.Map(format_record).with_resource_hints(min_ram="30GB")
             | 'WriteToBigQuery ' + layer >> beam.io.WriteToBigQuery(
                   beam_bigquery.TableReference(projectId='geo-solution-demos', datasetId=dataset, tableId=layer),
                   method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
                   write_disposition=write_method,
                   create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER).with_resource_hints(min_ram="30GB")
            )