in spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java [96:147]
private List<Frontend> initFrontends(DorisConfig config) throws Exception {
String frontendNodes = config.getValue(DorisOptions.DORIS_FENODES);
String[] frontendNodeArray = frontendNodes.split(",");
if (config.getValue(DorisOptions.DORIS_FE_AUTO_FETCH)) {
Exception ex = null;
List<Frontend> frontendList = null;
for (String frontendNode : frontendNodeArray) {
String[] nodeDetails = frontendNode.split(":");
try {
List<Frontend> list = Collections.singletonList(new Frontend(nodeDetails[0],
nodeDetails.length > 1 ? Integer.parseInt(nodeDetails[1]) : -1));
frontendList = requestFrontends(list, (frontend, client) -> {
String url = URLs.getFrontEndNodes(frontend.getHost(), frontend.getHttpPort(),
isHttpsEnabled);
HttpGet httpGet = new HttpGet(url);
HttpUtils.setAuth(httpGet, username, password);
JsonNode dataNode;
try {
HttpResponse response = client.execute(httpGet);
dataNode = extractDataFromResponse(response, url);
} catch (IOException e) {
throw new RuntimeException("fetch fe failed", e);
}
ArrayNode columnNames = (ArrayNode) dataNode.get("columnNames");
ArrayNode rows = (ArrayNode) dataNode.get("rows");
return parseFrontends(columnNames, rows);
});
} catch (Exception e) {
LOG.warn("fetch fe request on {} failed, err: {}", frontendNode, e.getMessage());
ex = e;
}
}
if (frontendList == null || frontendList.isEmpty()) {
if (ex == null) {
throw new DorisException("frontend init fetch failed, empty frontend list");
}
throw new DorisException("frontend init fetch failed", ex);
}
return frontendList;
} else {
int queryPort = config.contains(DorisOptions.DORIS_QUERY_PORT) ?
config.getValue(DorisOptions.DORIS_QUERY_PORT) : -1;
int flightSqlPort = config.contains(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) ?
config.getValue(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) : -1;
return Arrays.stream(frontendNodeArray)
.map(node -> {
String[] nodeParts = node.split(":");
return new Frontend(nodeParts[0], nodeParts.length > 1 ? Integer.parseInt(nodeParts[1]) : -1, queryPort, flightSqlPort);
})
.collect(Collectors.toList());
}
}