in CloudWatchSink/src/main/java/com/amazonaws/services/kinesisanalytics/sink/CloudWatchLogSink.java [49:62]
public void invoke(String message, Context context) {
logEvents.add(new InputLogEvent().withMessage(message).withTimestamp(System.currentTimeMillis()));
if (logEvents.size() >= MAX_BATCH_SIZE || lastFlushTimeMillis + MAX_BUFFER_TIME_MILLIS <= System.currentTimeMillis()) {
// flush the messages
PutLogEventsRequest putLogEventsRequest = new PutLogEventsRequest()
.withLogEvents(logEvents)
.withLogGroupName(logGroupName)
.withLogStreamName(logStreamName)
.withSequenceToken(getUploadSequenceToken());
awsLogsClient.putLogEvents(putLogEventsRequest);
lastFlushTimeMillis = System.currentTimeMillis();
logEvents.clear();
}
}