in modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java [120:163]
public synchronized void updateMetadata() {
if (err != null)
throw new IgniteException(err);
// If there are no new records in topic, method KafkaConsumer#poll blocks up to the specified timeout.
// In order to eliminate this, we compare current offsets with the offsets from the last metadata update
// (stored in 'offsets' field). If there are no offsets changes, polling cycle is skipped.
Map<TopicPartition, Long> offsets0 = cnsmr.endOffsets(parts, Duration.ofMillis(kafkaReqTimeout));
if (!F.isEmpty(offsets0) && F.eqNotOrdered(offsets, offsets0)) {
if (log.isDebugEnabled())
log.debug("Offsets unchanged, poll skipped");
return;
}
while (true) {
ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
if (recs.count() == 0) {
if (log.isDebugEnabled())
log.debug("Empty poll from meta topic");
return;
}
if (log.isInfoEnabled())
log.info("Polled from meta topic [rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']');
for (ConsumerRecord<Void, byte[]> rec : recs) {
Object data = IgniteUtils.fromBytes(rec.value());
if (data instanceof BinaryMetadata)
registerBinaryMeta(ctx, log, (BinaryMetadata)data);
else if (data instanceof TypeMapping)
registerMapping(ctx, log, (TypeMapping)data);
else
throw new IllegalArgumentException("Unknown meta type[type=" + data + ']');
}
// Offsets updated only after commit.
cnsmr.commitAsync(this);
}
}