in src/main/java/com/amazonaws/mskdatagen/producer/ApplyGenerator.java [68:92]
private void maybeBackoff(int throttledAttempts) {
long count = context.getGenerators().count();
if (throttledAttempts > 0 && throttledAttempts % count == 0) {
List<Long> nextTimestamps = new ArrayList<>();
Map<String, Long> timestamps = context.getTimestampsConfig().getTimestamps();
for (Map.Entry<String, Long> timestamp : timestamps.entrySet()) {
nextTimestamps.add(context.getGenerators()
.filter(t -> t.getTopic().equals(timestamp.getKey()))
.findFirst().map(GeneratorsConfigs::getThrottleNs)
.map(ts -> ts + timestamp.getValue()).orElse(timestamp.getValue()));
}
long earliestTs = Collections.min(nextTimestamps);
long now = System.nanoTime();
long deltaMs = (long) (((earliestTs - now) < 0 ? 0 : (earliestTs - now)) * 0.000001);
try {
Thread.sleep(deltaMs);
} catch (InterruptedException e) {
log.error("Throttled Interrupted Exception.", e);
// Restore interrupted state...
Thread.currentThread().interrupt();
}
}
}