public void onCompletion()

in connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java [448:633]


        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
            if (error != null) {
                log.error("Unexpected in consumer callback for KafkaConfigBackingStore: ", error);
                return;
            }

            final SchemaAndValue value;
            try {
                value = converter.toConnectData(topic, record.value());
            } catch (DataException e) {
                log.error("Failed to convert config data to Kafka Connect format: ", e);
                return;
            }
            // Make the recorded offset match the API used for positions in the consumer -- return the offset of the
            // *next record*, not the last one consumed.
            offset = record.offset() + 1;

            if (record.key().startsWith(TARGET_STATE_PREFIX)) {
                String connectorName = record.key().substring(TARGET_STATE_PREFIX.length());
                boolean removed = false;
                synchronized (lock) {
                    if (value.value() == null) {
                        // When connector configs are removed, we also write tombstones for the target state.
                        log.debug("Removed target state for connector {} due to null value in topic.", connectorName);
                        connectorTargetStates.remove(connectorName);
                        removed = true;

                        // If for some reason we still have configs for the connector, add back the default
                        // STARTED state to ensure each connector always has a valid target state.
                        if (connectorConfigs.containsKey(connectorName))
                            connectorTargetStates.put(connectorName, TargetState.STARTED);
                    } else {
                        if (!(value.value() instanceof Map)) {
                            log.error("Found target state ({}) in wrong format: {}",  record.key(), value.value().getClass());
                            return;
                        }
                        Object targetState = ((Map<String, Object>) value.value()).get("state");
                        if (!(targetState instanceof String)) {
                            log.error("Invalid data for target state for connector ({}): 'state' field should be a Map but is {}",
                                    connectorName, targetState == null ? null : targetState.getClass());
                            return;
                        }

                        try {
                            TargetState state = TargetState.valueOf((String) targetState);
                            log.debug("Setting target state for connector {} to {}", connectorName, targetState);
                            connectorTargetStates.put(connectorName, state);
                        } catch (IllegalArgumentException e) {
                            log.error("Invalid target state for connector ({}): {}", connectorName, targetState);
                            return;
                        }
                    }
                }

                // Note that we do not notify the update listener if the target state has been removed.
                // Instead we depend on the removal callback of the connector config itself to notify the worker.
                if (started && !removed)
                    updateListener.onConnectorTargetStateChange(connectorName);

            } else if (record.key().startsWith(CONNECTOR_PREFIX)) {
                String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
                boolean removed = false;
                synchronized (lock) {
                    if (value.value() == null) {
                        // Connector deletion will be written as a null value
                        log.info("Removed connector " + connectorName + " due to null configuration. This is usually intentional and does not indicate an issue.");
                        connectorConfigs.remove(connectorName);
                        removed = true;
                    } else {
                        // Connector configs can be applied and callbacks invoked immediately
                        if (!(value.value() instanceof Map)) {
                            log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
                            return;
                        }
                        Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
                        if (!(newConnectorConfig instanceof Map)) {
                            log.error("Invalid data for connector config ({}): properties field should be a Map but is {}", connectorName,
                                    newConnectorConfig == null ? null : newConnectorConfig.getClass());
                            return;
                        }
                        log.debug("Updating configuration for connector " + connectorName + " configuration: " + newConnectorConfig);
                        connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);

                        // Set the initial state of the connector to STARTED, which ensures that any connectors
                        // which were created with 0.9 Connect will be initialized in the STARTED state.
                        if (!connectorTargetStates.containsKey(connectorName))
                            connectorTargetStates.put(connectorName, TargetState.STARTED);
                    }
                }
                if (started) {
                    if (removed)
                        updateListener.onConnectorConfigRemove(connectorName);
                    else
                        updateListener.onConnectorConfigUpdate(connectorName);
                }
            } else if (record.key().startsWith(TASK_PREFIX)) {
                synchronized (lock) {
                    ConnectorTaskId taskId = parseTaskId(record.key());
                    if (taskId == null) {
                        log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key");
                        return;
                    }
                    if (!(value.value() instanceof Map)) {
                        log.error("Ignoring task configuration for task " + taskId + " because it is in the wrong format: " + value.value());
                        return;
                    }

                    Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
                    if (!(newTaskConfig instanceof Map)) {
                        log.error("Invalid data for task config (" + taskId + "): properties filed should be a Map but is " + newTaskConfig.getClass());
                        return;
                    }

                    Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(taskId.connector());
                    if (deferred == null) {
                        deferred = new HashMap<>();
                        deferredTaskUpdates.put(taskId.connector(), deferred);
                    }
                    log.debug("Storing new config for task " + taskId + " this will wait for a commit message before the new config will take effect. New config: " + newTaskConfig);
                    deferred.put(taskId, (Map<String, String>) newTaskConfig);
                }
            } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
                String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length());
                List<ConnectorTaskId> updatedTasks = new ArrayList<>();
                synchronized (lock) {
                    // Apply any outstanding deferred task updates for the given connector. Note that just because we
                    // encounter a commit message does not mean it will result in consistent output. In particular due to
                    // compaction, there may be cases where . For example if we have the following sequence of writes:
                    //
                    // 1. Write connector "foo"'s config
                    // 2. Write connector "foo", task 1's config <-- compacted
                    // 3. Write connector "foo", task 2's config
                    // 4. Write connector "foo" task commit message
                    // 5. Write connector "foo", task 1's config
                    // 6. Write connector "foo", task 2's config
                    // 7. Write connector "foo" task commit message
                    //
                    // then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied
                    // "foo" will not have a complete set of configs. Only when message 7 is applied will the complete
                    // configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that
                    // only 5 was written, then there may be nothing that will finish writing the configs and get the
                    // log back into a consistent state.
                    //
                    // It is expected that the user of this class (i.e., the Herder) will take the necessary action to
                    // resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
                    // exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
                    if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
                        log.error("Ignoring connector tasks configuration commit for connector " + connectorName + " because it is in the wrong format: " + value.value());
                        return;
                    }
                    Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);

                    int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));

                    // Validate the configs we're supposed to update to ensure we're getting a complete configuration
                    // update of all tasks that are expected based on the number of tasks in the commit message.
                    Set<Integer> taskIdSet = taskIds(connectorName, deferred);
                    if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
                        // Given the logic for writing commit messages, we should only hit this condition due to compacted
                        // historical data, in which case we would not have applied any updates yet and there will be no
                        // task config data already committed for the connector, so we shouldn't have to clear any data
                        // out. All we need to do is add the flag marking it inconsistent.
                        log.debug("We have an incomplete set of task configs for connector " + connectorName + " probably due to compaction. So we are not doing anything with the new configuration.");
                        inconsistent.add(connectorName);
                    } else {
                        if (deferred != null) {
                            taskConfigs.putAll(deferred);
                            updatedTasks.addAll(taskConfigs.keySet());
                        }
                        inconsistent.remove(connectorName);
                    }
                    // Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
                    // update, then we need to see a completely fresh set of configs after this commit message, so we don't
                    // want any of these outdated configs
                    if (deferred != null)
                        deferred.clear();

                    connectorTaskCounts.put(connectorName, newTaskCount);
                }

                if (started)
                    updateListener.onTaskConfigUpdate(updatedTasks);
            } else {
                log.error("Discarding config update record with invalid key: " + record.key());
            }
        }