in python/S3Sink/streaming-file-sink.py [0:0]
def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
sink_property_group_key = "sink.config.0"
input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "flink.stream.initpos"
output_sink_key = "output.bucket.name"
# tables
input_table_name = "input_table"
output_table_name = "output_table"
# get application properties
props = get_application_properties()
input_property_map = property_map(props, input_property_group_key)
output_property_map = property_map(props, sink_property_group_key)
input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]
output_bucket_name = output_property_map[output_sink_key]
# 2. Creates a source table from a Kinesis Data Stream
table_env.execute_sql(
create_source_table(
input_table_name, input_stream, input_region, stream_initpos
)
)
# 3. Creates a sink table writing to an S3 Bucket
create_sink = create_sink_table(
output_table_name, output_bucket_name
)
table_env.execute_sql(create_sink)
# 4. Queries from the Source Table and creates a tumbling window over 1 minute to calculate the average PRICE
# over the window.
tumbling_window_table = count_by_word(input_table_name)
# 5. These tumbling windows are inserted into the sink table (S3)
tumbling_window_table.execute_insert(output_table_name).wait()
statement_set.execute()