in internal/actor/container_actor.go [269:310]
func (a *containerActor) startContainer(actorCtx actor.Context, req *schedulerx.MasterStartContainerRequest) (string, error) {
uniqueId := utils.GetUniqueId(req.GetJobId(), req.GetJobInstanceId(), req.GetTaskId())
logger.Debugf("startContainer, uniqueId=%v, req=%+v, cost=%vms", uniqueId, req, time.Now().UnixMilli()-req.GetScheduleTime())
jobCtx, err := convertMasterStartContainerRequest2JobContext(req)
if err != nil {
return "", err
}
container, err := container.NewThreadContainer(jobCtx, actorCtx, container.GetThreadContainerPool())
if err != nil {
return "", err
}
if container != nil {
a.lock.Lock()
defer a.lock.Unlock()
a.containerPool.Put(uniqueId, container)
// Whether to share containerPool. If shared, statusReqBatchHandlerPool has only one handler with key=0.
statusReqBatchHandlerKey := int64(0)
if !a.enableShareContainerPool {
statusReqBatchHandlerKey = req.GetJobInstanceId()
}
if !a.statusReqBatchHandlerPool.Contains(statusReqBatchHandlerKey) {
// support 1.5 million requests
reqQueue := batch.NewReqQueue(config.GetWorkerConfig().QueueSize())
a.statusReqBatchHandlerPool.Start(
statusReqBatchHandlerKey,
batch.NewContainerStatusReqHandler(statusReqBatchHandlerKey, 1, 1,
a.batchSize, reqQueue, req.GetInstanceMasterAkkaPath()),
)
}
consumerNum := int32(constants.ConsumerNumDefault)
if req.GetConsumerNum() > 0 {
consumerNum = req.GetConsumerNum()
}
if err = a.containerPool.Submit(req.GetJobId(), req.GetJobInstanceId(), req.GetTaskId(), container, consumerNum); err != nil {
return "", err
}
} else {
logger.Warnf("Container is null, uniqueId=%d", uniqueId)
}
return uniqueId, nil
}