public static void process()

in GettingStartedTable/src/main/java/com/amazonaws/services/kinesisanalytics/StreamingJob.java [132:177]


        public static void process(StreamExecutionEnvironment env, String kafkaTopic, String s3Path, Properties kafkaProperties)  {

            org.apache.flink.table.api.bridge.java.StreamTableEnvironment streamTableEnvironment = org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(
                    env, EnvironmentSettings.newInstance().useBlinkPlanner().build());

            final String createTableStmt = "CREATE TABLE StockRecord " +
                    "(" +
                    "event_time TIMESTAMP," +
                    "ticker STRING," +
                    "price DOUBLE" +
                    ")" +
                    " WITH (" +
                    " 'connector' = 'kafka'," +
                    " 'topic' = '" + kafkaTopic + "'," +
                    " 'properties.bootstrap.servers' = '" + kafkaProperties.get("bootstrap.servers")
                    + "'," +
                    " 'properties.group.id' = 'testGroup'," +
                    " 'format' = 'json'," +
                    " 'scan.startup.mode' = 'earliest-offset'" +
                    ")";


            final String s3Sink = "CREATE TABLE sink_table (" +
                    "event_time TIMESTAMP," +
                    "ticker STRING," +
                    "price DOUBLE," +
                    "dt STRING," +
                    "hr STRING" +
                    ")" +
                    " PARTITIONED BY (ticker,dt,hr)" +
                    " WITH" +
                    "(" +
                    " 'connector' = 'filesystem'," +
                    " 'path' = '" + s3Path + "'," +
                    " 'format' = 'json'" +
                    ") ";


            streamTableEnvironment.executeSql(createTableStmt);
            streamTableEnvironment.executeSql(s3Sink);

            final String insertSql = "INSERT INTO sink_table SELECT event_time,ticker,price,DATE_FORMAT(event_time, 'yyyy-MM-dd') as dt, " +
                    "DATE_FORMAT(event_time, 'HH') as hh FROM StockRecord WHERE price > 50";
            streamTableEnvironment.executeSql(insertSql);

        }