private void checkLableState()

in streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisStreamLoader.java [117:181]


  private void checkLableState(String host, String database, String label) throws IOException {
    int tries = 0;
    while (tries < 10) {
      try {
        TimeUnit.SECONDS.sleep(Math.min(++tries, 5));
      } catch (InterruptedException e) {
        return;
      }
      try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
        HttpGet httpGet = new HttpGet(String.format(GET_LOAD_STATUS_URL, host, database, label));
        httpGet.setHeader(
            HttpHeaders.AUTHORIZATION,
            getBasicAuthHeader(dorisConfig.user(), dorisConfig.password()));
        httpGet.setHeader("Connection", "close");
        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
          final int statusCode = response.getStatusLine().getStatusCode();
          String loadResult = "{}";
          if (response.getEntity() != null) {
            loadResult = EntityUtils.toString(response.getEntity());
          }
          if (statusCode != 200) {
            throw new LoadStatusFailedException(
                String.format(
                    "Failed to flush data to doris, Error "
                        + "could not get the final state of label[%s].%n",
                    label),
                null);
          }
          Map<String, Object> result = OBJECT_MAPPER.readValue(loadResult, HashMap.class);
          String labelState = (String) result.get("state");
          if (null == labelState) {
            throw new LoadStatusFailedException(
                String.format(
                    "Failed to flush data to doris, Error "
                        + "could not get the final state of label[%s]. response[%s]%n",
                    label, loadResult),
                null);
          }
          LOG.info(String.format("Checking label[%s] state[%s]%n", label, labelState));
          switch (labelState) {
            case LAEBL_STATE_VISIBLE:
              return;
            case LAEBL_STATE_COMMITTED:
              return;
            case RESULT_LABEL_PREPARE:
              continue;
            case RESULT_LABEL_ABORTED:
              throw new LoadStatusFailedException(
                  String.format(
                      "Failed to flush data to doris, Error " + "label[%s] state[%s]%n",
                      label, labelState),
                  null,
                  true);
            case RESULT_LABEL_UNKNOWN:
            default:
              throw new LoadStatusFailedException(
                  String.format(
                      "Failed to flush data to doris, Error " + "label[%s] state[%s]%n",
                      label, labelState),
                  null);
          }
        }
      }
    }
  }