in connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java [448:633]
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
if (error != null) {
log.error("Unexpected in consumer callback for KafkaConfigBackingStore: ", error);
return;
}
final SchemaAndValue value;
try {
value = converter.toConnectData(topic, record.value());
} catch (DataException e) {
log.error("Failed to convert config data to Kafka Connect format: ", e);
return;
}
// Make the recorded offset match the API used for positions in the consumer -- return the offset of the
// *next record*, not the last one consumed.
offset = record.offset() + 1;
if (record.key().startsWith(TARGET_STATE_PREFIX)) {
String connectorName = record.key().substring(TARGET_STATE_PREFIX.length());
boolean removed = false;
synchronized (lock) {
if (value.value() == null) {
// When connector configs are removed, we also write tombstones for the target state.
log.debug("Removed target state for connector {} due to null value in topic.", connectorName);
connectorTargetStates.remove(connectorName);
removed = true;
// If for some reason we still have configs for the connector, add back the default
// STARTED state to ensure each connector always has a valid target state.
if (connectorConfigs.containsKey(connectorName))
connectorTargetStates.put(connectorName, TargetState.STARTED);
} else {
if (!(value.value() instanceof Map)) {
log.error("Found target state ({}) in wrong format: {}", record.key(), value.value().getClass());
return;
}
Object targetState = ((Map<String, Object>) value.value()).get("state");
if (!(targetState instanceof String)) {
log.error("Invalid data for target state for connector ({}): 'state' field should be a Map but is {}",
connectorName, targetState == null ? null : targetState.getClass());
return;
}
try {
TargetState state = TargetState.valueOf((String) targetState);
log.debug("Setting target state for connector {} to {}", connectorName, targetState);
connectorTargetStates.put(connectorName, state);
} catch (IllegalArgumentException e) {
log.error("Invalid target state for connector ({}): {}", connectorName, targetState);
return;
}
}
}
// Note that we do not notify the update listener if the target state has been removed.
// Instead we depend on the removal callback of the connector config itself to notify the worker.
if (started && !removed)
updateListener.onConnectorTargetStateChange(connectorName);
} else if (record.key().startsWith(CONNECTOR_PREFIX)) {
String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
boolean removed = false;
synchronized (lock) {
if (value.value() == null) {
// Connector deletion will be written as a null value
log.info("Removed connector " + connectorName + " due to null configuration. This is usually intentional and does not indicate an issue.");
connectorConfigs.remove(connectorName);
removed = true;
} else {
// Connector configs can be applied and callbacks invoked immediately
if (!(value.value() instanceof Map)) {
log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
return;
}
Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
if (!(newConnectorConfig instanceof Map)) {
log.error("Invalid data for connector config ({}): properties field should be a Map but is {}", connectorName,
newConnectorConfig == null ? null : newConnectorConfig.getClass());
return;
}
log.debug("Updating configuration for connector " + connectorName + " configuration: " + newConnectorConfig);
connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
// Set the initial state of the connector to STARTED, which ensures that any connectors
// which were created with 0.9 Connect will be initialized in the STARTED state.
if (!connectorTargetStates.containsKey(connectorName))
connectorTargetStates.put(connectorName, TargetState.STARTED);
}
}
if (started) {
if (removed)
updateListener.onConnectorConfigRemove(connectorName);
else
updateListener.onConnectorConfigUpdate(connectorName);
}
} else if (record.key().startsWith(TASK_PREFIX)) {
synchronized (lock) {
ConnectorTaskId taskId = parseTaskId(record.key());
if (taskId == null) {
log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key");
return;
}
if (!(value.value() instanceof Map)) {
log.error("Ignoring task configuration for task " + taskId + " because it is in the wrong format: " + value.value());
return;
}
Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
if (!(newTaskConfig instanceof Map)) {
log.error("Invalid data for task config (" + taskId + "): properties filed should be a Map but is " + newTaskConfig.getClass());
return;
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(taskId.connector());
if (deferred == null) {
deferred = new HashMap<>();
deferredTaskUpdates.put(taskId.connector(), deferred);
}
log.debug("Storing new config for task " + taskId + " this will wait for a commit message before the new config will take effect. New config: " + newTaskConfig);
deferred.put(taskId, (Map<String, String>) newTaskConfig);
}
} else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length());
List<ConnectorTaskId> updatedTasks = new ArrayList<>();
synchronized (lock) {
// Apply any outstanding deferred task updates for the given connector. Note that just because we
// encounter a commit message does not mean it will result in consistent output. In particular due to
// compaction, there may be cases where . For example if we have the following sequence of writes:
//
// 1. Write connector "foo"'s config
// 2. Write connector "foo", task 1's config <-- compacted
// 3. Write connector "foo", task 2's config
// 4. Write connector "foo" task commit message
// 5. Write connector "foo", task 1's config
// 6. Write connector "foo", task 2's config
// 7. Write connector "foo" task commit message
//
// then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied
// "foo" will not have a complete set of configs. Only when message 7 is applied will the complete
// configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that
// only 5 was written, then there may be nothing that will finish writing the configs and get the
// log back into a consistent state.
//
// It is expected that the user of this class (i.e., the Herder) will take the necessary action to
// resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
// exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
log.error("Ignoring connector tasks configuration commit for connector " + connectorName + " because it is in the wrong format: " + value.value());
return;
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
// Validate the configs we're supposed to update to ensure we're getting a complete configuration
// update of all tasks that are expected based on the number of tasks in the commit message.
Set<Integer> taskIdSet = taskIds(connectorName, deferred);
if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
// Given the logic for writing commit messages, we should only hit this condition due to compacted
// historical data, in which case we would not have applied any updates yet and there will be no
// task config data already committed for the connector, so we shouldn't have to clear any data
// out. All we need to do is add the flag marking it inconsistent.
log.debug("We have an incomplete set of task configs for connector " + connectorName + " probably due to compaction. So we are not doing anything with the new configuration.");
inconsistent.add(connectorName);
} else {
if (deferred != null) {
taskConfigs.putAll(deferred);
updatedTasks.addAll(taskConfigs.keySet());
}
inconsistent.remove(connectorName);
}
// Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
// update, then we need to see a completely fresh set of configs after this commit message, so we don't
// want any of these outdated configs
if (deferred != null)
deferred.clear();
connectorTaskCounts.put(connectorName, newTaskCount);
}
if (started)
updateListener.onTaskConfigUpdate(updatedTasks);
} else {
log.error("Discarding config update record with invalid key: " + record.key());
}
}