in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java [241:336]
public DorisSink<String> buildDorisSink(String tableIdentifier) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setJdbcUrl(jdbcUrl)
.setFenodes(fenodes)
.setBenodes(benodes)
.setUsername(user)
.setPassword(passwd);
sinkConfig
.getOptional(DorisConfigOptions.AUTO_REDIRECT)
.ifPresent(dorisBuilder::setAutoRedirect);
// single sink not need table identifier
if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(tableIdentifier)) {
dorisBuilder.setTableIdentifier(tableIdentifier);
}
Properties pro = new Properties();
// default json data format
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");
// customer stream load properties
Properties streamLoadProp = DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap());
pro.putAll(streamLoadProp);
DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder().setStreamLoadProp(pro);
sinkConfig
.getOptional(DorisConfigOptions.SINK_LABEL_PREFIX)
.ifPresent(executionBuilder::setLabelPrefix);
sinkConfig
.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE)
.ifPresent(executionBuilder::setDeletable);
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT)
.ifPresent(executionBuilder::setBufferCount);
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE)
.ifPresent(v -> executionBuilder.setBufferSize((int) v.getBytes()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL)
.ifPresent(v -> executionBuilder.setCheckInterval((int) v.toMillis()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_MAX_RETRIES)
.ifPresent(executionBuilder::setMaxRetries);
sinkConfig
.getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE)
.ifPresent(executionBuilder::setIgnoreUpdateBefore);
if (!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)) {
executionBuilder.disable2PC();
} else if (sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()) {
// force open 2pc
executionBuilder.enable2PC();
}
sinkConfig
.getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)
.ifPresent(executionBuilder::setBatchMode);
sinkConfig
.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE)
.ifPresent(executionBuilder::setFlushQueueSize);
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS)
.ifPresent(executionBuilder::setBufferFlushMaxRows);
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES)
.ifPresent(v -> executionBuilder.setBufferFlushMaxBytes((int) v.getBytes()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL)
.ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));
sinkConfig
.getOptional(DorisConfigOptions.SINK_USE_CACHE)
.ifPresent(executionBuilder::setUseCache);
sinkConfig
.getOptional(DorisConfigOptions.SINK_WRITE_MODE)
.ifPresent(v -> executionBuilder.setWriteMode(WriteMode.of(v)));
sinkConfig
.getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR)
.ifPresent(executionBuilder::setIgnoreCommitError);
DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(buildSchemaSerializer(dorisBuilder, executionOptions))
.setDorisOptions(dorisBuilder.build());
return builder.build();
}