service/history/task/transfer_active_task_executor.go (1,745 lines of code) (raw):

// Copyright (c) 2021 Uber Technologies, Inc. // Portions of the Software are attributed to Copyright (c) 2021 Temporal Technologies Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package task import ( "context" "errors" "fmt" "time" "github.com/pborman/uuid" "github.com/uber/cadence/client/history" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/reset" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/parentclosepolicy" ) const ( resetWorkflowTimeout = 30 * time.Second ) var ( // ErrMissingRequestCancelInfo indicates missing request cancel info ErrMissingRequestCancelInfo = &types.InternalServiceError{Message: "unable to get request cancel info"} // ErrMissingSignalInfo indicates missing signal external ErrMissingSignalInfo = &types.InternalServiceError{Message: "unable to get signal info"} ) var ( errUnknownTransferTask = errors.New("unknown transfer task") errWorkflowBusy = errors.New("unable to get workflow execution lock within specified timeout") errTargetDomainNotActive = errors.New("target domain not active") errWorkflowRateLimited = errors.New("workflow is being rate limited for making too many requests") ) type ( transferActiveTaskExecutor struct { *transferTaskExecutorBase historyClient history.Client parentClosePolicyClient parentclosepolicy.Client workflowResetter reset.WorkflowResetter wfIDCache workflowcache.WFCache ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter } generatorF = func(taskGenerator execution.MutableStateTaskGenerator) error ) // NewTransferActiveTaskExecutor creates a new task executor for active transfer task func NewTransferActiveTaskExecutor( shard shard.Context, archiverClient archiver.Client, executionCache *execution.Cache, workflowResetter reset.WorkflowResetter, logger log.Logger, config *config.Config, wfIDCache workflowcache.WFCache, ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter, ) Executor { return &transferActiveTaskExecutor{ transferTaskExecutorBase: newTransferTaskExecutorBase( shard, archiverClient, executionCache, logger, config, ), historyClient: shard.GetService().GetHistoryClient(), parentClosePolicyClient: parentclosepolicy.NewClient( shard.GetMetricsClient(), shard.GetLogger(), shard.GetService().GetSDKClient(), config.NumParentClosePolicySystemWorkflows(), ), workflowResetter: workflowResetter, wfIDCache: wfIDCache, ratelimitInternalPerWorkflowID: ratelimitInternalPerWorkflowID, } } func (t *transferActiveTaskExecutor) Execute( task Task, shouldProcessTask bool, ) error { transferTask, ok := task.GetInfo().(*persistence.TransferTaskInfo) if !ok { return errUnexpectedTask } if !shouldProcessTask { return nil } ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout) defer cancel() switch transferTask.TaskType { case persistence.TransferTaskTypeActivityTask: return t.processActivityTask(ctx, transferTask) case persistence.TransferTaskTypeDecisionTask: return t.processDecisionTask(ctx, transferTask) case persistence.TransferTaskTypeCloseExecution: return t.processCloseExecution(ctx, transferTask) case persistence.TransferTaskTypeRecordWorkflowClosed: return t.processRecordWorkflowClosed(ctx, transferTask) case persistence.TransferTaskTypeRecordChildExecutionCompleted: return t.processRecordChildExecutionCompleted(ctx, transferTask) case persistence.TransferTaskTypeApplyParentClosePolicy: return t.processApplyParentClosePolicy(ctx, transferTask) case persistence.TransferTaskTypeCancelExecution: return t.processCancelExecution(ctx, transferTask) case persistence.TransferTaskTypeSignalExecution: return t.processSignalExecution(ctx, transferTask) case persistence.TransferTaskTypeStartChildExecution: return t.processStartChildExecution(ctx, transferTask) case persistence.TransferTaskTypeRecordWorkflowStarted: return t.processRecordWorkflowStarted(ctx, transferTask) case persistence.TransferTaskTypeResetWorkflow: return t.processResetWorkflow(ctx, transferTask) case persistence.TransferTaskTypeUpsertWorkflowSearchAttributes: return t.processUpsertWorkflowSearchAttributes(ctx, transferTask) default: return errUnknownTransferTask } } func (t *transferActiveTaskExecutor) processActivityTask( ctx context.Context, task *persistence.TransferTaskInfo, ) (retError error) { wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout( task.DomainID, getWorkflowExecution(task), taskGetExecutionContextTimeout, ) if err != nil { if err == context.DeadlineExceeded { return errWorkflowBusy } return err } defer func() { release(retError) }() mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, task, t.metricsClient, t.logger) if err != nil { return err } if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() { return nil } ai, ok := mutableState.GetActivityInfo(task.ScheduleID) if !ok { t.logger.Debug("Potentially duplicate ", tag.TaskID(task.TaskID), tag.WorkflowScheduleID(task.ScheduleID), tag.TaskType(persistence.TransferTaskTypeActivityTask)) return nil } ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, ai.Version, task.Version, task) if err != nil || !ok { return err } timeout := common.MinInt32(ai.ScheduleToStartTimeout, common.MaxTaskTimeout) // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) // Rate limiting task processing requests if !t.allowTask(task) { return errWorkflowRateLimited } return t.pushActivity(ctx, task, timeout, mutableState.GetExecutionInfo().PartitionConfig) } func (t *transferActiveTaskExecutor) processDecisionTask( ctx context.Context, task *persistence.TransferTaskInfo, ) (retError error) { wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout( task.DomainID, getWorkflowExecution(task), taskGetExecutionContextTimeout, ) if err != nil { if err == context.DeadlineExceeded { return errWorkflowBusy } return err } defer func() { release(retError) }() mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, task, t.metricsClient, t.logger) if err != nil { return err } if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() { return nil } decision, found := mutableState.GetDecisionInfo(task.ScheduleID) if !found { t.logger.Debug("Potentially duplicate ", tag.TaskID(task.TaskID), tag.WorkflowScheduleID(task.ScheduleID), tag.TaskType(persistence.TransferTaskTypeDecisionTask)) return nil } ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, decision.Version, task.Version, task) if err != nil || !ok { return err } executionInfo := mutableState.GetExecutionInfo() workflowTimeout := executionInfo.WorkflowTimeout decisionTimeout := common.MinInt32(workflowTimeout, common.MaxTaskTimeout) // NOTE: previously this section check whether mutable state has enabled // sticky decision, if so convert the decision to a sticky decision. // that logic has a bug which timer task for that sticky decision is not generated // the correct logic should check whether the decision task is a sticky decision // task or not. taskList := &types.TaskList{ Name: task.TaskList, } if mutableState.GetExecutionInfo().TaskList != task.TaskList { // this decision is an sticky decision // there shall already be an timer set taskList.Kind = types.TaskListKindSticky.Ptr() decisionTimeout = executionInfo.StickyScheduleToStartTimeout } // TODO: for normal decision, we don't know if there's a scheduleToStart // timeout timer task associated with the decision since it's determined // when creating the decision and the result is not persisted in mutable // state. // If we calculated the timeout again here, the timeout may be different, // or even lost the decision if there's originally no timeout timer task // for the decision. Using MaxTaskTimeout here for now so at least no // decision will be lost. // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) // Rate limiting task processing requests if !t.allowTask(task) { return errWorkflowRateLimited } err = t.pushDecision(ctx, task, taskList, decisionTimeout, mutableState.GetExecutionInfo().PartitionConfig) if _, ok := err.(*types.StickyWorkerUnavailableError); ok { // sticky worker is unavailable, switch to non-sticky task list taskList = &types.TaskList{ Name: mutableState.GetExecutionInfo().TaskList, } // Continue to use sticky schedule_to_start timeout as TTL for the matching task. Because the schedule_to_start // timeout timer task is already created which will timeout this task if no worker pick it up in 5s anyway. // There is no need to reset sticky, because if this task is picked by new worker, the new worker will reset // the sticky queue to a new one. However, if worker is completely down, that schedule_to_start timeout task // will re-create a new non-sticky task and reset sticky. err = t.pushDecision(ctx, task, taskList, decisionTimeout, mutableState.GetExecutionInfo().PartitionConfig) } return err } func (t *transferActiveTaskExecutor) allowTask(task *persistence.TransferTaskInfo) bool { domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID) if err != nil { t.logger.Error("Error when getting domain name", tag.WorkflowDomainID(task.DomainID), tag.WorkflowID(task.WorkflowID), tag.WorkflowRunID(task.RunID), tag.Error(err)) // Fail open return true } enabled := t.ratelimitInternalPerWorkflowID(domainName) allow := t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID) return allow || !enabled } func (t *transferActiveTaskExecutor) processCloseExecution( ctx context.Context, task *persistence.TransferTaskInfo, ) error { return t.processCloseExecutionTaskHelper(ctx, task, true, true, true) } func (t *transferActiveTaskExecutor) processRecordWorkflowClosed( ctx context.Context, task *persistence.TransferTaskInfo, ) error { return t.processCloseExecutionTaskHelper(ctx, task, true, false, false) } func (t *transferActiveTaskExecutor) processRecordChildExecutionCompleted( ctx context.Context, task *persistence.TransferTaskInfo, ) error { return t.processCloseExecutionTaskHelper(ctx, task, false, true, false) } func (t *transferActiveTaskExecutor) processApplyParentClosePolicy( ctx context.Context, task *persistence.TransferTaskInfo, ) error { return t.processCloseExecutionTaskHelper(ctx, task, false, false, true) } // TODO: this helper function performs three operations: // 1. publish workflow closed visibility record // 2. if has parent workflow, reply to the parent workflow // 3. if has child workflow(s), apply parent close policy // ideally we should separate them into 3 functions, but it is complicated // but the fact that we want to release mutable state lock as early as possible // we should see if there's a better way to organize the code func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper( ctx context.Context, task *persistence.TransferTaskInfo, recordWorkflowClosed bool, replyToParentWorkflow bool, applyParentClosePolicy bool, ) (retError error) { wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout( task.DomainID, getWorkflowExecution(task), taskGetExecutionContextTimeout, ) if err != nil { if err == context.DeadlineExceeded { return errWorkflowBusy } return err } defer func() { release(retError) }() mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, task, t.metricsClient, t.logger) if err != nil { return err } if mutableState == nil || mutableState.IsWorkflowExecutionRunning() { return nil } lastWriteVersion, err := mutableState.GetLastWriteVersion() if err != nil { return err } ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, lastWriteVersion, task.Version, task) if err != nil || !ok { return err } domainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.DomainID) if err != nil { return err } executionInfo := mutableState.GetExecutionInfo() completionEvent, err := mutableState.GetCompletionEvent(ctx) if err != nil { return err } wfCloseTime := completionEvent.GetTimestamp() parentDomainID := executionInfo.ParentDomainID parentWorkflowID := executionInfo.ParentWorkflowID parentRunID := executionInfo.ParentRunID initiatedID := executionInfo.InitiatedID workflowTypeName := executionInfo.WorkflowTypeName workflowCloseTimestamp := wfCloseTime workflowCloseStatus := persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus) workflowHistoryLength := mutableState.GetNextEventID() - 1 isCron := len(executionInfo.CronSchedule) > 0 numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters)) updateTimestamp := t.shard.GetTimeSource().Now() startEvent, err := mutableState.GetStartEvent(ctx) if err != nil { return err } workflowStartTimestamp := startEvent.GetTimestamp() workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent) visibilityMemo := getWorkflowMemo(executionInfo.Memo) searchAttr := executionInfo.SearchAttributes domainName := mutableState.GetDomainEntry().GetInfo().Name children, err := filterPendingChildExecutions( task.TargetDomainIDs, mutableState.GetPendingChildExecutionInfos(), t.shard.GetDomainCache(), domainEntry, ) if err != nil { return err } // generate cross cluster task for applying parent close policy var crossClusterTaskGenerators []generatorF var sameClusterChildDomainIDs map[int64]string var signalParentClosePolicyWorker bool if applyParentClosePolicy { crossClusterTaskGenerators, sameClusterChildDomainIDs, signalParentClosePolicyWorker, err = t.applyParentClosePolicyDomainActiveCheck(task, domainName, children, domainEntry) if err != nil { return err } } // generate cross cluster task for record child execution completed replyToParentWorkflow = replyToParentWorkflow && mutableState.HasParentExecution() && executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew if replyToParentWorkflow { // generate cross cluster task for recording child completion targetDomainEntry, err := t.shard.GetDomainCache().GetDomainByID(parentDomainID) if err != nil { return err } if targetCluster, isCrossCluster := t.isCrossClusterTask(task.DomainID, targetDomainEntry); isCrossCluster { parentInfo := &types.ParentExecutionInfo{ DomainUUID: parentDomainID, Domain: targetDomainEntry.GetInfo().Name, Execution: &types.WorkflowExecution{ WorkflowID: parentWorkflowID, RunID: parentRunID, }, InitiatedID: initiatedID, } crossClusterTaskGenerators = append(crossClusterTaskGenerators, func(taskGenerator execution.MutableStateTaskGenerator) error { return taskGenerator.GenerateCrossClusterRecordChildCompletedTask(task, targetCluster, parentInfo) }) replyToParentWorkflow = false } } // update workflow execute to persist generated cross cluster tasks if len(crossClusterTaskGenerators) > 0 { if err := t.generateCrossClusterTasks( ctx, wfContext, mutableState, task, crossClusterTaskGenerators, ); err != nil { return err } } // we've gathered all necessary information from mutable state and // generated/persisted necessary cross cluster tasks. // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) // publish workflow closed visibility records if recordWorkflowClosed { if err := t.recordWorkflowClosed( ctx, task.DomainID, task.WorkflowID, task.RunID, workflowTypeName, workflowStartTimestamp, workflowExecutionTimestamp.UnixNano(), workflowCloseTimestamp, *workflowCloseStatus, workflowHistoryLength, task.GetTaskID(), visibilityMemo, executionInfo.TaskList, isCron, numClusters, updateTimestamp.UnixNano(), searchAttr, ); err != nil { return err } } // Communicate the result to parent execution if this is Child Workflow execution // and parent domain is in the same cluster if replyToParentWorkflow { recordChildCompletionCtx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout) defer cancel() err := t.historyClient.RecordChildExecutionCompleted(recordChildCompletionCtx, &types.RecordChildExecutionCompletedRequest{ DomainUUID: parentDomainID, WorkflowExecution: &types.WorkflowExecution{ WorkflowID: parentWorkflowID, RunID: parentRunID, }, InitiatedID: initiatedID, CompletedExecution: &types.WorkflowExecution{ WorkflowID: task.WorkflowID, RunID: task.RunID, }, CompletionEvent: completionEvent, }) // Check to see if the error is non-transient, in which case reset the error and continue with processing switch err.(type) { case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError: err = nil case *types.DomainNotActiveError: err = errTargetDomainNotActive } if err != nil { return err } } if applyParentClosePolicy { if err := t.processParentClosePolicy( ctx, wfContext, task, children, sameClusterChildDomainIDs, signalParentClosePolicyWorker, domainEntry, ); err != nil { return err } } return nil } func (t *transferActiveTaskExecutor) processCancelExecution( ctx context.Context, task *persistence.TransferTaskInfo, ) (retError error) { wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout( task.DomainID, getWorkflowExecution(task), taskGetExecutionContextTimeout, ) if err != nil { if err == context.DeadlineExceeded { return errWorkflowBusy } return err } defer func() { release(retError) }() mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, task, t.metricsClient, t.logger) if err != nil { return err } if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() { return nil } initiatedEventID := task.ScheduleID requestCancelInfo, ok := mutableState.GetRequestCancelInfo(initiatedEventID) if !ok { return nil } ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, requestCancelInfo.Version, task.Version, task) if err != nil || !ok { return err } targetDomainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID) if err != nil { // TODO: handle the case where target domain does not exist return err } if targetCluster, isCrossCluster := t.isCrossClusterTask(task.DomainID, targetDomainEntry); isCrossCluster { return t.generateCrossClusterTaskFromTransferTask(ctx, wfContext, mutableState, task, targetCluster) } targetDomainName := targetDomainEntry.GetInfo().Name // handle workflow cancel itself if task.DomainID == task.TargetDomainID && task.WorkflowID == task.TargetWorkflowID { // it does not matter if the run ID is a mismatch err = requestCancelExternalExecutionFailed( ctx, task, wfContext, targetDomainName, task.TargetWorkflowID, task.TargetRunID, t.shard.GetTimeSource().Now(), ) return err } if err = requestCancelExternalExecutionWithRetry( ctx, t.historyClient, task, targetDomainName, requestCancelInfo.CancelRequestID, ); err != nil { t.logger.Error("Failed to cancel external workflow execution", tag.WorkflowDomainID(task.DomainID), tag.WorkflowID(task.WorkflowID), tag.WorkflowRunID(task.RunID), tag.TargetWorkflowDomainID(task.TargetDomainID), tag.TargetWorkflowID(task.TargetWorkflowID), tag.TargetWorkflowRunID(task.TargetRunID), tag.Error(err)) // Check to see if the error is non-transient, in which case add RequestCancelFailed // event and complete transfer task by setting the err = nil if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) { // for retryable error just return return err } return requestCancelExternalExecutionFailed( ctx, task, wfContext, targetDomainName, task.TargetWorkflowID, task.TargetRunID, t.shard.GetTimeSource().Now(), ) } t.logger.Debug("RequestCancel successfully recorded to external workflow execution", tag.WorkflowDomainID(task.DomainID), tag.WorkflowID(task.WorkflowID), tag.WorkflowRunID(task.RunID), tag.TargetWorkflowDomainID(task.TargetDomainID), tag.TargetWorkflowID(task.TargetWorkflowID), tag.TargetWorkflowRunID(task.TargetRunID)) // Record ExternalWorkflowExecutionCancelRequested in source execution return requestCancelExternalExecutionCompleted( ctx, task, wfContext, targetDomainName, task.TargetWorkflowID, task.TargetRunID, t.shard.GetTimeSource().Now(), ) } func (t *transferActiveTaskExecutor) processSignalExecution( ctx context.Context, task *persistence.TransferTaskInfo, ) (retError error) { wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout( task.DomainID, getWorkflowExecution(task), taskGetExecutionContextTimeout, ) if err != nil { if err == context.DeadlineExceeded { return errWorkflowBusy } return err } defer func() { release(retError) }() mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, task, t.metricsClient, t.logger) if err != nil { return err } if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() { return nil } initiatedEventID := task.ScheduleID signalInfo, ok := mutableState.GetSignalInfo(initiatedEventID) if !ok { // TODO: here we should also RemoveSignalMutableState from target workflow // Otherwise, target SignalRequestID still can leak if shard restart after signalExternalExecutionCompleted // To do that, probably need to add the SignalRequestID in transfer return nil } ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, signalInfo.Version, task.Version, task) if err != nil || !ok { return err } targetDomainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID) if err != nil { // TODO: handle the case where target domain does not exist return err } if targetCluster, isCrossCluster := t.isCrossClusterTask(task.DomainID, targetDomainEntry); isCrossCluster { return t.generateCrossClusterTaskFromTransferTask(ctx, wfContext, mutableState, task, targetCluster) } targetDomainName := targetDomainEntry.GetInfo().Name // handle workflow signal itself if task.DomainID == task.TargetDomainID && task.WorkflowID == task.TargetWorkflowID { // it does not matter if the run ID is a mismatch return signalExternalExecutionFailed( ctx, task, wfContext, targetDomainName, task.TargetWorkflowID, task.TargetRunID, signalInfo.Control, t.shard.GetTimeSource().Now(), ) } if err = signalExternalExecutionWithRetry( ctx, t.historyClient, task, targetDomainName, signalInfo, ); err != nil { t.logger.Error("Failed to signal external workflow execution", tag.WorkflowDomainID(task.DomainID), tag.WorkflowID(task.WorkflowID), tag.WorkflowRunID(task.RunID), tag.TargetWorkflowDomainID(task.TargetDomainID), tag.TargetWorkflowID(task.TargetWorkflowID), tag.TargetWorkflowRunID(task.TargetRunID), tag.Error(err)) // Check to see if the error is non-transient, in which case add SignalFailed // event and complete transfer task by setting the err = nil if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) { // for retryable error just return return err } return signalExternalExecutionFailed( ctx, task, wfContext, targetDomainName, task.TargetWorkflowID, task.TargetRunID, signalInfo.Control, t.shard.GetTimeSource().Now(), ) } t.logger.Debug("Signal successfully recorded to external workflow execution", tag.WorkflowDomainID(task.DomainID), tag.WorkflowID(task.WorkflowID), tag.WorkflowRunID(task.RunID), tag.TargetWorkflowDomainID(task.TargetDomainID), tag.TargetWorkflowID(task.TargetWorkflowID), tag.TargetWorkflowRunID(task.TargetRunID)) err = signalExternalExecutionCompleted( ctx, task, wfContext, targetDomainName, task.TargetWorkflowID, task.TargetRunID, signalInfo.Control, t.shard.GetTimeSource().Now(), ) if err != nil { return err } // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(retError) // remove signalRequestedID from target workflow, after Signal detail is removed from source workflow return removeSignalMutableStateWithRetry(ctx, t.historyClient, task, signalInfo.SignalRequestID) } func (t *transferActiveTaskExecutor) processStartChildExecution( ctx context.Context, task *persistence.TransferTaskInfo, ) (retError error) { wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout( task.DomainID, getWorkflowExecution(task), taskGetExecutionContextTimeout, ) if err != nil { if err == context.DeadlineExceeded { return errWorkflowBusy } return err } defer func() { release(retError) }() mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, task, t.metricsClient, t.logger) if err != nil { return err } if mutableState == nil { return nil } initiatedEventID := task.ScheduleID childInfo, ok := mutableState.GetChildExecutionInfo(initiatedEventID) if !ok { return nil } ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, childInfo.Version, task.Version, task) if err != nil || !ok { return err } workflowRunning := mutableState.IsWorkflowExecutionRunning() childStarted := childInfo.StartedID != common.EmptyEventID if !workflowRunning && (!childStarted || childInfo.ParentClosePolicy != types.ParentClosePolicyAbandon) { // three cases here: // case 1: workflow not running, child started, close policy is not abandon // case 2 & 3: workflow not running, child not started, close policy is or is not abandon return nil } // Get target domain name var targetDomainName string var targetDomainEntry *cache.DomainCacheEntry if targetDomainEntry, err = t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID); err != nil { if _, ok := err.(*types.EntityNotExistsError); !ok { return err } // TODO: handle the case where target domain does not exist // it is possible that the domain got deleted. Use domainID instead as this is only needed for the history event targetDomainName = task.TargetDomainID } else { if targetCluster, isCrossCluster := t.isCrossClusterTask(task.DomainID, targetDomainEntry); isCrossCluster { return t.generateCrossClusterTaskFromTransferTask(ctx, wfContext, mutableState, task, targetCluster) } targetDomainName = targetDomainEntry.GetInfo().Name } // ChildExecution already started, just create DecisionTask and complete transfer task // if parent already closed, since child workflow started event already written to history, // still schedule the decision if the parent close policy is Abandon. // If parent close policy cancel, a decision will be scheduled when processing that close policy. if childStarted { // NOTE: do not access anything related mutable state after this lock release // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) return createFirstDecisionTask( ctx, t.historyClient, task.TargetDomainID, &types.WorkflowExecution{ WorkflowID: childInfo.StartedWorkflowID, RunID: childInfo.StartedRunID, }) } // remaining 2 cases: // workflow running, child not started, close policy is or is not abandon initiatedEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, initiatedEventID) if err != nil { return err } attributes := initiatedEvent.StartChildWorkflowExecutionInitiatedEventAttributes childRunID, err := startWorkflowWithRetry( ctx, t.historyClient, t.shard.GetTimeSource(), t.shard.GetDomainCache(), task, targetDomainName, childInfo.CreateRequestID, attributes, mutableState.GetExecutionInfo().PartitionConfig, ) if err != nil { t.logger.Error("Failed to start child workflow execution", tag.WorkflowDomainID(task.DomainID), tag.WorkflowID(task.WorkflowID), tag.WorkflowRunID(task.RunID), tag.TargetWorkflowDomainID(task.TargetDomainID), tag.TargetWorkflowID(attributes.WorkflowID), tag.Error(err)) // Check to see if the error is non-transient, in which case add StartChildWorkflowExecutionFailed // event and complete transfer task by setting the err = nil switch err.(type) { // TODO: we should also handle domain not exist error here // but we probably need to introduce a new error type for DomainNotExists, // for now when getting an EntityNotExists error, we can't tell if it's domain or workflow. case *types.WorkflowExecutionAlreadyStartedError: err = recordStartChildExecutionFailed(ctx, task, wfContext, attributes, t.shard.GetTimeSource().Now()) } return err } t.logger.Debug("Child Execution started successfully", tag.WorkflowDomainID(task.DomainID), tag.WorkflowID(task.WorkflowID), tag.WorkflowRunID(task.RunID), tag.TargetWorkflowDomainID(task.TargetDomainID), tag.TargetWorkflowID(attributes.WorkflowID), tag.TargetWorkflowRunID(childRunID)) // Child execution is successfully started, record ChildExecutionStartedEvent in parent execution err = recordChildExecutionStarted(ctx, task, wfContext, attributes, childRunID, t.shard.GetTimeSource().Now()) if err != nil { return err } // NOTE: do not access anything related mutable state after this lock release // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) // Finally create first decision task for Child execution so it is really started // entity not exist error is checked and ignored in HandleErr() method in task.go return createFirstDecisionTask( ctx, t.historyClient, task.TargetDomainID, &types.WorkflowExecution{ WorkflowID: task.TargetWorkflowID, RunID: childRunID, }) } func (t *transferActiveTaskExecutor) processRecordWorkflowStarted( ctx context.Context, task *persistence.TransferTaskInfo, ) (retError error) { return t.processRecordWorkflowStartedOrUpsertHelper(ctx, task, true) } func (t *transferActiveTaskExecutor) processUpsertWorkflowSearchAttributes( ctx context.Context, task *persistence.TransferTaskInfo, ) (retError error) { return t.processRecordWorkflowStartedOrUpsertHelper(ctx, task, false) } func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper( ctx context.Context, task *persistence.TransferTaskInfo, recordStart bool, ) (retError error) { workflowStartedScope := getOrCreateDomainTaggedScope(t.shard, metrics.TransferActiveTaskRecordWorkflowStartedScope, task.DomainID, t.logger) wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout( task.DomainID, getWorkflowExecution(task), taskGetExecutionContextTimeout, ) if err != nil { if err == context.DeadlineExceeded { return errWorkflowBusy } return err } defer func() { release(retError) }() mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, task, t.metricsClient, t.logger) if err != nil { return err } if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() { return nil } // verify task version for RecordWorkflowStarted. // upsert doesn't require verifyTask, because it is just a sync of mutableState. if recordStart { startVersion, err := mutableState.GetStartVersion() if err != nil { return err } ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, startVersion, task.Version, task) if err != nil || !ok { return err } } domainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.DomainID) if err != nil { return err } executionInfo := mutableState.GetExecutionInfo() workflowTimeout := executionInfo.WorkflowTimeout wfTypeName := executionInfo.WorkflowTypeName startEvent, err := mutableState.GetStartEvent(ctx) if err != nil { return err } startTimestamp := startEvent.GetTimestamp() executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent) visibilityMemo := getWorkflowMemo(executionInfo.Memo) searchAttr := copySearchAttributes(executionInfo.SearchAttributes) isCron := len(executionInfo.CronSchedule) > 0 numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters)) updateTimestamp := t.shard.GetTimeSource().Now() // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) if recordStart { workflowStartedScope.IncCounter(metrics.WorkflowStartedCount) return t.recordWorkflowStarted( ctx, task.DomainID, task.WorkflowID, task.RunID, wfTypeName, startTimestamp, executionTimestamp.UnixNano(), workflowTimeout, task.GetTaskID(), executionInfo.TaskList, isCron, numClusters, visibilityMemo, updateTimestamp.UnixNano(), searchAttr, ) } return t.upsertWorkflowExecution( ctx, task.DomainID, task.WorkflowID, task.RunID, wfTypeName, startTimestamp, executionTimestamp.UnixNano(), workflowTimeout, task.GetTaskID(), executionInfo.TaskList, visibilityMemo, isCron, numClusters, updateTimestamp.UnixNano(), searchAttr, ) } func (t *transferActiveTaskExecutor) processResetWorkflow( ctx context.Context, task *persistence.TransferTaskInfo, ) (retError error) { currentContext, currentRelease, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout( task.DomainID, getWorkflowExecution(task), taskGetExecutionContextTimeout, ) if err != nil { if err == context.DeadlineExceeded { return errWorkflowBusy } return err } defer func() { currentRelease(retError) }() currentMutableState, err := loadMutableStateForTransferTask(ctx, currentContext, task, t.metricsClient, t.logger) if err != nil { return err } if currentMutableState == nil { return nil } logger := t.logger.WithTags( tag.WorkflowDomainID(task.DomainID), tag.WorkflowID(task.WorkflowID), tag.WorkflowRunID(task.RunID), ) domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID) if err != nil { return err } if !currentMutableState.IsWorkflowExecutionRunning() { // it means this this might not be current anymore, we need to check var resp *persistence.GetCurrentExecutionResponse resp, err = t.shard.GetExecutionManager().GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{ DomainID: task.DomainID, WorkflowID: task.WorkflowID, DomainName: domainName, }) if err != nil { return err } if resp.RunID != task.RunID { logger.Warn("Auto-Reset is skipped, because current run is stale.") return nil } } // TODO: current reset doesn't allow childWFs, in the future we will release this restriction if len(currentMutableState.GetPendingChildExecutionInfos()) > 0 { logger.Warn("Auto-Reset is skipped, because current run has pending child executions.") return nil } currentStartVersion, err := currentMutableState.GetStartVersion() if err != nil { return err } ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, currentStartVersion, task.Version, task) if err != nil || !ok { return err } executionInfo := currentMutableState.GetExecutionInfo() domainEntry, err := t.shard.GetDomainCache().GetDomainByID(executionInfo.DomainID) if err != nil { return err } logger = logger.WithTags(tag.WorkflowDomainName(domainEntry.GetInfo().Name)) reason, resetPoint := execution.FindAutoResetPoint(t.shard.GetTimeSource(), &domainEntry.GetConfig().BadBinaries, executionInfo.AutoResetPoints) if resetPoint == nil { logger.Warn("Auto-Reset is skipped, because reset point is not found.") return nil } logger = logger.WithTags( tag.WorkflowResetBaseRunID(resetPoint.GetRunID()), tag.WorkflowBinaryChecksum(resetPoint.GetBinaryChecksum()), tag.WorkflowEventID(resetPoint.GetFirstDecisionCompletedID()), ) var baseContext execution.Context var baseMutableState execution.MutableState var baseRelease execution.ReleaseFunc if resetPoint.GetRunID() == executionInfo.RunID { baseContext = currentContext baseMutableState = currentMutableState baseRelease = currentRelease } else { baseExecution := types.WorkflowExecution{ WorkflowID: task.WorkflowID, RunID: resetPoint.GetRunID(), } baseContext, baseRelease, err = t.executionCache.GetOrCreateWorkflowExecutionWithTimeout( task.DomainID, baseExecution, taskGetExecutionContextTimeout, ) if err != nil { return err } defer func() { baseRelease(retError) }() baseMutableState, err = loadMutableStateForTransferTask(ctx, baseContext, task, t.metricsClient, t.logger) if err != nil { return err } if baseMutableState == nil { return nil } } // reset workflow needs to go through the history so it may take a long time. // as a result it's not subject to the taskDefaultTimeout. Otherwise the task // may got stuck if the workflow history is large. return t.resetWorkflow( task, domainEntry.GetInfo().Name, reason, resetPoint, baseContext, baseMutableState, currentContext, currentMutableState, logger, ) } func recordChildExecutionStarted( ctx context.Context, task *persistence.TransferTaskInfo, wfContext execution.Context, initiatedAttributes *types.StartChildWorkflowExecutionInitiatedEventAttributes, runID string, now time.Time, ) error { return updateWorkflowExecution(ctx, wfContext, true, func(ctx context.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return &types.EntityNotExistsError{Message: "Workflow execution already completed."} } domain := initiatedAttributes.Domain initiatedEventID := task.ScheduleID ci, ok := mutableState.GetChildExecutionInfo(initiatedEventID) if !ok || ci.StartedID != common.EmptyEventID { return &types.EntityNotExistsError{Message: "Pending child execution not found."} } _, err := mutableState.AddChildWorkflowExecutionStartedEvent( domain, &types.WorkflowExecution{ WorkflowID: task.TargetWorkflowID, RunID: runID, }, initiatedAttributes.WorkflowType, initiatedEventID, initiatedAttributes.Header, ) return err }, now, ) } func recordStartChildExecutionFailed( ctx context.Context, task *persistence.TransferTaskInfo, wfContext execution.Context, initiatedAttributes *types.StartChildWorkflowExecutionInitiatedEventAttributes, now time.Time, ) error { return updateWorkflowExecution(ctx, wfContext, true, func(ctx context.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return &types.EntityNotExistsError{Message: "Workflow execution already completed."} } initiatedEventID := task.ScheduleID ci, ok := mutableState.GetChildExecutionInfo(initiatedEventID) if !ok || ci.StartedID != common.EmptyEventID { return &types.EntityNotExistsError{Message: "Pending child execution not found."} } _, err := mutableState.AddStartChildWorkflowExecutionFailedEvent(initiatedEventID, types.ChildWorkflowExecutionFailedCauseWorkflowAlreadyRunning, initiatedAttributes) return err }, now, ) } // createFirstDecisionTask is used by StartChildExecution transfer task to create the first decision task for // child execution. func createFirstDecisionTask( ctx context.Context, historyClient history.Client, domainID string, execution *types.WorkflowExecution, ) error { scheduleDecisionCtx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout) defer cancel() err := historyClient.ScheduleDecisionTask(scheduleDecisionCtx, &types.ScheduleDecisionTaskRequest{ DomainUUID: domainID, WorkflowExecution: execution, IsFirstDecision: true, }) if err != nil { switch err.(type) { // Maybe child workflow execution already timedout or terminated // Safe to discard the error and complete this transfer task // cross cluster task need to catch entity not exist error // as the target domain may failover before first decision is scheduled. case *types.WorkflowExecutionAlreadyCompletedError: return nil case *types.DomainNotActiveError: err = errTargetDomainNotActive } } return err } func requestCancelExternalExecutionCompleted( ctx context.Context, task *persistence.TransferTaskInfo, wfContext execution.Context, targetDomain string, targetWorkflowID string, targetRunID string, now time.Time, ) error { err := updateWorkflowExecution(ctx, wfContext, true, func(ctx context.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return &types.WorkflowExecutionAlreadyCompletedError{Message: "Workflow execution already completed."} } initiatedEventID := task.ScheduleID _, ok := mutableState.GetRequestCancelInfo(initiatedEventID) if !ok { return ErrMissingRequestCancelInfo } _, err := mutableState.AddExternalWorkflowExecutionCancelRequested( initiatedEventID, targetDomain, targetWorkflowID, targetRunID, ) return err }, now, ) switch err.(type) { // this could happen if this is a duplicate processing of the task, // or the execution has already completed. case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError: return nil } return err } func signalExternalExecutionCompleted( ctx context.Context, task *persistence.TransferTaskInfo, wfContext execution.Context, targetDomain string, targetWorkflowID string, targetRunID string, control []byte, now time.Time, ) error { err := updateWorkflowExecution(ctx, wfContext, true, func(ctx context.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return &types.WorkflowExecutionAlreadyCompletedError{Message: "Workflow execution already completed."} } initiatedEventID := task.ScheduleID _, ok := mutableState.GetSignalInfo(initiatedEventID) if !ok { return ErrMissingSignalInfo } _, err := mutableState.AddExternalWorkflowExecutionSignaled( initiatedEventID, targetDomain, targetWorkflowID, targetRunID, control, ) return err }, now, ) switch err.(type) { // this could happen if this is a duplicate processing of the task, // or the execution has already completed. case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError: return nil } return err } func requestCancelExternalExecutionFailed( ctx context.Context, task *persistence.TransferTaskInfo, wfContext execution.Context, targetDomain string, targetWorkflowID string, targetRunID string, now time.Time, ) error { err := updateWorkflowExecution(ctx, wfContext, true, func(ctx context.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return &types.WorkflowExecutionAlreadyCompletedError{Message: "Workflow execution already completed."} } initiatedEventID := task.ScheduleID _, ok := mutableState.GetRequestCancelInfo(initiatedEventID) if !ok { return ErrMissingRequestCancelInfo } _, err := mutableState.AddRequestCancelExternalWorkflowExecutionFailedEvent( common.EmptyEventID, initiatedEventID, targetDomain, targetWorkflowID, targetRunID, types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, ) return err }, now, ) switch err.(type) { // this could happen if this is a duplicate processing of the task, // or the execution has already completed. case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError: return nil } return err } func signalExternalExecutionFailed( ctx context.Context, task *persistence.TransferTaskInfo, wfContext execution.Context, targetDomain string, targetWorkflowID string, targetRunID string, control []byte, now time.Time, ) error { err := updateWorkflowExecution(ctx, wfContext, true, func(ctx context.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return &types.WorkflowExecutionAlreadyCompletedError{Message: "Workflow execution already completed."} } initiatedEventID := task.ScheduleID _, ok := mutableState.GetSignalInfo(initiatedEventID) if !ok { return ErrMissingSignalInfo } _, err := mutableState.AddSignalExternalWorkflowExecutionFailedEvent( common.EmptyEventID, initiatedEventID, targetDomain, targetWorkflowID, targetRunID, control, types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, ) return err }, now, ) switch err.(type) { // this could happen if this is a duplicate processing of the task, // or the execution has already completed. case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError: return nil } return err } func (t *transferActiveTaskExecutor) isCrossClusterTask( sourceDomainID string, targetDomainEntry *cache.DomainCacheEntry, ) (string, bool) { if sourceDomainID == targetDomainEntry.GetInfo().ID { return "", false } targetCluster := targetDomainEntry.GetReplicationConfig().ActiveClusterName if targetCluster != t.shard.GetClusterMetadata().GetCurrentClusterName() { return targetCluster, true } return "", false } func (t *transferActiveTaskExecutor) generateCrossClusterTasks( ctx context.Context, wfContext execution.Context, mutableState execution.MutableState, task *persistence.TransferTaskInfo, generators []generatorF, ) error { // TODO: we don't have to do this check here, as we hold the mutable state lock // during the update and we already checked the workflow status after acquiring the lock. if task.TaskType == persistence.TransferTaskTypeCloseExecution { // IsWorkflowCompleted only returns true when the workflow is completed, // !IsWorkflowExecutionRunning below returns true when the wf is zombie or corrupted too if !mutableState.IsWorkflowCompleted() { t.logger.Error("generateCrossClusterTasks", tag.Error(types.BadRequestError{ Message: "Workflow has an invalid state for recordChildClose cross cluster task", })) // Returning nil to avoid infinite retry loop return nil } } else if task.TaskType != persistence.TransferTaskTypeStartChildExecution && !mutableState.IsWorkflowExecutionRunning() { return &types.WorkflowExecutionAlreadyCompletedError{Message: "Workflow execution already completed."} } taskGenerator := execution.NewMutableStateTaskGenerator( t.shard.GetClusterMetadata(), t.shard.GetDomainCache(), mutableState, ) for _, generator := range generators { err := generator(taskGenerator) if err != nil { return err } } return wfContext.UpdateWorkflowExecutionTasks(ctx, t.shard.GetTimeSource().Now()) } func (t *transferActiveTaskExecutor) generateCrossClusterTaskFromTransferTask( ctx context.Context, wfContext execution.Context, mutableState execution.MutableState, task *persistence.TransferTaskInfo, targetCluster string, ) error { return t.generateCrossClusterTasks( ctx, wfContext, mutableState, task, []generatorF{ func(taskGenerator execution.MutableStateTaskGenerator) error { return taskGenerator.GenerateFromTransferTask(task, targetCluster) }, }, ) } func updateWorkflowExecution( ctx context.Context, wfContext execution.Context, createDecisionTask bool, action func(ctx context.Context, builder execution.MutableState) error, now time.Time, ) error { mutableState, err := wfContext.LoadWorkflowExecution(ctx) if err != nil { return err } if err := action(ctx, mutableState); err != nil { return err } if createDecisionTask { // Create a transfer task to schedule a decision task err := execution.ScheduleDecision(mutableState) if err != nil { return err } } return wfContext.UpdateWorkflowExecutionAsActive(ctx, now) } func requestCancelExternalExecutionWithRetry( ctx context.Context, historyClient history.Client, task *persistence.TransferTaskInfo, targetDomain string, cancelRequestID string, ) error { request := &types.HistoryRequestCancelWorkflowExecutionRequest{ DomainUUID: task.TargetDomainID, CancelRequest: &types.RequestCancelWorkflowExecutionRequest{ Domain: targetDomain, WorkflowExecution: &types.WorkflowExecution{ WorkflowID: task.TargetWorkflowID, RunID: task.TargetRunID, }, Identity: execution.IdentityHistoryService, // Use the same request ID to dedupe RequestCancelWorkflowExecution calls RequestID: cancelRequestID, }, ExternalInitiatedEventID: common.Int64Ptr(task.ScheduleID), ExternalWorkflowExecution: &types.WorkflowExecution{ WorkflowID: task.WorkflowID, RunID: task.RunID, }, ChildWorkflowOnly: task.TargetChildWorkflowOnly, } requestCancelCtx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout) defer cancel() op := func() error { return historyClient.RequestCancelWorkflowExecution(requestCancelCtx, request) } throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(taskRetryPolicy), backoff.WithRetryableError(common.IsServiceTransientError), ) err := throttleRetry.Do(context.Background(), op) switch err.(type) { case *types.CancellationAlreadyRequestedError: // err is CancellationAlreadyRequestedError // this could happen if target workflow cancellation is already requested // mark as success err = nil case *types.DomainNotActiveError: err = errTargetDomainNotActive } return err } func signalExternalExecutionWithRetry( ctx context.Context, historyClient history.Client, task *persistence.TransferTaskInfo, targetDomain string, signalInfo *persistence.SignalInfo, ) error { request := &types.HistorySignalWorkflowExecutionRequest{ DomainUUID: task.TargetDomainID, SignalRequest: &types.SignalWorkflowExecutionRequest{ Domain: targetDomain, WorkflowExecution: &types.WorkflowExecution{ WorkflowID: task.TargetWorkflowID, RunID: task.TargetRunID, }, Identity: execution.IdentityHistoryService, SignalName: signalInfo.SignalName, Input: signalInfo.Input, // Use same request ID to deduplicate SignalWorkflowExecution calls RequestID: signalInfo.SignalRequestID, Control: signalInfo.Control, }, ExternalWorkflowExecution: &types.WorkflowExecution{ WorkflowID: task.WorkflowID, RunID: task.RunID, }, ChildWorkflowOnly: task.TargetChildWorkflowOnly, } signalCtx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout) defer cancel() op := func() error { return historyClient.SignalWorkflowExecution(signalCtx, request) } throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(taskRetryPolicy), backoff.WithRetryableError(common.IsServiceTransientError), ) err := throttleRetry.Do(context.Background(), op) if _, ok := err.(*types.DomainNotActiveError); ok { err = errTargetDomainNotActive } return err } func removeSignalMutableStateWithRetry( ctx context.Context, historyClient history.Client, task *persistence.TransferTaskInfo, signalRequestID string, ) error { ctx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout) defer cancel() removeSignalRequest := &types.RemoveSignalMutableStateRequest{ DomainUUID: task.TargetDomainID, WorkflowExecution: &types.WorkflowExecution{ WorkflowID: task.TargetWorkflowID, RunID: task.TargetRunID, }, RequestID: signalRequestID, } op := func() error { return historyClient.RemoveSignalMutableState(ctx, removeSignalRequest) } throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(taskRetryPolicy), backoff.WithRetryableError(common.IsServiceTransientError), ) err := throttleRetry.Do(context.Background(), op) switch err.(type) { case *types.EntityNotExistsError: // it's safe to discard entity not exists error here // as there's nothing to remove. // for cross cluster task, we don't have to return the error to the source cluster return nil case *types.DomainNotActiveError: err = errTargetDomainNotActive } return err } func startWorkflowWithRetry( ctx context.Context, historyClient history.Client, timeSource clock.TimeSource, domainCache cache.DomainCache, task *persistence.TransferTaskInfo, targetDomain string, requestID string, attributes *types.StartChildWorkflowExecutionInitiatedEventAttributes, partitionConfig map[string]string, ) (string, error) { // Get parent domain name domainName, err := domainCache.GetDomainName(task.DomainID) if err != nil { if _, ok := err.(*types.EntityNotExistsError); !ok { return "", err } // it is possible that the domain got deleted. Use domainID instead as this is only needed for the history event domainName = task.DomainID } frontendStartReq := &types.StartWorkflowExecutionRequest{ Domain: targetDomain, WorkflowID: attributes.WorkflowID, WorkflowType: attributes.WorkflowType, TaskList: attributes.TaskList, Input: attributes.Input, Header: attributes.Header, ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds, TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds, // Use the same request ID to dedupe StartWorkflowExecution calls RequestID: requestID, WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy, RetryPolicy: attributes.RetryPolicy, CronSchedule: attributes.CronSchedule, Memo: attributes.Memo, SearchAttributes: attributes.SearchAttributes, DelayStartSeconds: attributes.DelayStartSeconds, JitterStartSeconds: attributes.JitterStartSeconds, } historyStartReq, err := common.CreateHistoryStartWorkflowRequest(task.TargetDomainID, frontendStartReq, timeSource.Now(), partitionConfig) if err != nil { return "", err } historyStartReq.ParentExecutionInfo = &types.ParentExecutionInfo{ DomainUUID: task.DomainID, Domain: domainName, Execution: &types.WorkflowExecution{ WorkflowID: task.WorkflowID, RunID: task.RunID, }, InitiatedID: task.ScheduleID, } startWorkflowCtx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout) defer cancel() var response *types.StartWorkflowExecutionResponse op := func() error { response, err = historyClient.StartWorkflowExecution(startWorkflowCtx, historyStartReq) return err } throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(taskRetryPolicy), backoff.WithRetryableError(common.IsServiceTransientError), ) if err := throttleRetry.Do(context.Background(), op); err != nil { if _, ok := err.(*types.DomainNotActiveError); ok { err = errTargetDomainNotActive } return "", err } return response.GetRunID(), nil } func (t *transferActiveTaskExecutor) resetWorkflow( task *persistence.TransferTaskInfo, domain string, reason string, resetPoint *types.ResetPointInfo, baseContext execution.Context, baseMutableState execution.MutableState, currentContext execution.Context, currentMutableState execution.MutableState, logger log.Logger, ) error { var err error resetCtx, cancel := context.WithTimeout(context.Background(), resetWorkflowTimeout) defer cancel() domainID := task.DomainID WorkflowID := task.WorkflowID baseRunID := baseMutableState.GetExecutionInfo().RunID resetRunID := uuid.New() baseRebuildLastEventID := resetPoint.GetFirstDecisionCompletedID() - 1 baseVersionHistories := baseMutableState.GetVersionHistories() if baseVersionHistories == nil { return execution.ErrMissingVersionHistories } baseCurrentVersionHistory, err := baseVersionHistories.GetCurrentVersionHistory() if err != nil { return err } baseRebuildLastEventVersion, err := baseCurrentVersionHistory.GetEventVersion(baseRebuildLastEventID) if err != nil { return err } baseCurrentBranchToken := baseCurrentVersionHistory.GetBranchToken() baseNextEventID := baseMutableState.GetNextEventID() err = t.workflowResetter.ResetWorkflow( resetCtx, domainID, WorkflowID, baseRunID, baseCurrentBranchToken, baseRebuildLastEventID, baseRebuildLastEventVersion, baseNextEventID, resetRunID, uuid.New(), execution.NewWorkflow( resetCtx, t.shard.GetClusterMetadata(), currentContext, currentMutableState, execution.NoopReleaseFn, // this is fine since caller will defer on release ), reason, nil, false, ) switch err.(type) { case nil: return nil case *types.BadRequestError: // This means the reset point is corrupted and not retry able. // There must be a bug in our system that we must fix.(for example, history is not the same in active/passive) t.metricsClient.IncCounter(metrics.TransferQueueProcessorScope, metrics.AutoResetPointCorruptionCounter) logger.Error("Auto-Reset workflow failed and not retryable. The reset point is corrupted.", tag.Error(err)) return nil default: // log this error and retry logger.Error("Auto-Reset workflow failed", tag.Error(err)) return err } } // applyParentClosePolicyDomainActiveCheck determines how parent close policy should be applied // this function returns four results // 1. cross cluster task generator function // 2. a map of child workflow whose domain is active in the current cluster. // Map is from child init eventID to child domainID. Child // This result so what when actually processing parent close policy // we have a consistent view of whether child domain is active or not // otherwise if child domain did a failover in the middle, we may skip some child domains. // 3. signal parent policy worker: Whether we should signal the parent close policy workflow instead of // handling it within this transfer task. If true, the previous two results should not be used. // 4. error if there's any func (t *transferActiveTaskExecutor) applyParentClosePolicyDomainActiveCheck( task *persistence.TransferTaskInfo, domainName string, childInfos map[int64]*persistence.ChildExecutionInfo, parentDomainEntry *cache.DomainCacheEntry, ) ([]generatorF, map[int64]string, bool, error) { sameClusterChildDomainIDs := make(map[int64]string) // child init eventID -> child domainID remoteClusters := make(map[string]map[string]struct{}) parentClosePolicyWorkerEnabled := t.shard.GetConfig().EnableParentClosePolicyWorker() if parentClosePolicyWorkerEnabled && len(childInfos) >= t.shard.GetConfig().ParentClosePolicyThreshold(domainName) { // only signal parent close policy workflow when # of child workflow exceeds threshold // the system workflow can handle the case where child domain is active in a different cluster return nil, nil, true, nil } for initiatedID, childInfo := range childInfos { if childInfo.ParentClosePolicy == types.ParentClosePolicyAbandon { continue } targetDomainEntry, err := execution.GetChildExecutionDomainEntry( childInfo, t.shard.GetDomainCache(), parentDomainEntry, ) if err != nil { if common.IsEntityNotExistsError(err) { // if domain no longer exists, ignore the child // don't return error here, otherwise the entire close execution task will get skipped. continue } return nil, nil, false, err } targetCluster, isCrossCluster := t.isCrossClusterTask(task.DomainID, targetDomainEntry) if isCrossCluster { if _, ok := remoteClusters[targetCluster]; !ok { remoteClusters[targetCluster] = map[string]struct{}{} } remoteClusters[targetCluster][targetDomainEntry.GetInfo().ID] = struct{}{} } else { sameClusterChildDomainIDs[initiatedID] = targetDomainEntry.GetInfo().ID } } generators := []generatorF{} for remoteCluster, targetDomainIDs := range remoteClusters { generators = append( generators, func(taskGenerator execution.MutableStateTaskGenerator) error { return taskGenerator.GenerateCrossClusterApplyParentClosePolicyTask(task, remoteCluster, targetDomainIDs) }) } return generators, sameClusterChildDomainIDs, false, nil } func (t *transferActiveTaskExecutor) processParentClosePolicy( ctx context.Context, wfContext execution.Context, task *persistence.TransferTaskInfo, childInfos map[int64]*persistence.ChildExecutionInfo, sameClusterChildDomainIDs map[int64]string, // child init ID -> child domainID signalParentClosePolicyWorkflow bool, parentDomainEntry *cache.DomainCacheEntry, ) error { if len(childInfos) == 0 { return nil } scope := t.metricsClient.Scope(metrics.TransferActiveTaskCloseExecutionScope) if signalParentClosePolicyWorkflow { executions := make([]parentclosepolicy.RequestDetail, 0, len(childInfos)) for _, childInfo := range childInfos { if childInfo.ParentClosePolicy == types.ParentClosePolicyAbandon { continue } domainEntry, err := execution.GetChildExecutionDomainEntry(childInfo, t.shard.GetDomainCache(), parentDomainEntry) if common.IsEntityNotExistsError(err) { continue } if err != nil { return err } executions = append(executions, parentclosepolicy.RequestDetail{ DomainID: domainEntry.GetInfo().ID, DomainName: domainEntry.GetInfo().Name, WorkflowID: childInfo.StartedWorkflowID, RunID: childInfo.StartedRunID, Policy: childInfo.ParentClosePolicy, }) } if len(executions) == 0 { return nil } request := parentclosepolicy.Request{ ParentExecution: types.WorkflowExecution{ WorkflowID: task.WorkflowID, RunID: task.RunID, }, Executions: executions, } // Cross cluster requests will be handled via auto-forwarding, no need to treat them differently here return t.parentClosePolicyClient.SendParentClosePolicyRequest(ctx, request) } for initiatedID, childDomainID := range sameClusterChildDomainIDs { childDomainName, err := t.shard.GetDomainCache().GetDomainName(childDomainID) if err != nil { // domainName is not actually used when applyParentClosePolicy is calling // history service for terminating or cancelling workflow childDomainName = childDomainID } childInfo := childInfos[initiatedID] if err := applyParentClosePolicy( ctx, t.historyClient, &types.WorkflowExecution{ WorkflowID: task.WorkflowID, RunID: task.RunID, }, childDomainID, childDomainName, childInfo.StartedWorkflowID, childInfo.StartedRunID, childInfo.ParentClosePolicy, ); err != nil { switch err.(type) { case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError, *types.CancellationAlreadyRequestedError: // expected error, no-op break default: scope.IncCounter(metrics.ParentClosePolicyProcessorFailures) return err } } scope.IncCounter(metrics.ParentClosePolicyProcessorSuccess) } return nil } func applyParentClosePolicy( ctx context.Context, historyClient history.Client, parentWorkflowExecution *types.WorkflowExecution, childDomainID string, childDomainName string, childStartedWorkflowID string, childStartedRunID string, parentClosePolicy types.ParentClosePolicy, ) error { ctx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout) defer cancel() var err error switch parentClosePolicy { case types.ParentClosePolicyAbandon: // noop err = nil case types.ParentClosePolicyTerminate: err = historyClient.TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{ DomainUUID: childDomainID, TerminateRequest: &types.TerminateWorkflowExecutionRequest{ Domain: childDomainName, WorkflowExecution: &types.WorkflowExecution{ WorkflowID: childStartedWorkflowID, }, Reason: "by parent close policy", Identity: execution.IdentityHistoryService, // Include StartedRunID as FirstExecutionRunID on the request to allow child to be terminated across runs. // If the child does continue as new it still propagates the RunID of first execution. FirstExecutionRunID: childStartedRunID, }, ExternalWorkflowExecution: parentWorkflowExecution, ChildWorkflowOnly: true, }) case types.ParentClosePolicyRequestCancel: err = historyClient.RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{ DomainUUID: childDomainID, CancelRequest: &types.RequestCancelWorkflowExecutionRequest{ Domain: childDomainName, WorkflowExecution: &types.WorkflowExecution{ WorkflowID: childStartedWorkflowID, }, Identity: execution.IdentityHistoryService, // Include StartedRunID as FirstExecutionRunID on the request to allow child to be canceled across runs. // If the child does continue as new it still propagates the RunID of first execution. FirstExecutionRunID: childStartedRunID, }, ExternalWorkflowExecution: parentWorkflowExecution, ChildWorkflowOnly: true, }) default: err = &types.InternalServiceError{ Message: fmt.Sprintf("unknown parent close policy: %v", parentClosePolicy), } } if _, ok := err.(*types.DomainNotActiveError); ok { err = errTargetDomainNotActive } return err } func filterPendingChildExecutions( targetDomainIDs map[string]struct{}, children map[int64]*persistence.ChildExecutionInfo, domainCache cache.DomainCache, parentDomainEntry *cache.DomainCacheEntry, ) (map[int64]*persistence.ChildExecutionInfo, error) { if len(targetDomainIDs) == 0 { return children, nil } filteredChildren := make(map[int64]*persistence.ChildExecutionInfo, len(children)) for initiatedID, child := range children { domainID, err := execution.GetChildExecutionDomainID(child, domainCache, parentDomainEntry) if err != nil { if common.IsEntityNotExistsError(err) { // target domain deleted, ignore the child continue } return nil, err } if _, ok := targetDomainIDs[domainID]; ok { filteredChildren[initiatedID] = child } } return filteredChildren, nil }