in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java [154:199]
public DorisSink<String> buildDorisSink(String table) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
String labelPrefix = sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(fenodes)
.setTableIdentifier(database + "." + table)
.setUsername(user)
.setPassword(passwd);
Properties pro = new Properties();
//default json data format
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");
//customer stream load properties
Properties streamLoadProp = DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap());
pro.putAll(streamLoadProp);
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder()
.setLabelPrefix(String.join("-", labelPrefix, database, table))
.setStreamLoadProp(pro);
sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE).ifPresent(executionBuilder::setDeletable);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT).ifPresent(executionBuilder::setBufferCount);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE).ifPresent(executionBuilder::setBufferSize);
sinkConfig.getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL).ifPresent(executionBuilder::setCheckInterval);
sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries);
sinkConfig.getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE).ifPresent(executionBuilder::setIgnoreUpdateBefore);
boolean enable2pc = sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC);
if(!enable2pc){
executionBuilder.disable2PC();
}
DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
.build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
}