in PythonKafkaSink/main.py [0:0]
def main():
INPUT_PROPERTY_GROUP_KEY = "producer.config.0"
CONSUMER_PROPERTY_GROUP_KEY = "consumer.config.0"
INPUT_TOPIC_KEY = "input.topic.name"
OUTPUT_TOPIC_KEY = "output.topic.name"
OUTPUT_BUCKET_KEY = "output.s3.bucket"
BROKER_KEY = "bootstrap.servers"
props = app_properties()
input_property_map = property_map(props, INPUT_PROPERTY_GROUP_KEY)
output_property_map = property_map(props, CONSUMER_PROPERTY_GROUP_KEY)
input_stream = input_property_map[INPUT_TOPIC_KEY]
broker = input_property_map[BROKER_KEY]
output_stream_sns = output_property_map[OUTPUT_TOPIC_KEY]
output_s3_bucket = output_property_map[OUTPUT_BUCKET_KEY]
input_table = "input_table"
output_table_sns = "output_table_sns"
output_table_s3 = "output_table_s3"
table_env.execute_sql(create_table_input(input_table, input_stream, broker))
table_env.execute_sql(create_table_output_kafka(output_table_sns, output_stream_sns, broker))
table_env.execute_sql(create_table_output_s3(output_table_s3, output_s3_bucket))
statement_set.add_insert_sql(insert_stream_sns(input_table, output_table_sns))
statement_set.add_insert_sql(insert_stream_s3(input_table, output_table_s3))
statement_set.execute()