in docker/data-generator/src/main/java/org/apache/flink/playground/datagen/Producer.java [47:72]
public void run() {
KafkaProducer<Long, Transaction> producer = new KafkaProducer<>(getProperties());
Throttler throttler = new Throttler(100);
TransactionSupplier transactions = new TransactionSupplier();
while (isRunning) {
Transaction transaction = transactions.get();
long millis = transaction.timestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
ProducerRecord<Long, Transaction> record =
new ProducerRecord<>(topic, null, millis, transaction.accountId, transaction);
producer.send(record);
try {
throttler.throttle();
} catch (InterruptedException e) {
isRunning = false;
}
}
producer.close();
}