in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java [822:913]
private void startTask(Map<String, List<ConnectKeyValue>> newTasks) throws Exception {
for (String connectorName : newTasks.keySet()) {
for (ConnectKeyValue keyValue : newTasks.get(connectorName)) {
int taskId = keyValue.getInt(ConnectorConfig.TASK_ID);
ConnectorTaskId id = new ConnectorTaskId(connectorName, taskId);
ErrorMetricsGroup errorMetricsGroup = new ErrorMetricsGroup(id, this.connectMetrics);
String taskType = keyValue.getString(ConnectorConfig.TASK_TYPE);
if (TaskType.DIRECT.name().equalsIgnoreCase(taskType)) {
createDirectTask(id, keyValue);
continue;
}
ClassLoader savedLoader = plugin.currentThreadLoader();
try {
String connType = keyValue.getString(ConnectorConfig.CONNECTOR_CLASS);
ClassLoader connectorLoader = plugin.delegatingLoader().connectorLoader(connType);
savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
// new task
final Class<? extends Task> taskClass = plugin.currentThreadLoader().loadClass(keyValue.getString(ConnectorConfig.TASK_CLASS)).asSubclass(Task.class);
final Task task = plugin.newTask(taskClass);
/**
* create key/value converter
*/
RecordConverter valueConverter = plugin.newConverter(keyValue, false, ConnectorConfig.VALUE_CONVERTER, workerConfig.getValueConverter(), Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
RecordConverter keyConverter = plugin.newConverter(keyValue, true, ConnectorConfig.KEY_CONVERTER, workerConfig.getKeyConverter(), Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
if (keyConverter == null) {
keyConverter = plugin.newConverter(keyValue, true, ConnectorConfig.KEY_CONVERTER, workerConfig.getValueConverter(), Plugin.ClassLoaderUsage.PLUGINS);
log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
} else {
log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
}
if (valueConverter == null) {
valueConverter = plugin.newConverter(keyValue, false, ConnectorConfig.VALUE_CONVERTER, workerConfig.getKeyConverter(), Plugin.ClassLoaderUsage.PLUGINS);
log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
} else {
log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
}
if (task instanceof SourceTask) {
DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue, errorMetricsGroup);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(id, keyValue, errorMetricsGroup));
WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
(SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator, statusListener, this.connectMetrics);
Future future = taskExecutor.submit(workerSourceTask);
// schedule offset committer
sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerSourceTask));
taskToFutureMap.put(workerSourceTask, future);
this.pendingTasks.put(workerSourceTask, System.currentTimeMillis());
} else if (task instanceof SinkTask) {
log.info("sink task config keyValue is {}", keyValue.getProperties());
DefaultLitePullConsumer consumer = ConnectUtil.initDefaultLitePullConsumer(workerConfig, false);
// set consumer groupId
String groupId = keyValue.getString(SinkConnectorConfig.TASK_GROUP_ID);
if (StringUtils.isBlank(groupId)) {
groupId = ConnectUtil.SYS_TASK_CG_PREFIX + id.connector();
}
consumer.setConsumerGroup(groupId);
Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(workerConfig);
if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
ConnectUtil.createSubGroup(workerConfig, consumer.getConsumerGroup());
}
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue, errorMetricsGroup);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(id, keyValue, workerConfig, errorMetricsGroup));
WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
(SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter), statusListener, this.connectMetrics);
Future future = taskExecutor.submit(workerSinkTask);
taskToFutureMap.put(workerSinkTask, future);
this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
}
Plugin.compareAndSwapLoaders(savedLoader);
} catch (Exception e) {
log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
Plugin.compareAndSwapLoaders(savedLoader);
}
}
}
}