in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java [720:854]
public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest initReq) {
ActorRef sender = getSender();
logger.info("In onJobClusterInitialize {}", this.name);
if (logger.isDebugEnabled()) {
logger.debug("Init Request {}", initReq);
}
jobClusterMetadata = new JobClusterMetadataImpl.Builder()
.withLastJobCount(initReq.lastJobNumber)
.withIsDisabled(initReq.isDisabled)
.withJobClusterDefinition(initReq.jobClusterDefinition)
.build();
// create sla enforcer
slaEnforcer = new SLAEnforcer(jobClusterMetadata.getJobClusterDefinition().getSLA());
long expireFrequency = ConfigurationProvider.getConfig().getCompletedJobPurgeFrequencySeqs();
String jobClusterName = jobClusterMetadata.getJobClusterDefinition().getName();
// init scaler rules
try {
IJobClusterScalerRuleData scalerRuleData = jobStore.getJobClusterScalerData(jobClusterName);
if (scalerRuleData != null) {
this.jobClusterScalerRuleData = scalerRuleData;
}
} catch (IOException iex) {
logger.error("Failed to load job cluster: {} scaler rules. Fall back to empty rules", this.name, iex);
}
// If cluster is disabled
if(jobClusterMetadata.isDisabled()) {
logger.info("Cluster {} initialized but is Disabled", jobClusterMetadata
.getJobClusterDefinition().getName());
try {
jobManager.initialize();
} catch (Exception e) {
sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, CLIENT_ERROR,
String.format("JobCluster %s initialization failed",
initReq.jobClusterDefinition.getName()),
initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
}
int count = 50;
if(!initReq.jobList.isEmpty()) {
logger.info("Cluster {} is disabled however it has {} active/accepted jobs",
jobClusterName, initReq.jobList.size());
for(IMantisJobMetadata jobMeta : initReq.jobList) {
try {
if(count == 0) {
logger.info("Max cleanup limit of 50 reached abort");
break;
}
if(!JobState.isTerminalState(jobMeta.getState())) {
logger.info("Job {} is in non terminal state {} for disabled cluster {}."
+ "Marking it complete", jobMeta.getJobId(), jobMeta.getState(),
jobClusterName);
count--;
jobManager.markCompleted(jobMeta);
jobStore.archiveJob(jobMeta);
}
} catch (Exception e) {
logger.error("Exception {} archiving job {} during init ",e.getMessage(), jobMeta.getJobId(), e);
}
}
}
sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, SUCCESS,
String.format("JobCluster %s initialized successfully. But is currently disabled",
initReq.jobClusterDefinition.getName()),initReq.jobClusterDefinition.getName(),
initReq.requestor), getSelf());
logger.info("Job expiry check frequency set to {}", expireFrequency);
getContext().become(disabledBehavior);
return;
} else {
// new cluster initialization
if (initReq.createInStore) {
try {
jobStore.createJobCluster(jobClusterMetadata);
// TODO (p1): support scale rules creation during job cluster creation.
eventPublisher.publishAuditEvent(
new LifecycleEventsProto.AuditEvent(
LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_CREATE,
jobClusterName,
"saved job cluster " + name)
);
logger.info("successfully saved job cluster {}", name);
numJobClustersInitialized.increment();
} catch (final JobClusterAlreadyExistsException exists) {
numJobClusterInitializeFailures.increment();
logger.error("job cluster not created");
sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, CLIENT_ERROR,
String.format("JobCluster %s already exists",
initReq.jobClusterDefinition.getName()),
initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
// TODO: handle case when job cluster exists in store but Job cluster actor is not running
return;
} catch (final Exception e) {
numJobClusterInitializeFailures.increment();
logger.error("job cluster not created due to {}", e.getMessage(), e);
sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId,
SERVER_ERROR, String.format("JobCluster %s not created due to %s",
initReq.jobClusterDefinition.getName(), Throwables.getStackTraceAsString(e)),
initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
// TODO: send PoisonPill to self if job cluster was not created ? Return'ing for now,
// so we don't send back 2 InitJobClusterResponses
return;
}
}
try {
cronManager = new CronManager(name, getSelf(), jobClusterMetadata.getJobClusterDefinition().getSLA());
} catch (Exception e) {
logger.warn("Exception initializing cron", e);
getSender().tell(new JobClusterManagerProto.CreateJobClusterResponse(
initReq.requestId, e instanceof SchedulerException?CLIENT_ERROR:SERVER_ERROR,
"Job Cluster " + jobClusterName + " could not be created due to cron initialization error" + e.getMessage(),
jobClusterName), getSelf());
return;
}
initRunningJobs(initReq, sender);
logger.info("Job expiry check frequency set to {}", expireFrequency);
try {
jobManager.initialize();
} catch (Exception e) {
sender.tell(new JobClusterProto.InitializeJobClusterResponse(initReq.requestId, CLIENT_ERROR,
String.format("JobCluster %s initialization failed",
initReq.jobClusterDefinition.getName()),
initReq.jobClusterDefinition.getName(), initReq.requestor), getSelf());
}
}
}