in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java [86:140]
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Properties loadProperties = executionOptions.getStreamLoadProp();
boolean deletable =
executionOptions.getDeletable()
&& RestService.isUniqueKeyType(options, readOptions, LOG);
if (!loadProperties.containsKey(COLUMNS_KEY)) {
String[] fieldNames = tableSchema.getFieldNames();
Preconditions.checkState(fieldNames != null && fieldNames.length > 0);
String columns =
String.join(
",",
Arrays.stream(fieldNames)
.map(
item ->
String.format(
"`%s`", item.trim().replace("`", "")))
.collect(Collectors.toList()));
if (deletable) {
columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
}
loadProperties.put(COLUMNS_KEY, columns);
}
RowDataSerializer.Builder serializerBuilder = RowDataSerializer.builder();
serializerBuilder
.setFieldNames(tableSchema.getFieldNames())
.setFieldType(tableSchema.getFieldDataTypes())
.setType(loadProperties.getProperty(FORMAT_KEY, CSV))
.enableDelete(deletable)
.setFieldDelimiter(
loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT));
DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
dorisSinkBuilder
.setDorisOptions(options)
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
DorisSink<RowData> dorisSink = dorisSinkBuilder.build();
// for insert overwrite
if (overwrite) {
if (context.isBounded()) {
// execute jdbc query to truncate table
Preconditions.checkArgument(
options.getJdbcUrl() != null, "jdbc-url is required for Overwrite mode.");
// todo: should be written to a temporary table first,
// and then use GlobalCommitter to perform the rename.
truncateTable();
} else {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
}
return SinkV2Provider.of(dorisSink, sinkParallelism);
}