def run_postgres_sql()

in cloud-composer/dags/sample-datastream-private-ip-generate-data.py [0:0]


def run_postgres_sql(database_password):
    print("start run_postgres_sql")
    ipAddress = Path('/home/airflow/gcs/data/postgres_private_ip_address.txt').read_text()
    ipAddress = ipAddress.replace("\n", "")
    print("ipAddress:", ipAddress)

    database_name = "demodb"
    postgres_user = "postgres"

    conn = psycopg2.connect(
    host=ipAddress,
    database=database_name,
    user=postgres_user,
    password=database_password)

    # create default config
    config_data = {
        "interation_count" : 0,
        "min_index" : 1,
        "max_index" : 10010000,
        "current_index": 1
    }

    if os.path.exists('/home/airflow/gcs/data/datastream-private-ip-generate-data.json'):
        # read the data
        datastream_cdc_config_json = Path('/home/airflow/gcs/data/datastream-private-ip-generate-data.json').read_text()
        config_data = json.loads(datastream_cdc_config_json)
    else:
        # write the default config
        with open('/home/airflow/gcs/data/datastream-private-ip-generate-data.json', 'w') as f:
            json.dump(config_data, f)

    try:
        # This runs for 4 hours (14400 seconds)
        cur = conn.cursor()
        client = bigquery.Client()
        
        # Loop for 4 hours
        start_time = datetime.now()
        loop_count = 0
        while (datetime.now() - start_time).total_seconds() < 14400 :
            print("interation_count:",config_data["interation_count"])
            print("min_index:",config_data["min_index"])
            print("max_index:",config_data["max_index"])
            print("current_index:",config_data["current_index"])

            for index in range(config_data["current_index"], config_data["max_index"], 50):
                loop_count = loop_count + 1
                bigquery_sql = "SELECT sql_statement, table_name " + \
                                        " FROM taxi_dataset.datastream_cdc_data " + \
                                        "WHERE execution_order BETWEEN {} AND {};".format(index, index+49)
                # print("bigquery_sql: ", bigquery_sql)
                query_job = client.query(bigquery_sql)
                results = query_job.result()  # Waits for job to complete.

                for row in results:
                    # print("row.table_name:", row.table_name)
                    # print("row.sql_statement:", row.sql_statement)

                    if config_data["interation_count"] == 0:
                        # okay to execute
                        cur.execute(row.sql_statement)
                    else:
                        if row.table_name != "driver":
                            # okay to execute (we do not want to create duplicate drivers)
                            cur.execute(row.sql_statement)

                if index+49 >= config_data["max_index"]:
                    config_data["interation_count"] = config_data["interation_count"] + 1
                    config_data["current_index"] = 1

                conn.commit()
                config_data["current_index"] = index+49
                # Write the file ever so often
                if loop_count % 100 == 0:
                    print("bigquery_sql: ", bigquery_sql)
                    print("loop_count: ", loop_count)
                    print("config_data[current_index]: ", config_data["current_index"])
                    with open('/home/airflow/gcs/data/datastream-private-ip-generate-data.json', 'w') as f:
                        json.dump(config_data, f)                    
                
                if (datetime.now() - start_time).total_seconds() > 14400:
                    break

        # Save    
        with open('/home/airflow/gcs/data/datastream-private-ip-generate-data.json', 'w') as f:
            json.dump(config_data, f)
        
        cur.close()
        conn.commit()

        # Save our state
        with open('/home/airflow/gcs/data/datastream-private-ip-generate-data.json', 'w') as f:
            json.dump(config_data, f)

    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()