in streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java [372:529]
public static StreamThread create(final TopologyMetadata topologyMetadata,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
final Admin adminClient,
final UUID processId,
final String clientId,
final StreamsMetricsImpl streamsMetrics,
final Time time,
final StreamsMetadataState streamsMetadataState,
final long cacheSizeBytes,
final StateDirectory stateDirectory,
final StateRestoreListener userStateRestoreListener,
final StandbyUpdateListener userStandbyUpdateListener,
final int threadIdx,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {
final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING);
final String restorationThreadId = stateUpdaterEnabled ? stateUpdaterId : threadId;
final String logPrefix = String.format("stream-thread [%s] ", threadId);
final LogContext logContext = new LogContext(logPrefix);
final LogContext restorationLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", restorationThreadId)) : logContext;
final Logger log = logContext.logger(StreamThread.class);
final ReferenceContainer referenceContainer = new ReferenceContainer();
referenceContainer.adminClient = adminClient;
referenceContainer.streamsMetadataState = streamsMetadataState;
referenceContainer.time = time;
referenceContainer.clientTags = config.getClientTags();
log.info("Creating restore consumer client");
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId));
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
time,
config,
restorationLogContext,
adminClient,
restoreConsumer,
userStateRestoreListener,
userStandbyUpdateListener
);
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
topologyMetadata,
config,
streamsMetrics,
stateDirectory,
changelogReader,
cache,
time,
clientSupplier,
threadId,
threadIdx,
processId,
log,
stateUpdaterEnabled,
proceessingThreadsEnabled
);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
topologyMetadata,
config,
streamsMetrics,
stateDirectory,
changelogReader,
threadId,
log,
stateUpdaterEnabled);
final Tasks tasks = new Tasks(new LogContext(logPrefix));
final boolean processingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
final DefaultTaskManager schedulingTaskManager =
maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
final StateUpdater stateUpdater =
maybeCreateAndStartStateUpdater(
stateUpdaterEnabled,
streamsMetrics,
config,
restoreConsumer,
changelogReader,
topologyMetadata,
time,
clientId,
threadIdx
);
final TaskManager taskManager = new TaskManager(
time,
changelogReader,
new ProcessId(processId),
logPrefix,
activeTaskCreator,
standbyTaskCreator,
tasks,
topologyMetadata,
adminClient,
stateDirectory,
stateUpdater,
schedulingTaskManager
);
referenceContainer.taskManager = taskManager;
log.info("Creating consumer client");
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, consumerClientId(threadId), threadIdx);
consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
final String originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
// If there are any overrides, we never fall through to the consumer, but only handle offset management ourselves.
if (topologyMetadata.hasOffsetResetOverrides()) {
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
}
final MainConsumerSetup mainConsumerSetup = setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, consumerConfigs);
taskManager.setMainConsumer(mainConsumerSetup.mainConsumer);
referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer;
final StreamsThreadMetricsDelegatingReporter reporter = new StreamsThreadMetricsDelegatingReporter(mainConsumerSetup.mainConsumer, threadId, Optional.of(stateUpdaterId));
streamsMetrics.metricsRegistry().addReporter(reporter);
final StreamThread streamThread = new StreamThread(
time,
config,
adminClient,
mainConsumerSetup.mainConsumer,
restoreConsumer,
changelogReader,
originalReset,
taskManager,
stateUpdater,
streamsMetrics,
topologyMetadata,
processId,
threadId,
logContext,
referenceContainer.assignmentErrorCode,
referenceContainer.nextScheduledRebalanceMs,
referenceContainer.nonFatalExceptionsToHandle,
shutdownErrorHook,
streamsUncaughtExceptionHandler,
cache::resize,
mainConsumerSetup.streamsRebalanceData,
streamsMetadataState
);
return streamThread.updateThreadMetadata(adminClientId(clientId));
}