in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java [81:116]
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Properties loadProperties = executionOptions.getStreamLoadProp();
boolean deletable = executionOptions.getDeletable() && RestService.isUniqueKeyType(options, readOptions, LOG);
if (!loadProperties.containsKey(COLUMNS_KEY)) {
String[] fieldNames = tableSchema.getFieldNames();
Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
String columns = String.join(",", Arrays.stream(fieldNames).map(item -> String.format("`%s`", item.trim().replace("`", ""))).collect(Collectors.toList()));
if (deletable) {
columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
}
loadProperties.put(COLUMNS_KEY, columns);
}
RowDataSerializer.Builder serializerBuilder = RowDataSerializer.builder();
serializerBuilder.setFieldNames(tableSchema.getFieldNames())
.setFieldType(tableSchema.getFieldDataTypes())
.setType(loadProperties.getProperty(FORMAT_KEY, CSV))
.enableDelete(deletable)
.setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT));
if(!executionOptions.enableBatchMode()){
DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
dorisSinkBuilder.setDorisOptions(options)
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
}else{
DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder = DorisBatchSink.builder();
dorisBatchSinkBuilder.setDorisOptions(options)
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
return SinkV2Provider.of(dorisBatchSinkBuilder.build(), sinkParallelism);
}
}