private void advanceStep()

in src/main/java/com/amazonaws/mskdatagen/producer/ApplyGenerator.java [94:138]


    private void advanceStep() {
        TopicSeqConfig topicSeqConfig = context.getTopicSeq().findFirst().orElse(null);
        if (topicSeqConfig != null) {
            Optional<GeneratorsConfigs> generatorsConfigs = context.getGenerators().filter(t -> t.getTopic().equals(topicSeqConfig.getTopic()))
                    .findFirst();
            Long lastTs = context.getTimestampsConfig().getTimestamps().get(topicSeqConfig.getTopic());
            Long throttle = generatorsConfigs.map(GeneratorsConfigs::getThrottleNs).orElse(0L);
            long nextTs = lastTs + throttle;
            long now = System.nanoTime();

            if (now >= nextTs) {
                List<String> dependencies = generatorsConfigs.map(GeneratorsConfigs::getDependencies).orElse(Collections.emptyList());
                Function<DepsParameters, ResultGen> keyGenF = generatorsConfigs.map(GeneratorsConfigs::getKeyF)
                        .orElse(input -> new ResultGen(true));
                Function<DepsParameters, ResultGen> valGenF = generatorsConfigs.map(GeneratorsConfigs::getValueF)
                        .orElse(input -> new ResultGen(true));

                Map<ResultGen, ResultGen> resultGenResultGenMap = invokeGenerator(context, dependencies, keyGenF, valGenF);
                Optional<Map.Entry<ResultGen, ResultGen>> successResult = resultGenResultGenMap.entrySet().stream()
                        .filter(result -> result.getKey().isSuccess() && result.getValue().isSuccess()).findFirst();

                if (successResult.isPresent()) {
                    successResult(context, successResult.get(), topicSeqConfig.getTopic());
                } else {
                    resultGenResultGenMap.forEach((key, value) -> {
                        String message = String.format("Failed topic: %s { key:%s, value: %s }", topicSeqConfig.getTopic(), key.isSuccess(), value.isSuccess());
                        log.error(message);
                    });

                    updateGeneratedState(context, "failed");
                    context.getContextMap().get(ConfigType.TOPIC_SEQ_CONFIG).remove(0);
                }
            } else {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Throttled topic: %s", topicSeqConfig.getTopic()));
                }

                updateGeneratedState(context, "throttled");
                context.getContextMap().get(ConfigType.TOPIC_SEQ_CONFIG).remove(0);
            }
        } else {
            log.error("Can't find topic.");
            updateGeneratedState(context, "drained");
        }
    }