public static void main()

in src/main/java/com/amazonaws/services/kinesisanalytics/S3StreamingSinkJob.java [51:79]


    public static void main(String[] args) throws Exception {
        
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final ParameterTool params = ParameterTool.fromArgs(args);
        env.enableCheckpointing(10000); // Every 10 sec
        checkpointDir = params.get("checkpoint-dir");
        region = params.get("region");
        s3SinkPath = params.get("s3SinkPath"); 
        inputStreamName = params.get("inputStreamName"); 
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        
        DataStream<String> input = createSourceFromStaticConfig(env);

        ObjectMapper jsonParser = new ObjectMapper();

        input.map(value -> {
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            return new Tuple2<>(jsonNode.get("TICKER").asText(), jsonNode.get("PRICE").asDouble());
        }).returns(Types.TUPLE(Types.STRING, Types.DOUBLE))
                .keyBy(0) // Logically partition the stream per stock symbol
                .timeWindow(Time.seconds(10), Time.seconds(5))  // Sliding window definition
                .max(1) // Calculate mamximum price per stock over the window
                .setParallelism(8) // Set parallelism for the min operator
                .map(value -> value.f0 + "," + value.f1 + "," + value.f1.toString() + "\n")
                .addSink(createS3SinkFromStaticConfig()).name("S3_sink");
        env.execute("Flink S3 Streaming Sink Job");
    }