func()

in internal/container/thread_container.go [68:144]


func (c *ThreadContainer) Start() {
	taskMasterPool := masterpool.GetTaskMasterPool()
	uniqueId := utils.GetUniqueId(c.jobCtx.JobId(), c.jobCtx.JobInstanceId(), c.jobCtx.TaskId())
	c.containerPool.SetContext(c.jobCtx)

	startTime := time.Now().UnixMilli()
	logger.Debugf("start run container, uniqueId=%v, cost=%vms, jobContext=%+v", uniqueId, startTime-c.jobCtx.ScheduleTime().UnixMilli(), c.jobCtx)

	defer func() {
		// clean containerPool
		c.containerPool.Remove(uniqueId)
		c.containerPool.RemoveContext()

		if e := recover(); e != nil {
			logger.Errorf("Start run container panic, error=%v, stack=%s", e, debug.Stack())
			errMsg := fmt.Sprintf("Process task panic, error=%v, stack=%s", e, debug.Stack())
			result := processor.NewProcessResult(processor.WithFailed(), processor.WithResult(errMsg))
			workerAddr := c.actorCtx.ActorSystem().Address()
			c.reportTaskStatus(result, workerAddr)
		}
	}()

	result := processor.NewProcessResult(processor.WithFailed())
	workerAddr := c.actorCtx.ActorSystem().Address()
	var err error
	if c.jobCtx.TaskAttempt() == 0 {
		c.reportTaskStatus(processor.NewProcessResult(processor.WithStatus(processor.InstanceStatusRunning)), workerAddr)
	}

	jobName := gjson.Get(c.jobCtx.Content(), "jobName").String()
	// Compatible with the existing Java language configuration mechanism
	if c.jobCtx.JobType() == "java" {
		jobName = gjson.Get(c.jobCtx.Content(), "className").String()
	}
	task, ok := taskMasterPool.Tasks().Find(jobName)
	if !ok {
		retMsg := fmt.Sprintf("jobName=%s not found, maybe forgot to register it by the client", c.jobCtx.JobName())
		result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult(retMsg))
		c.reportTaskStatus(result, workerAddr)
		logger.Errorf("Process task=%s failed, because it's unregistered. ", jobName)
		return
	}

	result, err = task.Process(c.jobCtx)
	if err != nil {
		fixedErrMsg := err.Error()
		if errMsg := err.Error(); len(errMsg) > constants.InstanceResultSizeMax {
			fixedErrMsg = errMsg[:constants.InstanceResultSizeMax]
		}
		result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult(fixedErrMsg))
		c.reportTaskStatus(result, workerAddr)
		logger.Errorf("Process task=%s failed, uniqueId=%v, serialNum=%v, err=%s ", c.jobCtx.TaskName(), uniqueId, c.jobCtx.SerialNum(), err.Error())
		return
	}

	endTime := time.Now().UnixMilli()
	logger.Debugf("container run finished, uniqueId=%v, cost=%dms", uniqueId, endTime-startTime)

	if result == nil {
		result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult("result can't be null"))
	}

	// If the execution of the map model subtask (non-root task) fails, would be retried
	if c.jobCtx.MaxAttempt() > 0 && c.jobCtx.TaskId() > 0 && result.Status() == processor.InstanceStatusFailed {
		if taskAttempt := c.jobCtx.TaskAttempt(); taskAttempt < c.jobCtx.MaxAttempt() {
			taskAttempt++
			time.Sleep(time.Duration(c.jobCtx.TaskAttemptInterval()) * time.Second)
			c.jobCtx.SetTaskAttempt(taskAttempt)
			c.Start()

			// No need to return the current result status when retrying
			return
		}
	}

	c.reportTaskStatus(result, workerAddr)
}