in spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java [383:434]
private Future<StreamLoadResponse> buildReqAndExec(String host, Integer port, CloseableHttpClient client) {
HttpPut httpPut = new HttpPut(URLs.streamLoad(host, port, database, table, isHttpsEnabled));
try {
handleStreamLoadProperties(httpPut);
} catch (OptionRequiredException e) {
throw new RuntimeException("stream load handle properties failed", e);
}
PipedInputStream pipedInputStream = new PipedInputStream(pipeSize);
try {
output = new PipedOutputStream(pipedInputStream);
} catch (IOException e) {
throw new RuntimeException("stream load create output failed", e);
}
HttpEntity entity = new InputStreamEntity(pipedInputStream);
if (isGzipCompressionEnabled) {
entity = new GzipCompressingEntity(entity);
}
httpPut.setEntity(entity);
Thread currentThread = Thread.currentThread();
logger.info("table {}.{} stream load started for {} on host {}:{}", database, table,
currentLabel != null ? currentLabel : "group commit", host, port);
return getExecutors().submit(() -> {
StreamLoadResponse streamLoadResponse = null;
try (CloseableHttpResponse response = client.execute(httpPut)) {
// stream load http request finished unexpectedly
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw new StreamLoadException(
"stream load failed, status: " + response.getStatusLine().getStatusCode()
+ ", reason: " + response.getStatusLine().getReasonPhrase());
}
String entityStr = EntityUtils.toString(response.getEntity());
logger.info("stream load response: " + entityStr);
streamLoadResponse = MAPPER.readValue(entityStr, StreamLoadResponse.class);
if (streamLoadResponse == null
|| streamLoadResponse.getLabel() == null
|| streamLoadResponse.getMessage() == null) {
throw new StreamLoadException("stream load failed, response error : " + entityStr);
} else if (!streamLoadResponse.isSuccess()) {
throw new StreamLoadException(
"stream load failed, txnId: " + streamLoadResponse.getTxnId()
+ ", status: " + streamLoadResponse.getStatus()
+ ", msg: " + streamLoadResponse.getMessage());
}
} catch (Exception e) {
logger.error("stream load exception", e);
unexpectedException = e;
currentThread.interrupt();
}
return streamLoadResponse;
});
}