protected void runAppliers()

in modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java [132:192]


    protected void runAppliers() {
        AtomicBoolean stopped = new AtomicBoolean();

        Set<Integer> caches = null;

        if (!F.isEmpty(streamerCfg.getCaches())) {
            checkCaches(streamerCfg.getCaches());

            caches = streamerCfg.getCaches().stream()
                .map(CU::cacheId).collect(Collectors.toSet());
        }

        KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(
            binaryContext(),
            log,
            kafkaProps,
            streamerCfg
        );

        int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
        int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
        int threadCnt = streamerCfg.getThreadCount();

        int partPerApplier = kafkaParts / threadCnt;

        for (int i = 0; i < threadCnt; i++) {
            int from = i * partPerApplier;
            int to = (i + 1) * partPerApplier;

            if (i == threadCnt - 1)
                to = kafkaParts;

            KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
                () -> eventsApplier(),
                log,
                kafkaProps,
                streamerCfg.getTopic(),
                kafkaPartsFrom + from,
                kafkaPartsFrom + to,
                caches,
                streamerCfg.getMaxBatchSize(),
                streamerCfg.getKafkaRequestTimeout(),
                metaUpdr,
                stopped
            );

            addAndStart("applier-thread-" + i, applier);
        }

        try {
            for (Thread run: runners)
                run.join();
        }
        catch (InterruptedException e) {
            stopped.set(true);

            appliers.forEach(U::closeQuiet);

            log.warning("Kafka to Ignite streamer interrupted", e);
        }
    }