public static void main()

in CloudWatchSink/src/main/java/com/amazonaws/services/kinesisanalytics/CustomSinkStreamingJob.java [81:100]


    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        setupLogGroupAndStream(region, CLOUD_WATCH_LOG_GROUP, CLOUD_WATCH_LOG_STREAM);

        DataStream<String> input = createSourceFromStaticConfig(env);
        ObjectMapper jsonParser = new ObjectMapper();
        input.map(value -> {
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            return new Tuple2<>(jsonNode.get("TICKER").asText(), jsonNode.get("PRICE").asDouble());
        }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE))
                .keyBy(0)
                .timeWindow(Time.seconds(10), Time.seconds(5))
                .max(1)
                .map(value -> value.f0 + ":  max - " + value.f1.toString() + "\n")
                .addSink(new CloudWatchLogSink(region, CLOUD_WATCH_LOG_GROUP, CLOUD_WATCH_LOG_STREAM));

        env.execute("Max Stock Price");
    }