in streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobSnapshot.scala [44:82]
def eval(
appId: Long,
clusterNs: String,
clusterId: String,
crStatus: Option[FlinkCRStatus],
jobStatus: Option[JobStatus]): JobSnapshot = JobSnapshot(
appId = appId,
clusterNs = clusterNs,
clusterId = clusterId,
evalState = evalFinalJobState(crStatus, jobStatus),
crStatus = crStatus,
jobStatus = jobStatus
)
private def evalFinalJobState(crStatus: Option[FlinkCRStatus], jobStatus: Option[JobStatus]): EvalJobState =
(crStatus, jobStatus) match {
case (None, None) => EvalJobState.LOST
case (None, Some(jobStatus)) => EvalJobState.of(jobStatus.state)
case (Some(crStatus), None) =>
crStatus.evalState match {
case EvalState.DEPLOYING | EvalState.READY => EvalJobState.INITIALIZING
case EvalState.FAILED => EvalJobState.FAILED
case EvalState.SUSPENDED => EvalJobState.SUSPENDED
case EvalState.DELETED => EvalJobState.TERMINATED
}
case (Some(crStatus), Some(jobStatus)) =>
if (jobStatus.updatedTs >= crStatus.updatedTs) EvalJobState.of(jobStatus.state)
else {
crStatus.evalState match {
case EvalState.FAILED => EvalJobState.FAILED
case EvalState.SUSPENDED => EvalJobState.SUSPENDED
case EvalState.DELETED => EvalJobState.TERMINATED
case EvalState.READY => EvalJobState.of(jobStatus.state)
case EvalState.DEPLOYING =>
if (JobState.maybeDeploying.contains(jobStatus.state)) EvalJobState.of(jobStatus.state)
else EvalJobState.INITIALIZING
}
}
}