public Subscriber call()

in mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java [264:418]


        public Subscriber<? super MetricData> call(final Subscriber<? super Object> child) {
            child.add(Schedulers.computation().createWorker().schedulePeriodically(
                    new Action0() {
                        @Override
                        public void call() {

                            List<Map<String, GaugeData>> listofAggregates = new ArrayList<>();

                            synchronized (workersMap) {
                                for (Map.Entry<Integer, WorkerMetrics> entry : workersMap.entrySet()) {
                                    // get the aggregate metric values by metric group per worker
                                    listofAggregates.add(metricAggregator.getAggregates(entry.getValue().getGaugesByMetricGrp()));
                                }
                            }
                            final int numWorkers = numStageWorkersFn.call(stage);
                            // get the aggregate metric values by metric group for all workers in stage
                            Map<String, GaugeData> allWorkerAggregates = getAggregates(listofAggregates);
                            logger.info("Job stage {} avgResUsage from {} workers: {}", stage, workersMap.size(), allWorkerAggregates.toString());
                            maybeEmitAutoscalerManagerEvent(numWorkers);

                            for (Map.Entry<String, Set<String>> userDefinedMetric : autoScaleMetricsConfig.getUserDefinedMetrics().entrySet()) {
                                final String metricGrp = userDefinedMetric.getKey();
                                for (String metric : userDefinedMetric.getValue()) {
                                    if (!allWorkerAggregates.containsKey(metricGrp) || !allWorkerAggregates.get(metricGrp).getGauges().containsKey(metric)) {
                                        logger.debug("no gauge data found for UserDefined (metric={})", userDefinedMetric);
                                    } else {
                                        jobAutoScaleObserver.onNext(
                                            new JobAutoScaler.Event(
                                                StageScalingPolicy.ScalingReason.UserDefined, stage,
                                                allWorkerAggregates.get(metricGrp).getGauges().get(metric),
                                                allWorkerAggregates.get(metricGrp).getGauges().get(metric),
                                                numWorkers));
                                    }
                                }
                            }
                            if (allWorkerAggregates.containsKey(KAFKA_CONSUMER_FETCH_MGR_METRIC_GROUP)) {
                                final Map<String, Double> gauges = allWorkerAggregates.get(KAFKA_CONSUMER_FETCH_MGR_METRIC_GROUP).getGauges();
                                if (gauges.containsKey(KAFKA_LAG)) {
                                    jobAutoScaleObserver.onNext(
                                            new JobAutoScaler.Event(
                                                StageScalingPolicy.ScalingReason.KafkaLag,
                                                stage,
                                                gauges.get(KAFKA_LAG),
                                                gauges.get(KAFKA_LAG),
                                                numWorkers));
                                }
                                if (gauges.containsKey(KAFKA_PROCESSED)) {
                                    jobAutoScaleObserver.onNext(
                                            new JobAutoScaler.Event(
                                                StageScalingPolicy.ScalingReason.KafkaProcessed,
                                                stage,
                                                gauges.get(KAFKA_PROCESSED),
                                                gauges.get(KAFKA_PROCESSED),
                                                numWorkers));
                                }
                            }
                            if (allWorkerAggregates.containsKey(RESOURCE_USAGE_METRIC_GROUP)) {
                                // cpuPctUsageCurr is Published as (cpuUsageCurr * 100.0) from ResourceUsagePayloadSetter, reverse transform to retrieve curr cpu usage
                                double cpuUsageCurr = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.CPU_PCT_USAGE_CURR) / 100.0;
                                double cpuUsageLimit = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.CPU_PCT_LIMIT) / 100.0;
                                double cpuUsageEffectiveValue = 100.0 * cpuUsageCurr / cpuUsageLimit;
                                jobAutoScaleObserver.onNext(
                                    new JobAutoScaler.Event(
                                        StageScalingPolicy.ScalingReason.CPU,
                                        stage,
                                        cpuUsageCurr,
                                        cpuUsageEffectiveValue,
                                        numWorkers));

                                double nwBytesUsageCurr = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.NW_BYTES_USAGE_CURR);
                                double nwBytesLimit = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.NW_BYTES_LIMIT);
                                double nwBytesEffectiveValue = 100.0 * nwBytesUsageCurr / nwBytesLimit;
                                jobAutoScaleObserver.onNext(
                                        new JobAutoScaler.Event(
                                            StageScalingPolicy.ScalingReason.Network,
                                            stage,
                                            nwBytesUsageCurr,
                                            nwBytesEffectiveValue,
                                            numWorkers));
                                // Divide by 1024 * 1024 to account for bytes to MB conversion.
                                // Making memory usage metric interchangeable with jvm memory usage metric since memory usage is not suitable for autoscaling in a JVM based system.
                                double memoryUsageInMB = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get("jvmMemoryUsedBytes") / (1024 * 1024);
                                double memoryLimitInMB = allWorkerAggregates.get(RESOURCE_USAGE_METRIC_GROUP).getGauges().get(MetricStringConstants.MEM_LIMIT);
                                double effectiveValue = 100.0 * memoryUsageInMB / memoryLimitInMB;
                                jobAutoScaleObserver.onNext(
                                    new JobAutoScaler.Event(
                                        StageScalingPolicy.ScalingReason.Memory,
                                        stage,
                                        memoryUsageInMB,
                                        effectiveValue,
                                        numWorkers));
                                jobAutoScaleObserver.onNext(
                                    new JobAutoScaler.Event(
                                        StageScalingPolicy.ScalingReason.JVMMemory,
                                        stage,
                                        memoryUsageInMB,
                                        effectiveValue,
                                        numWorkers));
                            }

                            if (allWorkerAggregates.containsKey(DATA_DROP_METRIC_GROUP)) {
                                final GaugeData gaugeData = allWorkerAggregates.get(DATA_DROP_METRIC_GROUP);
                                final Map<String, Double> gauges = gaugeData.getGauges();
                                if (gauges.containsKey(DROP_PERCENT)) {
                                    jobAutoScaleObserver.onNext(
                                        new JobAutoScaler.Event(
                                            StageScalingPolicy.ScalingReason.DataDrop, stage,
                                            gauges.get(DROP_PERCENT),
                                            gauges.get(DROP_PERCENT),
                                            numWorkers));
                                }
                            }

                            if (allWorkerAggregates.containsKey(WORKER_STAGE_INNER_INPUT)) {
                                final GaugeData gaugeData = allWorkerAggregates.get(WORKER_STAGE_INNER_INPUT);
                                final Map<String, Double> gauges = gaugeData.getGauges();
                                if (gauges.containsKey(ON_NEXT_GAUGE)) {
                                    // Divide by 6 to account for 6 second reset by Atlas on counter metric.
                                    jobAutoScaleObserver.onNext(
                                        new JobAutoScaler.Event(
                                            StageScalingPolicy.ScalingReason.RPS,
                                            stage,
                                            gauges.get(ON_NEXT_GAUGE) / 6.0,
                                            gauges.get(ON_NEXT_GAUGE) / 6.0,
                                            numWorkers));
                                }
                            }

                            addScalerEventForSourceJobDrops(numWorkers);
                        }
                    }, metricsIntervalSeconds, metricsIntervalSeconds, TimeUnit.SECONDS
            ));
            return new Subscriber<MetricData>() {
                @Override
                public void onCompleted() {
                    child.unsubscribe();
                }

                @Override
                public void onError(Throwable e) {
                    logger.error("Unexpected error: " + e.getMessage(), e);
                }

                @Override
                public void onNext(MetricData metricData) {
                    logger.debug("Got metric metricData for job " + jobId + " stage " + stage +
                            ", worker " + metricData.getWorkerNumber() + ": " + metricData);
                    if (jobId.equals(metricData.getJobId())) {
                        addDataPoint(metricData);
                    } else {
                        addSourceJobDataPoint(metricData);
                    }
                }
            };
        }