in internal/container/thread_container.go [68:144]
func (c *ThreadContainer) Start() {
taskMasterPool := masterpool.GetTaskMasterPool()
uniqueId := utils.GetUniqueId(c.jobCtx.JobId(), c.jobCtx.JobInstanceId(), c.jobCtx.TaskId())
c.containerPool.SetContext(c.jobCtx)
startTime := time.Now().UnixMilli()
logger.Debugf("start run container, uniqueId=%v, cost=%vms, jobContext=%+v", uniqueId, startTime-c.jobCtx.ScheduleTime().UnixMilli(), c.jobCtx)
defer func() {
// clean containerPool
c.containerPool.Remove(uniqueId)
c.containerPool.RemoveContext()
if e := recover(); e != nil {
logger.Errorf("Start run container panic, error=%v, stack=%s", e, debug.Stack())
errMsg := fmt.Sprintf("Process task panic, error=%v, stack=%s", e, debug.Stack())
result := processor.NewProcessResult(processor.WithFailed(), processor.WithResult(errMsg))
workerAddr := c.actorCtx.ActorSystem().Address()
c.reportTaskStatus(result, workerAddr)
}
}()
result := processor.NewProcessResult(processor.WithFailed())
workerAddr := c.actorCtx.ActorSystem().Address()
var err error
if c.jobCtx.TaskAttempt() == 0 {
c.reportTaskStatus(processor.NewProcessResult(processor.WithStatus(processor.InstanceStatusRunning)), workerAddr)
}
jobName := gjson.Get(c.jobCtx.Content(), "jobName").String()
// Compatible with the existing Java language configuration mechanism
if c.jobCtx.JobType() == "java" {
jobName = gjson.Get(c.jobCtx.Content(), "className").String()
}
task, ok := taskMasterPool.Tasks().Find(jobName)
if !ok {
retMsg := fmt.Sprintf("jobName=%s not found, maybe forgot to register it by the client", c.jobCtx.JobName())
result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult(retMsg))
c.reportTaskStatus(result, workerAddr)
logger.Errorf("Process task=%s failed, because it's unregistered. ", jobName)
return
}
result, err = task.Process(c.jobCtx)
if err != nil {
fixedErrMsg := err.Error()
if errMsg := err.Error(); len(errMsg) > constants.InstanceResultSizeMax {
fixedErrMsg = errMsg[:constants.InstanceResultSizeMax]
}
result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult(fixedErrMsg))
c.reportTaskStatus(result, workerAddr)
logger.Errorf("Process task=%s failed, uniqueId=%v, serialNum=%v, err=%s ", c.jobCtx.TaskName(), uniqueId, c.jobCtx.SerialNum(), err.Error())
return
}
endTime := time.Now().UnixMilli()
logger.Debugf("container run finished, uniqueId=%v, cost=%dms", uniqueId, endTime-startTime)
if result == nil {
result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult("result can't be null"))
}
// If the execution of the map model subtask (non-root task) fails, would be retried
if c.jobCtx.MaxAttempt() > 0 && c.jobCtx.TaskId() > 0 && result.Status() == processor.InstanceStatusFailed {
if taskAttempt := c.jobCtx.TaskAttempt(); taskAttempt < c.jobCtx.MaxAttempt() {
taskAttempt++
time.Sleep(time.Duration(c.jobCtx.TaskAttemptInterval()) * time.Second)
c.jobCtx.SetTaskAttempt(taskAttempt)
c.Start()
// No need to return the current result status when retrying
return
}
}
c.reportTaskStatus(result, workerAddr)
}