in seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java [93:252]
public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunnelRowTypeInfo) {
super(config);
checkArgument(
!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
if (config.hasPath(FileBaseSinkOptions.SINK_COLUMNS.key())
&& !CollectionUtils.isEmpty(
config.getStringList(FileBaseSinkOptions.SINK_COLUMNS.key()))) {
this.sinkColumnList = config.getStringList(FileBaseSinkOptions.SINK_COLUMNS.key());
}
// if the config sink_columns is empty, all fields in SeaTunnelRowTypeInfo will being write
if (CollectionUtils.isEmpty(this.sinkColumnList)) {
// construct a new ArrayList, because `list` generated by `Arrays.asList` do not support
// remove and add operations.
this.sinkColumnList =
new ArrayList<>(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames()));
}
if (config.hasPath(FileBaseSinkOptions.PARTITION_BY.key())) {
this.partitionFieldList = config.getStringList(FileBaseSinkOptions.PARTITION_BY.key());
} else {
this.partitionFieldList = Collections.emptyList();
}
if (config.hasPath(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key())
&& !StringUtils.isBlank(
config.getString(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key()))) {
this.partitionDirExpression =
config.getString(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key());
}
if (config.hasPath(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.key())) {
this.isPartitionFieldWriteInFile =
config.getBoolean(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.key());
}
if (config.hasPath(FileBaseSinkOptions.TMP_PATH.key())
&& !StringUtils.isBlank(config.getString(FileBaseSinkOptions.TMP_PATH.key()))) {
this.tmpPath = config.getString(FileBaseSinkOptions.TMP_PATH.key());
}
if (config.hasPath(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key())
&& !StringUtils.isBlank(
config.getString(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key()))) {
this.fileNameTimeFormat =
config.getString(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key());
}
if (config.hasPath(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key())) {
this.isEnableTransaction =
config.getBoolean(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key());
}
if (config.hasPath(FileBaseSinkOptions.ENCODING.key())) {
this.encoding = config.getString(FileBaseSinkOptions.ENCODING.key());
}
if (this.isEnableTransaction
&& !this.fileNameExpression.contains(FileBaseSinkOptions.TRANSACTION_EXPRESSION)) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"file_name_expression must contains "
+ FileBaseSinkOptions.TRANSACTION_EXPRESSION
+ " when is_enable_transaction is true");
}
// check partition field must in seaTunnelRowTypeInfo
if (!CollectionUtils.isEmpty(this.partitionFieldList)
&& (CollectionUtils.isEmpty(this.sinkColumnList)
|| !new HashSet<>(this.sinkColumnList)
.containsAll(this.partitionFieldList))) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"partition fields must in sink columns");
}
if (!CollectionUtils.isEmpty(this.partitionFieldList) && !isPartitionFieldWriteInFile) {
if (!this.sinkColumnList.removeAll(this.partitionFieldList)) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"remove partition field from sink columns error");
}
}
if (CollectionUtils.isEmpty(this.sinkColumnList)) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "sink columns can not be empty");
}
Map<String, Integer> columnsMap =
new HashMap<>(seaTunnelRowTypeInfo.getFieldNames().length);
String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
for (int i = 0; i < fieldNames.length; i++) {
columnsMap.put(fieldNames[i].toLowerCase(), i);
}
// init sink column index and partition field index, we will use the column index to found
// the data in SeaTunnelRow
this.sinkColumnsIndexInRow =
this.sinkColumnList.stream()
.map(column -> columnsMap.get(column.toLowerCase()))
.filter(e -> e != null)
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(this.partitionFieldList)) {
this.partitionFieldsIndexInRow =
this.partitionFieldList.stream()
.map(columnsMap::get)
.collect(Collectors.toList());
}
if (config.hasPath(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY.key())) {
this.maxRowsInMemory = config.getInt(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY.key());
}
if (config.hasPath(FileBaseSinkOptions.SHEET_NAME.key())) {
this.sheetName = config.getString(FileBaseSinkOptions.SHEET_NAME.key());
}
if (FileFormat.XML.equals(this.fileFormat)) {
if (!config.hasPath(FileBaseSinkOptions.XML_USE_ATTR_FORMAT.key())) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"User must define xml_use_attr_format when file_format_type is xml");
}
this.xmlUseAttrFormat =
config.getBoolean(FileBaseSinkOptions.XML_USE_ATTR_FORMAT.key());
if (config.hasPath(FileBaseSinkOptions.XML_ROOT_TAG.key())) {
this.xmlRootTag = config.getString(FileBaseSinkOptions.XML_ROOT_TAG.key());
}
if (config.hasPath(FileBaseSinkOptions.XML_ROW_TAG.key())) {
this.xmlRowTag = config.getString(FileBaseSinkOptions.XML_ROW_TAG.key());
}
}
if (FileFormat.PARQUET.equals(this.fileFormat)) {
if (config.hasPath(FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.key())) {
this.parquetWriteTimestampAsInt96 =
config.getBoolean(
FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.key());
}
if (config.hasPath(FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.key())) {
this.parquetAvroWriteFixedAsInt96 =
config.getStringList(
FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.key());
}
}
if (FileFormat.CSV.equals(this.fileFormat)) {
if (config.hasPath(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.key())) {
this.csvStringQuoteMode =
CsvStringQuoteMode.valueOf(
config.getString(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.key()));
}
}
}