func()

in internal/master/task_master.go [357:419]


func (m *TaskMaster) convert2StartContainerRequest(jobInstanceInfo *common.JobInstanceInfo, taskId int64, taskName string, taskBody []byte, failover bool) (*schedulerx.MasterStartContainerRequest, error) {
	req := &schedulerx.MasterStartContainerRequest{
		JobId:                  proto.Int64(jobInstanceInfo.GetJobId()),
		JobInstanceId:          proto.Int64(jobInstanceInfo.GetJobInstanceId()),
		TaskId:                 proto.Int64(taskId),
		User:                   proto.String(jobInstanceInfo.GetUser()),
		JobType:                proto.String(jobInstanceInfo.GetJobType()),
		Content:                proto.String(jobInstanceInfo.GetContent()),
		ScheduleTime:           proto.Int64(jobInstanceInfo.GetScheduleTime().UnixMilli()),
		DataTime:               proto.Int64(jobInstanceInfo.GetDataTime().UnixMilli()),
		Parameters:             proto.String(jobInstanceInfo.GetParameters()),
		InstanceParameters:     proto.String(jobInstanceInfo.GetInstanceParameters()),
		GroupId:                proto.String(jobInstanceInfo.GetGroupId()),
		MaxAttempt:             proto.Int32(jobInstanceInfo.GetMaxAttempt()),
		Attempt:                proto.Int32(jobInstanceInfo.GetAttempt()),
		InstanceMasterAkkaPath: proto.String(m.GetLocalTaskRouterPath()),
	}

	if upstreamDatas := jobInstanceInfo.GetUpstreamData(); len(upstreamDatas) > 0 {
		req.UpstreamData = []*schedulerx.UpstreamData{}

		for _, jobInstanceData := range upstreamDatas {
			req.UpstreamData = append(req.UpstreamData, &schedulerx.UpstreamData{
				JobName: proto.String(jobInstanceData.GetJobName()),
				Data:    proto.String(jobInstanceData.GetData()),
			})
		}
	}

	if xattrs := jobInstanceInfo.GetXattrs(); len(xattrs) > 0 {
		mapTaskXAttrs := common.NewMapTaskXAttrs()
		if err := json.Unmarshal([]byte(xattrs), mapTaskXAttrs); err != nil {
			return nil, fmt.Errorf("Json unmarshal to mapTaskXAttrs failed, xattrs=%s, err=%s ", xattrs, err.Error())
		}
		req.ConsumerNum = proto.Int32(mapTaskXAttrs.GetConsumerSize())
		req.MaxAttempt = proto.Int32(mapTaskXAttrs.GetTaskMaxAttempt())
		req.TaskAttemptInterval = proto.Int32(mapTaskXAttrs.GetTaskAttemptInterval())
	}

	if taskName != "" {
		req.TaskName = proto.String(taskName)
	}
	if len(taskBody) > 0 {
		req.Task = taskBody
	}
	if failover {
		req.Failover = proto.Bool(true)
	}
	if jobInstanceInfo.GetWfInstanceId() >= 0 {
		req.WfInstanceId = proto.Int64(jobInstanceInfo.GetWfInstanceId())
	}

	req.SerialNum = proto.Int64(m.GetSerialNum())
	req.ExecuteMode = proto.String(jobInstanceInfo.GetExecuteMode())

	if len(jobInstanceInfo.GetJobName()) > 0 {
		req.JobName = proto.String(jobInstanceInfo.GetJobName())
	}
	req.TimeType = proto.Int32(jobInstanceInfo.GetTimeType())
	req.TimeExpression = proto.String(jobInstanceInfo.GetTimeExpression())

	return req, nil
}