in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [118:176]
public DorisBatchStreamLoad(
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator,
int subTaskId) {
this.backendUtil =
StringUtils.isNotEmpty(dorisOptions.getBenodes())
? new BackendUtil(dorisOptions.getBenodes())
: new BackendUtil(
RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.hostPort = backendUtil.getAvailableBackend();
this.username = dorisOptions.getUsername();
this.password = dorisOptions.getPassword();
this.loadProps = executionOptions.getStreamLoadProp();
this.labelGenerator = labelGenerator;
if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
this.lineDelimiter = null;
} else {
this.lineDelimiter =
EscapeHandler.escapeString(
loadProps.getProperty(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
this.enableGroupCommit =
loadProps.containsKey(GROUP_COMMIT)
&& !loadProps
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ);
this.executionOptions = executionOptions;
this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
// maxBlockedBytes ensures that a buffer can be written even if the queue is full
this.maxBlockedBytes =
(long) executionOptions.getBufferFlushMaxBytes()
* (executionOptions.getFlushQueueSize() + 1);
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Preconditions.checkState(
tableInfo.length == 2,
"tableIdentifier input error, the format is database.table");
this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]);
}
this.loadAsyncExecutor = new LoadAsyncExecutor(executionOptions.getFlushQueueSize());
this.loadExecutorService =
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new DefaultThreadFactory("streamload-executor"),
new ThreadPoolExecutor.AbortPolicy());
this.started = new AtomicBoolean(true);
this.loadExecutorService.execute(loadAsyncExecutor);
this.subTaskId = subTaskId;
this.httpClientBuilder = new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch();
}