in src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java [56:106]
public void initialize(ThreadPool threadPool, ClusterService clusterService, Client client) {
this.threadPool = threadPool;
this.clusterService = clusterService;
this.client = client;
NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
Runnable runnable = () -> {
if (nativeMemoryCacheManager.isCacheCapacityReached() && clusterService.localNode().isDataNode()) {
long currentSizeKiloBytes = nativeMemoryCacheManager.getCacheSizeInKilobytes();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.getCircuitBreakerLimit().getKb();
long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNSettings.getCircuitBreakerUnsetPercentage()/100)
* circuitBreakerLimitSizeKiloBytes);
/**
* Unset capacityReached flag if currentSizeBytes is less than circuitBreakerUnsetSizeBytes
*/
if (currentSizeKiloBytes <= circuitBreakerUnsetSizeKiloBytes) {
nativeMemoryCacheManager.setCacheCapacityReached(false);
}
}
// Leader node untriggers CB if all nodes have not reached their max capacity
if (KNNSettings.isCircuitBreakerTriggered() && clusterService.state().nodes().isLocalNodeElectedMaster()) {
KNNStatsRequest knnStatsRequest = new KNNStatsRequest(KNNStatsConfig.KNN_STATS.keySet());
knnStatsRequest.addStat(StatNames.CACHE_CAPACITY_REACHED.getName());
knnStatsRequest.timeout(new TimeValue(1000*10)); // 10 second timeout
try {
KNNStatsResponse knnStatsResponse = client.execute(KNNStatsAction.INSTANCE, knnStatsRequest).get();
List<KNNStatsNodeResponse> nodeResponses = knnStatsResponse.getNodes();
List<String> nodesAtMaxCapacity = new ArrayList<>();
for (KNNStatsNodeResponse nodeResponse : nodeResponses) {
if ((Boolean) nodeResponse.getStatsMap().get(StatNames.CACHE_CAPACITY_REACHED.getName())) {
nodesAtMaxCapacity.add(nodeResponse.getNode().getId());
}
}
if (!nodesAtMaxCapacity.isEmpty()) {
logger.info("[KNN] knn.circuit_breaker.triggered stays set. Nodes at max cache capacity: "
+ String.join(",", nodesAtMaxCapacity) + ".");
} else {
logger.info("[KNN] Cache capacity below 75% of the circuit breaker limit for all nodes." +
" Unsetting knn.circuit_breaker.triggered flag.");
KNNSettings.state().updateCircuitBreakerSettings(false);
}
} catch (Exception e) {
logger.error("[KNN] Exception getting stats: " + e);
}
}
};
this.threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueSeconds(CB_TIME_INTERVAL), ThreadPool.Names.GENERIC);
}