in src/main/java/org/opensearch/performanceanalyzer/rca/RcaController.java [197:283]
private void start() {
try {
Objects.requireNonNull(subscriptionManager);
Objects.requireNonNull(rcaConf);
if (dbProvider == null) {
return;
}
subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
this.connectedComponents = getRcaGraphComponents(rcaConf);
// Mute the rca nodes after the graph creation and before the scheduler start
readAndUpdateMutedComponentsDuringStart();
ThresholdMain thresholdMain = new ThresholdMain(RcaConsts.THRESHOLDS_PATH, rcaConf);
persistenceProvider = PersistenceFactory.create(rcaConf);
networkThreadPoolReference.set(
RcaControllerHelper.buildNetworkThreadPool(rcaConf.getNetworkQueueLength()));
addRcaRequestHandler();
queryRcaRequestHandler.setPersistable(persistenceProvider);
addActionsRequestHandler();
queryActionRequestHandler.setPersistable(persistenceProvider);
receivedFlowUnitStore = new ReceivedFlowUnitStore(rcaConf.getPerVertexBufferLength());
WireHopper net =
new WireHopper(
nodeStateManager,
rcaNetClient,
subscriptionManager,
networkThreadPoolReference,
receivedFlowUnitStore,
appContext);
// RcaScheduler should be started with a snapshot of the AppContext as RcaController
// monitors it for stale state and always restarts the scheduler if it finds its state
// stale.
AppContext copyAppContext = new AppContext(this.appContext);
this.rcaScheduler =
new RCAScheduler(
connectedComponents,
dbProvider,
rcaConf,
thresholdMain,
persistenceProvider,
net,
copyAppContext);
rcaNetServer.setSendDataHandler(
new PublishRequestHandler(
nodeStateManager, receivedFlowUnitStore, networkThreadPoolReference));
rcaNetServer.setSubscribeHandler(
new SubscribeServerHandler(subscriptionManager, networkThreadPoolReference));
Thread rcaSchedulerThread =
threadProvider.createThreadForRunnable(
() -> rcaScheduler.start(),
PerformanceAnalyzerThreads.RCA_SCHEDULER,
copyAppContext.getMyInstanceDetails().getInstanceId().toString());
CountDownLatch schedulerStartLatch = new CountDownLatch(1);
rcaScheduler.setSchedulerTrackingLatch(schedulerStartLatch);
rcaSchedulerThread.start();
if (!schedulerStartLatch.await(WAIT_FOR_SCHED_START_SECS, TimeUnit.SECONDS)) {
LOG.error("Failed to start RcaScheduler.");
throw new IllegalStateException(
"Failed to start RcaScheduler within "
+ WAIT_FOR_SCHED_START_SECS
+ " seconds.");
}
if (rcaScheduler.getState() != RcaSchedulerState.STATE_STARTED) {
LOG.error(
"RCA scheduler didn't start within {} seconds", WAIT_FOR_SCHED_START_SECS);
}
} catch (ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException
| InstantiationException
| IllegalAccessException
| MalformedConfig
| SQLException
| IOException e) {
LOG.error("Couldn't build connected components or persistable..", e);
} catch (Exception ex) {
LOG.error("Couldn't start RcaController", ex);
}
}