private void populate()

in kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/StreamPopulator.java [129:217]


  private void populate() {
    long lastWatermark = 0;
    long lastWatermarkSentTime = 0;
    long watermarkBatchEventCount = 0;
    long statisticsBatchEventCount = 0;
    long statisticsLastOutputTimeslot = 0;

    TripEvent nextEvent = taxiEventReader.next();

    final long timeZeroSystem = System.currentTimeMillis();
    final long timeZeroLog = nextEvent.timestamp;

    LOG.info("starting to populate stream {}", streamName);

    while (true) {
      //determine system time, ie, how much time hast past since program invocation...
      double timeDeltaSystem = (System.currentTimeMillis() - timeZeroSystem) * speedupFactor;

      //determine event time, ie, how much time has passed according to the events that have been ingested to the Kinesis stream
      long timeDeltaLog = nextEvent.timestamp - timeZeroLog;

      double replayTimeGap = timeDeltaSystem - timeDeltaLog;

      if (replayTimeGap < 0) {
        //wait until event time has caught up with the system time
        try {
          long sleepTime = (long) Math.max(-replayTimeGap / speedupFactor, MIN_SLEEP_MILLIS);

          Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
          LOG.error(e.getMessage());
        }
      } else {
        //adapt the time of the event before ingestion into the stream
        nextEvent = TripEvent.adaptTime(nextEvent, adaptTimeOptionOption);

        //queue the next event for ingestion to the Kinesis stream through the KPL
        ListenableFuture<UserRecordResult> f = kinesisProducer.addUserRecord(
            streamName, Integer.toString(nextEvent.hashCode()), nextEvent.toByteBuffer());

        //monitor if the event has actually been sent and adapt the largest possible watermark value accordingly
        watermarkTracker.trackTimestamp(f, nextEvent);

        //block if too many events are buffered locally
        backpressureSemaphore.acquire(f);

        watermarkBatchEventCount++;
        statisticsBatchEventCount++;

        LOG.trace("sent event {}", nextEvent);

        if (taxiEventReader.hasNext()) {
          //pre-fetch next event
          nextEvent = taxiEventReader.next();
        } else {
          //terminate if there are no more events to replay
          break;
        }
      }

      //emit a watermark to every shard of the Kinesis stream every WATERMARK_MILLIS ms or WATERMARK_EVENT_COUNT events, whatever comes first
      if (System.currentTimeMillis() - lastWatermarkSentTime >= WATERMARK_MILLIS || watermarkBatchEventCount >= WATERMARK_EVENT_COUNT) {
        if (!noWatermark) {
          watermarkTracker.sentWatermark();
        }

        watermarkBatchEventCount = 0;
        lastWatermark = watermarkTracker.getCurrentWatermark();
        lastWatermarkSentTime = System.currentTimeMillis();
      }

      //output statistics every statisticsFrequencyMillies ms
      if ((System.currentTimeMillis() - timeZeroSystem) / statisticsFrequencyMillies != statisticsLastOutputTimeslot) {
        double statisticsBatchEventRate = Math.round(1000.0 * statisticsBatchEventCount / statisticsFrequencyMillies);
        long replayLag = Math.round(replayTimeGap / speedupFactor / 1000);

        LOG.info("all events with dropoff time before {} have been sent ({} events/sec, {} sec replay lag)",
            new DateTime(lastWatermark + 1), statisticsBatchEventRate, replayLag);

        statisticsBatchEventCount = 0;
        statisticsLastOutputTimeslot = (System.currentTimeMillis() - timeZeroSystem) / statisticsFrequencyMillies;
      }
    }

    LOG.info("all events have been sent");

    kinesisProducer.flushSync();
    kinesisProducer.destroy();
  }