internal/container/thread_container.go (172 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package container
import (
"fmt"
"runtime/debug"
"time"
"github.com/asynkron/protoactor-go/actor"
"github.com/tidwall/gjson"
"google.golang.org/protobuf/proto"
"github.com/alibaba/schedulerx-worker-go/config"
actorcomm "github.com/alibaba/schedulerx-worker-go/internal/actor/common"
"github.com/alibaba/schedulerx-worker-go/internal/batch"
"github.com/alibaba/schedulerx-worker-go/internal/constants"
"github.com/alibaba/schedulerx-worker-go/internal/masterpool"
"github.com/alibaba/schedulerx-worker-go/internal/proto/schedulerx"
"github.com/alibaba/schedulerx-worker-go/internal/utils"
"github.com/alibaba/schedulerx-worker-go/logger"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
)
var _ Container = &ThreadContainer{}
type ThreadContainer struct {
jobCtx *jobcontext.JobContext
actorCtx actor.Context
jobProcessor processor.Processor
containerPool ContainerPool
masterPid *actor.PID
}
func NewThreadContainer(jobCtx *jobcontext.JobContext, actorCtx actor.Context, containerPool ContainerPool) (*ThreadContainer, error) {
if jobCtx.InstanceMasterActorPath() == "" {
return nil, fmt.Errorf("get taskMaster akka path error, path=" + jobCtx.InstanceMasterActorPath())
}
workerAddr := actorcomm.GetRealWorkerAddr(jobCtx.InstanceMasterActorPath())
return &ThreadContainer{
jobCtx: jobCtx,
actorCtx: actorCtx,
containerPool: containerPool,
masterPid: actorcomm.GetMapMasterPid(workerAddr),
}, nil
}
func (c *ThreadContainer) Run() {
c.Start()
}
func (c *ThreadContainer) Start() {
taskMasterPool := masterpool.GetTaskMasterPool()
uniqueId := utils.GetUniqueId(c.jobCtx.JobId(), c.jobCtx.JobInstanceId(), c.jobCtx.TaskId())
c.containerPool.SetContext(c.jobCtx)
startTime := time.Now().UnixMilli()
logger.Debugf("start run container, uniqueId=%v, cost=%vms, jobContext=%+v", uniqueId, startTime-c.jobCtx.ScheduleTime().UnixMilli(), c.jobCtx)
defer func() {
// clean containerPool
c.containerPool.Remove(uniqueId)
c.containerPool.RemoveContext()
if e := recover(); e != nil {
logger.Errorf("Start run container panic, error=%v, stack=%s", e, debug.Stack())
errMsg := fmt.Sprintf("Process task panic, error=%v, stack=%s", e, debug.Stack())
result := processor.NewProcessResult(processor.WithFailed(), processor.WithResult(errMsg))
workerAddr := c.actorCtx.ActorSystem().Address()
c.reportTaskStatus(result, workerAddr)
}
}()
result := processor.NewProcessResult(processor.WithFailed())
workerAddr := c.actorCtx.ActorSystem().Address()
var err error
if c.jobCtx.TaskAttempt() == 0 {
c.reportTaskStatus(processor.NewProcessResult(processor.WithStatus(processor.InstanceStatusRunning)), workerAddr)
}
jobName := gjson.Get(c.jobCtx.Content(), "jobName").String()
// Compatible with the existing Java language configuration mechanism
if c.jobCtx.JobType() == "java" {
jobName = gjson.Get(c.jobCtx.Content(), "className").String()
}
task, ok := taskMasterPool.Tasks().Find(jobName)
if !ok {
retMsg := fmt.Sprintf("jobName=%s not found, maybe forgot to register it by the client", c.jobCtx.JobName())
result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult(retMsg))
c.reportTaskStatus(result, workerAddr)
logger.Errorf("Process task=%s failed, because it's unregistered. ", jobName)
return
}
result, err = task.Process(c.jobCtx)
if err != nil {
fixedErrMsg := err.Error()
if errMsg := err.Error(); len(errMsg) > constants.InstanceResultSizeMax {
fixedErrMsg = errMsg[:constants.InstanceResultSizeMax]
}
result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult(fixedErrMsg))
c.reportTaskStatus(result, workerAddr)
logger.Errorf("Process task=%s failed, uniqueId=%v, serialNum=%v, err=%s ", c.jobCtx.TaskName(), uniqueId, c.jobCtx.SerialNum(), err.Error())
return
}
endTime := time.Now().UnixMilli()
logger.Debugf("container run finished, uniqueId=%v, cost=%dms", uniqueId, endTime-startTime)
if result == nil {
result = processor.NewProcessResult(processor.WithFailed(), processor.WithResult("result can't be null"))
}
// If the execution of the map model subtask (non-root task) fails, would be retried
if c.jobCtx.MaxAttempt() > 0 && c.jobCtx.TaskId() > 0 && result.Status() == processor.InstanceStatusFailed {
if taskAttempt := c.jobCtx.TaskAttempt(); taskAttempt < c.jobCtx.MaxAttempt() {
taskAttempt++
time.Sleep(time.Duration(c.jobCtx.TaskAttemptInterval()) * time.Second)
c.jobCtx.SetTaskAttempt(taskAttempt)
c.Start()
// No need to return the current result status when retrying
return
}
}
c.reportTaskStatus(result, workerAddr)
}
func (c *ThreadContainer) Kill() {
logger.Infof("kill container, jobInstanceId=%v, content=%s", c.jobCtx.JobInstanceId(), c.jobCtx.Content())
jobName := gjson.Get(c.jobCtx.Content(), "jobName").String()
if jobName != "" {
taskMasterPool := masterpool.GetTaskMasterPool()
task, ok := taskMasterPool.Tasks().Find(jobName)
if !ok {
logger.Warnf("Kill task=%s failed, because it's not found. ", jobName)
} else {
kt, ok := task.(processor.KillProcessor)
if ok {
kt.Kill(c.jobCtx)
}
}
}
workerAddr := c.actorCtx.ActorSystem().Address()
req := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(c.jobCtx.JobId()),
JobInstanceId: proto.Int64(c.jobCtx.JobInstanceId()),
TaskId: proto.Int64(c.jobCtx.TaskId()),
TaskName: proto.String(c.jobCtx.TaskName()),
Status: proto.Int32(int32(taskstatus.TaskStatusFailed)),
WorkerId: proto.String(utils.GetWorkerId()),
WorkerAddr: proto.String(workerAddr),
Result: proto.String("killed"),
}
c.actorCtx.Send(c.masterPid, req)
uniqueId := utils.GetUniqueId(c.jobCtx.JobId(), c.jobCtx.JobInstanceId(), c.jobCtx.TaskId())
c.containerPool.Remove(uniqueId)
}
func (c *ThreadContainer) reportTaskStatus(result *processor.ProcessResult, workerAddr string) {
req := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(c.jobCtx.JobId()),
JobInstanceId: proto.Int64(c.jobCtx.JobInstanceId()),
TaskId: proto.Int64(c.jobCtx.TaskId()),
Status: proto.Int32(int32(result.Status())),
WorkerAddr: proto.String(workerAddr),
WorkerId: proto.String(utils.GetWorkerId()),
SerialNum: proto.Int64(c.jobCtx.SerialNum()),
InstanceMasterActorPath: proto.String(c.jobCtx.InstanceMasterActorPath()),
}
if c.jobCtx.TaskName() != "" {
req.TaskName = proto.String(c.jobCtx.TaskName())
}
if result.Result() != "" {
req.Result = proto.String(result.Result())
}
submitResult := false
if config.GetWorkerConfig().IsShareContainerPool() {
submitResult = batch.GetContainerStatusReqHandlerPool().SubmitReq(0, req)
} else {
submitResult = batch.GetContainerStatusReqHandlerPool().SubmitReq(c.jobCtx.JobInstanceId(), req)
}
logger.Debugf("reportTaskStatus instanceId=%v submitResult=%v, processResult=%v", utils.GetUniqueId(c.jobCtx.JobId(), c.jobCtx.JobInstanceId(), c.jobCtx.TaskId()), submitResult, result)
if !submitResult {
c.actorCtx.Request(c.masterPid, req)
}
}
func (c *ThreadContainer) GetContext() *jobcontext.JobContext {
return c.jobCtx
}
func (c *ThreadContainer) SetContext(context *jobcontext.JobContext) {
c.jobCtx = context
}
func (c *ThreadContainer) Stop() {
//TODO implement me
panic("implement me")
}