in flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java [110:184]
public boolean scale(FlinkResourceContext<?> ctx) {
var conf = ctx.getObserveConfig();
var resource = ctx.getResource();
var resourceId = ResourceID.fromResource(resource);
var flinkMetrics = getOrInitAutoscalerFlinkMetrics(ctx, resourceId);
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics = null;
try {
if (resource.getSpec().getJob() == null || !conf.getBoolean(AUTOSCALER_ENABLED)) {
LOG.debug("Job autoscaler is disabled");
return false;
}
// Initialize metrics only if autoscaler is enabled
var status = resource.getStatus();
if (status.getLifecycleState() != ResourceLifecycleState.STABLE
|| !status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
LOG.info("Job autoscaler is waiting for RUNNING job state");
lastEvaluatedMetrics.remove(resourceId);
return false;
}
var autoScalerInfo = infoManager.getOrCreateInfo(resource);
var collectedMetrics =
metricsCollector.updateMetrics(
resource, autoScalerInfo, ctx.getFlinkService(), conf);
if (collectedMetrics.getMetricHistory().isEmpty()) {
autoScalerInfo.replaceInKubernetes(kubernetesClient);
return false;
}
LOG.debug("Collected metrics: {}", collectedMetrics);
evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
initRecommendedParallelism(evaluatedMetrics);
if (!collectedMetrics.isFullyCollected()) {
// We have done an upfront evaluation, but we are not ready for scaling.
resetRecommendedParallelism(evaluatedMetrics);
autoScalerInfo.replaceInKubernetes(kubernetesClient);
return false;
}
var specAdjusted =
scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics);
if (specAdjusted) {
flinkMetrics.numScalings.inc();
} else {
flinkMetrics.numBalanced.inc();
}
autoScalerInfo.replaceInKubernetes(kubernetesClient);
return specAdjusted;
} catch (Throwable e) {
LOG.error("Error while scaling resource", e);
flinkMetrics.numErrors.inc();
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Warning,
EventRecorder.Reason.AutoscalerError,
EventRecorder.Component.Operator,
e.getMessage());
return false;
} finally {
if (evaluatedMetrics != null) {
lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
flinkMetrics.registerScalingMetrics(() -> lastEvaluatedMetrics.get(resourceId));
}
}
}