in mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java [264:418]
public Subscriber<? super MetricData> call(final Subscriber<? super Object> child) {
child.add(Schedulers.computation().createWorker().schedulePeriodically(
new Action0() {
@Override
public void call() {
List<Map<String, GaugeData>> listofAggregates = new ArrayList<>();
synchronized (workersMap) {
for (Map.Entry<Integer, WorkerMetrics> entry : workersMap.entrySet()) {
// get the aggregate metric values by metric group per worker
listofAggregates.add(metricAggregator.getAggregates(entry.getValue().getGaugesByMetricGrp()));
}
}
final int numWorkers = numStageWorkersFn.call(stage);
// get the aggregate metric values by metric group for all workers in stage
Map<String, GaugeData> allWorkerAggregates = getAggregates(listofAggregates);
logger.info("Job stage {} avgResUsage from {} workers: {}", stage, workersMap.size(), allWorkerAggregates.toString());
maybeEmitAutoscalerManagerEvent(numWorkers);
for (Map.Entry<String, Set<String>> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) {
final String metricGrp = userDefinedMetric.getKey();
for (String metric : userDefinedMetric.getValue()) {
if (!allWorkerAggregates.containsKey(metricGrp) || !allWorkerAggregates.get(metricGrp).getGauges().containsKey(metric)) {
logger.debug("no gauge data found for UserDefined (metric={})", userDefinedMetric);
} else {
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.UserDefined, stage,
allWorkerAggregates.get(metricGrp).getGauges().get(metric),
allWorkerAggregates.get(metricGrp).getGauges().get(metric),
numWorkers));
}
}
}
if (allWorkerAggregates.containsKey(KAFKA_CONSUMER_FETCH_MGR_METRIC_GROUP)) {
final Map<String, Double> gauges = allWorkerAggregates.get(KAFKA_CONSUMER_FETCH_MGR_METRIC_GROUP).getGauges();
if (gauges.containsKey(KAFKA_LAG)) {
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.KafkaLag,
stage,
gauges.get(KAFKA_LAG),
gauges.get(KAFKA_LAG),
numWorkers));
}
if (gauges.containsKey(KAFKA_PROCESSED)) {
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.KafkaProcessed,
stage,
gauges.get(KAFKA_PROCESSED),
gauges.get(KAFKA_PROCESSED),
numWorkers));
}
}
if (allWorkerAggregates.containsKey(RESOURCE_USAGE_METRIC_GROUP)) {
// cpuPctUsageCurr is Published as (cpuUsageCurr * 100.0) from ResourceUsagePayloadSetter, reverse transform to retrieve curr cpu usage
double cpuUsageCurr = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.CPU_PCT_USAGE_CURR) / 100.0;
double cpuUsageLimit = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.CPU_PCT_LIMIT) / 100.0;
double cpuUsageEffectiveValue = 100.0 * cpuUsageCurr / cpuUsageLimit;
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.CPU,
stage,
cpuUsageCurr,
cpuUsageEffectiveValue,
numWorkers));
double nwBytesUsageCurr = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.NW_BYTES_USAGE_CURR);
double nwBytesLimit = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.NW_BYTES_LIMIT);
double nwBytesEffectiveValue = 100.0 * nwBytesUsageCurr / nwBytesLimit;
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.Network,
stage,
nwBytesUsageCurr,
nwBytesEffectiveValue,
numWorkers));
// Divide by 1024 * 1024 to account for bytes to MB conversion.
// Making memory usage metric interchangeable with jvm memory usage metric since memory usage is not suitable for autoscaling in a JVM based system.
double memoryUsageInMB = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get("jvmMemoryUsedBytes") / (1024 * 1024);
double memoryLimitInMB = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.MEM_LIMIT);
double effectiveValue = 100.0 * memoryUsageInMB / memoryLimitInMB;
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.Memory,
stage,
memoryUsageInMB,
effectiveValue,
numWorkers));
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.JVMMemory,
stage,
memoryUsageInMB,
effectiveValue,
numWorkers));
}
if (allWorkerAggregates.containsKey(DATA_DROP_METRIC_GROUP)) {
final GaugeData gaugeData = allWorkerAggregates.get(DATA_DROP_METRIC_GROUP);
final Map<String, Double> gauges = gaugeData.getGauges();
if (gauges.containsKey(DROP_PERCENT)) {
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.DataDrop, stage,
gauges.get(DROP_PERCENT),
gauges.get(DROP_PERCENT),
numWorkers));
}
}
if (allWorkerAggregates.containsKey(WORKER_STAGE_INNER_INPUT)) {
final GaugeData gaugeData = allWorkerAggregates.get(WORKER_STAGE_INNER_INPUT);
final Map<String, Double> gauges = gaugeData.getGauges();
if (gauges.containsKey(ON_NEXT_GAUGE)) {
// Divide by 6 to account for 6 second reset by Atlas on counter metric.
jobAutoScaleObserver.onNext(
new JobAutoScaler.Event(
StageScalingPolicy.ScalingReason.RPS,
stage,
gauges.get(ON_NEXT_GAUGE) / 6.0,
gauges.get(ON_NEXT_GAUGE) / 6.0,
numWorkers));
}
}
addScalerEventForSourceJobDrops(numWorkers);
}
}, metricsIntervalSeconds, metricsIntervalSeconds, TimeUnit.SECONDS
));
return new Subscriber<MetricData>() {
@Override
public void onCompleted() {
child.unsubscribe();
}
@Override
public void onError(Throwable e) {
logger.error("Unexpected error: " + e.getMessage(), e);
}
@Override
public void onNext(MetricData metricData) {
logger.debug("Got metric metricData for job " + jobId + " stage " + stage +
", worker " + metricData.getWorkerNumber() + ": " + metricData);
if (jobId.equals(metricData.getJobId())) {
addDataPoint(metricData);
} else {
addSourceJobDataPoint(metricData);
}
}
};
}