public void run()

in docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java [47:72]


  public void run() {
    KafkaProducer<Long, Transaction> producer = new KafkaProducer<>(getProperties());

    Throttler throttler = new Throttler(100);

    TransactionSupplier transactions = new TransactionSupplier();

    while (isRunning) {

      Transaction transaction = transactions.get();

      long millis = transaction.timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();

      ProducerRecord<Long, Transaction> record =
          new ProducerRecord<>(topic, null, millis, transaction.accountId, transaction);
      producer.send(record);

      try {
        throttler.throttle();
      } catch (InterruptedException e) {
        isRunning = false;
      }
    }

    producer.close();
  }