in spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java [219:257]
private void executeCopyInto(Frontend frontend, CloseableHttpClient httpClient, String fileName) throws OptionRequiredException, IOException, CopyIntoException {
String tableIdentifier = config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER);
Properties props = new Properties();
properties.forEach(props::setProperty);
CopySQLBuilder copySQLBuilder = new CopySQLBuilder(format, props, tableIdentifier, fileName);
String copySql = copySQLBuilder.buildCopySQL();
LOG.info("build copy sql is " + copySql);
ObjectNode objectNode = MAPPER.createObjectNode();
objectNode.put("sql", copySql);
String queryUrl = URLs.copyIntoQuery(frontend.getHost(), frontend.getHttpPort(), false);
HttpPost queryReq = new HttpPostBuilder().setUrl(queryUrl)
.baseAuth(getAuthEncoded())
.setEntity(new StringEntity(MAPPER.writeValueAsString(objectNode))).build();
CloseableHttpResponse queryRes = httpClient.execute(queryReq);
int statusCode = queryRes.getStatusLine().getStatusCode();
String statusMessage = queryRes.getStatusLine().getReasonPhrase();
String responseContent = EntityUtils.toString(new BufferedHttpEntity(queryRes.getEntity()), StandardCharsets.UTF_8);
CopyIntoResponse loadResponse = new CopyIntoResponse(statusCode, statusMessage, responseContent);
if (loadResponse.getCode() != HttpStatus.SC_OK) {
LOG.error(String.format("Execute copy sql status is not OK, status: %d, response: %s",
loadResponse.getCode(), loadResponse));
throw new CopyIntoException(String.format("Execute copy sql, http status:%d, response:%s",
loadResponse.getCode(), loadResponse));
} else {
try {
RespContent responseContentObject = MAPPER.readValue(loadResponse.getContent(), RespContent.class);
if (!responseContentObject.isCopyIntoSuccess()) {
LOG.error(String.format("Execute copy sql status is not success, status: %s, response: %s",
responseContentObject.getStatus(), loadResponse));
throw new CopyIntoException(String.format("Execute copy sql error, load status:%s, response:%s",
responseContentObject.getStatus(), loadResponse));
}
LOG.info("Execute copy sql Response: {}", loadResponse);
} catch (IOException e) {
throw new CopyIntoException(e);
}
}
}