in spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java [352:391]
public static List<PartitionDefinition> findPartitions(Settings cfg, Logger logger) throws DorisException {
String[] tableIdentifiers = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") +
" from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
if (!StringUtils.isEmpty(cfg.getProperty(DORIS_FILTER_QUERY))) {
sql += " where " + cfg.getProperty(DORIS_FILTER_QUERY);
}
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger);
for (String feNode: feNodeList) {
try {
HttpPost httpPost = new HttpPost(getUriStr(feNode,cfg, logger) + QUERY_PLAN);
String entity = "{\"sql\": \""+ sql +"\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
stringEntity.setContentEncoding("UTF-8");
stringEntity.setContentType("application/json");
httpPost.setEntity(stringEntity);
String resStr = send(cfg, httpPost, logger);
logger.debug("Find partition response is '{}'.", resStr);
QueryPlan queryPlan = getQueryPlan(resStr, logger);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
return tabletsMapToPartition(
cfg,
be2Tablets,
queryPlan.getOpaqued_query_plan(),
tableIdentifiers[0],
tableIdentifiers[1],
logger);
} catch (ConnectedFailedException e) {
logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
}
}
String errMsg = "No Doris FE is available, please check configuration";
logger.error(errMsg);
throw new DorisException(errMsg);
}