public static void process()

in GettingStartedTable/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [83:126]


        public static void process(StreamExecutionEnvironment env, String kafkaTopic, String s3Path, Properties kafkaProperties)  {

            org.apache.flink.table.api.bridge.java.StreamTableEnvironment streamTableEnvironment = org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(
                    env, EnvironmentSettings.newInstance().useBlinkPlanner().build());

            //create the table
            final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
            consumer.setStartFromEarliest();
            //Obtain stream
            DataStream<StockRecord> events = env.addSource(consumer);

            Table table = streamTableEnvironment.fromDataStream(events);


            final Table filteredTable = table.
                    select(
                            $("event_time"), $("ticker"), $("price"),
                            dateFormat($("event_time"), "yyyy-MM-dd").as("dt"),
                            dateFormat($("event_time"), "HH").as("hr")
                    ).
                    where($("price").isGreater(50));


            final String s3Sink = "CREATE TABLE sink_table (" +
                    "event_time TIMESTAMP," +
                    "ticker STRING," +
                    "price DOUBLE," +
                    "dt STRING," +
                    "hr STRING" +
                    ")" +
                    " PARTITIONED BY (ticker,dt,hr)" +
                    " WITH" +
                    "(" +
                    " 'connector' = 'filesystem'," +
                    " 'path' = '" + s3Path + "'," +
                    " 'format' = 'json'" +
                    ") ";

            //send to s3
            streamTableEnvironment.executeSql(s3Sink);
            filteredTable.executeInsert("sink_table");


        }