in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [157:281]
public AdbpgOutputFormat(
int fieldNum,
String[] fieldNamesStrs,
String[] keyFields,
LogicalType[] logicalTypes,
ReadableConfig config
) {
this.config = config;
this.url = config.get(URL);
this.adbssHost = config.get(ADBSSHOST);
this.adbssPort = config.get(ADBSSPORT);
this.tableName = config.get(TABLE_NAME);
this.userName = config.get(USERNAME);
this.password = config.get(PASSWORD);
this.batchWriteTimeout = config.get(BATCH_WRITE_TIMEOUT_MS);
this.reserveMs = AdbpgOptions.isConfigOptionTrue(config, RESERVEMS);
this.conflictMode = config.get(CONFLICT_MODE);
this.useCopy = config.get(USE_COPY);
this.maxRetryTime = config.get(MAX_RETRY_TIMES);
this.replaceNullChar = config.get(REPLACE_NULL_CHAR);
this.batchSize = config.get(BATCH_SIZE);
this.targetSchema = config.get(TARGET_SCHEMA);
this.exceptionMode = config.get(EXCEPTION_MODE);
this.caseSensitive = AdbpgOptions.isConfigOptionTrue(config, CASE_SENSITIVE);
this.writeMode = config.get(WRITE_MODE);
this.delimiter = config.get(DELIMITER);
this.replace_break = config.get(REPLACE_BREAK);
this.copyFormat = config.get(COPY_FORMAT);
this.copyQuote = config.get(COPY_QUOTE);
this.quoteChar = this.copyQuote != null && this.copyQuote.length() > 0 ? this.copyQuote.charAt(0) : '\0';
this.verbose = config.get(VERBOSE);
this.retryWaitTime = config.get(RETRY_WAIT_TIME);
this.fieldNum = fieldNum;
this.logicalTypes = logicalTypes;
this.rowDataSerializer = new RowDataSerializer(this.logicalTypes);
Joiner joinerOnComma = Joiner.on(",").useForNull("null");
this.fieldNamesStrs = fieldNamesStrs;
if (keyFields != null) {
this.pkTypes = new LogicalType[keyFields.length];
for (int i = 0; i < keyFields.length; i++) {
pkFields.add(keyFields[i]);
int j = 0;
for (; j < fieldNamesStrs.length; j++) {
if (keyFields[i].equals(fieldNamesStrs[j])) {
pkIndex.add(j);
break;
}
}
if (fieldNamesStrs.length == j) {
throw new RuntimeException("Key cannot found in filenames.");
}
int keyIdx = Arrays.asList(fieldNamesStrs).indexOf(keyFields[i]);
this.pkTypes[i] = logicalTypes[keyIdx];
}
this.primaryKeys = new HashSet<>(pkFields);
this.pkConverter = new JdbcRowConverter(pkTypes);
} else {
this.primaryKeys = null;
this.pkTypes = null;
this.pkConverter = null;
}
this.adbpgDialect = new AdbpgDialect(targetSchema, caseSensitive);
if (primaryKeys == null || primaryKeys.isEmpty()) {
existsPrimaryKeys = false;
if (2 == this.writeMode) {
throw new RuntimeException("primary key cannot be empty when setting write mode to 2:upsert.");
}
this.upsertConverter = null;
} else {
existsPrimaryKeys = true;
this.primaryFieldNamesStr = new String[primaryKeys.size()];
this.nonPrimaryFieldNamesStr = new String[fieldNum - primaryKeys.size()];
String[] primaryFieldNamesStrCaseSensitive = new String[primaryKeys.size()];
String[] nonPrimaryFieldNamesStrCaseSensitive = new String[fieldNum - primaryKeys.size()];
this.excludedNonPrimaryFieldNamesStr = new String[fieldNum - primaryKeys.size()];
String[] excludedNonPrimaryFieldNamesStrCaseSensitive = new String[fieldNum - primaryKeys.size()];
String[] fieldNamesStrCaseSensitive = new String[this.fieldNum];
int primaryIndex = 0;
int excludedIndex = 0;
for (int i = 0; i < fieldNum; i++) {
String fileName = fieldNamesStrs[i];
fieldNamesStrCaseSensitive[i] = "\"" + fileName + "\"";
if (primaryKeys.contains(fileName)) {
primaryFieldNamesStr[primaryIndex] = fileName;
primaryFieldNamesStrCaseSensitive[primaryIndex++] = "\"" + fileName + "\"";
} else {
nonPrimaryFieldNamesStr[excludedIndex] = fileName;
nonPrimaryFieldNamesStrCaseSensitive[excludedIndex] = "\"" + fileName + "\"";
excludedNonPrimaryFieldNamesStr[excludedIndex] = "excluded." + fileName;
excludedNonPrimaryFieldNamesStrCaseSensitive[excludedIndex++] = "excluded.\"" + fileName + "\"";
}
}
this.primaryFieldNames = joinerOnComma.join(primaryFieldNamesStr);
this.nonPrimaryFieldNames = joinerOnComma.join(nonPrimaryFieldNamesStr);
this.primaryFieldNamesCaseSensitive = joinerOnComma.join(primaryFieldNamesStrCaseSensitive);
this.nonPrimaryFieldNamesCaseSensitive = joinerOnComma.join(nonPrimaryFieldNamesStrCaseSensitive);
this.excludedNonPrimaryFieldNames = joinerOnComma.join(excludedNonPrimaryFieldNamesStr);
this.excludedNonPrimaryFieldNamesCaseSensitive = joinerOnComma.join(excludedNonPrimaryFieldNamesStrCaseSensitive);
this.fieldNamesCaseSensitive = joinerOnComma.join((Object[]) fieldNamesStrCaseSensitive);
this.updateStatementFieldTypes =
new LogicalType[nonPrimaryFieldNamesStr.length + primaryFieldNamesStr.length];
int j = 0;
this.updateStatementFieldIndices = new int[nonPrimaryFieldNamesStr.length + primaryFieldNamesStr.length];
for (int i = 0; i < logicalTypes.length; ++i) {
if (Arrays.asList(this.primaryFieldNamesStr).contains(fieldNamesStrs[i])) {
continue;
}
updateStatementFieldIndices[j] = i;
updateStatementFieldTypes[j] = logicalTypes[i];
j++;
}
for (int i = 0; i < primaryFieldNamesStr.length; ++i) {
updateStatementFieldTypes[j] = pkTypes[i];
updateStatementFieldIndices[j] = pkIndex.get(i);
j++;
}
this.upsertConverter = new JdbcRowConverter(updateStatementFieldTypes);
}
this.rowConverter = new JdbcRowConverter(logicalTypes);
this.streamingServerRowConverter = new StreamingServerRowConverter(logicalTypes);
this.copyModeRowConverter = new StringFormatRowConverter(logicalTypes);
}