public synchronized void updateMetadata()

in modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java [120:163]


    public synchronized void updateMetadata() {
        if (err != null)
            throw new IgniteException(err);

        // If there are no new records in topic, method KafkaConsumer#poll blocks up to the specified timeout.
        // In order to eliminate this, we compare current offsets with the offsets from the last metadata update
        // (stored in 'offsets' field). If there are no offsets changes, polling cycle is skipped.
        Map<TopicPartition, Long> offsets0 = cnsmr.endOffsets(parts, Duration.ofMillis(kafkaReqTimeout));

        if (!F.isEmpty(offsets0) && F.eqNotOrdered(offsets, offsets0)) {
            if (log.isDebugEnabled())
                log.debug("Offsets unchanged, poll skipped");

            return;
        }

        while (true) {
            ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));

            if (recs.count() == 0) {
                if (log.isDebugEnabled())
                    log.debug("Empty poll from meta topic");

                return;
            }

            if (log.isInfoEnabled())
                log.info("Polled from meta topic [rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']');

            for (ConsumerRecord<Void, byte[]> rec : recs) {
                Object data = IgniteUtils.fromBytes(rec.value());

                if (data instanceof BinaryMetadata)
                    registerBinaryMeta(ctx, log, (BinaryMetadata)data);
                else if (data instanceof TypeMapping)
                    registerMapping(ctx, log, (TypeMapping)data);
                else
                    throw new IllegalArgumentException("Unknown meta type[type=" + data + ']');
            }

            // Offsets updated only after commit.
            cnsmr.commitAsync(this);
        }
    }