in GettingStartedTable/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [132:177]
public static void process(StreamExecutionEnvironment env, String kafkaTopic, String s3Path, Properties kafkaProperties) {
org.apache.flink.table.api.bridge.java.StreamTableEnvironment streamTableEnvironment = org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
final String createTableStmt = "CREATE TABLE StockRecord " +
"(" +
"event_time TIMESTAMP," +
"ticker STRING," +
"price DOUBLE" +
")" +
" WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = '" + kafkaTopic + "'," +
" 'properties.bootstrap.servers' = '" + kafkaProperties.get("bootstrap.servers")
+ "'," +
" 'properties.group.id' = 'testGroup'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
final String s3Sink = "CREATE TABLE sink_table (" +
"event_time TIMESTAMP," +
"ticker STRING," +
"price DOUBLE," +
"dt STRING," +
"hr STRING" +
")" +
" PARTITIONED BY (ticker,dt,hr)" +
" WITH" +
"(" +
" 'connector' = 'filesystem'," +
" 'path' = '" + s3Path + "'," +
" 'format' = 'json'" +
") ";
streamTableEnvironment.executeSql(createTableStmt);
streamTableEnvironment.executeSql(s3Sink);
final String insertSql = "INSERT INTO sink_table SELECT event_time,ticker,price,DATE_FORMAT(event_time, 'yyyy-MM-dd') as dt, " +
"DATE_FORMAT(event_time, 'HH') as hh FROM StockRecord WHERE price > 50";
streamTableEnvironment.executeSql(insertSql);
}