in core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java [53:77]
public synchronized void start() {
String jobId = topologyBuilder.getJobId();
if (started.get()) {
logger.info("RocketMQStream has been started, jobId=[{}].", jobId);
return;
}
this.started.compareAndSet(false, true);
//启动线程
try {
int threadNum = StreamConfig.STREAMS_PARALLEL_THREAD_NUM;
for (int i = 0; i < threadNum; i++) {
String threadName = String.join("_", Constant.WORKER_THREAD_NAME, jobId, String.valueOf(i));
WorkerThread thread = new WorkerThread(threadName, topologyBuilder, this.properties, executor);
thread.start();
workerThreads.add(thread);
}
} catch (Throwable t) {
logger.error("start RocketMQStream error, jobId=[{}].", jobId, t);
throw new RStreamsException(t);
}
}