cloud-composer/dags/sample-bigquery-start-spanner.py (179 lines of code) (raw):
####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################
# Author: Adam Paternostro
# Summary: Export BigQuery public data to cloud storage.
# Creates a Spanner manifest files and uploads to cloud storage.
# Starts a Dataflow job that ingests the data from storage to Spanner
# Create a BigQuery Federated connection so BQ can query Spanner directly
# NOTE: BigQuery can query the public data, but this was done to show a Federated Query with public data (which just happens to be in BQ)
# [START dag]
from google.cloud import storage
from datetime import datetime, timedelta
import requests
import sys
import os
import logging
import airflow
from airflow.operators import bash_operator
from airflow.utils import trigger_rule
from airflow.operators.python_operator import PythonOperator
import google.auth
import google.auth.transport.requests
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
import json
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': None,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'dagrun_timeout' : timedelta(minutes=60),
}
project_id = os.environ['ENV_PROJECT_ID']
spanner_region = os.environ['ENV_SPANNER_REGION']
dataflow_region = os.environ['ENV_DATAFLOW_REGION']
bigquery_region = os.environ['ENV_BIGQUERY_REGION']
spanner_instance_id = os.environ['ENV_SPANNER_INSTANCE_ID']
processed_bucket_name = os.environ['ENV_PROCESSED_BUCKET']
raw_bucket_name = os.environ['ENV_RAW_BUCKET']
# nam8 "--config=regional-us-east1" \
spanner_config = os.environ['ENV_SPANNER_CONFIG']
params_list = {
"project_id" : project_id,
"spanner_region": spanner_region,
"dataflow_region": dataflow_region,
"bigquery_region" : bigquery_region,
"processed_bucket_name" : processed_bucket_name,
"raw_bucket_name" : raw_bucket_name,
"spanner_instance_id" : spanner_instance_id,
"spanner_config" : spanner_config
}
gcloud_create_spanner_instance = \
"gcloud spanner instances create \"" + spanner_instance_id + "\" " + \
"--project=\"" + project_id + "\" " + \
"--description=\"main-instance\" " + \
"--config=" + spanner_config + " " + \
"--processing-units=100"
gcloud_create_spanner_database = \
"gcloud spanner databases create weather " + \
"--project=\"" + project_id + "\" " + \
"--instance=\"" + spanner_instance_id + "\" " + \
"--ddl='CREATE TABLE weather (station_id STRING(100), station_date DATE, snow_mm_amt FLOAT64, precipitation_tenth_mm_amt FLOAT64, min_celsius_temp FLOAT64, max_celsius_temp FLOAT64) PRIMARY KEY(station_date,station_id)' " + \
""
sql="""
EXPORT DATA
OPTIONS(
uri='gs://{processed_bucket_name}/spanner/weather/*.csv',
format='CSV',
overwrite=true,
header=false,
field_delimiter=',')
AS
WITH JustRainSnowMinMaxTempData AS
(
SELECT id, date, element, MAX(value) AS value
FROM `bigquery-public-data.ghcn_d.ghcnd_2022`
WHERE id = 'USW00094728' -- NEW YORK CNTRL PK TWR
AND element IN ('SNOW','PRCP','TMIN','TMAX')
GROUP BY id, date, element
UNION ALL
SELECT id, date, element, MAX(value) AS value
FROM `bigquery-public-data.ghcn_d.ghcnd_2021`
WHERE id = 'USW00094728' -- NEW YORK CNTRL PK TWR
AND element IN ('SNOW','PRCP','TMIN','TMAX')
GROUP BY id, date, element
UNION ALL
SELECT id, date, element, MAX(value) AS value
FROM `bigquery-public-data.ghcn_d.ghcnd_2020`
WHERE id = 'USW00094728' -- NEW YORK CNTRL PK TWR
AND element IN ('SNOW','PRCP','TMIN','TMAX')
GROUP BY id, date, element
UNION ALL
SELECT id, date, element, MAX(value) AS value
FROM `bigquery-public-data.ghcn_d.ghcnd_2019`
WHERE id = 'USW00094728' -- NEW YORK CNTRL PK TWR
AND element IN ('SNOW','PRCP','TMIN','TMAX')
GROUP BY id, date, element
UNION ALL
SELECT id, date, element, MAX(value) AS value
FROM `bigquery-public-data.ghcn_d.ghcnd_2018`
WHERE id = 'USW00094728' -- NEW YORK CNTRL PK TWR
AND element IN ('SNOW','PRCP','TMIN','TMAX')
GROUP BY id, date, element
UNION ALL
SELECT id, date, element, MAX(value) AS value
FROM `bigquery-public-data.ghcn_d.ghcnd_2017`
WHERE id = 'USW00094728' -- NEW YORK CNTRL PK TWR
AND element IN ('SNOW','PRCP','TMIN','TMAX')
GROUP BY id, date, element
UNION ALL
SELECT id, date, element, MAX(value) AS value
FROM `bigquery-public-data.ghcn_d.ghcnd_2016`
WHERE id = 'USW00094728' -- NEW YORK CNTRL PK TWR
AND element IN ('SNOW','PRCP','TMIN','TMAX')
GROUP BY id, date, element
UNION ALL
SELECT id, date, element, MAX(value) AS value
FROM `bigquery-public-data.ghcn_d.ghcnd_2015`
WHERE id = 'USW00094728' -- NEW YORK CNTRL PK TWR
AND element IN ('SNOW','PRCP','TMIN','TMAX')
GROUP BY id, date, element
)
SELECT id AS station_id,
date AS station_date,
SNOW AS snow_mm_amt,
PRCP AS precipitation_tenth_mm_amt,
TMIN AS min_celsius_temp,
TMAX AS max_celsius_temp,
FROM JustRainSnowMinMaxTempData
PIVOT(MAX(value) FOR element IN ('SNOW','PRCP','TMIN','TMAX'))
ORDER BY date;
""".format(processed_bucket_name=processed_bucket_name)
LOCAL_PATH_SPANNER_MANIFEST_FILE = "/home/airflow/gcs/data/spanner-manifest.json"
BUCKET_RELATIVE_PATH="spanner/weather/"
# Delete the table before loading
gcloud_truncate_table=("gcloud spanner databases execute-sql weather " + \
"--instance={spanner_instance_id} " + \
"--sql='DELETE FROM weather WHERE true;'").format(\
spanner_instance_id=spanner_instance_id)
# Dataflow template command to run a Spanner import from CSV files
gcloud_load_weather_table=("gcloud dataflow jobs run importspannerweatherdata " + \
"--gcs-location gs://dataflow-templates-{dataflow_region}/latest/GCS_Text_to_Cloud_Spanner " + \
"--region {dataflow_region} " + \
"--max-workers 1 " + \
"--num-workers 1 " + \
"--service-account-email \"dataflow-service-account@{project_id}.iam.gserviceaccount.com\" " + \
"--worker-machine-type \"n1-standard-4\" " + \
"--staging-location gs://{raw_bucket_name} " + \
"--network vpc-main " + \
"--subnetwork regions/{dataflow_region}/subnetworks/dataflow-subnet " + \
"--disable-public-ips " + \
"--parameters " + \
"instanceId={spanner_instance_id}," + \
"databaseId=weather," + \
"spannerProjectId={project_id}," + \
"importManifest=gs://{processed_bucket_name}/spanner/weather/spanner-manifest.json").format(\
dataflow_region=dataflow_region,
processed_bucket_name=processed_bucket_name,
raw_bucket_name=raw_bucket_name,
project_id=project_id,
spanner_instance_id=spanner_instance_id)
def write_spanner_run_datetime(spanner_instance_id):
run_datetime = datetime.now()
print("spanner_instance_id: ", spanner_instance_id)
data = {
"run_datetime" : run_datetime.strftime("%m/%d/%Y %H:%M:%S"),
"spanner_instance_id" : spanner_instance_id
}
with open('/home/airflow/gcs/data/write_spanner_run_datetime.json', 'w') as f:
json.dump(data, f)
# Creates a JSON file that is required for the Dataflow job that loads Spanner
def write_spanner_manifest(processed_bucket_name,file_path):
spanner_template_json={
"tables": [
{
"table_name": "weather",
"file_patterns": ["gs://" + processed_bucket_name + "/spanner/weather/*.csv"],
"columns": [
{"column_name": "station_id", "type_name": "STRING"},
{"column_name": "station_date", "type_name": "DATE"},
{"column_name": "snow_mm_amt", "type_name": "FLOAT64"},
{"column_name": "precipitation_tenth_mm_amt", "type_name": "FLOAT64"},
{"column_name": "min_celsius_temp", "type_name": "FLOAT64"},
{"column_name": "max_celsius_temp", "type_name": "FLOAT64"}
]
}
]
}
print(spanner_template_json)
try:
with open(file_path, 'w') as f:
json.dump(spanner_template_json, f)
except Exception as e:
print(e)
raise e
with airflow.DAG('sample-bigquery-start-spanner',
default_args=default_args,
start_date=datetime(2021, 1, 1),
# Add the Composer "Data" directory which will hold the SQL/Bash scripts for deployment
template_searchpath=['/home/airflow/gcs/data'],
# Not scheduled, trigger only
schedule_interval=None) as dag:
# NOTE: The service account of the Composer worker node must have access to run these commands
# Access to create a data transfer and access to BQ to create a dataset
# Access to spanner is required
# Create Spanner Instance
gcloud_create_spanner_instance = bash_operator.BashOperator(
task_id="gcloud_create_spanner_instance",
bash_command=gcloud_create_spanner_instance,
)
# Create Spanner Database
gcloud_create_spanner_database = bash_operator.BashOperator(
task_id="gcloud_create_spanner_database",
bash_command=gcloud_create_spanner_database,
)
# Run a BigQuery stored procedure that exports data to a storage bucket
# HARDCODED TO location="us" REGION since this data is only in US
export_public_weather_data = BigQueryInsertJobOperator(
task_id="export_public_weather_data",
location="us",
configuration={
"query": {
"query": sql,
"useLegacySql": False,
}
})
# Save a template file locally and then upload to GCS (Spanner needs this for importing)
write_spanner_manifest_file = PythonOperator(
task_id='write_spanner_manifest_file',
python_callable= write_spanner_manifest,
op_kwargs = { "processed_bucket_name" : processed_bucket_name,
"file_path" : LOCAL_PATH_SPANNER_MANIFEST_FILE},
dag=dag,
)
upload_spanner_manifest_file = LocalFilesystemToGCSOperator(
task_id="upload_spanner_manifest_file",
src=LOCAL_PATH_SPANNER_MANIFEST_FILE,
dst=BUCKET_RELATIVE_PATH,
bucket=processed_bucket_name,
)
# Delete all records from the spanner table (to avoid dups)
truncate_spanner_table = bash_operator.BashOperator(
task_id="truncate_spanner_table",
bash_command=gcloud_truncate_table,
)
# Run a Dataflow job that will import the data to spanner (this can take a few minutes to run)
dataflow_load_spanner_table = bash_operator.BashOperator(
task_id="dataflow_load_spanner_table",
bash_command=gcloud_load_weather_table,
)
# Setup a BigQuery federated query connection so we can query BQ and Spanner using a single SQL command
create_spanner_connection = bash_operator.BashOperator(
task_id='create_spanner_connection',
bash_command='bash_create_spanner_connection.sh',
params=params_list,
dag=dag
)
write_spanner_run_datetime = PythonOperator(
task_id='write_spanner_run_datetime',
python_callable= write_spanner_run_datetime,
op_kwargs = { "spanner_instance_id" : spanner_instance_id },
execution_timeout=timedelta(minutes=1),
dag=dag,
)
# DAG Graph
gcloud_create_spanner_instance >> gcloud_create_spanner_database >> \
export_public_weather_data >> \
write_spanner_manifest_file >> upload_spanner_manifest_file >> \
truncate_spanner_table >> dataflow_load_spanner_table >> \
create_spanner_connection >> \
write_spanner_run_datetime
# [END dag]