in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java [607:674]
private Config parseConfigWithOptionRule(
PluginType pluginType, String connectorType, Config config, OptionRule optionRule) {
Map<String, TypeReference<?>> typeReferenceMap = new HashMap<>();
optionRule
.getOptionalOptions()
.forEach(option -> typeReferenceMap.put(option.key(), option.typeReference()));
optionRule
.getRequiredOptions()
.forEach(
options -> {
options.getOptions()
.forEach(
option -> {
typeReferenceMap.put(
option.key(), option.typeReference());
});
});
Map<String, ConfigObject> needReplaceMap = new HashMap<>();
Map<String, ConfigValue> needReplaceList = new HashMap<>();
config.entrySet()
.forEach(
entry -> {
String key = entry.getKey();
ConfigValue configValue = entry.getValue();
try {
if (typeReferenceMap.containsKey(key)
&& isComplexType(typeReferenceMap.get(key))
&& !isEmptyValue(configValue)) {
String valueStr = configValue.unwrapped().toString();
if (typeReferenceMap
.get(key)
.getType()
.getTypeName()
.startsWith("java.util.List")
|| typeReferenceMap
.get(key)
.getType()
.getTypeName()
.startsWith(
"org.apache.seatunnel.api.configuration.Options")) {
String valueWrapper = "{key=" + valueStr + "}";
ConfigValue configList =
ConfigFactory.parseString(valueWrapper)
.getList("key");
needReplaceList.put(key, configList);
} else {
Config configObject = ConfigFactory.parseString(valueStr);
needReplaceMap.put(key, configObject.root());
}
}
} catch (Exception e) {
throw new SeatunnelException(
SeatunnelErrorEnum.ERROR_CONFIG,
String.format(
"Plugin Type: %s, Connector Type: %s, Key: %s, Error Info: %s",
pluginType, connectorType, key, e.getMessage()));
}
});
for (Map.Entry<String, ConfigObject> entry : needReplaceMap.entrySet()) {
config = config.withValue(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, ConfigValue> entry : needReplaceList.entrySet()) {
config = config.withValue(entry.getKey(), entry.getValue());
}
return config;
}