public static void main()

in SlidingWindow/src/main/java/com/amazonaws/services/kinesisanalytics/SlidingWindowStreamingJobWithParallelism.java [63:82]


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> input = createSourceFromStaticConfig(env);
        ObjectMapper jsonParser = new ObjectMapper();
        input.map(value -> { // Parse the JSON
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            return new Tuple2<>(jsonNode.get("TICKER").toString(), jsonNode.get("PRICE").asDouble());
        }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE))
                .keyBy(0) // Logically partition the stream per stock symbol
                //.timeWindow(Time.seconds(10), Time.seconds(5)) // Sliding window definition (Flink 1.11)
		.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) //Flink 1.13
                .min(1) // Calculate minimum price per stock over the window
                .setParallelism(3) // Set parallelism for the min operator
                .map(value -> value.f0 + String.format(",%.2f", value.f1) + "\n")
                .addSink(createSinkFromStaticConfig());

        env.execute("Min Stock Price");
    }