in modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java [132:192]
protected void runAppliers() {
AtomicBoolean stopped = new AtomicBoolean();
Set<Integer> caches = null;
if (!F.isEmpty(streamerCfg.getCaches())) {
checkCaches(streamerCfg.getCaches());
caches = streamerCfg.getCaches().stream()
.map(CU::cacheId).collect(Collectors.toSet());
}
KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(
binaryContext(),
log,
kafkaProps,
streamerCfg
);
int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
int threadCnt = streamerCfg.getThreadCount();
int partPerApplier = kafkaParts / threadCnt;
for (int i = 0; i < threadCnt; i++) {
int from = i * partPerApplier;
int to = (i + 1) * partPerApplier;
if (i == threadCnt - 1)
to = kafkaParts;
KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
() -> eventsApplier(),
log,
kafkaProps,
streamerCfg.getTopic(),
kafkaPartsFrom + from,
kafkaPartsFrom + to,
caches,
streamerCfg.getMaxBatchSize(),
streamerCfg.getKafkaRequestTimeout(),
metaUpdr,
stopped
);
addAndStart("applier-thread-" + i, applier);
}
try {
for (Thread run: runners)
run.join();
}
catch (InterruptedException e) {
stopped.set(true);
appliers.forEach(U::closeQuiet);
log.warning("Kafka to Ignite streamer interrupted", e);
}
}