func()

in internal/actor/container_actor.go [269:310]


func (a *containerActor) startContainer(actorCtx actor.Context, req *schedulerx.MasterStartContainerRequest) (string, error) {
	uniqueId := utils.GetUniqueId(req.GetJobId(), req.GetJobInstanceId(), req.GetTaskId())
	logger.Debugf("startContainer, uniqueId=%v, req=%+v, cost=%vms", uniqueId, req, time.Now().UnixMilli()-req.GetScheduleTime())

	jobCtx, err := convertMasterStartContainerRequest2JobContext(req)
	if err != nil {
		return "", err
	}
	container, err := container.NewThreadContainer(jobCtx, actorCtx, container.GetThreadContainerPool())
	if err != nil {
		return "", err
	}
	if container != nil {
		a.lock.Lock()
		defer a.lock.Unlock()
		a.containerPool.Put(uniqueId, container)
		// Whether to share containerPool. If shared, statusReqBatchHandlerPool has only one handler with key=0.
		statusReqBatchHandlerKey := int64(0)
		if !a.enableShareContainerPool {
			statusReqBatchHandlerKey = req.GetJobInstanceId()
		}
		if !a.statusReqBatchHandlerPool.Contains(statusReqBatchHandlerKey) {
			// support 1.5 million requests
			reqQueue := batch.NewReqQueue(config.GetWorkerConfig().QueueSize())
			a.statusReqBatchHandlerPool.Start(
				statusReqBatchHandlerKey,
				batch.NewContainerStatusReqHandler(statusReqBatchHandlerKey, 1, 1,
					a.batchSize, reqQueue, req.GetInstanceMasterAkkaPath()),
			)
		}
		consumerNum := int32(constants.ConsumerNumDefault)
		if req.GetConsumerNum() > 0 {
			consumerNum = req.GetConsumerNum()
		}
		if err = a.containerPool.Submit(req.GetJobId(), req.GetJobInstanceId(), req.GetTaskId(), container, consumerNum); err != nil {
			return "", err
		}
	} else {
		logger.Warnf("Container is null, uniqueId=%d", uniqueId)
	}
	return uniqueId, nil
}