in seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java [99:256]
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
ReadonlyConfig catalogOptions = getCatalogOptions(context);
Optional<String> optionalTable = config.getOptional(TABLE);
Optional<String> optionalDatabase = config.getOptional(DATABASE);
if (!optionalTable.isPresent()) {
optionalTable = Optional.of(REPLACE_TABLE_NAME_KEY);
}
// get source table relevant information
TableIdentifier tableId = catalogTable.getTableId();
String sourceDatabaseName = tableId.getDatabaseName();
String sourceSchemaName = tableId.getSchemaName();
String pluginInputIdentifier = tableId.getTableName();
// get sink table relevant information
String sinkDatabaseName = optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY);
String sinkTableNameBefore = optionalTable.get();
String[] sinkTableSplitArray = sinkTableNameBefore.split("\\.");
String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length - 1];
String sinkSchemaName;
if (sinkTableSplitArray.length > 1) {
sinkSchemaName = sinkTableSplitArray[sinkTableSplitArray.length - 2];
} else {
sinkSchemaName = null;
}
if (StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) {
sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA);
}
// to add tablePrefix and tableSuffix
String tempTableName;
String prefix = catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX);
String suffix = catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX);
if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) {
tempTableName = StringUtils.isNotEmpty(prefix) ? prefix + sinkTableName : sinkTableName;
tempTableName = StringUtils.isNotEmpty(suffix) ? tempTableName + suffix : tempTableName;
} else {
tempTableName = sinkTableName;
}
// to replace
String finalDatabaseName = sinkDatabaseName;
if (StringUtils.isNotEmpty(sourceDatabaseName)) {
finalDatabaseName =
sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName);
}
String finalSchemaName;
if (sinkSchemaName != null) {
if (sourceSchemaName == null) {
finalSchemaName = sinkSchemaName;
} else {
finalSchemaName = sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName);
}
} else {
finalSchemaName = null;
}
String finalTableName = sinkTableName;
if (StringUtils.isNotEmpty(pluginInputIdentifier)) {
finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY, pluginInputIdentifier);
}
// rebuild TableIdentifier and catalogTable
TableIdentifier newTableId =
TableIdentifier.of(
tableId.getCatalogName(),
finalDatabaseName,
finalSchemaName,
finalTableName);
catalogTable =
CatalogTable.of(
newTableId,
catalogTable.getTableSchema(),
catalogTable.getOptions(),
catalogTable.getPartitionKeys(),
catalogTable.getComment(),
catalogTable.getCatalogName());
Map<String, String> map = config.toMap();
if (catalogTable.getTableId().getSchemaName() != null) {
map.put(
TABLE.key(),
catalogTable.getTableId().getSchemaName()
+ "."
+ catalogTable.getTableId().getTableName());
} else {
map.put(TABLE.key(), catalogTable.getTableId().getTableName());
}
map.put(DATABASE.key(), catalogTable.getTableId().getDatabaseName());
PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
if (!config.getOptional(PRIMARY_KEYS).isPresent()) {
if (primaryKey != null && !CollectionUtils.isEmpty(primaryKey.getColumnNames())) {
map.put(PRIMARY_KEYS.key(), String.join(",", primaryKey.getColumnNames()));
} else {
Optional<ConstraintKey> keyOptional =
catalogTable.getTableSchema().getConstraintKeys().stream()
.filter(
key ->
ConstraintKey.ConstraintType.UNIQUE_KEY.equals(
key.getConstraintType()))
.findFirst();
if (keyOptional.isPresent()) {
map.put(
PRIMARY_KEYS.key(),
keyOptional.get().getColumnNames().stream()
.map(key -> key.getColumnName())
.collect(Collectors.joining(",")));
}
}
} else {
// replace primary key to config
PrimaryKey configPk =
PrimaryKey.of(
catalogTable.getTablePath().getTableName() + "_config_pk",
config.get(PRIMARY_KEYS));
TableSchema tableSchema = catalogTable.getTableSchema();
catalogTable =
CatalogTable.of(
catalogTable.getTableId(),
TableSchema.builder()
.primaryKey(configPk)
.constraintKey(tableSchema.getConstraintKeys())
.columns(tableSchema.getColumns())
.build(),
catalogTable.getOptions(),
catalogTable.getPartitionKeys(),
catalogTable.getComment(),
catalogTable.getCatalogName());
}
config = ReadonlyConfig.fromMap(new HashMap<>(map));
// always execute
final ReadonlyConfig options = config;
JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
catalogTable
.getOptions()
.put("fieldIde", fieldIdeEnum == null ? null : fieldIdeEnum.getValue());
JdbcDialect dialect =
JdbcDialectLoader.load(
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
sinkConfig.getJdbcConnectionConfig().getDialect(),
fieldIdeEnum == null ? null : fieldIdeEnum.getValue());
dialect.connectionUrlParse(
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getProperties(),
dialect.defaultParameter());
CatalogTable finalCatalogTable = catalogTable;
// get saveMode
DataSaveMode dataSaveMode = config.get(DATA_SAVE_MODE);
SchemaSaveMode schemaSaveMode = config.get(SCHEMA_SAVE_MODE);
return () ->
new JdbcSink(
options,
sinkConfig,
dialect,
schemaSaveMode,
dataSaveMode,
finalCatalogTable);
}