in streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisStreamLoader.java [117:181]
private void checkLableState(String host, String database, String label) throws IOException {
int tries = 0;
while (tries < 10) {
try {
TimeUnit.SECONDS.sleep(Math.min(++tries, 5));
} catch (InterruptedException e) {
return;
}
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(String.format(GET_LOAD_STATUS_URL, host, database, label));
httpGet.setHeader(
HttpHeaders.AUTHORIZATION,
getBasicAuthHeader(dorisConfig.user(), dorisConfig.password()));
httpGet.setHeader("Connection", "close");
try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
final int statusCode = response.getStatusLine().getStatusCode();
String loadResult = "{}";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
if (statusCode != 200) {
throw new LoadStatusFailedException(
String.format(
"Failed to flush data to doris, Error "
+ "could not get the final state of label[%s].\n",
label),
null);
}
Map<String, Object> result = OBJECT_MAPPER.readValue(loadResult, HashMap.class);
String labelState = (String) result.get("state");
if (null == labelState) {
throw new LoadStatusFailedException(
String.format(
"Failed to flush data to doris, Error "
+ "could not get the final state of label[%s]. response[%s]\n",
label, loadResult),
null);
}
LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
switch (labelState) {
case LAEBL_STATE_VISIBLE:
return;
case LAEBL_STATE_COMMITTED:
return;
case RESULT_LABEL_PREPARE:
continue;
case RESULT_LABEL_ABORTED:
throw new LoadStatusFailedException(
String.format(
"Failed to flush data to doris, Error " + "label[%s] state[%s]\n",
label, labelState),
null,
true);
case RESULT_LABEL_UNKNOWN:
default:
throw new LoadStatusFailedException(
String.format(
"Failed to flush data to doris, Error " + "label[%s] state[%s]\n",
label, labelState),
null);
}
}
}
}
}