in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java [104:157]
public DorisStreamLoad(
String hostPort,
DorisOptions dorisOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator,
CloseableHttpClient httpClient) {
this.hostPort = hostPort;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.db = tableInfo[0];
this.table = tableInfo[1];
this.user = dorisOptions.getUsername();
this.passwd = dorisOptions.getPassword();
this.labelGenerator = labelGenerator;
this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
this.enable2PC = executionOptions.enabled2PC();
this.streamLoadProp = executionOptions.getStreamLoadProp();
this.enableDelete = executionOptions.getDeletable();
this.httpClient = httpClient;
String threadName =
String.format(
"stream-load-upload-%s-%s",
labelGenerator.getSubtaskId(), labelGenerator.getTableIdentifier());
this.executorService =
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ExecutorThreadFactory(threadName));
this.recordStream =
new RecordStream(
executionOptions.getBufferSize(),
executionOptions.getBufferCount(),
executionOptions.isUseCache());
if (streamLoadProp.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
lineDelimiter = null;
} else {
lineDelimiter =
EscapeHandler.escapeString(
streamLoadProp.getProperty(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
this.enableGroupCommit =
streamLoadProp.containsKey(GROUP_COMMIT)
&& !streamLoadProp
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
this.enableGzCompress =
streamLoadProp.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ);
loadBatchFirstRecord = true;
}