in streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java [654:806]
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
final Time time) throws StreamsException {
this.config = config;
this.time = time;
// The application ID is a required config and hence should always have value
final UUID processId = UUID.randomUUID();
final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
if (userClientId.length() <= 0) {
clientId = applicationId + "-" + processId;
} else {
clientId = userClientId;
}
final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
final MetricConfig metricConfig = new MetricConfig()
.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
final JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
metrics = new Metrics(metricConfig, reporters, time, metricsContext);
streamsMetrics = new StreamsMetricsImpl(
metrics,
clientId,
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
time
);
ClientMetrics.addVersionMetric(streamsMetrics);
ClientMetrics.addCommitIdMetric(streamsMetrics);
ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString());
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
log.info("Kafka Streams version: {}", ClientMetrics.version());
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
// re-write the physical topology according to the config
internalTopologyBuilder.rewriteTopology(config);
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
final ProcessorTopology taskTopology = internalTopologyBuilder.buildTopology();
streamsMetadataState = new StreamsMetadataState(
internalTopologyBuilder,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
final int numStreamThreads;
if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
log.info("Overriding number of StreamThreads to zero for global-only topology");
numStreamThreads = 0;
} else {
numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
}
// create the stream thread, global update thread, and cleanup thread
threads = new StreamThread[numStreamThreads];
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
final boolean hasGlobalTopology = globalTaskTopology != null;
if (numStreamThreads == 0 && !hasGlobalTopology) {
log.error("Topology with no input topics will create no stream threads and no global thread.");
throw new TopologyException("Topology has no stream threads and no global threads, " +
"must subscribe to at least one source topic or global table.");
}
long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
if (totalCacheSize < 0) {
totalCacheSize = 0;
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
final long cacheSizePerThread = totalCacheSize / (threads.length + (hasGlobalTopology ? 1 : 0));
final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
(hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
try {
stateDirectory = new StateDirectory(config, time, hasPersistentStores);
} catch (final ProcessorStateException fatal) {
throw new StreamsException(fatal);
}
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
if (hasGlobalTopology) {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(
globalTaskTopology,
config,
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
stateDirectory,
cacheSizePerThread,
streamsMetrics,
time,
globalThreadId,
delegatingStateRestoreListener
);
globalThreadState = globalStreamThread.state();
}
// use client id instead of thread client id since this admin client may be shared among threads
adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = StreamThread.create(
internalTopologyBuilder,
config,
clientSupplier,
adminClient,
processId,
clientId,
streamsMetrics,
time,
streamsMetadataState,
cacheSizePerThread,
stateDirectory,
delegatingStateRestoreListener,
i + 1);
threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i], internalTopologyBuilder));
}
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
Math.toIntExact(Arrays.stream(threads).filter(thread -> thread.state().isAlive()).count()));
final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
if (hasGlobalTopology) {
globalStreamThread.setStateListener(streamStateListener);
}
for (final StreamThread thread : threads) {
thread.setStateListener(streamStateListener);
}
final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
stateDirCleaner = setupStateDirCleaner();
maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
}