private KafkaStreams()

in streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java [654:806]


    private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier,
                         final Time time) throws StreamsException {
        this.config = config;
        this.time = time;

        // The application ID is a required config and hence should always have value
        final UUID processId = UUID.randomUUID();
        final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
        final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
        if (userClientId.length() <= 0) {
            clientId = applicationId + "-" + processId;
        } else {
            clientId = userClientId;
        }

        final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
        this.log = logContext.logger(getClass());

        final MetricConfig metricConfig = new MetricConfig()
            .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
            .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
            .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
        final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
                MetricsReporter.class,
                Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
        final JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.configure(config.originals());
        reporters.add(jmxReporter);
        final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
        metrics = new Metrics(metricConfig, reporters, time, metricsContext);
        streamsMetrics = new StreamsMetricsImpl(
            metrics,
            clientId,
            config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
            time
        );
        ClientMetrics.addVersionMetric(streamsMetrics);
        ClientMetrics.addCommitIdMetric(streamsMetrics);
        ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
        ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, internalTopologyBuilder.describe().toString());
        ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
        log.info("Kafka Streams version: {}", ClientMetrics.version());
        log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());

        // re-write the physical topology according to the config
        internalTopologyBuilder.rewriteTopology(config);

        // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
        final ProcessorTopology taskTopology = internalTopologyBuilder.buildTopology();

        streamsMetadataState = new StreamsMetadataState(
                internalTopologyBuilder,
                parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));

        final int numStreamThreads;
        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
            log.info("Overriding number of StreamThreads to zero for global-only topology");
            numStreamThreads = 0;
        } else {
            numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
        }

        // create the stream thread, global update thread, and cleanup thread
        threads = new StreamThread[numStreamThreads];

        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
        final boolean hasGlobalTopology = globalTaskTopology != null;

        if (numStreamThreads == 0 && !hasGlobalTopology) {
            log.error("Topology with no input topics will create no stream threads and no global thread.");
            throw new TopologyException("Topology has no stream threads and no global threads, " +
                "must subscribe to at least one source topic or global table.");
        }

        long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
        if (totalCacheSize < 0) {
            totalCacheSize = 0;
            log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
        }
        final long cacheSizePerThread = totalCacheSize / (threads.length + (hasGlobalTopology ? 1 : 0));
        final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
                (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());

        try {
            stateDirectory = new StateDirectory(config, time, hasPersistentStores);
        } catch (final ProcessorStateException fatal) {
            throw new StreamsException(fatal);
        }

        final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
        GlobalStreamThread.State globalThreadState = null;
        if (hasGlobalTopology) {
            final String globalThreadId = clientId + "-GlobalStreamThread";
            globalStreamThread = new GlobalStreamThread(
                globalTaskTopology,
                config,
                clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
                stateDirectory,
                cacheSizePerThread,
                streamsMetrics,
                time,
                globalThreadId,
                delegatingStateRestoreListener
            );
            globalThreadState = globalStreamThread.state();
        }

        // use client id instead of thread client id since this admin client may be shared among threads
        adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));

        final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
        for (int i = 0; i < threads.length; i++) {
            threads[i] = StreamThread.create(
                internalTopologyBuilder,
                config,
                clientSupplier,
                adminClient,
                processId,
                clientId,
                streamsMetrics,
                time,
                streamsMetadataState,
                cacheSizePerThread,
                stateDirectory,
                delegatingStateRestoreListener,
                i + 1);
            threadState.put(threads[i].getId(), threads[i].state());
            storeProviders.add(new StreamThreadStateStoreProvider(threads[i], internalTopologyBuilder));
        }

        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
            Math.toIntExact(Arrays.stream(threads).filter(thread -> thread.state().isAlive()).count()));

        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
        if (hasGlobalTopology) {
            globalStreamThread.setStateListener(streamStateListener);
        }
        for (final StreamThread thread : threads) {
            thread.setStateListener(streamStateListener);
        }

        final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
        queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);

        stateDirCleaner = setupStateDirCleaner();

        maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
        rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
    }