public FileSinkConfig()

in seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java [93:252]


    public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunnelRowTypeInfo) {
        super(config);
        checkArgument(
                !CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));

        if (config.hasPath(FileBaseSinkOptions.SINK_COLUMNS.key())
                && !CollectionUtils.isEmpty(
                        config.getStringList(FileBaseSinkOptions.SINK_COLUMNS.key()))) {
            this.sinkColumnList = config.getStringList(FileBaseSinkOptions.SINK_COLUMNS.key());
        }

        // if the config sink_columns is empty, all fields in SeaTunnelRowTypeInfo will being write
        if (CollectionUtils.isEmpty(this.sinkColumnList)) {
            // construct a new ArrayList, because `list` generated by `Arrays.asList` do not support
            // remove and add operations.
            this.sinkColumnList =
                    new ArrayList<>(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames()));
        }

        if (config.hasPath(FileBaseSinkOptions.PARTITION_BY.key())) {
            this.partitionFieldList = config.getStringList(FileBaseSinkOptions.PARTITION_BY.key());
        } else {
            this.partitionFieldList = Collections.emptyList();
        }

        if (config.hasPath(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key())
                && !StringUtils.isBlank(
                        config.getString(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key()))) {
            this.partitionDirExpression =
                    config.getString(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key());
        }

        if (config.hasPath(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.key())) {
            this.isPartitionFieldWriteInFile =
                    config.getBoolean(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.key());
        }

        if (config.hasPath(FileBaseSinkOptions.TMP_PATH.key())
                && !StringUtils.isBlank(config.getString(FileBaseSinkOptions.TMP_PATH.key()))) {
            this.tmpPath = config.getString(FileBaseSinkOptions.TMP_PATH.key());
        }

        if (config.hasPath(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key())
                && !StringUtils.isBlank(
                        config.getString(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key()))) {
            this.fileNameTimeFormat =
                    config.getString(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key());
        }

        if (config.hasPath(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key())) {
            this.isEnableTransaction =
                    config.getBoolean(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key());
        }

        if (config.hasPath(FileBaseSinkOptions.ENCODING.key())) {
            this.encoding = config.getString(FileBaseSinkOptions.ENCODING.key());
        }

        if (this.isEnableTransaction
                && !this.fileNameExpression.contains(FileBaseSinkOptions.TRANSACTION_EXPRESSION)) {
            throw new FileConnectorException(
                    CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
                    "file_name_expression must contains "
                            + FileBaseSinkOptions.TRANSACTION_EXPRESSION
                            + " when is_enable_transaction is true");
        }

        // check partition field must in seaTunnelRowTypeInfo
        if (!CollectionUtils.isEmpty(this.partitionFieldList)
                && (CollectionUtils.isEmpty(this.sinkColumnList)
                        || !new HashSet<>(this.sinkColumnList)
                                .containsAll(this.partitionFieldList))) {
            throw new FileConnectorException(
                    CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
                    "partition fields must in sink columns");
        }

        if (!CollectionUtils.isEmpty(this.partitionFieldList) && !isPartitionFieldWriteInFile) {
            if (!this.sinkColumnList.removeAll(this.partitionFieldList)) {
                throw new FileConnectorException(
                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
                        "remove partition field from sink columns error");
            }
        }

        if (CollectionUtils.isEmpty(this.sinkColumnList)) {
            throw new FileConnectorException(
                    CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "sink columns can not be empty");
        }

        Map<String, Integer> columnsMap =
                new HashMap<>(seaTunnelRowTypeInfo.getFieldNames().length);
        String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
        for (int i = 0; i < fieldNames.length; i++) {
            columnsMap.put(fieldNames[i].toLowerCase(), i);
        }

        // init sink column index and partition field index, we will use the column index to found
        // the data in SeaTunnelRow
        this.sinkColumnsIndexInRow =
                this.sinkColumnList.stream()
                        .map(column -> columnsMap.get(column.toLowerCase()))
                        .filter(e -> e != null)
                        .collect(Collectors.toList());

        if (!CollectionUtils.isEmpty(this.partitionFieldList)) {
            this.partitionFieldsIndexInRow =
                    this.partitionFieldList.stream()
                            .map(columnsMap::get)
                            .collect(Collectors.toList());
        }

        if (config.hasPath(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY.key())) {
            this.maxRowsInMemory = config.getInt(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY.key());
        }

        if (config.hasPath(FileBaseSinkOptions.SHEET_NAME.key())) {
            this.sheetName = config.getString(FileBaseSinkOptions.SHEET_NAME.key());
        }

        if (FileFormat.XML.equals(this.fileFormat)) {
            if (!config.hasPath(FileBaseSinkOptions.XML_USE_ATTR_FORMAT.key())) {
                throw new FileConnectorException(
                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
                        "User must define xml_use_attr_format when file_format_type is xml");
            }

            this.xmlUseAttrFormat =
                    config.getBoolean(FileBaseSinkOptions.XML_USE_ATTR_FORMAT.key());

            if (config.hasPath(FileBaseSinkOptions.XML_ROOT_TAG.key())) {
                this.xmlRootTag = config.getString(FileBaseSinkOptions.XML_ROOT_TAG.key());
            }

            if (config.hasPath(FileBaseSinkOptions.XML_ROW_TAG.key())) {
                this.xmlRowTag = config.getString(FileBaseSinkOptions.XML_ROW_TAG.key());
            }
        }

        if (FileFormat.PARQUET.equals(this.fileFormat)) {
            if (config.hasPath(FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.key())) {
                this.parquetWriteTimestampAsInt96 =
                        config.getBoolean(
                                FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.key());
            }
            if (config.hasPath(FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.key())) {
                this.parquetAvroWriteFixedAsInt96 =
                        config.getStringList(
                                FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.key());
            }
        }

        if (FileFormat.CSV.equals(this.fileFormat)) {
            if (config.hasPath(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.key())) {
                this.csvStringQuoteMode =
                        CsvStringQuoteMode.valueOf(
                                config.getString(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.key()));
            }
        }
    }