cloud-composer/dags/sample-datastream-private-ip-generate-data.py (119 lines of code) (raw):
####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################
# Author: Adam Paternostro
# Summary: Creates a Postgres Cloud SQL instance
# Creates a database
# Creates a table with some data
# Creates a datastream job from Cloud SQL to BigQuery
# [START dag]
from google.cloud import storage
from datetime import datetime, timedelta
import requests
import sys
import os
import logging
import airflow
from airflow.operators import bash_operator
from airflow.utils import trigger_rule
from airflow.operators.python_operator import PythonOperator
import json
from pathlib import Path
import psycopg2
import time
import random
from google.cloud import bigquery
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': None,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'dagrun_timeout' : timedelta(minutes=60),
}
project_id = os.environ['ENV_PROJECT_ID']
root_password = os.environ['ENV_RANDOM_EXTENSION']
cloud_sql_region = os.environ['ENV_CLOUD_SQL_REGION']
datastream_region = os.environ['ENV_DATASTREAM_REGION']
params_list = {
'project_id' : project_id,
'root_password' : root_password,
'cloud_sql_region' : cloud_sql_region,
'datastream_region' : datastream_region,
}
# Create the table
# Set the datastream replication items
def run_postgres_sql(database_password):
print("start run_postgres_sql")
ipAddress = Path('/home/airflow/gcs/data/postgres_private_ip_address.txt').read_text()
ipAddress = ipAddress.replace("\n", "")
print("ipAddress:", ipAddress)
database_name = "demodb"
postgres_user = "postgres"
conn = psycopg2.connect(
host=ipAddress,
database=database_name,
user=postgres_user,
password=database_password)
# create default config
config_data = {
"interation_count" : 0,
"min_index" : 1,
"max_index" : 10010000,
"current_index": 1
}
if os.path.exists('/home/airflow/gcs/data/datastream-private-ip-generate-data.json'):
# read the data
datastream_cdc_config_json = Path('/home/airflow/gcs/data/datastream-private-ip-generate-data.json').read_text()
config_data = json.loads(datastream_cdc_config_json)
else:
# write the default config
with open('/home/airflow/gcs/data/datastream-private-ip-generate-data.json', 'w') as f:
json.dump(config_data, f)
try:
# This runs for 4 hours (14400 seconds)
cur = conn.cursor()
client = bigquery.Client()
# Loop for 4 hours
start_time = datetime.now()
loop_count = 0
while (datetime.now() - start_time).total_seconds() < 14400 :
print("interation_count:",config_data["interation_count"])
print("min_index:",config_data["min_index"])
print("max_index:",config_data["max_index"])
print("current_index:",config_data["current_index"])
for index in range(config_data["current_index"], config_data["max_index"], 50):
loop_count = loop_count + 1
bigquery_sql = "SELECT sql_statement, table_name " + \
" FROM taxi_dataset.datastream_cdc_data " + \
"WHERE execution_order BETWEEN {} AND {};".format(index, index+49)
# print("bigquery_sql: ", bigquery_sql)
query_job = client.query(bigquery_sql)
results = query_job.result() # Waits for job to complete.
for row in results:
# print("row.table_name:", row.table_name)
# print("row.sql_statement:", row.sql_statement)
if config_data["interation_count"] == 0:
# okay to execute
cur.execute(row.sql_statement)
else:
if row.table_name != "driver":
# okay to execute (we do not want to create duplicate drivers)
cur.execute(row.sql_statement)
if index+49 >= config_data["max_index"]:
config_data["interation_count"] = config_data["interation_count"] + 1
config_data["current_index"] = 1
conn.commit()
config_data["current_index"] = index+49
# Write the file ever so often
if loop_count % 100 == 0:
print("bigquery_sql: ", bigquery_sql)
print("loop_count: ", loop_count)
print("config_data[current_index]: ", config_data["current_index"])
with open('/home/airflow/gcs/data/datastream-private-ip-generate-data.json', 'w') as f:
json.dump(config_data, f)
if (datetime.now() - start_time).total_seconds() > 14400:
break
# Save
with open('/home/airflow/gcs/data/datastream-private-ip-generate-data.json', 'w') as f:
json.dump(config_data, f)
cur.close()
conn.commit()
# Save our state
with open('/home/airflow/gcs/data/datastream-private-ip-generate-data.json', 'w') as f:
json.dump(config_data, f)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
with airflow.DAG('sample-datastream-private-ip-generate-data',
default_args=default_args,
start_date=datetime(2021, 1, 1),
# Add the Composer "Data" directory which will hold the SQL/Bash scripts for deployment
template_searchpath=['/home/airflow/gcs/data'],
# Not scheduled, trigger only
schedule_interval=None) as dag:
run_postgres_sql_task = PythonOperator(
task_id='run_postgres_sql_task',
python_callable=run_postgres_sql,
op_kwargs = { "database_password" : root_password },
dag=dag
)
# DAG Graph
run_postgres_sql_task
# [END dag]