service/history/task/cross_cluster_task.go (749 lines of code) (raw):

// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package task import ( "context" "errors" "fmt" "sync" "time" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/future" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" ctask "github.com/uber/cadence/common/task" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" ) // cross cluster task state // a typically state transition will be: // Initialized -> Reported -> Recorded -> Reported // if task has two stages (e.g. first start childworkflow and // then schedule first decision task) // or // Initialized -> Reported // if task has only one stage (e.g. cancel external workflow) // // NOTE: DO NOT change the value for each state // new states must be added at the end to ensure backward compatibility // across clusters. const ( // processingStateInitialized is the initial state after a task is loaded // task is available for poll at this state processingStateInitialized processingState = 1 // processingStateResponseRecorded is the state when a response // for processing the task at target cluster is received // task is NOT available for polling at this state. // task can reach this state from either processingStateInitialized // or processingStateResponseRecorded processingStateResponseReported processingState = 2 // processingStateResponseRecorded is the state after target response // is recorded in source workflow's history // task is available for poll at this state processingStateResponseRecorded processingState = 3 // processingStateInvalidated is the state for a cross-cluster task // that is no longer valid and should be either converted to a transfer // task or move to the cross-cluster queue for another target cluster. // task is NOT available for poll at this state processingStateInvalidated processingState = 4 ) var ( _ CrossClusterTask = (*crossClusterSourceTask)(nil) ) type ( // processingState is a more detailed state description // when the tasks is being processed and state is TaskStatePending processingState int crossClusterTargetTask struct { *crossClusterTaskBase request *types.CrossClusterTaskRequest response *types.CrossClusterTaskResponse settable future.Settable } crossClusterSourceTask struct { *crossClusterTaskBase // targetCluster is the cluster name for which the task's owning // queue processor is responsible for // e.g if targetCluster is C, it means the task is loaded by // the queue processor responsible for cluster C, and only cluster // C is able to poll the task. // targetCluster may not match the current active cluster of the // targetDomain if the targetDomain performed a failover after // the task is loaded. targetCluster string executionCache *execution.Cache response *types.CrossClusterTaskResponse readyForPollFn func(task CrossClusterTask) } crossClusterTaskBase struct { Info sync.Mutex // state is used by the general processing queue implementation (the ack manager) // to determine if the execution has finished for the task state ctask.State // processingState is used by cross cluster task's specific task executor // to determine should be done next // the value of processingState only matters when state = TaskStatePending processingState processingState priority int attempt int shard shard.Context timeSource clock.TimeSource submitTime time.Time logger log.Logger eventLogger eventLogger scope metrics.Scope taskExecutor Executor taskProcessor Processor redispatchFn func(task Task) maxRetryCount int } ) // NewCrossClusterSourceTask creates a cross cluster task // for the processing it at the source cluster func NewCrossClusterSourceTask( shard shard.Context, targetCluster string, executionCache *execution.Cache, taskInfo Info, taskExecutor Executor, taskProcessor Processor, logger log.Logger, redispatchFn func(task Task), readyForPollFn func(task CrossClusterTask), maxRetryCount dynamicconfig.IntPropertyFn, ) CrossClusterTask { return &crossClusterSourceTask{ targetCluster: targetCluster, executionCache: executionCache, readyForPollFn: readyForPollFn, crossClusterTaskBase: newCrossClusterTaskBase( shard, taskInfo, processingStateInitialized, taskExecutor, taskProcessor, getCrossClusterTaskMetricsScope(taskInfo.GetTaskType(), true), logger, shard.GetTimeSource(), redispatchFn, maxRetryCount, ), } } // NewCrossClusterTargetTask is called at the target cluster // to process the cross cluster task // the returned the Future will be unblocked after the task // is processed. The future value has type types.CrossClusterTaskResponse // and there won't be any error returned for this future. All errors will // be recorded by the FailedCause field in the response. func NewCrossClusterTargetTask( shard shard.Context, taskRequest *types.CrossClusterTaskRequest, taskExecutor Executor, taskProcessor Processor, logger log.Logger, redispatchFn func(task Task), maxRetryCount dynamicconfig.IntPropertyFn, ) (Task, future.Future) { info := &persistence.CrossClusterTaskInfo{ DomainID: taskRequest.TaskInfo.DomainID, WorkflowID: taskRequest.TaskInfo.WorkflowID, RunID: taskRequest.TaskInfo.RunID, VisibilityTimestamp: time.Unix(0, taskRequest.TaskInfo.GetVisibilityTimestamp()), TaskID: taskRequest.TaskInfo.TaskID, Version: common.EmptyVersion, // we don't need version information at target cluster } switch taskRequest.TaskInfo.GetTaskType() { case types.CrossClusterTaskTypeStartChildExecution: info.TaskType = persistence.CrossClusterTaskTypeStartChildExecution info.TargetDomainID = taskRequest.StartChildExecutionAttributes.TargetDomainID info.TargetWorkflowID = taskRequest.StartChildExecutionAttributes.InitiatedEventAttributes.WorkflowID info.TargetRunID = taskRequest.StartChildExecutionAttributes.GetTargetRunID() info.ScheduleID = taskRequest.StartChildExecutionAttributes.InitiatedEventID case types.CrossClusterTaskTypeCancelExecution: info.TaskType = persistence.CrossClusterTaskTypeCancelExecution info.TargetDomainID = taskRequest.CancelExecutionAttributes.TargetDomainID info.TargetWorkflowID = taskRequest.CancelExecutionAttributes.TargetWorkflowID info.TargetRunID = taskRequest.CancelExecutionAttributes.TargetRunID info.ScheduleID = taskRequest.CancelExecutionAttributes.InitiatedEventID info.TargetChildWorkflowOnly = taskRequest.CancelExecutionAttributes.ChildWorkflowOnly case types.CrossClusterTaskTypeSignalExecution: info.TaskType = persistence.CrossClusterTaskTypeSignalExecution info.TargetDomainID = taskRequest.SignalExecutionAttributes.TargetDomainID info.TargetWorkflowID = taskRequest.SignalExecutionAttributes.TargetWorkflowID info.TargetRunID = taskRequest.SignalExecutionAttributes.TargetRunID info.ScheduleID = taskRequest.SignalExecutionAttributes.InitiatedEventID info.TargetChildWorkflowOnly = taskRequest.SignalExecutionAttributes.ChildWorkflowOnly case types.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete: info.TaskType = persistence.CrossClusterTaskTypeRecordChildExeuctionCompleted info.TargetDomainID = taskRequest.RecordChildWorkflowExecutionCompleteAttributes.TargetDomainID info.TargetWorkflowID = taskRequest.RecordChildWorkflowExecutionCompleteAttributes.TargetWorkflowID info.TargetRunID = taskRequest.RecordChildWorkflowExecutionCompleteAttributes.TargetRunID info.ScheduleID = taskRequest.RecordChildWorkflowExecutionCompleteAttributes.InitiatedEventID case types.CrossClusterTaskTypeApplyParentPolicy: info.TaskType = persistence.CrossClusterTaskTypeApplyParentClosePolicy default: panic(fmt.Sprintf("unknown cross cluster task type: %v", taskRequest.TaskInfo.GetTaskType())) } future, settable := future.NewFuture() return &crossClusterTargetTask{ crossClusterTaskBase: newCrossClusterTaskBase( shard, info, processingState(taskRequest.TaskInfo.TaskState), taskExecutor, taskProcessor, getCrossClusterTaskMetricsScope(info.GetTaskType(), false), logger, shard.GetTimeSource(), redispatchFn, maxRetryCount, ), request: taskRequest, settable: settable, }, future } func newCrossClusterTaskBase( shard shard.Context, taskInfo Info, processingState processingState, taskExecutor Executor, taskProcessor Processor, metricScopeIdx int, logger log.Logger, timeSource clock.TimeSource, redispatchFn func(task Task), maxRetryCount dynamicconfig.IntPropertyFn, ) *crossClusterTaskBase { var eventLogger eventLogger if shard.GetConfig().EnableDebugMode && shard.GetConfig().EnableTaskInfoLogByDomainID(taskInfo.GetDomainID()) { eventLogger = newEventLogger(logger, timeSource, defaultTaskEventLoggerSize) eventLogger.AddEvent("Created task") } return &crossClusterTaskBase{ Info: taskInfo, shard: shard, state: ctask.TaskStatePending, processingState: processingState, priority: noPriority, attempt: 0, timeSource: timeSource, submitTime: timeSource.Now(), logger: logger, eventLogger: eventLogger, scope: getOrCreateDomainTaggedScope( shard, metricScopeIdx, taskInfo.GetDomainID(), logger, ), taskExecutor: taskExecutor, taskProcessor: taskProcessor, redispatchFn: redispatchFn, maxRetryCount: maxRetryCount(), } } // cross cluster source task methods func (t *crossClusterSourceTask) Execute() error { executionStartTime := t.timeSource.Now() defer func() { t.scope.IncCounter(metrics.TaskRequestsPerDomain) t.scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(executionStartTime)) }() logEvent(t.eventLogger, "Executing task") return t.taskExecutor.Execute(t, true) } func (t *crossClusterSourceTask) Ack() { // do not set t.state to Acked, which will prevent the task from being fetched again // state and processingState will be updated by the task executor t.Lock() state := t.state processingState := t.processingState t.Unlock() logEvent(t.eventLogger, "executed task", "processing state", processingState) if state == ctask.TaskStateAcked { t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt)) t.scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(t.submitTime)) t.scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(t.GetVisibilityTimestamp())) if t.eventLogger != nil && t.attempt != 0 { // only dump events when the task has been retried t.eventLogger.FlushEvents("Task processing events") } } else { if !t.IsReadyForPoll() { panic(fmt.Sprintf("Unexpected task state in CrossClusterSourceTask Ack method, state: %v, processing state: %v", t.State(), t.ProcessingState())) } t.readyForPollFn(t) } } func (t *crossClusterSourceTask) Nack() { // Nack will be called when we are unable to move the next step for processing // basically add the task to redispatch queue, so it can be retried later. // wether the task state is setting to Nacked or not is not important, since // task will be retried forever logEvent(t.eventLogger, "Nacked task") if t.GetAttempt() < activeTaskResubmitMaxAttempts { if submitted, _ := t.taskProcessor.TrySubmit(t); submitted { return } } t.redispatchFn(t) } func (t *crossClusterSourceTask) HandleErr( err error, ) (retErr error) { defer func() { if retErr != nil { logEvent(t.eventLogger, "Failed to handle error", retErr) t.Lock() t.attempt++ attempt := t.attempt t.Unlock() if attempt > t.maxRetryCount { t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(attempt)) t.logger.Error("Critical error processing task, retrying.", tag.Error(err), tag.OperationCritical, tag.TaskType(t.GetTaskType()), tag.AttemptCount(attempt), ) } } }() if err == nil { return nil } logEvent(t.eventLogger, "Handling task processing error", err) if _, ok := err.(*types.EntityNotExistsError); ok { return nil } if _, ok := err.(*types.WorkflowExecutionAlreadyCompletedError); ok { return nil } if err == errWorkflowBusy { t.scope.IncCounter(metrics.TaskWorkflowBusyPerDomain) return err } if err == ErrTaskPendingActive { t.scope.IncCounter(metrics.TaskPendingActiveCounterPerDomain) return err } // return domain not active error here so that the cross-cluster task can be // convert to a (passive) transfer task t.scope.IncCounter(metrics.TaskFailuresPerDomain) if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok { t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed) return nil } t.logger.Error("Fail to process task", tag.Error(err), tag.LifeCycleProcessingFailed) return err } func (t *crossClusterSourceTask) RetryErr( err error, ) bool { return err != errWorkflowBusy && err != ErrTaskPendingActive && !common.IsContextTimeoutError(err) } func (t *crossClusterSourceTask) IsReadyForPoll() bool { t.Lock() defer t.Unlock() return t.state == ctask.TaskStatePending && (t.processingState == processingStateInitialized || t.processingState == processingStateResponseRecorded) } // GetCrossClusterRequest returns a CrossClusterTaskRequest and error if there's any // If the returned error is not nil: // - there might be error while loading the request, we can retry the function call // - task may be invalidated, in which case caller should submit the task for processing // so that a new task can be created. // // If both returned error and request are nil: // - there's nothing need to be done for the task, task already acked and is not available // for polling again // // If the returned request is not nil // - the request can be returned to the target cluster func (t *crossClusterSourceTask) GetCrossClusterRequest() (request *types.CrossClusterTaskRequest, retError error) { t.Lock() defer func() { if retError == nil && request == nil { t.state = ctask.TaskStateAcked } t.Unlock() }() if !t.isValidLocked() { return nil, errors.New("task invalidated") } ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout) defer cancel() taskInfo := t.GetInfo().(*persistence.CrossClusterTaskInfo) _, mutableState, release, err := loadWorkflowForCrossClusterTask(ctx, t.executionCache, taskInfo, t.shard.GetMetricsClient(), t.logger) if err != nil || mutableState == nil { return nil, err } defer func() { release(retError) }() request = &types.CrossClusterTaskRequest{ TaskInfo: &types.CrossClusterTaskInfo{ DomainID: t.GetDomainID(), WorkflowID: t.GetWorkflowID(), RunID: t.GetRunID(), TaskID: t.GetTaskID(), VisibilityTimestamp: common.Int64Ptr(t.GetVisibilityTimestamp().UnixNano()), }, } var taskState processingState switch t.GetTaskType() { case persistence.CrossClusterTaskTypeStartChildExecution: var attributes *types.CrossClusterStartChildExecutionRequestAttributes attributes, taskState, err = t.getRequestForStartChildExecution(ctx, taskInfo, mutableState) if err != nil || attributes == nil { return nil, err } request.TaskInfo.TaskType = types.CrossClusterTaskTypeStartChildExecution.Ptr() request.StartChildExecutionAttributes = attributes case persistence.CrossClusterTaskTypeCancelExecution: var attributes *types.CrossClusterCancelExecutionRequestAttributes attributes, taskState, err = t.getRequestForCancelExecution(ctx, taskInfo, mutableState) if err != nil || attributes == nil { return nil, err } request.TaskInfo.TaskType = types.CrossClusterTaskTypeCancelExecution.Ptr() request.CancelExecutionAttributes = attributes case persistence.CrossClusterTaskTypeSignalExecution: var attributes *types.CrossClusterSignalExecutionRequestAttributes attributes, taskState, err = t.getRequestForSignalExecution(ctx, taskInfo, mutableState) if err != nil || attributes == nil { return nil, err } request.TaskInfo.TaskType = types.CrossClusterTaskTypeSignalExecution.Ptr() request.SignalExecutionAttributes = attributes case persistence.CrossClusterTaskTypeRecordChildExeuctionCompleted: var attributes *types.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes attributes, taskState, err = t.getRequestForRecordChildWorkflowCompletion(ctx, taskInfo, mutableState) if err != nil || attributes == nil { return nil, err } request.TaskInfo.TaskType = types.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete.Ptr() request.RecordChildWorkflowExecutionCompleteAttributes = attributes case persistence.CrossClusterTaskTypeApplyParentClosePolicy: var attributes *types.CrossClusterApplyParentClosePolicyRequestAttributes attributes, taskState, err = t.getRequestForApplyParentPolicy(ctx, taskInfo, mutableState) if err != nil || attributes == nil { return nil, err } request.TaskInfo.TaskType = types.CrossClusterTaskTypeApplyParentPolicy.Ptr() request.ApplyParentClosePolicyAttributes = attributes default: return nil, errUnknownCrossClusterTask } request.TaskInfo.TaskState = int16(taskState) return request, nil } func (t *crossClusterSourceTask) VerifyLastWriteVersion( mutableState execution.MutableState, taskInfo *persistence.CrossClusterTaskInfo, ) (bool, error) { lastWriteVersion, err := mutableState.GetLastWriteVersion() if err != nil { return false, err } return verifyTaskVersion(t.shard, t.logger, taskInfo.DomainID, lastWriteVersion, taskInfo.Version, taskInfo) } func (t *crossClusterSourceTask) getRequestForApplyParentPolicy( ctx context.Context, taskInfo *persistence.CrossClusterTaskInfo, mutableState execution.MutableState, ) (*types.CrossClusterApplyParentClosePolicyRequestAttributes, processingState, error) { if mutableState.IsWorkflowExecutionRunning() { return nil, t.processingState, nil } // No need to check the target failovers, only the active cluster should poll tasks // if active cluster changes during polling, target should return error to the source verified, err := t.VerifyLastWriteVersion(mutableState, taskInfo) if err != nil || !verified { return nil, t.processingState, err } domainEntry := mutableState.GetDomainEntry() attributes := &types.CrossClusterApplyParentClosePolicyRequestAttributes{} children, err := filterPendingChildExecutions( taskInfo.TargetDomainIDs, mutableState.GetPendingChildExecutionInfos(), t.GetShard().GetDomainCache(), domainEntry, ) if err != nil { return nil, t.processingState, err } for _, childInfo := range children { // we already filtered the children so that child domainID is in task.TargetDomainIDs // don't check if child domain is active or not here, // we need to send the request even if the child domain is not active in target cluster targetDomainID, err := execution.GetChildExecutionDomainID(childInfo, t.shard.GetDomainCache(), domainEntry) if err != nil { return nil, t.processingState, err } attributes.Children = append( attributes.Children, &types.ApplyParentClosePolicyRequest{ Child: &types.ApplyParentClosePolicyAttributes{ ChildDomainID: targetDomainID, ChildWorkflowID: childInfo.StartedWorkflowID, ChildRunID: childInfo.StartedRunID, ParentClosePolicy: &childInfo.ParentClosePolicy, }, Status: &types.ApplyParentClosePolicyStatus{ Completed: false, }, }, ) } return attributes, t.processingState, nil } func (t *crossClusterSourceTask) getRequestForRecordChildWorkflowCompletion( ctx context.Context, taskInfo *persistence.CrossClusterTaskInfo, mutableState execution.MutableState, ) (*types.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes, processingState, error) { if mutableState.IsWorkflowExecutionRunning() { return nil, t.processingState, nil } verified, err := t.VerifyLastWriteVersion(mutableState, taskInfo) if err != nil || !verified { return nil, t.processingState, err } executionInfo := mutableState.GetExecutionInfo() completionEvent, err := mutableState.GetCompletionEvent(ctx) if err != nil { return nil, t.processingState, err } attributes := &types.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes{ TargetDomainID: executionInfo.ParentDomainID, TargetWorkflowID: executionInfo.ParentWorkflowID, TargetRunID: executionInfo.ParentRunID, InitiatedEventID: executionInfo.InitiatedID, CompletionEvent: completionEvent, } return attributes, t.processingState, nil } func (t *crossClusterSourceTask) getRequestForStartChildExecution( ctx context.Context, taskInfo *persistence.CrossClusterTaskInfo, mutableState execution.MutableState, ) (*types.CrossClusterStartChildExecutionRequestAttributes, processingState, error) { initiatedEventID := taskInfo.ScheduleID childInfo, ok := mutableState.GetChildExecutionInfo(initiatedEventID) if !ok { return nil, t.processingState, nil } ok, err := verifyTaskVersion(t.shard, t.logger, taskInfo.DomainID, childInfo.Version, taskInfo.Version, taskInfo) if err != nil || !ok { return nil, t.processingState, err } if !mutableState.IsWorkflowExecutionRunning() && (childInfo.StartedID == common.EmptyEventID || childInfo.ParentClosePolicy != types.ParentClosePolicyAbandon) { return nil, t.processingState, err } initiatedEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, initiatedEventID) if err != nil { return nil, t.processingState, err } attributes := &types.CrossClusterStartChildExecutionRequestAttributes{ TargetDomainID: taskInfo.TargetDomainID, RequestID: childInfo.CreateRequestID, InitiatedEventID: initiatedEventID, InitiatedEventAttributes: initiatedEvent.StartChildWorkflowExecutionInitiatedEventAttributes, PartitionConfig: mutableState.GetExecutionInfo().PartitionConfig, } if childInfo.StartedID != common.EmptyEventID { // childExecution already started, advance to next state t.processingState = processingStateResponseRecorded attributes.TargetRunID = common.StringPtr(childInfo.StartedRunID) } return attributes, t.processingState, nil } func (t *crossClusterSourceTask) getRequestForCancelExecution( ctx context.Context, taskInfo *persistence.CrossClusterTaskInfo, mutableState execution.MutableState, ) (*types.CrossClusterCancelExecutionRequestAttributes, processingState, error) { if !mutableState.IsWorkflowExecutionRunning() { return nil, t.processingState, nil } initiatedEventID := taskInfo.ScheduleID requestCancelInfo, ok := mutableState.GetRequestCancelInfo(initiatedEventID) if !ok { return nil, t.processingState, nil } ok, err := verifyTaskVersion(t.shard, t.logger, taskInfo.DomainID, requestCancelInfo.Version, taskInfo.Version, taskInfo) if err != nil || !ok { return nil, t.processingState, err } return &types.CrossClusterCancelExecutionRequestAttributes{ TargetDomainID: taskInfo.TargetDomainID, TargetWorkflowID: taskInfo.TargetWorkflowID, TargetRunID: taskInfo.TargetRunID, RequestID: requestCancelInfo.CancelRequestID, InitiatedEventID: initiatedEventID, ChildWorkflowOnly: taskInfo.TargetChildWorkflowOnly, }, t.processingState, nil } func (t *crossClusterSourceTask) getRequestForSignalExecution( ctx context.Context, taskInfo *persistence.CrossClusterTaskInfo, mutableState execution.MutableState, ) (*types.CrossClusterSignalExecutionRequestAttributes, processingState, error) { if !mutableState.IsWorkflowExecutionRunning() { return nil, t.processingState, nil } initiatedEventID := taskInfo.ScheduleID signalInfo, ok := mutableState.GetSignalInfo(initiatedEventID) if !ok { return nil, t.processingState, nil } ok, err := verifyTaskVersion(t.shard, t.logger, taskInfo.DomainID, signalInfo.Version, taskInfo.Version, taskInfo) if err != nil || !ok { return nil, t.processingState, err } return &types.CrossClusterSignalExecutionRequestAttributes{ TargetDomainID: taskInfo.TargetDomainID, TargetWorkflowID: taskInfo.TargetWorkflowID, TargetRunID: taskInfo.TargetRunID, RequestID: signalInfo.SignalRequestID, InitiatedEventID: initiatedEventID, ChildWorkflowOnly: taskInfo.TargetChildWorkflowOnly, SignalName: signalInfo.SignalName, SignalInput: signalInfo.Input, Control: signalInfo.Control, }, t.processingState, nil } func (t *crossClusterSourceTask) IsValid() bool { t.Lock() defer t.Unlock() return t.isValidLocked() } func (t *crossClusterSourceTask) isValidLocked() bool { if t.processingState == processingStateInvalidated { return false } domainCache := t.shard.GetDomainCache() sourceEntry, err := domainCache.GetDomainByID(t.GetDomainID()) if err != nil { // unable to tell, treat the task as valid t.logger.Error("Failed to load domain entry", tag.Error(err)) return true } var targetEntry *cache.DomainCacheEntry // for apply parent policy, target workflow infomation is not // persisted with the task, so skip this test for target workflow since the check is best effort // TODO: we should check the TargetDomainIDs field if t.GetTaskType() != persistence.CrossClusterTaskTypeApplyParentClosePolicy { targetEntry, err = domainCache.GetDomainByID(t.Info.(*persistence.CrossClusterTaskInfo).TargetDomainID) if err != nil { return true } } // pending active state is treated as valid sourceInvalid := sourceEntry.GetReplicationConfig().ActiveClusterName != t.shard.GetClusterMetadata().GetCurrentClusterName() targetInvalid := targetEntry != nil && targetEntry.GetReplicationConfig().ActiveClusterName != t.targetCluster if sourceInvalid || targetInvalid { t.processingState = processingStateInvalidated return false } return true } // RecordResponse records the response of processing cross cluster task at the target cluster // If an error is returned, the operation is failed, task state will remain unchanged // and task is still be available for polling // If not error is returned, operation is successful, task won't be available for polling // and caller should submit the task for processing. func (t *crossClusterSourceTask) RecordResponse(response *types.CrossClusterTaskResponse) error { t.Lock() defer t.Unlock() if t.state != ctask.TaskStatePending || t.processingState != processingState(response.TaskState) { // this might happen when: // 1. duplicated response (shard movement in target cluster) // 2. shard movement in source cluster causing task to be re-processed // 3. task got invalidated during domain failover callback return fmt.Errorf("unexpected task state, expected: %v, actual: %v", t.processingState, response.TaskState) } if t.GetTaskID() != response.GetTaskID() { return fmt.Errorf("unexpected taskID, expected: %v, actual: %v", t.GetTaskID(), response.GetTaskID()) } var emptyResponse bool var taskTypeMatch bool switch t.GetTaskType() { case persistence.CrossClusterTaskTypeStartChildExecution: taskTypeMatch = response.GetTaskType() == types.CrossClusterTaskTypeStartChildExecution emptyResponse = response.StartChildExecutionAttributes == nil case persistence.CrossClusterTaskTypeCancelExecution: taskTypeMatch = response.GetTaskType() == types.CrossClusterTaskTypeCancelExecution emptyResponse = response.CancelExecutionAttributes == nil case persistence.CrossClusterTaskTypeSignalExecution: taskTypeMatch = response.GetTaskType() == types.CrossClusterTaskTypeSignalExecution emptyResponse = response.SignalExecutionAttributes == nil case persistence.CrossClusterTaskTypeRecordChildExeuctionCompleted: taskTypeMatch = response.GetTaskType() == types.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete emptyResponse = response.RecordChildWorkflowExecutionCompleteAttributes == nil case persistence.CrossClusterTaskTypeApplyParentClosePolicy: taskTypeMatch = response.GetTaskType() == types.CrossClusterTaskTypeApplyParentPolicy emptyResponse = response.ApplyParentClosePolicyAttributes == nil default: return fmt.Errorf("unknown task type: %v", t.GetTaskType()) } if !taskTypeMatch { return fmt.Errorf("unexpected task type, expected: %v, actual: %v", t.GetTaskType(), response.GetTaskType()) } if emptyResponse && response.FailedCause == nil { return fmt.Errorf("empty cross cluster task response, task type: %v", t.GetTaskType()) } if response.FailedCause != nil { unexpectedFailedCause := false switch response.GetFailedCause() { case types.CrossClusterTaskFailedCauseUncategorized: unexpectedFailedCause = true case types.CrossClusterTaskFailedCauseWorkflowAlreadyRunning: unexpectedFailedCause = (t.GetTaskType() == persistence.CrossClusterTaskTypeCancelExecution || t.GetTaskType() == persistence.CrossClusterTaskTypeSignalExecution) } if unexpectedFailedCause { // nothing needs to be done for unexpectedFailedCause // leave the task state as is, so it's available for next poll // before it's fetched we will also verify if the task still need to be processed // if so task will be fetched and executed again in the target cluster return fmt.Errorf("unexpected cross cluster task failed cause: %v", response.GetFailedCause()) } } // set state to reported so that the task won't be pulled // while it's being processed at the source cluster t.processingState = processingStateResponseReported t.response = response return nil } // CROSS CLUSTER TARGET TASK METHODS func (t *crossClusterTargetTask) Execute() error { executionStartTime := t.timeSource.Now() defer func() { t.scope.IncCounter(metrics.TaskRequestsPerDomain) t.scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(executionStartTime)) }() logEvent(t.eventLogger, "Executing task") return t.taskExecutor.Execute(t, true) } func (t *crossClusterTargetTask) Ack() { t.Lock() t.state = ctask.TaskStateAcked t.Unlock() logEvent(t.eventLogger, "Acked task") t.completeTask() } func (t *crossClusterTargetTask) Nack() { t.Lock() if t.attempt < t.maxRetryCount { t.redispatchFn(t) t.Unlock() return } t.state = ctask.TaskStateNacked t.Unlock() logEvent(t.eventLogger, "Nacked task") t.completeTask() } func (t *crossClusterTargetTask) HandleErr( err error, ) (retErr error) { logEvent(t.eventLogger, "Failed to handle error", retErr) t.Lock() t.attempt++ t.Unlock() t.scope.IncCounter(metrics.TaskFailuresPerDomain) t.logger.Error("Fail to process task", tag.Error(err), tag.LifeCycleProcessingFailed) return err } func (t *crossClusterTargetTask) RetryErr( err error, ) bool { return err != ErrTaskPendingActive } func (t *crossClusterTargetTask) completeTask() { t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt)) t.scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(t.submitTime)) t.scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(t.GetVisibilityTimestamp())) if t.eventLogger != nil && t.attempt != 0 { // only dump events when the task has been retried t.eventLogger.FlushEvents("Task processing events") } t.settable.Set(*t.response, nil) } // cross cluster task base method shared by both source and target task impl func (t *crossClusterTaskBase) GetInfo() Info { return t.Info } func (t *crossClusterTaskBase) State() ctask.State { t.Lock() defer t.Unlock() return t.state } func (t *crossClusterTaskBase) Priority() int { t.Lock() defer t.Unlock() return t.priority } func (t *crossClusterTaskBase) SetPriority( priority int, ) { t.Lock() defer t.Unlock() t.priority = priority } func (t *crossClusterTaskBase) GetShard() shard.Context { return t.shard } func (t *crossClusterTaskBase) GetAttempt() int { t.Lock() defer t.Unlock() return t.attempt } func (t *crossClusterTaskBase) GetQueueType() QueueType { return QueueTypeCrossCluster } func (t *crossClusterTaskBase) ProcessingState() processingState { t.Lock() defer t.Unlock() return t.processingState } func loadWorkflowForCrossClusterTask( ctx context.Context, executionCache *execution.Cache, taskInfo *persistence.CrossClusterTaskInfo, metricsClient metrics.Client, logger log.Logger, ) (execution.Context, execution.MutableState, execution.ReleaseFunc, error) { wfContext, release, err := executionCache.GetOrCreateWorkflowExecutionWithTimeout( taskInfo.GetDomainID(), getWorkflowExecution(taskInfo), taskGetExecutionContextTimeout, ) if err != nil { if err == context.DeadlineExceeded { return nil, nil, nil, errWorkflowBusy } return nil, nil, nil, err } mutableState, err := loadMutableStateForCrossClusterTask(ctx, wfContext, taskInfo, metricsClient, logger) if err != nil { release(err) return nil, nil, nil, err } if mutableState == nil { release(nil) return nil, nil, nil, nil } return wfContext, mutableState, release, nil }