def main()

in PythonKafkaSink/main.py [0:0]


def main():
    INPUT_PROPERTY_GROUP_KEY = "producer.config.0"
    CONSUMER_PROPERTY_GROUP_KEY = "consumer.config.0"

    INPUT_TOPIC_KEY = "input.topic.name"
    OUTPUT_TOPIC_KEY = "output.topic.name"
    OUTPUT_BUCKET_KEY = "output.s3.bucket"
    BROKER_KEY = "bootstrap.servers"

    props = app_properties()

    input_property_map = property_map(props, INPUT_PROPERTY_GROUP_KEY)
    output_property_map = property_map(props, CONSUMER_PROPERTY_GROUP_KEY)

    input_stream = input_property_map[INPUT_TOPIC_KEY]
    broker = input_property_map[BROKER_KEY]

    output_stream_sns = output_property_map[OUTPUT_TOPIC_KEY]
    output_s3_bucket = output_property_map[OUTPUT_BUCKET_KEY]

    input_table = "input_table"
    output_table_sns = "output_table_sns"
    output_table_s3 = "output_table_s3"

    table_env.execute_sql(create_table_input(input_table, input_stream, broker))
    table_env.execute_sql(create_table_output_kafka(output_table_sns, output_stream_sns, broker))
    table_env.execute_sql(create_table_output_s3(output_table_s3, output_s3_bucket))

    statement_set.add_insert_sql(insert_stream_sns(input_table, output_table_sns))
    statement_set.add_insert_sql(insert_stream_s3(input_table, output_table_s3))

    statement_set.execute()