in src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java [130:461]
public void construct() {
Metric heapUsed = new Heap_Used(EVALUATION_INTERVAL_SECONDS);
Metric gcEvent = new GC_Collection_Event(EVALUATION_INTERVAL_SECONDS);
Heap_Max heapMax = new Heap_Max(EVALUATION_INTERVAL_SECONDS);
Metric gc_Collection_Time = new GC_Collection_Time(EVALUATION_INTERVAL_SECONDS);
GC_Type gcType = new GC_Type(EVALUATION_INTERVAL_SECONDS);
Metric cpuUtilizationGroupByOperation =
new AggregateMetric(
1,
CPU_Utilization.NAME,
AggregateMetric.AggregateFunction.SUM,
MetricsDB.AVG,
AllMetrics.CommonDimension.OPERATION.toString());
heapUsed.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
gcEvent.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
gcType.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
heapMax.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
gc_Collection_Time.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
cpuUtilizationGroupByOperation.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
addLeaf(heapUsed);
addLeaf(gcEvent);
addLeaf(gcType);
addLeaf(heapMax);
addLeaf(gc_Collection_Time);
addLeaf(cpuUtilizationGroupByOperation);
// add node stats metrics
List<Metric> nodeStatsMetrics = constructNodeStatsMetrics();
Rca<ResourceFlowUnit<HotResourceSummary>> highHeapUsageOldGenRca =
new HighHeapUsageOldGenRca(
RCA_PERIOD, heapUsed, gcEvent, heapMax, nodeStatsMetrics);
highHeapUsageOldGenRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
List<Node<?>> upstream = new ArrayList<>(Arrays.asList(heapUsed, gcEvent, heapMax));
upstream.addAll(nodeStatsMetrics);
highHeapUsageOldGenRca.addAllUpstreams(upstream);
Rca<ResourceFlowUnit<HotResourceSummary>> highHeapUsageYoungGenRca =
new HighHeapUsageYoungGenRca(
RCA_PERIOD, heapUsed, gc_Collection_Time, gcEvent, gcType);
highHeapUsageYoungGenRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
highHeapUsageYoungGenRca.addAllUpstreams(
Arrays.asList(heapUsed, gc_Collection_Time, gcEvent, gcType));
Rca<ResourceFlowUnit<HotResourceSummary>> highCpuRca =
new HighCpuRca(RCA_PERIOD, cpuUtilizationGroupByOperation);
highCpuRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
highCpuRca.addAllUpstreams(Collections.singletonList(cpuUtilizationGroupByOperation));
Rca<ResourceFlowUnit<HotNodeSummary>> hotJVMNodeRca =
new HotNodeRca(
RCA_PERIOD, highHeapUsageOldGenRca, highHeapUsageYoungGenRca, highCpuRca);
hotJVMNodeRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
hotJVMNodeRca.addAllUpstreams(
Arrays.asList(highHeapUsageOldGenRca, highHeapUsageYoungGenRca, highCpuRca));
HighHeapUsageClusterRca highHeapUsageClusterRca =
new HighHeapUsageClusterRca(RCA_PERIOD, hotJVMNodeRca);
highHeapUsageClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
highHeapUsageClusterRca.addAllUpstreams(Collections.singletonList(hotJVMNodeRca));
highHeapUsageClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
HotNodeClusterRca hotNodeClusterRca = new HotNodeClusterRca(RCA_PERIOD, hotJVMNodeRca);
hotNodeClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
hotNodeClusterRca.addAllUpstreams(Collections.singletonList(hotJVMNodeRca));
final HighOldGenOccupancyRca oldGenOccupancyRca =
new HighOldGenOccupancyRca(heapMax, heapUsed, gcType);
oldGenOccupancyRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
oldGenOccupancyRca.addAllUpstreams(Arrays.asList(heapMax, heapUsed, gcType));
final OldGenReclamationRca oldGenReclamationRca =
new OldGenReclamationRca(heapUsed, heapMax, gcEvent, gcType);
oldGenReclamationRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
oldGenReclamationRca.addAllUpstreams(Arrays.asList(heapUsed, heapMax, gcEvent, gcType));
final OldGenContendedRca oldGenContendedRca =
new OldGenContendedRca(oldGenOccupancyRca, oldGenReclamationRca);
oldGenContendedRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
oldGenContendedRca.addAllUpstreams(Arrays.asList(oldGenOccupancyRca, oldGenReclamationRca));
final LargeHeapClusterRca largeHeapClusterRca = new LargeHeapClusterRca(oldGenContendedRca);
largeHeapClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
largeHeapClusterRca.addAllUpstreams(Collections.singletonList(oldGenContendedRca));
largeHeapClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
// Heap Health Decider
HeapHealthDecider heapHealthDecider =
new HeapHealthDecider(RCA_PERIOD, highHeapUsageClusterRca, largeHeapClusterRca);
heapHealthDecider.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
heapHealthDecider.addAllUpstreams(
Arrays.asList(highHeapUsageClusterRca, largeHeapClusterRca));
/* Queue Rejection RCAs
*/
// TODO: Refactor this monolithic function
Metric threadpool_RejectedReqs = new ThreadPool_RejectedReqs(EVALUATION_INTERVAL_SECONDS);
threadpool_RejectedReqs.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
addLeaf(threadpool_RejectedReqs);
// Node level queue rejection RCA
QueueRejectionRca queueRejectionNodeRca =
new QueueRejectionRca(RCA_PERIOD, threadpool_RejectedReqs);
queueRejectionNodeRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
queueRejectionNodeRca.addAllUpstreams(Collections.singletonList(threadpool_RejectedReqs));
// Cluster level queue rejection RCA
QueueRejectionClusterRca queueRejectionClusterRca =
new QueueRejectionClusterRca(RCA_PERIOD, queueRejectionNodeRca);
queueRejectionClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
queueRejectionClusterRca.addAllUpstreams(Collections.singletonList(queueRejectionNodeRca));
queueRejectionClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
// Queue Health Decider
QueueHealthDecider queueHealthDecider =
new QueueHealthDecider(
EVALUATION_INTERVAL_SECONDS,
12,
queueRejectionClusterRca,
highHeapUsageClusterRca);
queueHealthDecider.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
queueHealthDecider.addAllUpstreams(
Arrays.asList(queueRejectionClusterRca, highHeapUsageClusterRca));
// Node Config Collector
ThreadPool_QueueCapacity queueCapacity = new ThreadPool_QueueCapacity();
queueCapacity.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
addLeaf(queueCapacity);
Cache_Max_Size cacheMaxSize = new Cache_Max_Size(EVALUATION_INTERVAL_SECONDS);
cacheMaxSize.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
addLeaf(cacheMaxSize);
NodeConfigCollector nodeConfigCollector =
new NodeConfigCollector(RCA_PERIOD, queueCapacity, cacheMaxSize, heapMax);
nodeConfigCollector.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
nodeConfigCollector.addAllUpstreams(Arrays.asList(queueCapacity, cacheMaxSize, heapMax));
NodeConfigClusterCollector nodeConfigClusterCollector =
new NodeConfigClusterCollector(nodeConfigCollector);
nodeConfigClusterCollector.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
nodeConfigClusterCollector.addAllUpstreams(Collections.singletonList(nodeConfigCollector));
nodeConfigClusterCollector.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
// Field Data Cache RCA
Metric fieldDataCacheEvictions = new Cache_FieldData_Eviction(EVALUATION_INTERVAL_SECONDS);
fieldDataCacheEvictions.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
addLeaf(fieldDataCacheEvictions);
Metric fieldDataCacheSizeGroupByOperation =
new AggregateMetric(
EVALUATION_INTERVAL_SECONDS,
Cache_FieldData_Size.NAME,
AggregateMetric.AggregateFunction.SUM,
MetricsDB.MAX,
AllMetrics.ShardStatsDerivedDimension.INDEX_NAME.toString());
fieldDataCacheSizeGroupByOperation.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
addLeaf(fieldDataCacheSizeGroupByOperation);
FieldDataCacheRca fieldDataCacheNodeRca =
new FieldDataCacheRca(
RCA_PERIOD, fieldDataCacheEvictions, fieldDataCacheSizeGroupByOperation);
fieldDataCacheNodeRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
fieldDataCacheNodeRca.addAllUpstreams(
Arrays.asList(fieldDataCacheEvictions, fieldDataCacheSizeGroupByOperation));
FieldDataCacheClusterRca fieldDataCacheClusterRca =
new FieldDataCacheClusterRca(RCA_PERIOD, fieldDataCacheNodeRca);
fieldDataCacheClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
fieldDataCacheClusterRca.addAllUpstreams(Collections.singletonList(fieldDataCacheNodeRca));
fieldDataCacheClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
// Shard Request Cache RCA
Metric shardRequestCacheEvictions = new Cache_Request_Eviction(EVALUATION_INTERVAL_SECONDS);
shardRequestCacheEvictions.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
addLeaf(shardRequestCacheEvictions);
Metric shardRequestHits = new Cache_Request_Hit(EVALUATION_INTERVAL_SECONDS);
shardRequestHits.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
addLeaf(shardRequestHits);
Metric shardRequestCacheSizeGroupByOperation =
new AggregateMetric(
EVALUATION_INTERVAL_SECONDS,
Cache_Request_Size.NAME,
AggregateMetric.AggregateFunction.SUM,
MetricsDB.MAX,
AllMetrics.ShardStatsDerivedDimension.INDEX_NAME.toString());
shardRequestCacheSizeGroupByOperation.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
addLeaf(shardRequestCacheSizeGroupByOperation);
ShardRequestCacheRca shardRequestCacheNodeRca =
new ShardRequestCacheRca(
RCA_PERIOD,
shardRequestCacheEvictions,
shardRequestHits,
shardRequestCacheSizeGroupByOperation);
shardRequestCacheNodeRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
shardRequestCacheNodeRca.addAllUpstreams(
Arrays.asList(
shardRequestCacheEvictions,
shardRequestHits,
shardRequestCacheSizeGroupByOperation));
ShardRequestCacheClusterRca shardRequestCacheClusterRca =
new ShardRequestCacheClusterRca(RCA_PERIOD, shardRequestCacheNodeRca);
shardRequestCacheClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
shardRequestCacheClusterRca.addAllUpstreams(
Collections.singletonList(shardRequestCacheNodeRca));
shardRequestCacheClusterRca.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
// Cache Health Decider
CacheHealthDecider cacheHealthDecider =
new CacheHealthDecider(
EVALUATION_INTERVAL_SECONDS,
12,
fieldDataCacheClusterRca,
shardRequestCacheClusterRca,
highHeapUsageClusterRca);
cacheHealthDecider.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
cacheHealthDecider.addAllUpstreams(
Arrays.asList(
fieldDataCacheClusterRca,
shardRequestCacheClusterRca,
highHeapUsageClusterRca));
AdmissionControlDecider admissionControlDecider =
buildAdmissionControlDecider(heapUsed, heapMax);
constructShardResourceUsageGraph();
constructResourceHeatMapGraph();
// Collator - Collects actions from all deciders and aligns impact vectors
Collator collator =
new Collator(
queueHealthDecider,
cacheHealthDecider,
heapHealthDecider,
admissionControlDecider);
collator.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
collator.addAllUpstreams(
Arrays.asList(
queueHealthDecider,
cacheHealthDecider,
heapHealthDecider,
admissionControlDecider));
// Publisher - Executes decisions output from collator
Publisher publisher = new Publisher(EVALUATION_INTERVAL_SECONDS, collator);
publisher.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_MASTER_NODE);
publisher.addAllUpstreams(Collections.singletonList(collator));
// TODO: Refactor using DI to move out of construct method
PluginControllerConfig pluginControllerConfig = new PluginControllerConfig();
PluginController pluginController = new PluginController(pluginControllerConfig, publisher);
pluginController.initPlugins();
}