in spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java [198:243]
public int load(String value, Boolean enable2PC) throws StreamLoadException {
String label = generateLoadLabel();
LoadResponse loadResponse;
int responseHttpStatus = -1;
try (CloseableHttpClient httpClient = getHttpClient()) {
String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
LOG.debug("Stream load Request:{} ,Body:{}", loadUrlStr, value);
// only to record the BE node in case of an exception
this.loadUrlStr = loadUrlStr;
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC);
httpPut.setEntity(new StringEntity(value, StandardCharsets.UTF_8));
HttpResponse httpResponse = httpClient.execute(httpPut);
responseHttpStatus = httpResponse.getStatusLine().getStatusCode();
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
String response = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
loadResponse = new LoadResponse(responseHttpStatus, respMsg, response);
} catch (IOException e) {
e.printStackTrace();
String err = "http request exception,load url : " + loadUrlStr + ",failed to execute spark stream load with label: " + label;
LOG.warn(err, e);
loadResponse = new LoadResponse(responseHttpStatus, e.getMessage(), err);
}
if (loadResponse.status != HttpStatus.SC_OK) {
LOG.info("Stream load Response HTTP Status Error:{}", loadResponse);
// throw new StreamLoadException("stream load error: " + loadResponse.respContent);
throw new StreamLoadException("stream load error");
} else {
ObjectMapper obj = new ObjectMapper();
try {
RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
LOG.error("Stream load Response RES STATUS Error:{}", loadResponse);
throw new StreamLoadException("stream load error");
}
LOG.info("Stream load Response:{}", loadResponse);
return respContent.getTxnId();
} catch (IOException e) {
throw new StreamLoadException(e);
}
}
}