public AdbpgOutputFormat()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [157:281]


    public AdbpgOutputFormat(
            int fieldNum,
            String[] fieldNamesStrs,
            String[] keyFields,
            LogicalType[] logicalTypes,
            ReadableConfig config
    ) {
        this.config = config;
        this.url = config.get(URL);
        this.adbssHost = config.get(ADBSSHOST);
        this.adbssPort = config.get(ADBSSPORT);
        this.tableName = config.get(TABLE_NAME);
        this.userName = config.get(USERNAME);
        this.password = config.get(PASSWORD);
        this.batchWriteTimeout = config.get(BATCH_WRITE_TIMEOUT_MS);
        this.reserveMs = AdbpgOptions.isConfigOptionTrue(config, RESERVEMS);
        this.conflictMode = config.get(CONFLICT_MODE);
        this.useCopy = config.get(USE_COPY);
        this.maxRetryTime = config.get(MAX_RETRY_TIMES);
        this.replaceNullChar = config.get(REPLACE_NULL_CHAR);
        this.batchSize = config.get(BATCH_SIZE);
        this.targetSchema = config.get(TARGET_SCHEMA);
        this.exceptionMode = config.get(EXCEPTION_MODE);
        this.caseSensitive = AdbpgOptions.isConfigOptionTrue(config, CASE_SENSITIVE);
        this.writeMode = config.get(WRITE_MODE);
        this.delimiter = config.get(DELIMITER);
        this.replace_break = config.get(REPLACE_BREAK);
        this.copyFormat = config.get(COPY_FORMAT);
        this.copyQuote = config.get(COPY_QUOTE);
        this.quoteChar = this.copyQuote != null && this.copyQuote.length() > 0 ? this.copyQuote.charAt(0) : '\0';
        this.verbose = config.get(VERBOSE);
        this.retryWaitTime = config.get(RETRY_WAIT_TIME);
        this.fieldNum = fieldNum;
        this.logicalTypes = logicalTypes;
        this.rowDataSerializer = new RowDataSerializer(this.logicalTypes);
        Joiner joinerOnComma = Joiner.on(",").useForNull("null");
        this.fieldNamesStrs = fieldNamesStrs;

        if (keyFields != null) {
            this.pkTypes = new LogicalType[keyFields.length];
            for (int i = 0; i < keyFields.length; i++) {
                pkFields.add(keyFields[i]);
                int j = 0;
                for (; j < fieldNamesStrs.length; j++) {
                    if (keyFields[i].equals(fieldNamesStrs[j])) {
                        pkIndex.add(j);
                        break;
                    }
                }
                if (fieldNamesStrs.length == j) {
                    throw new RuntimeException("Key cannot found in filenames.");
                }
                int keyIdx = Arrays.asList(fieldNamesStrs).indexOf(keyFields[i]);
                this.pkTypes[i] = logicalTypes[keyIdx];
            }
            this.primaryKeys = new HashSet<>(pkFields);
            this.pkConverter = new JdbcRowConverter(pkTypes);
        } else {
            this.primaryKeys = null;
            this.pkTypes = null;
            this.pkConverter = null;
        }

        this.adbpgDialect = new AdbpgDialect(targetSchema, caseSensitive);

        if (primaryKeys == null || primaryKeys.isEmpty()) {
            existsPrimaryKeys = false;
            if (2 == this.writeMode) {
                throw new RuntimeException("primary key cannot be empty when setting write mode to 2:upsert.");
            }
            this.upsertConverter = null;
        } else {
            existsPrimaryKeys = true;
            this.primaryFieldNamesStr = new String[primaryKeys.size()];
            this.nonPrimaryFieldNamesStr = new String[fieldNum - primaryKeys.size()];
            String[] primaryFieldNamesStrCaseSensitive = new String[primaryKeys.size()];
            String[] nonPrimaryFieldNamesStrCaseSensitive = new String[fieldNum - primaryKeys.size()];
            this.excludedNonPrimaryFieldNamesStr = new String[fieldNum - primaryKeys.size()];
            String[] excludedNonPrimaryFieldNamesStrCaseSensitive = new String[fieldNum - primaryKeys.size()];
            String[] fieldNamesStrCaseSensitive = new String[this.fieldNum];
            int primaryIndex = 0;
            int excludedIndex = 0;
            for (int i = 0; i < fieldNum; i++) {
                String fileName = fieldNamesStrs[i];
                fieldNamesStrCaseSensitive[i] = "\"" + fileName + "\"";
                if (primaryKeys.contains(fileName)) {
                    primaryFieldNamesStr[primaryIndex] = fileName;
                    primaryFieldNamesStrCaseSensitive[primaryIndex++] = "\"" + fileName + "\"";
                } else {
                    nonPrimaryFieldNamesStr[excludedIndex] = fileName;
                    nonPrimaryFieldNamesStrCaseSensitive[excludedIndex] = "\"" + fileName + "\"";
                    excludedNonPrimaryFieldNamesStr[excludedIndex] = "excluded." + fileName;
                    excludedNonPrimaryFieldNamesStrCaseSensitive[excludedIndex++] = "excluded.\"" + fileName + "\"";
                }
            }
            this.primaryFieldNames = joinerOnComma.join(primaryFieldNamesStr);
            this.nonPrimaryFieldNames = joinerOnComma.join(nonPrimaryFieldNamesStr);
            this.primaryFieldNamesCaseSensitive = joinerOnComma.join(primaryFieldNamesStrCaseSensitive);
            this.nonPrimaryFieldNamesCaseSensitive = joinerOnComma.join(nonPrimaryFieldNamesStrCaseSensitive);
            this.excludedNonPrimaryFieldNames = joinerOnComma.join(excludedNonPrimaryFieldNamesStr);
            this.excludedNonPrimaryFieldNamesCaseSensitive = joinerOnComma.join(excludedNonPrimaryFieldNamesStrCaseSensitive);
            this.fieldNamesCaseSensitive = joinerOnComma.join((Object[]) fieldNamesStrCaseSensitive);
            this.updateStatementFieldTypes =
                    new LogicalType[nonPrimaryFieldNamesStr.length + primaryFieldNamesStr.length];
            int j = 0;
            this.updateStatementFieldIndices = new int[nonPrimaryFieldNamesStr.length + primaryFieldNamesStr.length];
            for (int i = 0; i < logicalTypes.length; ++i) {
                if (Arrays.asList(this.primaryFieldNamesStr).contains(fieldNamesStrs[i])) {
                    continue;
                }
                updateStatementFieldIndices[j] = i;
                updateStatementFieldTypes[j] = logicalTypes[i];
                j++;
            }
            for (int i = 0; i < primaryFieldNamesStr.length; ++i) {
                updateStatementFieldTypes[j] = pkTypes[i];
                updateStatementFieldIndices[j] = pkIndex.get(i);
                j++;
            }
            this.upsertConverter = new JdbcRowConverter(updateStatementFieldTypes);
        }
        this.rowConverter = new JdbcRowConverter(logicalTypes);
        this.streamingServerRowConverter = new StreamingServerRowConverter(logicalTypes);
        this.copyModeRowConverter = new StringFormatRowConverter(logicalTypes);
    }