in kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java [124:170]
private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) {
YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
result.setTargetDatabaseName(param.getTargetDatabaseName());
Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION");
Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>();
List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
.thenComparing(each -> each.getSource().format())).collect(Collectors.toList());
YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
for (SourceTargetEntry each : sourceTargetEntries) {
sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource());
ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(),
() -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName()));
String dataSourceName = each.getSource().getDataSourceName();
if (configSources.containsKey(dataSourceName)) {
continue;
}
ShardingSpherePreconditions.checkContainsKey(metaDataDataSource, dataSourceName,
() -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
Map<String, Object> sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
if (null == each.getSource().getSchemaName() && dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()) {
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
}
DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType();
if (null == result.getSourceDatabaseType()) {
result.setSourceDatabaseType(sourceDatabaseType.getType());
} else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
throw new PipelineInvalidParameterException("Source storage units have different database types");
}
}
result.setSources(configSources);
ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase);
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter()));
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream()
.map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList());
result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, result.getJobShardingDataNodes())));
return result;
}