####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#     https://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################

# Author:  Adam Paternostro
# Summary: Creates a Postgres Cloud SQL instance
#          Creates a database
#          Creates a table with some data
#          Creates a datastream job from Cloud SQL to BigQuery

# [START dag]
from google.cloud import storage
from datetime import datetime, timedelta
import requests
import sys
import os
import logging
import airflow
from airflow.operators import bash_operator
from airflow.utils import trigger_rule
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import json
from pathlib import Path
import psycopg2


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': None,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    'dagrun_timeout' : timedelta(minutes=60),
    }

project_id         = os.environ['ENV_PROJECT_ID'] 
project_number     = os.environ['ENV_PROJECT_NUMBER'] 
root_password      = os.environ['ENV_RANDOM_EXTENSION'] 
cloud_sql_region   = os.environ['ENV_CLOUD_SQL_REGION']
datastream_region  = os.environ['ENV_DATASTREAM_REGION']
bigquery_region    = os.environ['ENV_BIGQUERY_REGION'] 
code_bucket_name   = os.environ['ENV_CODE_BUCKET']
cloud_sql_zone     = os.environ['ENV_CLOUD_SQL_ZONE'] 

params_list = { 
    'project_id'        : project_id,
    'root_password'     : root_password, 
    'cloud_sql_region'  : cloud_sql_region, 
    'datastream_region' : datastream_region, 
    'datastream_region' : datastream_region, 
    'bigquery_region'   : bigquery_region,
    'project_number'    : project_number,
    'code_bucket_name'  : code_bucket_name,
    'cloud_sql_zone'    : cloud_sql_zone
    }


# Create the table
# Set the datastream replication items
def run_postgres_sql(database_password):
    print("start run_postgres_sql")
    ipAddress = Path('/home/airflow/gcs/data/postgres_private_ip_address.txt').read_text()
    ipAddress = ipAddress.replace("\n", "")
    print("ipAddress:", ipAddress)

    database_name = "demodb"
    postgres_user = "postgres"

    conn = psycopg2.connect(
    host=ipAddress,
    database=database_name,
    user=postgres_user,
    password=database_password)

    # Datastream failed to read from the PostgreSQL replication slot datastream_replication_slot. Make sure that the slot exists and that Datastream has the necessary permissions to access it.
    # Datastream failed to find the publication datastream_publication. Make sure that the publication exists and that Datastream has the necessary permissions to access it.
    with open('/home/airflow/gcs/data/postgres_create_schema.sql', 'r') as file:
        table_commands = file.readlines()

    with open('/home/airflow/gcs/data/postgres_create_datastream_replication.sql', 'r') as file:
        replication_commands = file.readlines()

    """    table_commands = (
            "CREATE TABLE IF NOT EXISTS entries (guestName VARCHAR(255), content VARCHAR(255), entryID SERIAL PRIMARY KEY);",
            "INSERT INTO entries (guestName, content) values ('first guest', 'I got here!');",
            "INSERT INTO entries (guestName, content) values ('second guest', 'Me too!');",
            )

        replication_commands = (
            "CREATE PUBLICATION datastream_publication FOR ALL TABLES;",
            "ALTER USER " + postgres_user + " with replication;",
            "SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('datastream_replication_slot', 'pgoutput');",
            "CREATE USER datastream_user WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD '" + database_password + "';",
            "GRANT SELECT ON ALL TABLES IN SCHEMA public TO datastream_user;",
            "GRANT USAGE ON SCHEMA public TO datastream_user;",
            "ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO datastream_user;"
            )
    """
    
    # You can get these through the DataStream UI.  Click through and press the button for the SQL to run.
    # CREATE PUBLICATION [MY_PUBLICATION] FOR ALL TABLES;
    # alter user <curr_user> with replication;
    # SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT ('[REPLICATION_SLOT_NAME]', 'pgoutput');
    # CREATE USER [MY_USER] WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD '[MY_PASSWORD]';
    # GRANT SELECT ON ALL TABLES IN SCHEMA [MY_SCHEMA] TO [MY_USER];
    # GRANT USAGE ON SCHEMA [MY_SCHEMA] TO [MY_USER];
    # ALTER DEFAULT PRIVILEGES IN SCHEMA [MY_SCHEMA] GRANT SELECT ON TABLES TO [MY_USER];    

    try:
        # Create table first in order to avoid "cannot create logical replication slot in transaction that has performed writes"
        table_cursor = conn.cursor()
        for sql in table_commands:
            print("SQL: ", sql)
            if sql.startswith("--"):
                continue
            if sql.strip() == "":
                continue
            table_cursor.execute(sql)
        table_cursor.close()
        conn.commit()

        # Run Datastream necessary commands (these change by database type)
        replication_cur = conn.cursor()
        for command in replication_commands:
            sql = command
            print("SQL: ", sql)
            if sql.startswith("--"):
                continue
            if sql.strip() == "":
                continue
            sql = sql.replace("<<POSTGRES_USER>>",postgres_user);
            sql = sql.replace("<<DATABASE_PASSWORD>>",database_password);
            replication_cur.execute(sql)
            conn.commit()
        replication_cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print("ERROR: ", error)
    finally:
        if conn is not None:
            conn.close()


with airflow.DAG('sample-datastream-private-ip-deploy',
                 default_args=default_args,
                 start_date=datetime(2021, 1, 1),
                 # Add the Composer "Data" directory which will hold the SQL/Bash scripts for deployment
                 template_searchpath=['/home/airflow/gcs/data'],
                 # Not scheduled, trigger only
                 schedule_interval=None) as dag:

    # NOTE: The service account of the Composer worker node must have access to run these commands

    # Create the Postgres Instance and Database
    create_datastream_postgres_database_task = bash_operator.BashOperator(
          task_id='create_datastream_postgres_database_task',
          bash_command='sample_datastream_private_ip_deploy_postgres.sh',
          params=params_list,
          dag=dag
          )

    run_postgres_sql_task = PythonOperator(
        task_id='run_postgres_sql_task',
        python_callable=run_postgres_sql,
        op_kwargs = { "database_password" : root_password },
        dag=dag
        )    

    # Configure datastream
    bash_create_datastream_task = bash_operator.BashOperator(
          task_id='bash_create_datastream_task',
          bash_command='sample_datastream_private_ip_deploy_datastream.sh',
          params=params_list,
          dag=dag
        )

    # DAG Graph
    create_datastream_postgres_database_task >> run_postgres_sql_task >> bash_create_datastream_task 

# [END dag]