in mantis-jm-akka/src/main/java/io/mantisrx/server/worker/jobmaster/akka/rules/ScalerControllerActor.java [147:255]
private void startScalerService(CoordinatorActor.ActivateRuleRequest activateScalerRequest) {
log.info("[EVENT ACTIVATE RULE] req is higher ranking from current: {}, activating rule: {}",
Optional.ofNullable(this.activeRule).map(JobScalingRule::getRuleId).orElse("null"),
activateScalerRequest.getRule().getRuleId());
final JobAutoScalerService currentService = this.activeJobAutoScalerService;
final JobScalingRule currentRule = this.activeRule;
this.activeRule = activateScalerRequest.getRule();
if (this.activeRule.getScalerConfig().getStageConfigMap().entrySet().stream()
.anyMatch(entry ->
entry.getValue() != null && entry.getValue().getScalingPolicy() != null)) {
log.info("Creating Job Auto Scaler service for rule: {}", activateScalerRequest.getRule().getRuleId());
this.activeJobAutoScalerService = this.jobScalerContext.getJobAutoScalerServiceFactory()
.apply(this.jobScalerContext, activateScalerRequest.getRule());
} else {
// the rule only requested desire size but no scaling policy, no need to create service.
log.info("No Job Auto Scaler service required for rule: {}", activateScalerRequest.getRule().getRuleId());
this.activeJobAutoScalerService = null;
}
final JobAutoScalerService newService = this.activeJobAutoScalerService;
final JobScalingRule newRule = this.activeRule;
log.info("closing current service {} for rule: {}, starting new service {} for rule {}",
currentService,
Optional.ofNullable(currentRule).map(JobScalingRule::getRuleId),
newService,
newRule.getRuleId());
// Run start service in scaler executor,
// DO NOT mutate actor state in scaler executor!
Future<String> startServiceFuture = future(() -> {
// shutdown current service if exists, ignore shutdown error.
if (currentService != null) {
log.info("Stopping current Job Auto Scaler service for rule: {}", currentRule.getRuleId());
try {
currentService.shutdown();
} catch (Exception ex) {
log.error("failed to stop current job auto scaler service", ex);
throw new RuntimeException(ex);
}
}
// first handle stage desire size
for (Map.Entry<String, JobScalingRule.StageScalerConfig> kv :
newRule.getScalerConfig().getStageConfigMap().entrySet()) {
if (kv.getValue() == null || kv.getValue().getDesireSize() == null || kv.getValue().getDesireSize() < 0) {
log.info("No valid desire size for stage: {}, ignore", kv.getKey());
continue;
}
log.info("Start scaling stage {} to desire size {}", kv.getKey(), kv.getValue());
this.jobScalerContext.getMasterClientApi().scaleJobStage(
this.jobScalerContext.getJobId(),
Integer.parseInt(kv.getKey()),
kv.getValue().getDesireSize(),
"Desire size from scaling ruleID: " + newRule.getRuleId())
.retryWhen(RuleUtils.LimitTenRetryLogic)
.doOnCompleted(() -> log.info("Scaled stage {} to desire size {}", kv.getKey(), kv.getValue()))
.onErrorResumeNext(throwable -> {
log.error("{} Failed to scale stage {} to desire size {}",
this.jobScalerContext.getJobId(), kv.getKey(), kv.getKey());
return Observable.empty();
})
.toBlocking()
.first();
log.info("Finish scaling stage {} to desire size {}", kv.getKey(), kv.getValue());
}
if (newService == null) {
log.info("[No Scaler Required] Job Auto Scaler service is null for rule: {}", newRule.getRuleId());
} else {
log.info("start activeJobAutoScalerService for {}", this.activeRule.getRuleId());
newService.start();
}
return newRule.getRuleId();
}, executionContext);
// handle scaler service error back in dispatcher thread.
startServiceFuture.onComplete(result -> {
if (result.isSuccess()) {
log.info("Job Auto Scaler started successfully for ruleID: {}", result.get());
} else {
log.error("failed to setup job auto scaler service in rule: {}",
newRule, result.failed().get());
// only reset if the failed rule service is the current active one back in dispatcher thread.
if (this.activeRule.getRuleId().equals(newRule.getRuleId())) {
log.error("reset controller actor due to failed rule: {}", this.activeRule.getRuleId());
this.activeRule = null;
this.activeJobAutoScalerService = null;
throw new RuntimeException("failed to start job scaler", result.failed().get());
// getContext().getParent().tell(CoordinatorActor.RefreshRuleRequest.of(this.jobScalerContext.getJobId()), self());
} else {
log.warn("Ignore non-active rule service start failure: {}, current rule: {}",
newRule.getRuleId(), this.activeRule.getRuleId());
}
}
return null;
}, getContext().dispatcher());
this.activateCount.increment();
try {
this.activeRuleGauge.set(Long.parseLong(activateScalerRequest.getRule().getRuleId()));
} catch (NumberFormatException e) {
log.error("Unexpected non-number rule id: {}", activateScalerRequest.getRule().getRuleId(), e);
}
log.info("Activated scaler rule: {}", activateScalerRequest.getRule().getRuleId());
}