in src/main/java/org/opensearch/performanceanalyzer/collectors/MasterServiceEventMetrics.java [98:181]
public void collectMetrics(long startTime) {
try {
if (OpenSearchResources.INSTANCE.getClusterService() == null
|| OpenSearchResources.INSTANCE.getClusterService().getMasterService()
== null) {
return;
}
value.setLength(0);
Queue<Runnable> current = getMasterServiceCurrentQueue();
if (current == null || current.size() == 0) {
generateFinishMetrics(startTime);
return;
}
List<PrioritizedOpenSearchThreadPoolExecutor.Pending> pending = new ArrayList<>();
Object[] parameters = new Object[TPEXECUTOR_ADD_PENDING_PARAM_COUNT];
parameters[0] = new ArrayList<>(current);
parameters[1] = pending;
parameters[2] = true;
getPrioritizedTPExecutorAddPendingMethod()
.invoke(prioritizedOpenSearchThreadPoolExecutor, parameters);
if (pending.size() != 0) {
PrioritizedOpenSearchThreadPoolExecutor.Pending firstPending = pending.get(0);
if (lastTaskInsertionOrder != firstPending.insertionOrder) {
generateFinishMetrics(startTime);
SourcePrioritizedRunnable task = (SourcePrioritizedRunnable) firstPending.task;
lastTaskInsertionOrder = firstPending.insertionOrder;
int firstSpaceIndex = task.source().indexOf(" ");
value.append(PerformanceAnalyzerMetrics.getCurrentTimeMetric());
PerformanceAnalyzerMetrics.addMetricEntry(
value,
MasterMetricDimensions.MASTER_TASK_PRIORITY.toString(),
firstPending.priority.toString());
// - as it is sampling, we won't exactly know the start time of the current
// task, we will be
// - capturing start time as midpoint of previous time bucket
PerformanceAnalyzerMetrics.addMetricEntry(
value,
MasterMetricValues.START_TIME.toString(),
startTime - SAMPLING_TIME_INTERVAL / 2);
PerformanceAnalyzerMetrics.addMetricEntry(
value,
MasterMetricDimensions.MASTER_TASK_TYPE.toString(),
firstSpaceIndex == -1
? task.source()
: task.source().substring(0, firstSpaceIndex));
PerformanceAnalyzerMetrics.addMetricEntry(
value,
MasterMetricDimensions.MASTER_TASK_METADATA.toString(),
firstSpaceIndex == -1 ? "" : task.source().substring(firstSpaceIndex));
PerformanceAnalyzerMetrics.addMetricEntry(
value,
MasterMetricDimensions.MASTER_TASK_QUEUE_TIME.toString(),
task.getAgeInMillis());
saveMetricValues(
value.toString(),
startTime,
String.valueOf(getMasterThreadId()),
String.valueOf(lastTaskInsertionOrder),
PerformanceAnalyzerMetrics.START_FILE_NAME);
value.setLength(0);
}
} else {
generateFinishMetrics(startTime);
}
LOG.debug(() -> "Successfully collected Master Event Metrics.");
} catch (Exception ex) {
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.MASTER_METRICS_ERROR, "", 1);
LOG.debug(
"Exception in Collecting Master Metrics: {} for startTime {} with ExceptionCode: {}",
() -> ex.toString(),
() -> startTime,
() -> StatExceptionCode.MASTER_METRICS_ERROR.toString());
}
}