in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java [92:120]
public DorisBatchStreamLoad(DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator) {
this.backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.hostPort = backendUtil.getAvailableBackend();
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.db = tableInfo[0];
this.table = tableInfo[1];
this.username = dorisOptions.getUsername();
this.password = dorisOptions.getPassword();
this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
this.loadProps = executionOptions.getStreamLoadProp();
this.labelGenerator = labelGenerator;
this.lineDelimiter = EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes();
this.executionOptions = executionOptions;
//init queue
this.writeQueue = new ArrayBlockingQueue<>(executionOptions.getFlushQueueSize());
LOG.info("init RecordBuffer capacity {}, count {}", executionOptions.getBufferFlushMaxBytes(), executionOptions.getFlushQueueSize());
for (int index = 0; index < executionOptions.getFlushQueueSize(); index++) {
this.writeQueue.add(new BatchRecordBuffer(this.lineDelimiter, executionOptions.getBufferFlushMaxBytes()));
}
readQueue = new LinkedBlockingDeque<>();
this.loadAsyncExecutor= new LoadAsyncExecutor();
this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new DefaultThreadFactory("streamload-executor"), new ThreadPoolExecutor.AbortPolicy());
this.started = new AtomicBoolean(true);
this.loadExecutorService.execute(loadAsyncExecutor);
}