service/history/execution/mutable_state_task_generator.go (898 lines of code) (raw):

// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. //go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination mutable_state_task_generator_mock.go -self_package github.com/uber/cadence/service/history/execution package execution import ( "errors" "fmt" "math" "math/rand" "time" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) type ( // MutableStateTaskGenerator generates workflow transfer and timer tasks MutableStateTaskGenerator interface { // for workflow reset startTime should be the reset time instead of // the startEvent time, so that workflow timeout timestamp can be // re-calculated. GenerateWorkflowStartTasks( startTime time.Time, startEvent *types.HistoryEvent, ) error GenerateWorkflowCloseTasks( closeEvent *types.HistoryEvent, workflowDeletionTaskJitterRange int, ) error GenerateRecordWorkflowStartedTasks( startEvent *types.HistoryEvent, ) error GenerateDelayedDecisionTasks( startEvent *types.HistoryEvent, ) error GenerateDecisionScheduleTasks( decisionScheduleID int64, ) error GenerateDecisionStartTasks( decisionScheduleID int64, ) error GenerateActivityTransferTasks( event *types.HistoryEvent, ) error GenerateActivityRetryTasks( activityScheduleID int64, ) error GenerateChildWorkflowTasks( event *types.HistoryEvent, ) error GenerateRequestCancelExternalTasks( event *types.HistoryEvent, ) error GenerateSignalExternalTasks( event *types.HistoryEvent, ) error GenerateWorkflowSearchAttrTasks() error GenerateWorkflowResetTasks() error GenerateFromTransferTask( transferTask *persistence.TransferTaskInfo, targetCluster string, ) error // NOTE: CloseExecution task may generate both RecordChildCompletion // and ApplyParentPolicy tasks. That's why we currently have separate // functions for generating tasks from CloseExecution task GenerateCrossClusterRecordChildCompletedTask( transferTask *persistence.TransferTaskInfo, targetCluster string, parentInfo *types.ParentExecutionInfo, ) error GenerateCrossClusterApplyParentClosePolicyTask( transferTask *persistence.TransferTaskInfo, targetCluster string, childDomainIDs map[string]struct{}, ) error GenerateFromCrossClusterTask( crossClusterTask *persistence.CrossClusterTaskInfo, ) error // these 2 APIs should only be called when mutable state transaction is being closed GenerateActivityTimerTasks() error GenerateUserTimerTasks() error } mutableStateTaskGeneratorImpl struct { clusterMetadata cluster.Metadata domainCache cache.DomainCache mutableState MutableState } ) const ( defaultWorkflowRetentionInDays int32 = 1 defaultInitIntervalForDecisionRetry = 1 * time.Minute defaultMaxIntervalForDecisionRetry = 5 * time.Minute defaultJitterCoefficient = 0.2 ) var _ MutableStateTaskGenerator = (*mutableStateTaskGeneratorImpl)(nil) // NewMutableStateTaskGenerator creates a new task generator for mutable state func NewMutableStateTaskGenerator( clusterMetadata cluster.Metadata, domainCache cache.DomainCache, mutableState MutableState, ) MutableStateTaskGenerator { return &mutableStateTaskGeneratorImpl{ clusterMetadata: clusterMetadata, domainCache: domainCache, mutableState: mutableState, } } func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowStartTasks( startTime time.Time, startEvent *types.HistoryEvent, ) error { attr := startEvent.WorkflowExecutionStartedEventAttributes firstDecisionDelayDuration := time.Duration(attr.GetFirstDecisionTaskBackoffSeconds()) * time.Second executionInfo := r.mutableState.GetExecutionInfo() startVersion := startEvent.Version workflowTimeoutDuration := time.Duration(executionInfo.WorkflowTimeout) * time.Second workflowTimeoutTimestamp := startTime.Add(workflowTimeoutDuration + firstDecisionDelayDuration) // ensure that the first attempt does not time out early based on retry policy timeout if attr.Attempt > 0 && !executionInfo.ExpirationTime.IsZero() && workflowTimeoutTimestamp.After(executionInfo.ExpirationTime) { workflowTimeoutTimestamp = executionInfo.ExpirationTime } r.mutableState.AddTimerTasks(&persistence.WorkflowTimeoutTask{ TaskData: persistence.TaskData{ // TaskID is set by shard VisibilityTimestamp: workflowTimeoutTimestamp, Version: startVersion, }, }) return nil } func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks( closeEvent *types.HistoryEvent, workflowDeletionTaskJitterRange int, ) error { executionInfo := r.mutableState.GetExecutionInfo() transferTasks := []persistence.Task{} crossClusterTasks := []persistence.Task{} _, isActive, err := getTargetCluster(executionInfo.DomainID, r.domainCache, r.clusterMetadata) if err != nil { return err } if !isActive { // source domain passive, generate only one close execution task transferTasks = append(transferTasks, &persistence.CloseExecutionTask{ TaskData: persistence.TaskData{ // TaskID and VisibilityTimestamp are set by shard context Version: closeEvent.Version, }, }) } else { // 1. check if parent is cross cluster parentTargetCluster, isActive, err := getParentCluster(r.mutableState, r.domainCache, r.clusterMetadata) if err != nil { return err } if parentTargetCluster != "" { recordChildCompletionTask := &persistence.RecordChildExecutionCompletedTask{ TaskData: persistence.TaskData{ Version: closeEvent.Version, }, TargetDomainID: executionInfo.ParentDomainID, TargetWorkflowID: executionInfo.ParentWorkflowID, TargetRunID: executionInfo.ParentRunID, } if !isActive { crossClusterTasks = append(crossClusterTasks, &persistence.CrossClusterRecordChildExecutionCompletedTask{ TargetCluster: parentTargetCluster, RecordChildExecutionCompletedTask: *recordChildCompletionTask, }) } else { transferTasks = append(transferTasks, recordChildCompletionTask) } } // 2. check if child domains are cross cluster parentCloseTransferTask, parentCloseCrossClusterTask, err := r.generateApplyParentCloseTasks(nil, closeEvent.Version, time.Time{}, false) if err != nil { return err } transferTasks = append(transferTasks, parentCloseTransferTask...) crossClusterTasks = append(crossClusterTasks, parentCloseCrossClusterTask...) // 3. add record workflow closed task if len(crossClusterTasks) != 0 { transferTasks = append(transferTasks, &persistence.RecordWorkflowClosedTask{ TaskData: persistence.TaskData{ Version: closeEvent.Version, }, }) } else { transferTasks = []persistence.Task{ &persistence.CloseExecutionTask{ TaskData: persistence.TaskData{ Version: closeEvent.Version, }, }, } } } r.mutableState.AddTransferTasks(transferTasks...) r.mutableState.AddCrossClusterTasks(crossClusterTasks...) retentionInDays := defaultWorkflowRetentionInDays domainEntry, err := r.domainCache.GetDomainByID(executionInfo.DomainID) switch err.(type) { case nil: retentionInDays = domainEntry.GetRetentionDays(executionInfo.WorkflowID) case *types.EntityNotExistsError: // domain is not accessible, use default value above default: return err } closeTimestamp := time.Unix(0, closeEvent.GetTimestamp()) retentionDuration := (time.Duration(retentionInDays) * time.Hour * 24) if workflowDeletionTaskJitterRange > 1 { retentionDuration += time.Duration(rand.Intn(workflowDeletionTaskJitterRange*60)) * time.Second } r.mutableState.AddTimerTasks(&persistence.DeleteHistoryEventTask{ TaskData: persistence.TaskData{ // TaskID is set by shard VisibilityTimestamp: closeTimestamp.Add(retentionDuration), Version: closeEvent.Version, }, }) return nil } func (r *mutableStateTaskGeneratorImpl) GenerateDelayedDecisionTasks( startEvent *types.HistoryEvent, ) error { startVersion := startEvent.Version startTimestamp := time.Unix(0, startEvent.GetTimestamp()) startAttr := startEvent.WorkflowExecutionStartedEventAttributes decisionBackoffDuration := time.Duration(startAttr.GetFirstDecisionTaskBackoffSeconds()) * time.Second executionTimestamp := startTimestamp.Add(decisionBackoffDuration) // noParentWorkflow case firstDecisionDelayType := persistence.WorkflowBackoffTimeoutTypeCron // continue as new case if startAttr.Initiator != nil { switch startAttr.GetInitiator() { case types.ContinueAsNewInitiatorRetryPolicy: firstDecisionDelayType = persistence.WorkflowBackoffTimeoutTypeRetry case types.ContinueAsNewInitiatorCronSchedule: firstDecisionDelayType = persistence.WorkflowBackoffTimeoutTypeCron case types.ContinueAsNewInitiatorDecider: return &types.InternalServiceError{ Message: "encounter continue as new iterator & first decision delay not 0", } default: return &types.InternalServiceError{ Message: fmt.Sprintf("unknown iterator retry policy: %v", startAttr.GetInitiator()), } } } r.mutableState.AddTimerTasks(&persistence.WorkflowBackoffTimerTask{ TaskData: persistence.TaskData{ // TaskID is set by shard VisibilityTimestamp: executionTimestamp, Version: startVersion, }, // TODO EventID seems not used at all TimeoutType: firstDecisionDelayType, }) return nil } func (r *mutableStateTaskGeneratorImpl) GenerateRecordWorkflowStartedTasks( startEvent *types.HistoryEvent, ) error { startVersion := startEvent.Version r.mutableState.AddTransferTasks(&persistence.RecordWorkflowStartedTask{ TaskData: persistence.TaskData{ // TaskID and VisibilityTimestamp are set by shard context Version: startVersion, }, }) return nil } func (r *mutableStateTaskGeneratorImpl) GenerateDecisionScheduleTasks( decisionScheduleID int64, ) error { executionInfo := r.mutableState.GetExecutionInfo() decision, ok := r.mutableState.GetDecisionInfo( decisionScheduleID, ) if !ok { return &types.InternalServiceError{ Message: fmt.Sprintf("it could be a bug, cannot get pending decision: %v", decisionScheduleID), } } r.mutableState.AddTransferTasks(&persistence.DecisionTask{ TaskData: persistence.TaskData{ // TaskID and VisibilityTimestamp are set by shard context Version: decision.Version, }, DomainID: executionInfo.DomainID, TaskList: decision.TaskList, ScheduleID: decision.ScheduleID, }) if scheduleToStartTimeout := r.mutableState.GetDecisionScheduleToStartTimeout(); scheduleToStartTimeout != 0 { scheduledTime := time.Unix(0, decision.ScheduledTimestamp) r.mutableState.AddTimerTasks(&persistence.DecisionTimeoutTask{ TaskData: persistence.TaskData{ // TaskID is set by shard VisibilityTimestamp: scheduledTime.Add(scheduleToStartTimeout), Version: decision.Version, }, TimeoutType: int(TimerTypeScheduleToStart), EventID: decision.ScheduleID, ScheduleAttempt: decision.Attempt, }) } return nil } func (r *mutableStateTaskGeneratorImpl) GenerateDecisionStartTasks( decisionScheduleID int64, ) error { decision, ok := r.mutableState.GetDecisionInfo( decisionScheduleID, ) if !ok { return &types.InternalServiceError{ Message: fmt.Sprintf("it could be a bug, cannot get pending decision: %v", decisionScheduleID), } } startedTime := time.Unix(0, decision.StartedTimestamp) startToCloseTimeout := time.Duration( decision.DecisionTimeout, ) * time.Second // schedule timer exponentially if decision keeps failing if decision.Attempt > 1 { defaultStartToCloseTimeout := r.mutableState.GetExecutionInfo().DecisionStartToCloseTimeout startToCloseTimeout = getNextDecisionTimeout(decision.Attempt, time.Duration(defaultStartToCloseTimeout)*time.Second) decision.DecisionTimeout = int32(startToCloseTimeout.Seconds()) // override decision timeout r.mutableState.UpdateDecision(decision) } r.mutableState.AddTimerTasks(&persistence.DecisionTimeoutTask{ TaskData: persistence.TaskData{ // TaskID is set by shard VisibilityTimestamp: startedTime.Add(startToCloseTimeout), Version: decision.Version, }, TimeoutType: int(TimerTypeStartToClose), EventID: decision.ScheduleID, ScheduleAttempt: decision.Attempt, }) return nil } func (r *mutableStateTaskGeneratorImpl) GenerateActivityTransferTasks( event *types.HistoryEvent, ) error { attr := event.ActivityTaskScheduledEventAttributes activityScheduleID := event.ID activityInfo, ok := r.mutableState.GetActivityInfo(activityScheduleID) if !ok { return &types.InternalServiceError{ Message: fmt.Sprintf("it could be a bug, cannot get pending activity: %v", activityScheduleID), } } var targetDomainID string var err error if activityInfo.DomainID != "" { targetDomainID = activityInfo.DomainID } else { // TODO remove this block after Mar, 1th, 2020 // previously, DomainID in activity info is not used, so need to get // schedule event from DB checking whether activity to be scheduled // belongs to this domain targetDomainID, err = r.getTargetDomainID(attr.GetDomain()) if err != nil { return err } } r.mutableState.AddTransferTasks(&persistence.ActivityTask{ TaskData: persistence.TaskData{ // TaskID and VisibilityTimestamp are set by shard context Version: activityInfo.Version, }, DomainID: targetDomainID, TaskList: activityInfo.TaskList, ScheduleID: activityInfo.ScheduleID, }) return nil } func (r *mutableStateTaskGeneratorImpl) GenerateActivityRetryTasks( activityScheduleID int64, ) error { ai, ok := r.mutableState.GetActivityInfo(activityScheduleID) if !ok { return &types.InternalServiceError{ Message: fmt.Sprintf("it could be a bug, cannot get pending activity: %v", activityScheduleID), } } r.mutableState.AddTimerTasks(&persistence.ActivityRetryTimerTask{ TaskData: persistence.TaskData{ // TaskID is set by shard Version: ai.Version, VisibilityTimestamp: ai.ScheduledTime, }, EventID: ai.ScheduleID, Attempt: ai.Attempt, }) return nil } func (r *mutableStateTaskGeneratorImpl) GenerateChildWorkflowTasks( event *types.HistoryEvent, ) error { attr := event.StartChildWorkflowExecutionInitiatedEventAttributes childWorkflowScheduleID := event.ID childWorkflowTargetDomain := attr.GetDomain() childWorkflowInfo, ok := r.mutableState.GetChildExecutionInfo(childWorkflowScheduleID) if !ok { return &types.InternalServiceError{ Message: fmt.Sprintf("it could be a bug, cannot get pending child workflow: %v", childWorkflowScheduleID), } } targetDomainID := childWorkflowInfo.DomainID if targetDomainID == "" { var err error targetDomainID, err = r.getTargetDomainID(childWorkflowTargetDomain) if err != nil { return err } } targetCluster, isCrossClusterTask, err := r.isCrossClusterTask(targetDomainID) if err != nil { return err } startChildExecutionTask := &persistence.StartChildExecutionTask{ TaskData: persistence.TaskData{ // TaskID and VisibilityTimestamp are set by shard context Version: childWorkflowInfo.Version, }, TargetDomainID: targetDomainID, TargetWorkflowID: childWorkflowInfo.StartedWorkflowID, InitiatedID: childWorkflowInfo.InitiatedID, } if !isCrossClusterTask { r.mutableState.AddTransferTasks(startChildExecutionTask) } else { r.mutableState.AddCrossClusterTasks(&persistence.CrossClusterStartChildExecutionTask{ TargetCluster: targetCluster, StartChildExecutionTask: *startChildExecutionTask, }) } return nil } func (r *mutableStateTaskGeneratorImpl) GenerateRequestCancelExternalTasks( event *types.HistoryEvent, ) error { attr := event.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes scheduleID := event.ID version := event.Version targetDomainName := attr.GetDomain() targetWorkflowID := attr.GetWorkflowExecution().GetWorkflowID() targetRunID := attr.GetWorkflowExecution().GetRunID() targetChildOnly := attr.GetChildWorkflowOnly() _, ok := r.mutableState.GetRequestCancelInfo(scheduleID) if !ok { return &types.InternalServiceError{ Message: fmt.Sprintf("it could be a bug, cannot get pending request cancel external workflow: %v", scheduleID), } } targetDomainID, err := r.getTargetDomainID(targetDomainName) if err != nil { return err } targetCluster, isCrossClusterTask, err := r.isCrossClusterTask(targetDomainID) if err != nil { return err } cancelExecutionTask := &persistence.CancelExecutionTask{ TaskData: persistence.TaskData{ // TaskID and VisibilityTimestamp are set by shard context Version: version, }, TargetDomainID: targetDomainID, TargetWorkflowID: targetWorkflowID, TargetRunID: targetRunID, TargetChildWorkflowOnly: targetChildOnly, InitiatedID: scheduleID, } if !isCrossClusterTask { r.mutableState.AddTransferTasks(cancelExecutionTask) } else { r.mutableState.AddCrossClusterTasks(&persistence.CrossClusterCancelExecutionTask{ TargetCluster: targetCluster, CancelExecutionTask: *cancelExecutionTask, }) } return nil } func (r *mutableStateTaskGeneratorImpl) GenerateSignalExternalTasks( event *types.HistoryEvent, ) error { attr := event.SignalExternalWorkflowExecutionInitiatedEventAttributes scheduleID := event.ID version := event.Version targetDomainName := attr.GetDomain() targetWorkflowID := attr.GetWorkflowExecution().GetWorkflowID() targetRunID := attr.GetWorkflowExecution().GetRunID() targetChildOnly := attr.GetChildWorkflowOnly() _, ok := r.mutableState.GetSignalInfo(scheduleID) if !ok { return &types.InternalServiceError{ Message: fmt.Sprintf("it could be a bug, cannot get pending signal external workflow: %v", scheduleID), } } targetDomainID, err := r.getTargetDomainID(targetDomainName) if err != nil { return err } targetCluster, isCrossClusterTask, err := r.isCrossClusterTask(targetDomainID) if err != nil { return err } signalExecutionTask := &persistence.SignalExecutionTask{ TaskData: persistence.TaskData{ // TaskID and VisibilityTimestamp are set by shard context Version: version, }, TargetDomainID: targetDomainID, TargetWorkflowID: targetWorkflowID, TargetRunID: targetRunID, TargetChildWorkflowOnly: targetChildOnly, InitiatedID: scheduleID, } if !isCrossClusterTask { r.mutableState.AddTransferTasks(signalExecutionTask) } else { r.mutableState.AddCrossClusterTasks(&persistence.CrossClusterSignalExecutionTask{ TargetCluster: targetCluster, SignalExecutionTask: *signalExecutionTask, }) } return nil } func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowSearchAttrTasks() error { currentVersion := r.mutableState.GetCurrentVersion() r.mutableState.AddTransferTasks(&persistence.UpsertWorkflowSearchAttributesTask{ TaskData: persistence.TaskData{ // TaskID and VisibilityTimestamp are set by shard context Version: currentVersion, // task processing does not check this version }, }) return nil } func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowResetTasks() error { currentVersion := r.mutableState.GetCurrentVersion() r.mutableState.AddTransferTasks(&persistence.ResetWorkflowTask{ TaskData: persistence.TaskData{ // TaskID and VisibilityTimestamp are set by shard context Version: currentVersion, }, }) return nil } func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterRecordChildCompletedTask( task *persistence.TransferTaskInfo, targetCluster string, parentInfo *types.ParentExecutionInfo, ) error { if targetCluster == r.clusterMetadata.GetCurrentClusterName() { // this should not happen return errors.New("unable to create cross-cluster task for current cluster") } if parentInfo != nil { r.mutableState.AddCrossClusterTasks(&persistence.CrossClusterRecordChildExecutionCompletedTask{ TargetCluster: targetCluster, RecordChildExecutionCompletedTask: persistence.RecordChildExecutionCompletedTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context VisibilityTimestamp: task.VisibilityTimestamp, Version: task.Version, }, TargetDomainID: parentInfo.DomainUUID, TargetWorkflowID: parentInfo.GetExecution().GetWorkflowID(), TargetRunID: parentInfo.GetExecution().GetRunID(), }, }) } return nil } func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterApplyParentClosePolicyTask( task *persistence.TransferTaskInfo, targetCluster string, childDomainIDs map[string]struct{}, ) error { if targetCluster == r.clusterMetadata.GetCurrentClusterName() { // this should not happen return errors.New("unable to create cross-cluster task for current cluster") } if len(childDomainIDs) != 0 { r.mutableState.AddCrossClusterTasks(&persistence.CrossClusterApplyParentClosePolicyTask{ TargetCluster: targetCluster, ApplyParentClosePolicyTask: persistence.ApplyParentClosePolicyTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context Version: task.Version, VisibilityTimestamp: task.VisibilityTimestamp, }, TargetDomainIDs: childDomainIDs, // Domain, workflow and run ids will be collected from mutableState // when processing the apply parent policy tasks. }, }) } return nil } func (r *mutableStateTaskGeneratorImpl) GenerateFromTransferTask( task *persistence.TransferTaskInfo, targetCluster string, ) error { if targetCluster == r.clusterMetadata.GetCurrentClusterName() { // this should not happen return errors.New("unable to create cross-cluster task for current cluster") } var crossClusterTask persistence.Task switch task.TaskType { case persistence.TransferTaskTypeCancelExecution: crossClusterTask = &persistence.CrossClusterCancelExecutionTask{ TargetCluster: targetCluster, CancelExecutionTask: persistence.CancelExecutionTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context Version: task.Version, }, TargetDomainID: task.TargetDomainID, TargetWorkflowID: task.TargetWorkflowID, TargetRunID: task.TargetRunID, TargetChildWorkflowOnly: task.TargetChildWorkflowOnly, InitiatedID: task.ScheduleID, }, } case persistence.TransferTaskTypeSignalExecution: crossClusterTask = &persistence.CrossClusterSignalExecutionTask{ TargetCluster: targetCluster, SignalExecutionTask: persistence.SignalExecutionTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context Version: task.Version, }, TargetDomainID: task.TargetDomainID, TargetWorkflowID: task.TargetWorkflowID, TargetRunID: task.TargetRunID, TargetChildWorkflowOnly: task.TargetChildWorkflowOnly, InitiatedID: task.ScheduleID, }, } case persistence.TransferTaskTypeStartChildExecution: crossClusterTask = &persistence.CrossClusterStartChildExecutionTask{ TargetCluster: targetCluster, StartChildExecutionTask: persistence.StartChildExecutionTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context Version: task.Version, }, TargetDomainID: task.TargetDomainID, TargetWorkflowID: task.TargetWorkflowID, InitiatedID: task.ScheduleID, }, } // TransferTaskTypeCloseExecution, // TransferTaskTypeRecordChildExecutionCompleted, // TransferTaskTypeApplyParentClosePolicy are handled by GenerateCrossClusterRecordChildCompletedTask default: return fmt.Errorf("unable to convert transfer task of type %v to cross-cluster task", task.TaskType) } // set visibility timestamp here so we the metric for task latency // can include the latency for the original transfer task. crossClusterTask.SetVisibilityTimestamp(task.VisibilityTimestamp) r.mutableState.AddCrossClusterTasks(crossClusterTask) return nil } func (r *mutableStateTaskGeneratorImpl) generateApplyParentCloseTasks( childDomainIDs map[string]struct{}, version int64, visibilityTimestamp time.Time, isPassive bool, ) ([]persistence.Task, []persistence.Task, error) { transferTasks := []persistence.Task{} crossClusterTasks := []persistence.Task{} if isPassive { transferTasks = []persistence.Task{ &persistence.ApplyParentClosePolicyTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context VisibilityTimestamp: visibilityTimestamp, Version: version, }, TargetDomainIDs: childDomainIDs, }, } return transferTasks, crossClusterTasks, nil } sameClusterDomainIDs, remoteClusterDomainIDs, err := getChildrenClusters(childDomainIDs, r.mutableState, r.domainCache, r.clusterMetadata) if err != nil { return nil, nil, err } if len(sameClusterDomainIDs) != 0 { transferTasks = append(transferTasks, &persistence.ApplyParentClosePolicyTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context VisibilityTimestamp: visibilityTimestamp, Version: version, }, TargetDomainIDs: sameClusterDomainIDs, }) } for remoteCluster, domainIDs := range remoteClusterDomainIDs { crossClusterTasks = append(crossClusterTasks, &persistence.CrossClusterApplyParentClosePolicyTask{ TargetCluster: remoteCluster, ApplyParentClosePolicyTask: persistence.ApplyParentClosePolicyTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context VisibilityTimestamp: visibilityTimestamp, Version: version, }, TargetDomainIDs: domainIDs, }, }) } return transferTasks, crossClusterTasks, nil } func (r *mutableStateTaskGeneratorImpl) GenerateFromCrossClusterTask( task *persistence.CrossClusterTaskInfo, ) error { generateTransferTask := false var targetCluster string sourceDomainEntry := r.mutableState.GetDomainEntry() if isActive, _ := sourceDomainEntry.IsActiveIn(r.clusterMetadata.GetCurrentClusterName()); !isActive && !sourceDomainEntry.IsDomainPendingActive() { // domain is passive, generate (passive) transfer task generateTransferTask = true } // ApplyParentClosePolicy is different than the others because it doesn't have a single // target domain, workflow or run id. if task.GetTaskType() == persistence.CrossClusterTaskTypeApplyParentClosePolicy { transferTasks, crossClusterTasks, err := r.generateApplyParentCloseTasks(task.TargetDomainIDs, task.GetVersion(), task.GetVisibilityTimestamp(), generateTransferTask) if err != nil { return err } r.mutableState.AddTransferTasks(transferTasks...) r.mutableState.AddCrossClusterTasks(crossClusterTasks...) return nil } if !generateTransferTask { targetDomainEntry, err := r.domainCache.GetDomainByID(task.TargetDomainID) if err != nil { return err } targetCluster = targetDomainEntry.GetReplicationConfig().ActiveClusterName if targetCluster == r.clusterMetadata.GetCurrentClusterName() { generateTransferTask = true } } var newTask persistence.Task switch task.GetTaskType() { case persistence.CrossClusterTaskTypeCancelExecution: cancelExecutionTask := &persistence.CancelExecutionTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context Version: task.Version, }, TargetDomainID: task.TargetDomainID, TargetWorkflowID: task.TargetWorkflowID, TargetRunID: task.TargetRunID, TargetChildWorkflowOnly: task.TargetChildWorkflowOnly, InitiatedID: task.ScheduleID, } if generateTransferTask { newTask = cancelExecutionTask } else { newTask = &persistence.CrossClusterCancelExecutionTask{ TargetCluster: targetCluster, CancelExecutionTask: *cancelExecutionTask, } } case persistence.CrossClusterTaskTypeSignalExecution: signalExecutionTask := &persistence.SignalExecutionTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context Version: task.Version, }, TargetDomainID: task.TargetDomainID, TargetWorkflowID: task.TargetWorkflowID, TargetRunID: task.TargetRunID, TargetChildWorkflowOnly: task.TargetChildWorkflowOnly, InitiatedID: task.ScheduleID, } if generateTransferTask { newTask = signalExecutionTask } else { newTask = &persistence.CrossClusterSignalExecutionTask{ TargetCluster: targetCluster, SignalExecutionTask: *signalExecutionTask, } } case persistence.CrossClusterTaskTypeStartChildExecution: startChildExecutionTask := &persistence.StartChildExecutionTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context Version: task.Version, }, TargetDomainID: task.TargetDomainID, TargetWorkflowID: task.TargetWorkflowID, InitiatedID: task.ScheduleID, } if generateTransferTask { newTask = startChildExecutionTask } else { newTask = &persistence.CrossClusterStartChildExecutionTask{ TargetCluster: targetCluster, StartChildExecutionTask: *startChildExecutionTask, } } case persistence.CrossClusterTaskTypeRecordChildExeuctionCompleted: recordChildExecutionCompletedTask := &persistence.RecordChildExecutionCompletedTask{ TaskData: persistence.TaskData{ // TaskID is set by shard context Version: task.Version, }, TargetDomainID: task.TargetDomainID, TargetWorkflowID: task.TargetWorkflowID, TargetRunID: task.TargetRunID, } if generateTransferTask { newTask = recordChildExecutionCompletedTask } else { newTask = &persistence.CrossClusterRecordChildExecutionCompletedTask{ TargetCluster: targetCluster, RecordChildExecutionCompletedTask: *recordChildExecutionCompletedTask, } } // persistence.CrossClusterTaskTypeApplyParentPolicy is handled by generateFromApplyParentCloseCrossClusterTask above default: return fmt.Errorf("unable to convert cross-cluster task of type %v", task.TaskType) } // set visibility timestamp here so we the metric for task latency // can include the latency for the original transfer task. newTask.SetVisibilityTimestamp(task.VisibilityTimestamp) if generateTransferTask { r.mutableState.AddTransferTasks(newTask) } else { r.mutableState.AddCrossClusterTasks(newTask) } return nil } func (r *mutableStateTaskGeneratorImpl) GenerateActivityTimerTasks() error { _, err := NewTimerSequence(r.mutableState).CreateNextActivityTimer() return err } func (r *mutableStateTaskGeneratorImpl) GenerateUserTimerTasks() error { _, err := NewTimerSequence(r.mutableState).CreateNextUserTimer() return err } func (r *mutableStateTaskGeneratorImpl) getTargetDomainID( targetDomainName string, ) (string, error) { if targetDomainName != "" { return r.domainCache.GetDomainID(targetDomainName) } return r.mutableState.GetExecutionInfo().DomainID, nil } // isCrossClusterTask determines if the task belongs to the cross-cluster queue // this is only an best effort check // even if the task ended up in the wrong queue, the actual processing logic // will detect it and create a new task in the right queue. func (r *mutableStateTaskGeneratorImpl) isCrossClusterTask( targetDomainID string, ) (string, bool, error) { sourceDomainID := r.mutableState.GetExecutionInfo().DomainID // case 1: not cross domain task if sourceDomainID == targetDomainID { return "", false, nil } sourceDomainEntry, err := r.domainCache.GetDomainByID(sourceDomainID) if err != nil { return "", false, err } // case 2: source domain is not active in the current cluster if isActive, _ := sourceDomainEntry.IsActiveIn(r.clusterMetadata.GetCurrentClusterName()); !isActive { return "", false, nil } targetDomainEntry, err := r.domainCache.GetDomainByID(targetDomainID) if err != nil { return "", false, err } targetCluster := targetDomainEntry.GetReplicationConfig().ActiveClusterName // case 3: target cluster is the same as source domain active cluster // which is current cluster since source domain is active if targetCluster == r.clusterMetadata.GetCurrentClusterName() { return "", false, nil } return targetCluster, true, nil } func getTargetCluster( domainID string, domainCache cache.DomainCache, clusterMetadata cluster.Metadata, ) (string, bool, error) { domainEntry, err := domainCache.GetDomainByID(domainID) if err != nil { return "", false, err } isActive, _ := domainEntry.IsActiveIn(clusterMetadata.GetCurrentClusterName()) if !isActive { // treat pending active as active isActive = domainEntry.IsDomainPendingActive() } activeCluster := domainEntry.GetReplicationConfig().ActiveClusterName return activeCluster, isActive, nil } func getParentCluster( mutableState MutableState, domainCache cache.DomainCache, clusterMetadata cluster.Metadata, ) (string, bool, error) { executionInfo := mutableState.GetExecutionInfo() if !mutableState.HasParentExecution() || executionInfo.CloseStatus == persistence.WorkflowCloseStatusContinuedAsNew { // we don't need to reply to parent return "", false, nil } return getTargetCluster(executionInfo.ParentDomainID, domainCache, clusterMetadata) } func getChildrenClusters( childDomainIDs map[string]struct{}, mutableState MutableState, domainCache cache.DomainCache, clusterMetadata cluster.Metadata, ) (map[string]struct{}, map[string]map[string]struct{}, error) { if len(childDomainIDs) == 0 { childDomainIDs = make(map[string]struct{}) children := mutableState.GetPendingChildExecutionInfos() for _, childInfo := range children { if childInfo.ParentClosePolicy == types.ParentClosePolicyAbandon { continue } childDomainID, err := GetChildExecutionDomainID(childInfo, domainCache, mutableState.GetDomainEntry()) if err != nil { if common.IsEntityNotExistsError(err) { continue // ignore deleted domain } return nil, nil, err } childDomainIDs[childDomainID] = struct{}{} } } sameClusterDomainIDs := make(map[string]struct{}) remoteClusterDomainIDs := make(map[string]map[string]struct{}) for childDomainID := range childDomainIDs { childCluster, isActive, err := getTargetCluster(childDomainID, domainCache, clusterMetadata) if err != nil { return nil, nil, err } if isActive { sameClusterDomainIDs[childDomainID] = struct{}{} } else { if _, ok := remoteClusterDomainIDs[childCluster]; !ok { remoteClusterDomainIDs[childCluster] = make(map[string]struct{}) } remoteClusterDomainIDs[childCluster][childDomainID] = struct{}{} } } return sameClusterDomainIDs, remoteClusterDomainIDs, nil } func getNextDecisionTimeout(attempt int64, defaultStartToCloseTimeout time.Duration) time.Duration { if attempt <= 1 { return defaultStartToCloseTimeout } nextInterval := float64(defaultInitIntervalForDecisionRetry) * math.Pow(2, float64(attempt-2)) nextInterval = math.Min(nextInterval, float64(defaultMaxIntervalForDecisionRetry)) jitterPortion := int(defaultJitterCoefficient * nextInterval) if jitterPortion < 1 { jitterPortion = 1 } nextInterval = nextInterval*(1-defaultJitterCoefficient) + float64(rand.Intn(jitterPortion)) return time.Duration(nextInterval) }