in src/main/java/com/amazonaws/mskdatagen/producer/ApplyGenerator.java [94:138]
private void advanceStep() {
TopicSeqConfig topicSeqConfig = context.getTopicSeq().findFirst().orElse(null);
if (topicSeqConfig != null) {
Optional<GeneratorsConfigs> generatorsConfigs = context.getGenerators().filter(t -> t.getTopic().equals(topicSeqConfig.getTopic()))
.findFirst();
Long lastTs = context.getTimestampsConfig().getTimestamps().get(topicSeqConfig.getTopic());
Long throttle = generatorsConfigs.map(GeneratorsConfigs::getThrottleNs).orElse(0L);
long nextTs = lastTs + throttle;
long now = System.nanoTime();
if (now >= nextTs) {
List<String> dependencies = generatorsConfigs.map(GeneratorsConfigs::getDependencies).orElse(Collections.emptyList());
Function<DepsParameters, ResultGen> keyGenF = generatorsConfigs.map(GeneratorsConfigs::getKeyF)
.orElse(input -> new ResultGen(true));
Function<DepsParameters, ResultGen> valGenF = generatorsConfigs.map(GeneratorsConfigs::getValueF)
.orElse(input -> new ResultGen(true));
Map<ResultGen, ResultGen> resultGenResultGenMap = invokeGenerator(context, dependencies, keyGenF, valGenF);
Optional<Map.Entry<ResultGen, ResultGen>> successResult = resultGenResultGenMap.entrySet().stream()
.filter(result -> result.getKey().isSuccess() && result.getValue().isSuccess()).findFirst();
if (successResult.isPresent()) {
successResult(context, successResult.get(), topicSeqConfig.getTopic());
} else {
resultGenResultGenMap.forEach((key, value) -> {
String message = String.format("Failed topic: %s { key:%s, value: %s }", topicSeqConfig.getTopic(), key.isSuccess(), value.isSuccess());
log.error(message);
});
updateGeneratedState(context, "failed");
context.getContextMap().get(ConfigType.TOPIC_SEQ_CONFIG).remove(0);
}
} else {
if (log.isDebugEnabled()) {
log.debug(String.format("Throttled topic: %s", topicSeqConfig.getTopic()));
}
updateGeneratedState(context, "throttled");
context.getContextMap().get(ConfigType.TOPIC_SEQ_CONFIG).remove(0);
}
} else {
log.error("Can't find topic.");
updateGeneratedState(context, "drained");
}
}