ark-demo/pipelines/nfhl/nfhl_pipeline.py (88 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Load NFHL into BigQuery """ import datetime from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions import logging def parse_gcs_url(gcs_url): [full_path, suffix] = gcs_url.split('.') basename = full_path.split('/')[-1] [prefix, fips, release] = basename.split('_') gdb_name = '{}.gdb'.format(basename) release_date = datetime.datetime.strptime(release, '%Y%m%d') return release_date, gdb_name """ Fix GDB datetime fields see https://desktop.arcgis.com/en/arcmap/latest/manage-data/tables/fundamentals-of-date-fields.htm """ def format_gdb_datetime(element, schema): from datetime import datetime props, geom = element dt_fields = [] for field in schema: if field['type'] == 'DATETIME': dt_fields.append(field['name']) for field in dt_fields: if props[field] is not None: dt_in = datetime.strptime(props[field], '%Y-%m-%dT%H:%M:%S%z') props[field] = dt_in.replace(tzinfo=None).strftime('%Y-%m-%d %H:%M:%S') return props, geom def get_schemas(): from google.cloud import storage import json schemas = {} client = storage.Client() bucket = client.bucket('geo-demos') schema_ls = client.list_blobs('geo-demos', prefix='ark-demo/schemas/', delimiter='/') for schema_file in schema_ls: if not schema_file.name.endswith('.json'): continue layer_name = schema_file.name.split('/')[-1].split('.json')[0] schema_json = json.loads(bucket.blob(schema_file.name).download_as_string()) schemas[layer_name] = schema_json return schemas def run(pipeline_args, gcs_url, layer=None, dataset=None): import apache_beam as beam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from geobeam.io import GeodatabaseSource from geobeam.fn import make_valid, filter_invalid, format_record release_date, gdb_name = parse_gcs_url(gcs_url) layer_schemas = get_schemas() if layer is not None: nfhl_layers = [layer] else: nfhl_layers = layer_schemas.keys() pipeline_options = PipelineOptions( pipeline_args, experiments=['use_runner_v2'], temp_location='gs://gsd-pipeline-temp', sdk_container_image='gcr.io/dataflow-geobeam/base', project='geo-solution-demos', region='us-central1', number_of_worker_harness_threads=1 ) pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options = ['enable_prime'] write_method = beam.io.BigQueryDisposition.WRITE_APPEND if known_args.truncate: write_method = beam.io.BigQueryDisposition.WRITE_TRUNCATE with beam.Pipeline(options=pipeline_options) as p: for layer in nfhl_layers: layer_schema = layer_schemas[layer] (p | 'Read ' + layer >> beam.io.Read(GeodatabaseSource(gcs_url, layer_name=layer, gdb_name=gdb_name)).with_resource_hints(min_ram="30GB") | 'MakeValid ' + layer >> beam.Map(make_valid).with_resource_hints(min_ram="30GB") | 'FilterInvalid ' + layer >> beam.Filter(filter_invalid).with_resource_hints(min_ram="30GB") | 'FormatGDBDatetimes ' + layer >> beam.Map(format_gdb_datetime, layer_schema).with_resource_hints(min_ram="30GB") | 'FormatRecord ' + layer >> beam.Map(format_record).with_resource_hints(min_ram="30GB") | 'WriteToBigQuery ' + layer >> beam.io.WriteToBigQuery( beam_bigquery.TableReference(projectId='geo-solution-demos', datasetId=dataset, tableId=layer), method=beam.io.WriteToBigQuery.Method.FILE_LOADS, write_disposition=write_method, create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER).with_resource_hints(min_ram="30GB") ) """ if known_args.safe_mode: from google.cloud import bigquery client = bigquery.Client() for layer in nfhl_layers: sql = ( 'insert into `geo-solution-demos.nfhl.' + layer + '` ' 'select * except(geom), st_geogfromgeojson(geom, make_valid => true) as geom ' 'from `geo-solution-demos.' + dataset + '.' + layer + '` ' ) client.query(sql) """ if __name__ == '__main__': import argparse logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument('--gcs_url', type=str) parser.add_argument('--layer', type=str, default=None) parser.add_argument('--dataset', type=str, default='nfhl_staging') parser.add_argument('--safe_mode', type=bool, default=True) parser.add_argument('--truncate', type=bool, default=True) known_args, pipeline_args = parser.parse_known_args() run(pipeline_args, known_args.gcs_url, known_args.layer, known_args.dataset)