in cloud-composer/dags/sample-datastream-public-ip-generate-data.py [0:0]
def run_postgres_sql(database_password):
print("start run_postgres_sql")
ipAddress = Path('/home/airflow/gcs/data/postgres_public_ip_address.txt').read_text()
ipAddress = ipAddress.replace("\n", "")
print("ipAddress:", ipAddress)
database_name = "demodb"
postgres_user = "postgres"
conn = psycopg2.connect(
host=ipAddress,
database=database_name,
user=postgres_user,
password=database_password)
# create default config
config_data = {
"interation_count" : 0,
"min_index" : 1,
"max_index" : 10010000,
"current_index": 1
}
if os.path.exists('/home/airflow/gcs/data/datastream-public-ip-generate-data.json'):
# read the data
datastream_cdc_config_json = Path('/home/airflow/gcs/data/datastream-public-ip-generate-data.json').read_text()
config_data = json.loads(datastream_cdc_config_json)
else:
# write the default config
with open('/home/airflow/gcs/data/datastream-public-ip-generate-data.json', 'w') as f:
json.dump(config_data, f)
try:
# This runs for 4 hours
cur = conn.cursor()
client = bigquery.Client()
# Loop for 4 hours (14400 seconds)
start_time = datetime.now()
loop_count = 0
while (datetime.now() - start_time).total_seconds() < 14400 :
print("interation_count:",config_data["interation_count"])
print("min_index:",config_data["min_index"])
print("max_index:",config_data["max_index"])
print("current_index:",config_data["current_index"])
for index in range(config_data["current_index"], config_data["max_index"], 50):
loop_count = loop_count + 1
bigquery_sql = "SELECT sql_statement, table_name " + \
" FROM taxi_dataset.datastream_cdc_data " + \
"WHERE execution_order BETWEEN {} AND {};".format(index, index+49)
# print("bigquery_sql: ", bigquery_sql)
query_job = client.query(bigquery_sql)
results = query_job.result() # Waits for job to complete.
for row in results:
# print("row.table_name:", row.table_name)
# print("row.sql_statement:", row.sql_statement)
if config_data["interation_count"] == 0:
# okay to execute
cur.execute(row.sql_statement)
else:
if row.table_name != "driver":
# okay to execute (we do not want to create duplicate drivers)
cur.execute(row.sql_statement)
if index+49 >= config_data["max_index"]:
config_data["interation_count"] = config_data["interation_count"] + 1
config_data["current_index"] = 1
conn.commit()
config_data["current_index"] = index+49
# Write the file ever so often
if loop_count % 100 == 0:
print("bigquery_sql: ", bigquery_sql)
print("loop_count: ", loop_count)
print("config_data[current_index]: ", config_data["current_index"])
with open('/home/airflow/gcs/data/datastream-public-ip-generate-data.json', 'w') as f:
json.dump(config_data, f)
if (datetime.now() - start_time).total_seconds() > 14400:
break
# Save
with open('/home/airflow/gcs/data/datastream-public-ip-generate-data.json', 'w') as f:
json.dump(config_data, f)
cur.close()
conn.commit()
# Save our state
with open('/home/airflow/gcs/data/datastream-public-ip-generate-data.json', 'w') as f:
json.dump(config_data, f)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()