in GettingStartedTable/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [83:126]
public static void process(StreamExecutionEnvironment env, String kafkaTopic, String s3Path, Properties kafkaProperties) {
org.apache.flink.table.api.bridge.java.StreamTableEnvironment streamTableEnvironment = org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
//create the table
final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
consumer.setStartFromEarliest();
//Obtain stream
DataStream<StockRecord> events = env.addSource(consumer);
Table table = streamTableEnvironment.fromDataStream(events);
final Table filteredTable = table.
select(
$("event_time"), $("ticker"), $("price"),
dateFormat($("event_time"), "yyyy-MM-dd").as("dt"),
dateFormat($("event_time"), "HH").as("hr")
).
where($("price").isGreater(50));
final String s3Sink = "CREATE TABLE sink_table (" +
"event_time TIMESTAMP," +
"ticker STRING," +
"price DOUBLE," +
"dt STRING," +
"hr STRING" +
")" +
" PARTITIONED BY (ticker,dt,hr)" +
" WITH" +
"(" +
" 'connector' = 'filesystem'," +
" 'path' = '" + s3Path + "'," +
" 'format' = 'json'" +
") ";
//send to s3
streamTableEnvironment.executeSql(s3Sink);
filteredTable.executeInsert("sink_table");
}