code/main.py (57 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import functions_framework from google.cloud import bigquery, storage import os def load_csv_to_bq(bucket, object): # Construct a BigQuery client object. client = bigquery.Client() project_id = os.environ['DW_PROJECT_ID'] params = object.split("/") table_id = "{}.{}.{}".format(project_id, params[0], params[1]) print(f"Table ID: {table_id}") job_config = bigquery.LoadJobConfig( autodetect=True, skip_leading_rows=1, # The source format defaults to CSV, so the line below is optional. source_format=bigquery.SourceFormat.CSV, ) uri = "gs://{}/{}".format(bucket, object) load_job = client.load_table_from_uri( uri, table_id, job_config=job_config ) # Make an API request. load_job.result() # Waits for the job to complete. destination_table = client.get_table(table_id) # Make an API request. print("Table has now {} rows.".format(destination_table.num_rows)) def move_blob(bucket_name, blob_name): """Moves a blob from one bucket to another with a new name.""" destination_bucket_name = os.environ['GCS_ARCHIVE_BUCKET'] storage_client = storage.Client() source_bucket = storage_client.bucket(bucket_name) src_blob = source_bucket.blob(blob_name) dst_bucket = storage_client.bucket(destination_bucket_name) blob_copy = source_bucket.copy_blob(src_blob, dst_bucket, blob_name) source_bucket.delete_blob(blob_name) print( "Blob {} in bucket {} moved to blob {} in bucket {}.".format( src_blob.name, source_bucket.name, blob_copy.name, dst_bucket.name, ) ) # Triggered by a change in a storage bucket @functions_framework.cloud_event def trigger_gcs(cloud_event): data = cloud_event.data event_id = cloud_event["id"] event_type = cloud_event["type"] bucket = data["bucket"] name = data["name"] metageneration = data["metageneration"] timeCreated = data["timeCreated"] updated = data["updated"] print(f"Event ID: {event_id}") print(f"Event type: {event_type}") print(f"Bucket: {bucket}") print(f"File: {name}") print(f"Metageneration: {metageneration}") print(f"Created: {timeCreated}") print(f"Updated: {updated}") if 'csv' in name: load_csv_to_bq(bucket, name) move_blob(bucket, name)