in pyflink-examples/StreamingFileSink/streaming-file-sink.py [0:0]
def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
sink_property_group_key = "sink.config.0"
input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "flink.stream.initpos"
output_sink_key = "output.bucket.name"
# tables
input_table_name = "input_table"
output_table_name = "output_table"
# get application properties
props = get_application_properties()
input_property_map = property_map(props, input_property_group_key)
output_property_map = property_map(props, sink_property_group_key)
input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]
output_bucket_name = output_property_map[output_sink_key]
# 2. Creates a source table from a Kinesis Data Stream
table_env.execute_sql(
create_source_table(
input_table_name, input_stream, input_region, stream_initpos
)
)
# 3. Creates a sink table writing to an S3 Bucket
create_sink = create_sink_table(
output_table_name, output_bucket_name
)
table_env.execute_sql(create_sink)
# 4. Queries from the Source Table and creates a tumbling window over 10 seconds to calculate the cumulative price
# over the window.
tumbling_window_table = perform_tumbling_window_aggregation(input_table_name)
table_env.create_temporary_view("tumbling_window_table", tumbling_window_table)
# 5. These tumbling windows are inserted into the sink table
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
.format(output_table_name, "tumbling_window_table"))
if is_local:
table_result.wait()
else:
# get job status through TableResult
print(table_result.get_job_client().get_job_status())