in adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java [219:252]
private void refreshRunnerContext(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum refreshTypeEnum) {
String runnerName = targetRunnerConfig.getName();
switch (refreshTypeEnum) {
case ADD:
case UPDATE:
runnerConfigMap.put(runnerName, targetRunnerConfig);
TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(targetRunnerConfig.getComponents(), plugin);
taskTransformMap.put(runnerName, transformChain);
int endIndex = targetRunnerConfig.getComponents().size() -1;
TargetKeyValue targetKeyValue = new TargetKeyValue(targetRunnerConfig.getComponents().get(endIndex));
SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
pusherTaskMap.put(runnerName, sinkTask);
if (!pusherExecutorMap.containsKey(runnerName)) {
pusherExecutorMap.put(runnerName, initDefaultThreadPoolExecutor(runnerName));
}
if(logger.isInfoEnabled()){
logger.info("runnerName -{}- refresh context by refresh type -{}- succeed", runnerName, refreshTypeEnum.name());
}
break;
case DELETE:
runnerConfigMap.remove(runnerName);
taskTransformMap.remove(runnerName);
pusherTaskMap.remove(runnerName);
if(logger.isInfoEnabled()){
logger.info("runnerName -{}- remove context succeed", runnerName);
}
break;
default:
break;
}
}