in flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java [597:647]
public static List<PartitionDefinition> findPartitions(
DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisException {
String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
Preconditions.checkArgument(
tableIdentifiers.length == 2, "table identifier is illegal, should be db.table");
String readFields =
StringUtils.isBlank(readOptions.getReadFields())
? "*"
: readOptions.getReadFields();
String sql =
"select "
+ readFields
+ " from `"
+ tableIdentifiers[0]
+ "`.`"
+ tableIdentifiers[1]
+ "`";
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " where " + readOptions.getFilterQuery();
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
String[] tableIdentifier = parseIdentifier(options.getTableIdentifier(), logger);
String queryPlanUri =
String.format(
QUERY_PLAN_API,
randomEndpoint(options.getFenodes(), logger),
tableIdentifier[0],
tableIdentifier[1]);
HttpPost httpPost = new HttpPost(queryPlanUri);
String entity = "{\"sql\": \"" + sql + "\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
stringEntity.setContentEncoding("UTF-8");
stringEntity.setContentType("application/json");
httpPost.setEntity(stringEntity);
String resStr = send(options, readOptions, httpPost, logger);
logger.debug("Find partition response is '{}'.", resStr);
QueryPlan queryPlan = getQueryPlan(resStr, logger);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
return tabletsMapToPartition(
options,
readOptions,
be2Tablets,
queryPlan.getOpaquedQueryPlan(),
tableIdentifiers[0],
tableIdentifiers[1],
logger);
}