private void startTask()

in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java [822:913]


    private void startTask(Map<String, List<ConnectKeyValue>> newTasks) throws Exception {
        for (String connectorName : newTasks.keySet()) {
            for (ConnectKeyValue keyValue : newTasks.get(connectorName)) {
                int taskId = keyValue.getInt(ConnectorConfig.TASK_ID);
                ConnectorTaskId id = new ConnectorTaskId(connectorName, taskId);

                ErrorMetricsGroup errorMetricsGroup = new ErrorMetricsGroup(id, this.connectMetrics);

                String taskType = keyValue.getString(ConnectorConfig.TASK_TYPE);
                if (TaskType.DIRECT.name().equalsIgnoreCase(taskType)) {
                    createDirectTask(id, keyValue);
                    continue;
                }

                ClassLoader savedLoader = plugin.currentThreadLoader();
                try {
                    String connType = keyValue.getString(ConnectorConfig.CONNECTOR_CLASS);
                    ClassLoader connectorLoader = plugin.delegatingLoader().connectorLoader(connType);
                    savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
                    // new task
                    final Class<? extends Task> taskClass = plugin.currentThreadLoader().loadClass(keyValue.getString(ConnectorConfig.TASK_CLASS)).asSubclass(Task.class);
                    final Task task = plugin.newTask(taskClass);

                    /**
                     * create key/value converter
                     */
                    RecordConverter valueConverter = plugin.newConverter(keyValue, false, ConnectorConfig.VALUE_CONVERTER, workerConfig.getValueConverter(), Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
                    RecordConverter keyConverter = plugin.newConverter(keyValue, true, ConnectorConfig.KEY_CONVERTER, workerConfig.getKeyConverter(), Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);

                    if (keyConverter == null) {
                        keyConverter = plugin.newConverter(keyValue, true, ConnectorConfig.KEY_CONVERTER, workerConfig.getValueConverter(), Plugin.ClassLoaderUsage.PLUGINS);
                        log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
                    } else {
                        log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
                    }
                    if (valueConverter == null) {
                        valueConverter = plugin.newConverter(keyValue, false, ConnectorConfig.VALUE_CONVERTER, workerConfig.getKeyConverter(), Plugin.ClassLoaderUsage.PLUGINS);
                        log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
                    } else {
                        log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
                    }

                    if (task instanceof SourceTask) {
                        DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
                        TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
                        // create retry operator
                        RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue, errorMetricsGroup);
                        retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(id, keyValue, errorMetricsGroup));

                        WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
                                (SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator, statusListener, this.connectMetrics);

                        Future future = taskExecutor.submit(workerSourceTask);
                        // schedule offset committer
                        sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerSourceTask));

                        taskToFutureMap.put(workerSourceTask, future);
                        this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());

                    } else if (task instanceof SinkTask) {
                        log.info("sink task config keyValue is {}", keyValue.getProperties());
                        DefaultLitePullConsumer consumer = ConnectUtil.initDefaultLitePullConsumer(workerConfig, false);
                        // set consumer groupId
                        String groupId = keyValue.getString(SinkConnectorConfig.TASK_GROUP_ID);
                        if (StringUtils.isBlank(groupId)) {
                            groupId = ConnectUtil.SYS_TASK_CG_PREFIX + id.connector();
                        }
                        consumer.setConsumerGroup(groupId);
                        Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(workerConfig);
                        if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
                            ConnectUtil.createSubGroup(workerConfig, consumer.getConsumerGroup());
                        }
                        TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
                        // create retry operator
                        RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue, errorMetricsGroup);
                        retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(id, keyValue, workerConfig, errorMetricsGroup));

                        WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
                                (SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
                                retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter), statusListener, this.connectMetrics);
                        Future future = taskExecutor.submit(workerSinkTask);
                        taskToFutureMap.put(workerSinkTask, future);
                        this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
                    }
                    Plugin.compareAndSwapLoaders(savedLoader);
                } catch (Exception e) {
                    log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
                    Plugin.compareAndSwapLoaders(savedLoader);
                }
            }
        }
    }