public static void main()

in src/main/java/com/amazonaws/kda/flink/starterkit/SessionProcessor.java [50:104]


	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		ParameterTool parameter;
		if (env instanceof LocalStreamEnvironment) {
			parameter = ParameterTool.fromArgs(args);
		} else {
			// read properties from Kinesis Data Analytics environment
			Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
			Properties flinkProperties = applicationProperties.get("FlinkAppProperties");
			if (flinkProperties == null) {
				throw new RuntimeException("Unable to load properties from Group ID FlinkAppProperties.");
			}
			parameter = ParameterToolUtils.fromApplicationProperties(flinkProperties);
		}

		if (!validateRuntimeProperties(parameter))
			throw new RuntimeException(
					"Runtime properties are invalid. Will not proceed to start Kinesis Analytics Application");
		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
		env.registerType(Event.class);
		DataStream<String> stream = createKinesisSource(env, parameter);
		log.info("Kinesis stream created.");

		ObjectMapper objectMapper = new ObjectMapper();
		KeyedStream<Event, String> keyedStream = stream.map(record -> {
			try {
				return objectMapper.readValue(record, Event.class);
			} catch (Exception e) {
				log.error("Exception in parsing the input records to Event POJO. "
						+ "Please make sure the input record structure is compatible with the POJO. Input record: "
						+ record);
				return null;
			}
		}).filter(Objects::nonNull).keyBy(Event::getSession_id);

		/**
		 * EventTimeSessionWindows - The timestamp when the event occurred. This is also
		 * sometimes called the client-side time.
		 * 
		 * Ingest time – The timestamp of when record was added to the streaming source.
		 * Amazon Kinesis Data Streams includes a field called APPROXIMATE_ARRIVAL_TIME
		 * in every record that provides this timestamp. This is also sometimes referred
		 * to as the server-side time.
		 * 
		 * Source:
		 * https://docs.aws.amazon.com/kinesisanalytics/latest/dev/timestamps-rowtime-concepts.html
		 */
		long timeout = Long.parseLong(parameter.get("session_time_out_in_minutes"));
		DataStream<String> sessionStream = keyedStream
				.window(ProcessingTimeSessionWindows.withGap(Time.minutes(timeout))).aggregate(new Aggregator())
				.name("session_stream");
		sessionStream.addSink(createS3Sink(parameter)).name("session_processor_sink");
		log.info("S3 Sink added.");
		env.execute("Kinesis Data Analytics Flink Application with  Session Window and Aggregate Function");
	}