in cloud-composer/dags/sample-datastream-private-ip-deploy.py [0:0]
def run_postgres_sql(database_password):
print("start run_postgres_sql")
ipAddress = Path('/home/airflow/gcs/data/postgres_private_ip_address.txt').read_text()
ipAddress = ipAddress.replace("\n", "")
print("ipAddress:", ipAddress)
database_name = "demodb"
postgres_user = "postgres"
conn = psycopg2.connect(
host=ipAddress,
database=database_name,
user=postgres_user,
password=database_password)
# Datastream failed to read from the PostgreSQL replication slot datastream_replication_slot. Make sure that the slot exists and that Datastream has the necessary permissions to access it.
# Datastream failed to find the publication datastream_publication. Make sure that the publication exists and that Datastream has the necessary permissions to access it.
with open('/home/airflow/gcs/data/postgres_create_schema.sql', 'r') as file:
table_commands = file.readlines()
with open('/home/airflow/gcs/data/postgres_create_datastream_replication.sql', 'r') as file:
replication_commands = file.readlines()
""" table_commands = (
"CREATE TABLE IF NOT EXISTS entries (guestName VARCHAR(255), content VARCHAR(255), entryID SERIAL PRIMARY KEY);",
"INSERT INTO entries (guestName, content) values ('first guest', 'I got here!');",
"INSERT INTO entries (guestName, content) values ('second guest', 'Me too!');",
)
replication_commands = (
"CREATE PUBLICATION datastream_publication FOR ALL TABLES;",
"ALTER USER " + postgres_user + " with replication;",
"SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('datastream_replication_slot', 'pgoutput');",
"CREATE USER datastream_user WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD '" + database_password + "';",
"GRANT SELECT ON ALL TABLES IN SCHEMA public TO datastream_user;",
"GRANT USAGE ON SCHEMA public TO datastream_user;",
"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO datastream_user;"
)
"""
# You can get these through the DataStream UI. Click through and press the button for the SQL to run.
# CREATE PUBLICATION [MY_PUBLICATION] FOR ALL TABLES;
# alter user <curr_user> with replication;
# SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT ('[REPLICATION_SLOT_NAME]', 'pgoutput');
# CREATE USER [MY_USER] WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD '[MY_PASSWORD]';
# GRANT SELECT ON ALL TABLES IN SCHEMA [MY_SCHEMA] TO [MY_USER];
# GRANT USAGE ON SCHEMA [MY_SCHEMA] TO [MY_USER];
# ALTER DEFAULT PRIVILEGES IN SCHEMA [MY_SCHEMA] GRANT SELECT ON TABLES TO [MY_USER];
try:
# Create table first in order to avoid "cannot create logical replication slot in transaction that has performed writes"
table_cursor = conn.cursor()
for sql in table_commands:
print("SQL: ", sql)
if sql.startswith("--"):
continue
if sql.strip() == "":
continue
table_cursor.execute(sql)
table_cursor.close()
conn.commit()
# Run Datastream necessary commands (these change by database type)
replication_cur = conn.cursor()
for command in replication_commands:
sql = command
print("SQL: ", sql)
if sql.startswith("--"):
continue
if sql.strip() == "":
continue
sql = sql.replace("<<POSTGRES_USER>>",postgres_user);
sql = sql.replace("<<DATABASE_PASSWORD>>",database_password);
replication_cur.execute(sql)
conn.commit()
replication_cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print("ERROR: ", error)
finally:
if conn is not None:
conn.close()