in src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java [72:132]
public void load(String label, RecordBuffer buffer) throws IOException {
if (enableGroupCommit) {
label = null;
}
refreshLoadUrl(database, table);
String data = buffer.getData();
ByteArrayEntity entity = new ByteArrayEntity(data.getBytes(StandardCharsets.UTF_8));
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder
.setUrl(loadUrl)
.baseAuth(user, password)
.setLabel(label)
.addCommonHeader()
.setEntity(entity)
.addHiddenColumns(dorisOptions.isEnableDelete())
.enable2PC(dorisOptions.enable2PC())
.addProperties(dorisOptions.getStreamLoadProp());
if (enableGroupCommit) {
LOG.info("stream load started with group commit on host {}", hostPort);
} else {
LOG.info("stream load started for {} on host {}", label, hostPort);
}
try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
KafkaRespContent respContent =
OBJECT_MAPPER.readValue(loadResult, KafkaRespContent.class);
if (respContent == null || respContent.getMessage() == null) {
throw new StreamLoadException("response error : " + loadResult);
}
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(), respContent.getErrorURL());
throw new StreamLoadException(errMsg);
}
respContent.setDatabase(database);
respContent.setTable(table);
respContent.setLastOffset(buffer.getLastOffset());
respContent.setTopic(topic);
respContents.add(respContent);
}
} catch (Exception ex) {
String err;
if (enableGroupCommit) {
err = "failed to stream load data with group commit";
} else {
err = "failed to stream load data with label: " + label;
}
LOG.warn(err, ex);
throw new StreamLoadException(err, ex);
}
}