in internal/master/task_master.go [357:419]
func (m *TaskMaster) convert2StartContainerRequest(jobInstanceInfo *common.JobInstanceInfo, taskId int64, taskName string, taskBody []byte, failover bool) (*schedulerx.MasterStartContainerRequest, error) {
req := &schedulerx.MasterStartContainerRequest{
JobId: proto.Int64(jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()),
TaskId: proto.Int64(taskId),
User: proto.String(jobInstanceInfo.GetUser()),
JobType: proto.String(jobInstanceInfo.GetJobType()),
Content: proto.String(jobInstanceInfo.GetContent()),
ScheduleTime: proto.Int64(jobInstanceInfo.GetScheduleTime().UnixMilli()),
DataTime: proto.Int64(jobInstanceInfo.GetDataTime().UnixMilli()),
Parameters: proto.String(jobInstanceInfo.GetParameters()),
InstanceParameters: proto.String(jobInstanceInfo.GetInstanceParameters()),
GroupId: proto.String(jobInstanceInfo.GetGroupId()),
MaxAttempt: proto.Int32(jobInstanceInfo.GetMaxAttempt()),
Attempt: proto.Int32(jobInstanceInfo.GetAttempt()),
InstanceMasterAkkaPath: proto.String(m.GetLocalTaskRouterPath()),
}
if upstreamDatas := jobInstanceInfo.GetUpstreamData(); len(upstreamDatas) > 0 {
req.UpstreamData = []*schedulerx.UpstreamData{}
for _, jobInstanceData := range upstreamDatas {
req.UpstreamData = append(req.UpstreamData, &schedulerx.UpstreamData{
JobName: proto.String(jobInstanceData.GetJobName()),
Data: proto.String(jobInstanceData.GetData()),
})
}
}
if xattrs := jobInstanceInfo.GetXattrs(); len(xattrs) > 0 {
mapTaskXAttrs := common.NewMapTaskXAttrs()
if err := json.Unmarshal([]byte(xattrs), mapTaskXAttrs); err != nil {
return nil, fmt.Errorf("Json unmarshal to mapTaskXAttrs failed, xattrs=%s, err=%s ", xattrs, err.Error())
}
req.ConsumerNum = proto.Int32(mapTaskXAttrs.GetConsumerSize())
req.MaxAttempt = proto.Int32(mapTaskXAttrs.GetTaskMaxAttempt())
req.TaskAttemptInterval = proto.Int32(mapTaskXAttrs.GetTaskAttemptInterval())
}
if taskName != "" {
req.TaskName = proto.String(taskName)
}
if len(taskBody) > 0 {
req.Task = taskBody
}
if failover {
req.Failover = proto.Bool(true)
}
if jobInstanceInfo.GetWfInstanceId() >= 0 {
req.WfInstanceId = proto.Int64(jobInstanceInfo.GetWfInstanceId())
}
req.SerialNum = proto.Int64(m.GetSerialNum())
req.ExecuteMode = proto.String(jobInstanceInfo.GetExecuteMode())
if len(jobInstanceInfo.GetJobName()) > 0 {
req.JobName = proto.String(jobInstanceInfo.GetJobName())
}
req.TimeType = proto.Int32(jobInstanceInfo.GetTimeType())
req.TimeExpression = proto.String(jobInstanceInfo.GetTimeExpression())
return req, nil
}