in kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/StreamPopulator.java [129:217]
private void populate() {
long lastWatermark = 0;
long lastWatermarkSentTime = 0;
long watermarkBatchEventCount = 0;
long statisticsBatchEventCount = 0;
long statisticsLastOutputTimeslot = 0;
TripEvent nextEvent = taxiEventReader.next();
final long timeZeroSystem = System.currentTimeMillis();
final long timeZeroLog = nextEvent.timestamp;
LOG.info("starting to populate stream {}", streamName);
while (true) {
//determine system time, ie, how much time hast past since program invocation...
double timeDeltaSystem = (System.currentTimeMillis() - timeZeroSystem) * speedupFactor;
//determine event time, ie, how much time has passed according to the events that have been ingested to the Kinesis stream
long timeDeltaLog = nextEvent.timestamp - timeZeroLog;
double replayTimeGap = timeDeltaSystem - timeDeltaLog;
if (replayTimeGap < 0) {
//wait until event time has caught up with the system time
try {
long sleepTime = (long) Math.max(-replayTimeGap / speedupFactor, MIN_SLEEP_MILLIS);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
LOG.error(e.getMessage());
}
} else {
//adapt the time of the event before ingestion into the stream
nextEvent = TripEvent.adaptTime(nextEvent, adaptTimeOptionOption);
//queue the next event for ingestion to the Kinesis stream through the KPL
ListenableFuture<UserRecordResult> f = kinesisProducer.addUserRecord(
streamName, Integer.toString(nextEvent.hashCode()), nextEvent.toByteBuffer());
//monitor if the event has actually been sent and adapt the largest possible watermark value accordingly
watermarkTracker.trackTimestamp(f, nextEvent);
//block if too many events are buffered locally
backpressureSemaphore.acquire(f);
watermarkBatchEventCount++;
statisticsBatchEventCount++;
LOG.trace("sent event {}", nextEvent);
if (taxiEventReader.hasNext()) {
//pre-fetch next event
nextEvent = taxiEventReader.next();
} else {
//terminate if there are no more events to replay
break;
}
}
//emit a watermark to every shard of the Kinesis stream every WATERMARK_MILLIS ms or WATERMARK_EVENT_COUNT events, whatever comes first
if (System.currentTimeMillis() - lastWatermarkSentTime >= WATERMARK_MILLIS || watermarkBatchEventCount >= WATERMARK_EVENT_COUNT) {
if (!noWatermark) {
watermarkTracker.sentWatermark();
}
watermarkBatchEventCount = 0;
lastWatermark = watermarkTracker.getCurrentWatermark();
lastWatermarkSentTime = System.currentTimeMillis();
}
//output statistics every statisticsFrequencyMillies ms
if ((System.currentTimeMillis() - timeZeroSystem) / statisticsFrequencyMillies != statisticsLastOutputTimeslot) {
double statisticsBatchEventRate = Math.round(1000.0 * statisticsBatchEventCount / statisticsFrequencyMillies);
long replayLag = Math.round(replayTimeGap / speedupFactor / 1000);
LOG.info("all events with dropoff time before {} have been sent ({} events/sec, {} sec replay lag)",
new DateTime(lastWatermark + 1), statisticsBatchEventRate, replayLag);
statisticsBatchEventCount = 0;
statisticsLastOutputTimeslot = (System.currentTimeMillis() - timeZeroSystem) / statisticsFrequencyMillies;
}
}
LOG.info("all events have been sent");
kinesisProducer.flushSync();
kinesisProducer.destroy();
}