processor/jobcontext/jobcontext.go (242 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package jobcontext import ( "context" "time" "github.com/alibaba/schedulerx-worker-go/internal/common" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) var _ context.Context = &JobContext{} type JobContext struct { context.Context // basic information jobId int64 jobInstanceId int64 wfInstanceId int64 taskId int64 jobName string scheduleTime time.Time dataTime time.Time executeMode string jobType string instanceMasterActorPath string taskName string task []byte groupId string content string user string // jobInstance maximum number of retries maxAttempt int32 // jobInstance current retry count attempt int32 // job custom parameters jobParameters string // custom parameters be passed every time when task is triggered, and can they only be passed through API triggers instanceParameters string // upstream Data for the workflow upstreamData []*common.JobInstanceData // results of all child tasks, map[int64]string taskResults map[int64]string // status of all child tasks, map[int64]TaskStatus taskStatuses map[int64]taskstatus.TaskStatus // task maximum retry count taskMaxAttempt int32 // task current retry count taskAttempt int32 // task retry interval (seconds) taskAttemptInterval int32 // second-level task loop times serialNum int64 // sharding id shardingId int64 // sharding parameter shardingParameter string // sharding count shardingNum int32 allWorkerAddrs []string workerAddr string timeType int32 timeExpression string } func (j *JobContext) UpstreamData() []*common.JobInstanceData { return j.upstreamData } func (j *JobContext) SetUpstreamData(upstreamData []*common.JobInstanceData) { j.upstreamData = upstreamData } func (j *JobContext) JobId() int64 { return j.jobId } func (j *JobContext) SetJobId(jobId int64) { j.jobId = jobId } func (j *JobContext) JobInstanceId() int64 { return j.jobInstanceId } func (j *JobContext) SetJobInstanceId(jobInstanceId int64) { j.jobInstanceId = jobInstanceId } func (j *JobContext) WfInstanceId() int64 { return j.wfInstanceId } func (j *JobContext) SetWfInstanceId(wfInstanceId int64) { j.wfInstanceId = wfInstanceId } func (j *JobContext) TaskId() int64 { return j.taskId } func (j *JobContext) SetTaskId(taskId int64) { j.taskId = taskId } func (j *JobContext) JobName() string { return j.jobName } func (j *JobContext) SetJobName(jobName string) { j.jobName = jobName } func (j *JobContext) ScheduleTime() time.Time { return j.scheduleTime } func (j *JobContext) SetScheduleTime(scheduleTime time.Time) { j.scheduleTime = scheduleTime } func (j *JobContext) DataTime() time.Time { return j.dataTime } func (j *JobContext) SetDataTime(dataTime time.Time) { j.dataTime = dataTime } func (j *JobContext) ExecuteMode() string { return j.executeMode } func (j *JobContext) SetExecuteMode(executeMode string) { j.executeMode = executeMode } func (j *JobContext) JobType() string { return j.jobType } func (j *JobContext) SetJobType(jobType string) { j.jobType = jobType } func (j *JobContext) InstanceMasterActorPath() string { return j.instanceMasterActorPath } func (j *JobContext) SetInstanceMasterActorPath(instanceMasterActorPath string) { j.instanceMasterActorPath = instanceMasterActorPath } func (j *JobContext) TaskName() string { return j.taskName } func (j *JobContext) SetTaskName(taskName string) { j.taskName = taskName } func (j *JobContext) Task() []byte { return j.task } func (j *JobContext) SetTask(task []byte) { j.task = task } func (j *JobContext) GroupId() string { return j.groupId } func (j *JobContext) SetGroupId(groupId string) { j.groupId = groupId } func (j *JobContext) Content() string { return j.content } func (j *JobContext) SetContent(content string) { j.content = content } func (j *JobContext) User() string { return j.user } func (j *JobContext) SetUser(user string) { j.user = user } func (j *JobContext) MaxAttempt() int32 { return j.maxAttempt } func (j *JobContext) SetMaxAttempt(maxAttempt int32) { j.maxAttempt = maxAttempt } func (j *JobContext) Attempt() int32 { return j.attempt } func (j *JobContext) SetAttempt(attempt int32) { j.attempt = attempt } func (j *JobContext) JobParameters() string { return j.jobParameters } func (j *JobContext) SetJobParameters(jobParameters string) { j.jobParameters = jobParameters } func (j *JobContext) InstanceParameters() string { return j.instanceParameters } func (j *JobContext) SetInstanceParameters(instanceParameters string) { j.instanceParameters = instanceParameters } func (j *JobContext) TaskResults() map[int64]string { return j.taskResults } func (j *JobContext) SetTaskResults(taskResults map[int64]string) { j.taskResults = taskResults } func (j *JobContext) TaskStatuses() map[int64]taskstatus.TaskStatus { return j.taskStatuses } func (j *JobContext) SetTaskStatuses(taskStatuses map[int64]taskstatus.TaskStatus) { j.taskStatuses = taskStatuses } func (j *JobContext) TaskMaxAttempt() int32 { return j.taskMaxAttempt } func (j *JobContext) SetTaskMaxAttempt(taskMaxAttempt int32) { j.taskMaxAttempt = taskMaxAttempt } func (j *JobContext) TaskAttempt() int32 { return j.taskAttempt } func (j *JobContext) SetTaskAttempt(taskAttempt int32) { j.taskAttempt = taskAttempt } func (j *JobContext) TaskAttemptInterval() int32 { return j.taskAttemptInterval } func (j *JobContext) SetTaskAttemptInterval(taskAttemptInterval int32) { j.taskAttemptInterval = taskAttemptInterval } func (j *JobContext) SerialNum() int64 { return j.serialNum } func (j *JobContext) SetSerialNum(serialNum int64) { j.serialNum = serialNum } func (j *JobContext) ShardingId() int64 { return j.shardingId } func (j *JobContext) SetShardingId(shardingId int64) { j.shardingId = shardingId } func (j *JobContext) ShardingParameter() string { return j.shardingParameter } func (j *JobContext) SetShardingParameter(shardingParameter string) { j.shardingParameter = shardingParameter } func (j *JobContext) ShardingNum() int32 { return j.shardingNum } func (j *JobContext) SetShardingNum(shardingNum int32) { j.shardingNum = shardingNum } func (j *JobContext) AllWorkerAddrs() []string { return j.allWorkerAddrs } func (j *JobContext) SetAllWorkerAddrs(allWorkerAddrs []string) { j.allWorkerAddrs = allWorkerAddrs } func (j *JobContext) WorkerAddr() string { return j.workerAddr } func (j *JobContext) SetWorkerAddr(workerAddr string) { j.workerAddr = workerAddr } func (j *JobContext) TimeType() int32 { return j.timeType } func (j *JobContext) SetTimeType(timeType int32) { j.timeType = timeType } func (j *JobContext) TimeExpression() string { return j.timeExpression } func (j *JobContext) SetTimeExpression(timeExpression string) { j.timeExpression = timeExpression }