public synchronized void start()

in core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java [53:77]


    public synchronized void start() {
        String jobId = topologyBuilder.getJobId();
        if (started.get()) {
            logger.info("RocketMQStream has been started, jobId=[{}].", jobId);
            return;
        }

        this.started.compareAndSet(false, true);

        //启动线程
        try {
            int threadNum = StreamConfig.STREAMS_PARALLEL_THREAD_NUM;
            for (int i = 0; i < threadNum; i++) {
                String threadName = String.join("_", Constant.WORKER_THREAD_NAME, jobId, String.valueOf(i));

                WorkerThread thread = new WorkerThread(threadName, topologyBuilder, this.properties, executor);

                thread.start();
                workerThreads.add(thread);
            }
        } catch (Throwable t) {
            logger.error("start RocketMQStream error, jobId=[{}].", jobId, t);
            throw new RStreamsException(t);
        }
    }