def run_postgres_sql()

in cloud-composer/dags/sample-datastream-public-ip-deploy.py [0:0]


def run_postgres_sql(database_password):
    print("start run_postgres_sql")
    ipAddress = Path('/home/airflow/gcs/data/postgres_public_ip_address.txt').read_text()
    ipAddress = ipAddress.replace("\n", "")
    print("ipAddress:", ipAddress)

    database_name = "demodb"
    postgres_user = "postgres"

    conn = psycopg2.connect(
    host=ipAddress,
    database=database_name,
    user=postgres_user,
    password=database_password)

    # Datastream failed to read from the PostgreSQL replication slot datastream_replication_slot. Make sure that the slot exists and that Datastream has the necessary permissions to access it.
    # Datastream failed to find the publication datastream_publication. Make sure that the publication exists and that Datastream has the necessary permissions to access it.
    with open('/home/airflow/gcs/data/postgres_create_schema.sql', 'r') as file:
        table_commands = file.readlines()

    with open('/home/airflow/gcs/data/postgres_create_datastream_replication.sql', 'r') as file:
        replication_commands = file.readlines()

    """    table_commands = (
            "CREATE TABLE IF NOT EXISTS entries (guestName VARCHAR(255), content VARCHAR(255), entryID SERIAL PRIMARY KEY);",
            "INSERT INTO entries (guestName, content) values ('first guest', 'I got here!');",
            "INSERT INTO entries (guestName, content) values ('second guest', 'Me too!');",
            )

        replication_commands = (
            "CREATE PUBLICATION datastream_publication FOR ALL TABLES;",
            "ALTER USER " + postgres_user + " with replication;",
            "SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('datastream_replication_slot', 'pgoutput');",
            "CREATE USER datastream_user WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD '" + database_password + "';",
            "GRANT SELECT ON ALL TABLES IN SCHEMA public TO datastream_user;",
            "GRANT USAGE ON SCHEMA public TO datastream_user;",
            "ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO datastream_user;"
            )
    """
    
    # You can get these through the DataStream UI.  Click through and press the button for the SQL to run.
    # CREATE PUBLICATION [MY_PUBLICATION] FOR ALL TABLES;
    # alter user <curr_user> with replication;
    # SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT ('[REPLICATION_SLOT_NAME]', 'pgoutput');
    # CREATE USER [MY_USER] WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD '[MY_PASSWORD]';
    # GRANT SELECT ON ALL TABLES IN SCHEMA [MY_SCHEMA] TO [MY_USER];
    # GRANT USAGE ON SCHEMA [MY_SCHEMA] TO [MY_USER];
    # ALTER DEFAULT PRIVILEGES IN SCHEMA [MY_SCHEMA] GRANT SELECT ON TABLES TO [MY_USER];    

    try:
        # Create table first in order to avoid "cannot create logical replication slot in transaction that has performed writes"
        table_cursor = conn.cursor()
        for sql in table_commands:
            print("SQL: ", sql)
            if sql.startswith("--"):
                continue
            if sql.strip() == "":
                continue
            table_cursor.execute(sql)
        table_cursor.close()
        conn.commit()

        # Run Datastream necessary commands (these change by database type)
        replication_cur = conn.cursor()
        for command in replication_commands:
            sql = command
            print("SQL: ", sql)
            if sql.startswith("--"):
                continue
            if sql.strip() == "":
                continue
            sql = sql.replace("<<POSTGRES_USER>>",postgres_user);
            sql = sql.replace("<<DATABASE_PASSWORD>>",database_password);
            replication_cur.execute(sql)
            conn.commit()
        replication_cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print("ERROR: ", error)
    finally:
        if conn is not None:
            conn.close()