cloud-composer/dags/sample-datastream-private-ip-deploy.py (112 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Creates a Postgres Cloud SQL instance # Creates a database # Creates a table with some data # Creates a datastream job from Cloud SQL to BigQuery # [START dag] from google.cloud import storage from datetime import datetime, timedelta import requests import sys import os import logging import airflow from airflow.operators import bash_operator from airflow.utils import trigger_rule from airflow.operators.python_operator import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator import json from pathlib import Path import psycopg2 default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } project_id = os.environ['ENV_PROJECT_ID'] project_number = os.environ['ENV_PROJECT_NUMBER'] root_password = os.environ['ENV_RANDOM_EXTENSION'] cloud_sql_region = os.environ['ENV_CLOUD_SQL_REGION'] datastream_region = os.environ['ENV_DATASTREAM_REGION'] bigquery_region = os.environ['ENV_BIGQUERY_REGION'] code_bucket_name = os.environ['ENV_CODE_BUCKET'] cloud_sql_zone = os.environ['ENV_CLOUD_SQL_ZONE'] params_list = { 'project_id' : project_id, 'root_password' : root_password, 'cloud_sql_region' : cloud_sql_region, 'datastream_region' : datastream_region, 'datastream_region' : datastream_region, 'bigquery_region' : bigquery_region, 'project_number' : project_number, 'code_bucket_name' : code_bucket_name, 'cloud_sql_zone' : cloud_sql_zone } # Create the table # Set the datastream replication items def run_postgres_sql(database_password): print("start run_postgres_sql") ipAddress = Path('/home/airflow/gcs/data/postgres_private_ip_address.txt').read_text() ipAddress = ipAddress.replace("\n", "") print("ipAddress:", ipAddress) database_name = "demodb" postgres_user = "postgres" conn = psycopg2.connect( host=ipAddress, database=database_name, user=postgres_user, password=database_password) # Datastream failed to read from the PostgreSQL replication slot datastream_replication_slot. Make sure that the slot exists and that Datastream has the necessary permissions to access it. # Datastream failed to find the publication datastream_publication. Make sure that the publication exists and that Datastream has the necessary permissions to access it. with open('/home/airflow/gcs/data/postgres_create_schema.sql', 'r') as file: table_commands = file.readlines() with open('/home/airflow/gcs/data/postgres_create_datastream_replication.sql', 'r') as file: replication_commands = file.readlines() """ table_commands = ( "CREATE TABLE IF NOT EXISTS entries (guestName VARCHAR(255), content VARCHAR(255), entryID SERIAL PRIMARY KEY);", "INSERT INTO entries (guestName, content) values ('first guest', 'I got here!');", "INSERT INTO entries (guestName, content) values ('second guest', 'Me too!');", ) replication_commands = ( "CREATE PUBLICATION datastream_publication FOR ALL TABLES;", "ALTER USER " + postgres_user + " with replication;", "SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('datastream_replication_slot', 'pgoutput');", "CREATE USER datastream_user WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD '" + database_password + "';", "GRANT SELECT ON ALL TABLES IN SCHEMA public TO datastream_user;", "GRANT USAGE ON SCHEMA public TO datastream_user;", "ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO datastream_user;" ) """ # You can get these through the DataStream UI. Click through and press the button for the SQL to run. # CREATE PUBLICATION [MY_PUBLICATION] FOR ALL TABLES; # alter user <curr_user> with replication; # SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT ('[REPLICATION_SLOT_NAME]', 'pgoutput'); # CREATE USER [MY_USER] WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD '[MY_PASSWORD]'; # GRANT SELECT ON ALL TABLES IN SCHEMA [MY_SCHEMA] TO [MY_USER]; # GRANT USAGE ON SCHEMA [MY_SCHEMA] TO [MY_USER]; # ALTER DEFAULT PRIVILEGES IN SCHEMA [MY_SCHEMA] GRANT SELECT ON TABLES TO [MY_USER]; try: # Create table first in order to avoid "cannot create logical replication slot in transaction that has performed writes" table_cursor = conn.cursor() for sql in table_commands: print("SQL: ", sql) if sql.startswith("--"): continue if sql.strip() == "": continue table_cursor.execute(sql) table_cursor.close() conn.commit() # Run Datastream necessary commands (these change by database type) replication_cur = conn.cursor() for command in replication_commands: sql = command print("SQL: ", sql) if sql.startswith("--"): continue if sql.strip() == "": continue sql = sql.replace("<<POSTGRES_USER>>",postgres_user); sql = sql.replace("<<DATABASE_PASSWORD>>",database_password); replication_cur.execute(sql) conn.commit() replication_cur.close() except (Exception, psycopg2.DatabaseError) as error: print("ERROR: ", error) finally: if conn is not None: conn.close() with airflow.DAG('sample-datastream-private-ip-deploy', default_args=default_args, start_date=datetime(2021, 1, 1), # Add the Composer "Data" directory which will hold the SQL/Bash scripts for deployment template_searchpath=['/home/airflow/gcs/data'], # Not scheduled, trigger only schedule_interval=None) as dag: # NOTE: The service account of the Composer worker node must have access to run these commands # Create the Postgres Instance and Database create_datastream_postgres_database_task = bash_operator.BashOperator( task_id='create_datastream_postgres_database_task', bash_command='sample_datastream_private_ip_deploy_postgres.sh', params=params_list, dag=dag ) run_postgres_sql_task = PythonOperator( task_id='run_postgres_sql_task', python_callable=run_postgres_sql, op_kwargs = { "database_password" : root_password }, dag=dag ) # Configure datastream bash_create_datastream_task = bash_operator.BashOperator( task_id='bash_create_datastream_task', bash_command='sample_datastream_private_ip_deploy_datastream.sh', params=params_list, dag=dag ) # DAG Graph create_datastream_postgres_database_task >> run_postgres_sql_task >> bash_create_datastream_task # [END dag]