public Map fetchLabel2Status()

in src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java [112:139]


    public Map<String, String> fetchLabel2Status() {
        String querySQL =
                String.format(
                        TRANSACTION_LABEL_PATTEN,
                        dorisOptions.getDatabase(),
                        labelGenerator.buildLabelPrefix());
        LOG.info("query doris offset by sql: {}", querySQL);
        Map<String, String> label2Status = new HashMap<>();
        try (Connection connection = connectionProvider.getOrEstablishConnection();
                PreparedStatement ps = connection.prepareStatement(querySQL);
                ResultSet rs = ps.executeQuery()) {
            while (rs.next()) {
                String label = rs.getString("Label");
                String transactionStatus = rs.getString("TransactionStatus");
                label2Status.put(label, transactionStatus);
            }
        } catch (Exception e) {
            LOG.warn(
                    "Unable to obtain the label generated when importing data through stream load from doris, "
                            + "causing the doris kafka connector to not guarantee exactly once.",
                    e);
            throw new StreamLoadException(
                    "Unable to obtain the label generated when importing data through stream load from doris, "
                            + "causing the doris kafka connector to not guarantee exactly once.",
                    e);
        }
        return label2Status;
    }