in src/main/java/com/amazonaws/kda/flink/starterkit/SessionProcessor.java [50:104]
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameter;
if (env instanceof LocalStreamEnvironment) {
parameter = ParameterTool.fromArgs(args);
} else {
// read properties from Kinesis Data Analytics environment
Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
Properties flinkProperties = applicationProperties.get("FlinkAppProperties");
if (flinkProperties == null) {
throw new RuntimeException("Unable to load properties from Group ID FlinkAppProperties.");
}
parameter = ParameterToolUtils.fromApplicationProperties(flinkProperties);
}
if (!validateRuntimeProperties(parameter))
throw new RuntimeException(
"Runtime properties are invalid. Will not proceed to start Kinesis Analytics Application");
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.registerType(Event.class);
DataStream<String> stream = createKinesisSource(env, parameter);
log.info("Kinesis stream created.");
ObjectMapper objectMapper = new ObjectMapper();
KeyedStream<Event, String> keyedStream = stream.map(record -> {
try {
return objectMapper.readValue(record, Event.class);
} catch (Exception e) {
log.error("Exception in parsing the input records to Event POJO. "
+ "Please make sure the input record structure is compatible with the POJO. Input record: "
+ record);
return null;
}
}).filter(Objects::nonNull).keyBy(Event::getSession_id);
/**
* EventTimeSessionWindows - The timestamp when the event occurred. This is also
* sometimes called the client-side time.
*
* Ingest time – The timestamp of when record was added to the streaming source.
* Amazon Kinesis Data Streams includes a field called APPROXIMATE_ARRIVAL_TIME
* in every record that provides this timestamp. This is also sometimes referred
* to as the server-side time.
*
* Source:
* https://docs.aws.amazon.com/kinesisanalytics/latest/dev/timestamps-rowtime-concepts.html
*/
long timeout = Long.parseLong(parameter.get("session_time_out_in_minutes"));
DataStream<String> sessionStream = keyedStream
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(timeout))).aggregate(new Aggregator())
.name("session_stream");
sessionStream.addSink(createS3Sink(parameter)).name("session_processor_sink");
log.info("S3 Sink added.");
env.execute("Kinesis Data Analytics Flink Application with Session Window and Aggregate Function");
}