service/history/execution/mutable_state_decision_task_manager.go (666 lines of code) (raw):

// The MIT License (MIT) // // Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. //go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination mutable_state_decision_task_manager_mock.go -self_package github.com/uber/cadence/service/history/execution package execution import ( "fmt" "time" "github.com/uber/cadence/common" "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) type ( mutableStateDecisionTaskManager interface { ReplicateDecisionTaskScheduledEvent( version int64, scheduleID int64, taskList string, startToCloseTimeoutSeconds int32, attempt int64, scheduleTimestamp int64, originalScheduledTimestamp int64, bypassTaskGeneration bool, ) (*DecisionInfo, error) ReplicateTransientDecisionTaskScheduled() error ReplicateDecisionTaskStartedEvent( decision *DecisionInfo, version int64, scheduleID int64, startedID int64, requestID string, timestamp int64, ) (*DecisionInfo, error) ReplicateDecisionTaskCompletedEvent(event *types.HistoryEvent) error ReplicateDecisionTaskFailedEvent() error ReplicateDecisionTaskTimedOutEvent(timeoutType types.TimeoutType) error AddDecisionTaskScheduleToStartTimeoutEvent(scheduleEventID int64) (*types.HistoryEvent, error) AddDecisionTaskScheduledEventAsHeartbeat( bypassTaskGeneration bool, originalScheduledTimestamp int64, ) (*DecisionInfo, error) AddDecisionTaskScheduledEvent(bypassTaskGeneration bool) (*DecisionInfo, error) AddFirstDecisionTaskScheduled(startEvent *types.HistoryEvent) error AddDecisionTaskStartedEvent( scheduleEventID int64, requestID string, request *types.PollForDecisionTaskRequest, ) (*types.HistoryEvent, *DecisionInfo, error) AddDecisionTaskCompletedEvent( scheduleEventID int64, startedEventID int64, request *types.RespondDecisionTaskCompletedRequest, maxResetPoints int, ) (*types.HistoryEvent, error) AddDecisionTaskFailedEvent( scheduleEventID int64, startedEventID int64, cause types.DecisionTaskFailedCause, details []byte, identity string, reason string, binChecksum string, baseRunID string, newRunID string, forkEventVersion int64, resetRequestID string, ) (*types.HistoryEvent, error) AddDecisionTaskTimedOutEvent(scheduleEventID int64, startedEventID int64) (*types.HistoryEvent, error) AddDecisionTaskResetTimeoutEvent( scheduleEventID int64, baseRunID string, newRunID string, forkEventVersion int64, reason string, resetRequestID string, ) (*types.HistoryEvent, error) FailDecision(incrementAttempt bool) DeleteDecision() UpdateDecision(decision *DecisionInfo) HasPendingDecision() bool GetPendingDecision() (*DecisionInfo, bool) HasInFlightDecision() bool GetInFlightDecision() (*DecisionInfo, bool) HasProcessedOrPendingDecision() bool GetDecisionInfo(scheduleEventID int64) (*DecisionInfo, bool) GetDecisionScheduleToStartTimeout() time.Duration CreateTransientDecisionEvents(decision *DecisionInfo, identity string) (*types.HistoryEvent, *types.HistoryEvent) } mutableStateDecisionTaskManagerImpl struct { msb *mutableStateBuilder } ) func newMutableStateDecisionTaskManager(msb *mutableStateBuilder) mutableStateDecisionTaskManager { return &mutableStateDecisionTaskManagerImpl{ msb: msb, } } func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskScheduledEvent( version int64, scheduleID int64, taskList string, startToCloseTimeoutSeconds int32, attempt int64, scheduleTimestamp int64, originalScheduledTimestamp int64, bypassTaskGeneration bool, ) (*DecisionInfo, error) { // set workflow state to running, since decision is scheduled // NOTE: for zombie workflow, should not change the state state, _ := m.msb.GetWorkflowStateCloseStatus() if state != persistence.WorkflowStateZombie { if err := m.msb.UpdateWorkflowStateCloseStatus( persistence.WorkflowStateRunning, persistence.WorkflowCloseStatusNone, ); err != nil { return nil, err } } decision := &DecisionInfo{ Version: version, ScheduleID: scheduleID, StartedID: common.EmptyEventID, RequestID: common.EmptyUUID, DecisionTimeout: startToCloseTimeoutSeconds, TaskList: taskList, Attempt: attempt, ScheduledTimestamp: scheduleTimestamp, StartedTimestamp: 0, OriginalScheduledTimestamp: originalScheduledTimestamp, } m.UpdateDecision(decision) if !bypassTaskGeneration { if err := m.msb.taskGenerator.GenerateDecisionScheduleTasks(decision.ScheduleID); err != nil { return nil, err } } return decision, nil } func (m *mutableStateDecisionTaskManagerImpl) ReplicateTransientDecisionTaskScheduled() error { if m.HasPendingDecision() || m.msb.GetExecutionInfo().DecisionAttempt == 0 { return nil } // the schedule ID for this decision is guaranteed to be wrong // since the next event ID is assigned at the very end of when // all events are applied for replication. // this is OK // 1. if a failover happen just after this transient decision, // AddDecisionTaskStartedEvent will handle the correction of schedule ID // and set the attempt to 0 // 2. if no failover happen during the life time of this transient decision // then ReplicateDecisionTaskScheduledEvent will overwrite everything // including the decision schedule ID decision := &DecisionInfo{ Version: m.msb.GetCurrentVersion(), ScheduleID: m.msb.GetNextEventID(), StartedID: common.EmptyEventID, RequestID: common.EmptyUUID, DecisionTimeout: m.msb.GetExecutionInfo().DecisionStartToCloseTimeout, TaskList: m.msb.GetExecutionInfo().TaskList, Attempt: m.msb.GetExecutionInfo().DecisionAttempt, ScheduledTimestamp: m.msb.timeSource.Now().UnixNano(), StartedTimestamp: 0, } m.UpdateDecision(decision) return m.msb.taskGenerator.GenerateDecisionScheduleTasks(decision.ScheduleID) } func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskStartedEvent( decision *DecisionInfo, version int64, scheduleID int64, startedID int64, requestID string, timestamp int64, ) (*DecisionInfo, error) { // Replicator calls it with a nil decision info, and it is safe to always lookup the decision in this case as it // does not have to deal with transient decision case. var ok bool if decision == nil { decision, ok = m.GetDecisionInfo(scheduleID) if !ok { return nil, errors.NewInternalFailureError(fmt.Sprintf("unable to find decision: %v", scheduleID)) } // setting decision attempt to 0 for decision task replication // this mainly handles transient decision completion // for transient decision, active side will write 2 batch in a "transaction" // 1. decision task scheduled & decision task started // 2. decision task completed & other events // since we need to treat each individual event batch as one transaction // certain "magic" needs to be done, i.e. setting attempt to 0 so // if first batch is replicated, but not the second one, decision can be correctly timed out decision.Attempt = 0 } // Update mutable decision state decision = &DecisionInfo{ Version: version, ScheduleID: scheduleID, StartedID: startedID, RequestID: requestID, DecisionTimeout: decision.DecisionTimeout, Attempt: decision.Attempt, StartedTimestamp: timestamp, ScheduledTimestamp: decision.ScheduledTimestamp, TaskList: decision.TaskList, OriginalScheduledTimestamp: decision.OriginalScheduledTimestamp, } m.UpdateDecision(decision) return decision, m.msb.taskGenerator.GenerateDecisionStartTasks(scheduleID) } func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskCompletedEvent( event *types.HistoryEvent, ) error { m.beforeAddDecisionTaskCompletedEvent() maxResetPoints := common.DefaultHistoryMaxAutoResetPoints // use default when it is not set in the config if m.msb.GetDomainEntry() != nil && m.msb.GetDomainEntry().GetInfo() != nil && m.msb.config != nil { domainName := m.msb.GetDomainEntry().GetInfo().Name maxResetPoints = m.msb.config.MaxAutoResetPoints(domainName) } return m.afterAddDecisionTaskCompletedEvent(event, maxResetPoints) } func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskFailedEvent() error { m.FailDecision(true) return nil } func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskTimedOutEvent( timeoutType types.TimeoutType, ) error { incrementAttempt := true // Do not increment decision attempt in the case of sticky scheduleToStart timeout to // prevent creating next decision as transient // Note: this is just best effort, stickiness can be cleared before the timer fires, // and we can't tell is the decision that is having scheduleToStart timeout is sticky // or not. if timeoutType == types.TimeoutTypeScheduleToStart && m.msb.executionInfo.StickyTaskList != "" { incrementAttempt = false } m.FailDecision(incrementAttempt) return nil } func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskScheduleToStartTimeoutEvent( scheduleEventID int64, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskTimedOut if m.msb.executionInfo.DecisionScheduleID != scheduleEventID || m.msb.executionInfo.DecisionStartedID > 0 { m.msb.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(m.msb.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowScheduleID(scheduleEventID), ) return nil, m.msb.createInternalServerError(opTag) } var event *types.HistoryEvent // stickyness will be cleared in ReplicateDecisionTaskTimedOutEvent // Avoid creating new history events when decisions are continuously timing out if m.msb.executionInfo.DecisionAttempt == 0 { event = m.msb.hBuilder.AddDecisionTaskTimedOutEvent( scheduleEventID, 0, types.TimeoutTypeScheduleToStart, "", "", common.EmptyVersion, "", types.DecisionTaskTimedOutCauseTimeout, "", ) } if err := m.ReplicateDecisionTaskTimedOutEvent(types.TimeoutTypeScheduleToStart); err != nil { return nil, err } return event, nil } func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskResetTimeoutEvent( scheduleEventID int64, baseRunID string, newRunID string, forkEventVersion int64, reason string, resetRequestID string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskTimedOut if m.msb.executionInfo.DecisionScheduleID != scheduleEventID { m.msb.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(m.msb.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowScheduleID(scheduleEventID), ) return nil, m.msb.createInternalServerError(opTag) } event := m.msb.hBuilder.AddDecisionTaskTimedOutEvent( scheduleEventID, 0, types.TimeoutTypeScheduleToStart, baseRunID, newRunID, forkEventVersion, reason, types.DecisionTaskTimedOutCauseReset, resetRequestID, ) if err := m.ReplicateDecisionTaskTimedOutEvent(types.TimeoutTypeScheduleToStart); err != nil { return nil, err } // always clear decision attempt for reset m.msb.executionInfo.DecisionAttempt = 0 return event, nil } // originalScheduledTimestamp is to record the first scheduled decision during decision heartbeat. func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskScheduledEventAsHeartbeat( bypassTaskGeneration bool, originalScheduledTimestamp int64, ) (*DecisionInfo, error) { opTag := tag.WorkflowActionDecisionTaskScheduled if m.HasPendingDecision() { m.msb.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(m.msb.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowScheduleID(m.msb.executionInfo.DecisionScheduleID)) return nil, m.msb.createInternalServerError(opTag) } // Tasklist and decision timeout should already be set from workflow execution started event taskList := m.msb.executionInfo.TaskList if m.msb.IsStickyTaskListEnabled() { taskList = m.msb.executionInfo.StickyTaskList } else { // It can be because stickyness has expired due to StickyTTL config // In that case we need to clear stickyness so that the LastUpdateTimestamp is not corrupted. // In other cases, clearing stickyness shouldn't hurt anything. // TODO: https://github.com/uber/cadence/issues/2357: // if we can use a new field(LastDecisionUpdateTimestamp), then we could get rid of it. m.msb.ClearStickyness() } startToCloseTimeoutSeconds := m.msb.executionInfo.DecisionStartToCloseTimeout // Flush any buffered events before creating the decision, otherwise it will result in invalid IDs for transient // decision and will cause in timeout processing to not work for transient decisions if m.msb.HasBufferedEvents() { // if creating a decision and in the mean time events are flushed from buffered events // than this decision cannot be a transient decision m.msb.executionInfo.DecisionAttempt = 0 if err := m.msb.FlushBufferedEvents(); err != nil { return nil, err } } var newDecisionEvent *types.HistoryEvent scheduleID := m.msb.GetNextEventID() // we will generate the schedule event later for repeatedly failing decisions // Avoid creating new history events when decisions are continuously failing scheduleTime := m.msb.timeSource.Now().UnixNano() useNonTransientDecision := m.shouldUpdateLastWriteVersion() if m.msb.executionInfo.DecisionAttempt == 0 || useNonTransientDecision { newDecisionEvent = m.msb.hBuilder.AddDecisionTaskScheduledEvent( taskList, startToCloseTimeoutSeconds, m.msb.executionInfo.DecisionAttempt) scheduleID = newDecisionEvent.ID scheduleTime = newDecisionEvent.GetTimestamp() m.msb.executionInfo.DecisionAttempt = 0 } return m.ReplicateDecisionTaskScheduledEvent( m.msb.GetCurrentVersion(), scheduleID, taskList, startToCloseTimeoutSeconds, m.msb.executionInfo.DecisionAttempt, scheduleTime, originalScheduledTimestamp, bypassTaskGeneration, ) } func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskScheduledEvent( bypassTaskGeneration bool, ) (*DecisionInfo, error) { return m.AddDecisionTaskScheduledEventAsHeartbeat(bypassTaskGeneration, m.msb.timeSource.Now().UnixNano()) } func (m *mutableStateDecisionTaskManagerImpl) AddFirstDecisionTaskScheduled( startEvent *types.HistoryEvent, ) error { // handle first decision case, i.e. possible delayed decision // // below handles the following cases: // 1. if not continue as new & if workflow has no parent // -> schedule decision & schedule delayed decision // 2. if not continue as new & if workflow has parent // -> this function should not be called during workflow start, but should be called as // part of schedule decision in 2 phase commit // // if continue as new // 1. whether has parent workflow or not // -> schedule decision & schedule delayed decision // startAttr := startEvent.WorkflowExecutionStartedEventAttributes decisionBackoffDuration := time.Duration(startAttr.GetFirstDecisionTaskBackoffSeconds()) * time.Second var err error if decisionBackoffDuration != 0 { if err = m.msb.taskGenerator.GenerateDelayedDecisionTasks( startEvent, ); err != nil { return err } } else { if _, err = m.AddDecisionTaskScheduledEvent( false, ); err != nil { return err } } return nil } func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskStartedEvent( scheduleEventID int64, requestID string, request *types.PollForDecisionTaskRequest, ) (*types.HistoryEvent, *DecisionInfo, error) { opTag := tag.WorkflowActionDecisionTaskStarted decision, ok := m.GetDecisionInfo(scheduleEventID) if !ok || decision.StartedID != common.EmptyEventID { m.msb.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(m.msb.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowScheduleID(scheduleEventID)) return nil, nil, m.msb.createInternalServerError(opTag) } var event *types.HistoryEvent scheduleID := decision.ScheduleID startedID := scheduleID + 1 tasklist := request.TaskList.GetName() startTime := m.msb.timeSource.Now().UnixNano() useNonTransientDecision := m.shouldUpdateLastWriteVersion() // First check to see if new events came since transient decision was scheduled if decision.Attempt > 0 && (decision.ScheduleID != m.msb.GetNextEventID() || useNonTransientDecision) { // Also create a new DecisionTaskScheduledEvent since new events came in when it was scheduled scheduleEvent := m.msb.hBuilder.AddDecisionTaskScheduledEvent(tasklist, decision.DecisionTimeout, 0) scheduleID = scheduleEvent.ID decision.Attempt = 0 } // Avoid creating new history events when decisions are continuously failing if decision.Attempt == 0 { // Now create DecisionTaskStartedEvent event = m.msb.hBuilder.AddDecisionTaskStartedEvent(scheduleID, requestID, request.GetIdentity()) startedID = event.ID startTime = event.GetTimestamp() } decision, err := m.ReplicateDecisionTaskStartedEvent(decision, m.msb.GetCurrentVersion(), scheduleID, startedID, requestID, startTime) return event, decision, err } func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskCompletedEvent( scheduleEventID int64, startedEventID int64, request *types.RespondDecisionTaskCompletedRequest, maxResetPoints int, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskCompleted decision, ok := m.GetDecisionInfo(scheduleEventID) if !ok || decision.StartedID != startedEventID { m.msb.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(m.msb.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowScheduleID(scheduleEventID), tag.WorkflowStartedID(startedEventID)) return nil, m.msb.createInternalServerError(opTag) } m.beforeAddDecisionTaskCompletedEvent() if decision.Attempt > 0 { // Create corresponding DecisionTaskSchedule and DecisionTaskStarted events for decisions we have been retrying scheduledEvent := m.msb.hBuilder.AddTransientDecisionTaskScheduledEvent(m.msb.executionInfo.TaskList, decision.DecisionTimeout, decision.Attempt, decision.ScheduledTimestamp) startedEvent := m.msb.hBuilder.AddTransientDecisionTaskStartedEvent(scheduledEvent.ID, decision.RequestID, request.GetIdentity(), decision.StartedTimestamp) startedEventID = startedEvent.ID } // Now write the completed event event := m.msb.hBuilder.AddDecisionTaskCompletedEvent(scheduleEventID, startedEventID, request) err := m.afterAddDecisionTaskCompletedEvent(event, maxResetPoints) if err != nil { return nil, err } return event, nil } func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskFailedEvent( scheduleEventID int64, startedEventID int64, cause types.DecisionTaskFailedCause, details []byte, identity string, reason string, binChecksum string, baseRunID string, newRunID string, forkEventVersion int64, resetRequestID string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskFailed attr := types.DecisionTaskFailedEventAttributes{ ScheduledEventID: scheduleEventID, StartedEventID: startedEventID, Cause: cause.Ptr(), Details: details, Identity: identity, Reason: common.StringPtr(reason), BinaryChecksum: binChecksum, BaseRunID: baseRunID, NewRunID: newRunID, ForkEventVersion: forkEventVersion, RequestID: resetRequestID, } dt, ok := m.GetDecisionInfo(scheduleEventID) if !ok || dt.StartedID != startedEventID { m.msb.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(m.msb.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowScheduleID(scheduleEventID), tag.WorkflowStartedID(startedEventID)) return nil, m.msb.createInternalServerError(opTag) } var event *types.HistoryEvent // Only emit DecisionTaskFailedEvent for the very first time if dt.Attempt == 0 { event = m.msb.hBuilder.AddDecisionTaskFailedEvent(attr) } if err := m.ReplicateDecisionTaskFailedEvent(); err != nil { return nil, err } // always clear decision attempt for reset if cause == types.DecisionTaskFailedCauseResetWorkflow || cause == types.DecisionTaskFailedCauseFailoverCloseDecision { m.msb.executionInfo.DecisionAttempt = 0 } return event, nil } func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskTimedOutEvent( scheduleEventID int64, startedEventID int64, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskTimedOut dt, ok := m.GetDecisionInfo(scheduleEventID) if !ok || dt.StartedID != startedEventID { m.msb.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(m.msb.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowScheduleID(scheduleEventID), tag.WorkflowStartedID(startedEventID)) return nil, m.msb.createInternalServerError(opTag) } var event *types.HistoryEvent // Avoid creating new history events when decisions are continuously timing out if dt.Attempt == 0 { event = m.msb.hBuilder.AddDecisionTaskTimedOutEvent( scheduleEventID, startedEventID, types.TimeoutTypeStartToClose, "", "", common.EmptyVersion, "", types.DecisionTaskTimedOutCauseTimeout, "", ) } if err := m.ReplicateDecisionTaskTimedOutEvent(types.TimeoutTypeStartToClose); err != nil { return nil, err } return event, nil } func (m *mutableStateDecisionTaskManagerImpl) FailDecision( incrementAttempt bool, ) { // Clear stickiness whenever decision fails m.msb.ClearStickyness() failDecisionInfo := &DecisionInfo{ Version: common.EmptyVersion, ScheduleID: common.EmptyEventID, StartedID: common.EmptyEventID, RequestID: common.EmptyUUID, DecisionTimeout: 0, StartedTimestamp: 0, TaskList: "", OriginalScheduledTimestamp: 0, } if incrementAttempt { failDecisionInfo.Attempt = m.msb.executionInfo.DecisionAttempt + 1 failDecisionInfo.ScheduledTimestamp = m.msb.timeSource.Now().UnixNano() if failDecisionInfo.Attempt >= int64(m.msb.shard.GetConfig().DecisionRetryCriticalAttempts()) { domainName := m.msb.GetDomainEntry().GetInfo().Name domainTag := metrics.DomainTag(domainName) m.msb.metricsClient.Scope(metrics.WorkflowContextScope, domainTag).RecordTimer(metrics.DecisionAttemptTimer, time.Duration(failDecisionInfo.Attempt)) m.msb.logger.Warn("Critical error processing decision task, retrying.", tag.WorkflowDomainName(m.msb.GetDomainEntry().GetInfo().Name), tag.WorkflowID(m.msb.GetExecutionInfo().WorkflowID), tag.WorkflowRunID(m.msb.GetExecutionInfo().RunID), ) } } m.UpdateDecision(failDecisionInfo) } // DeleteDecision deletes a decision task. func (m *mutableStateDecisionTaskManagerImpl) DeleteDecision() { resetDecisionInfo := &DecisionInfo{ Version: common.EmptyVersion, ScheduleID: common.EmptyEventID, StartedID: common.EmptyEventID, RequestID: common.EmptyUUID, DecisionTimeout: 0, Attempt: 0, StartedTimestamp: 0, ScheduledTimestamp: 0, TaskList: "", // Keep the last original scheduled timestamp, so that AddDecisionAsHeartbeat can continue with it. OriginalScheduledTimestamp: m.getDecisionInfo().OriginalScheduledTimestamp, } m.UpdateDecision(resetDecisionInfo) } // UpdateDecision updates a decision task. func (m *mutableStateDecisionTaskManagerImpl) UpdateDecision( decision *DecisionInfo, ) { m.msb.executionInfo.DecisionVersion = decision.Version m.msb.executionInfo.DecisionScheduleID = decision.ScheduleID m.msb.executionInfo.DecisionStartedID = decision.StartedID m.msb.executionInfo.DecisionRequestID = decision.RequestID m.msb.executionInfo.DecisionTimeout = decision.DecisionTimeout m.msb.executionInfo.DecisionAttempt = decision.Attempt m.msb.executionInfo.DecisionStartedTimestamp = decision.StartedTimestamp m.msb.executionInfo.DecisionScheduledTimestamp = decision.ScheduledTimestamp m.msb.executionInfo.DecisionOriginalScheduledTimestamp = decision.OriginalScheduledTimestamp // NOTE: do not update tasklist in execution info m.msb.logger.Debug(fmt.Sprintf( "Decision Updated: {Schedule: %v, Started: %v, ID: %v, Timeout: %v, Attempt: %v, Timestamp: %v}", decision.ScheduleID, decision.StartedID, decision.RequestID, decision.DecisionTimeout, decision.Attempt, decision.StartedTimestamp, )) } func (m *mutableStateDecisionTaskManagerImpl) HasPendingDecision() bool { return m.msb.executionInfo.DecisionScheduleID != common.EmptyEventID } func (m *mutableStateDecisionTaskManagerImpl) GetPendingDecision() (*DecisionInfo, bool) { if m.msb.executionInfo.DecisionScheduleID == common.EmptyEventID { return nil, false } decision := m.getDecisionInfo() return decision, true } func (m *mutableStateDecisionTaskManagerImpl) HasInFlightDecision() bool { return m.msb.executionInfo.DecisionStartedID > 0 } func (m *mutableStateDecisionTaskManagerImpl) GetInFlightDecision() (*DecisionInfo, bool) { if m.msb.executionInfo.DecisionScheduleID == common.EmptyEventID || m.msb.executionInfo.DecisionStartedID == common.EmptyEventID { return nil, false } decision := m.getDecisionInfo() return decision, true } func (m *mutableStateDecisionTaskManagerImpl) HasProcessedOrPendingDecision() bool { return m.HasPendingDecision() || m.msb.GetPreviousStartedEventID() != common.EmptyEventID } // GetDecisionInfo returns details about the in-progress decision task func (m *mutableStateDecisionTaskManagerImpl) GetDecisionInfo( scheduleEventID int64, ) (*DecisionInfo, bool) { decision := m.getDecisionInfo() if scheduleEventID == decision.ScheduleID { return decision, true } return nil, false } func (m *mutableStateDecisionTaskManagerImpl) GetDecisionScheduleToStartTimeout() time.Duration { // we should not call IsStickyTaskListEnabled which may be false // if sticky TTL has expired // NOTE: this function is called in the same mutable state transaction as the one generating the decision task // we stickiness won't be cleared between creating the decision and getting the timeout if m.msb.executionInfo.StickyTaskList != "" { return time.Duration( m.msb.executionInfo.StickyScheduleToStartTimeout, ) * time.Second } domainName := m.msb.GetDomainEntry().GetInfo().Name if m.msb.executionInfo.DecisionAttempt < int64(m.msb.config.NormalDecisionScheduleToStartMaxAttempts(domainName)) { return m.msb.config.NormalDecisionScheduleToStartTimeout(domainName) } return 0 } func (m *mutableStateDecisionTaskManagerImpl) CreateTransientDecisionEvents( decision *DecisionInfo, identity string, ) (*types.HistoryEvent, *types.HistoryEvent) { tasklist := m.msb.executionInfo.TaskList scheduledEvent := newDecisionTaskScheduledEventWithInfo( decision.ScheduleID, decision.ScheduledTimestamp, tasklist, decision.DecisionTimeout, decision.Attempt, ) startedEvent := newDecisionTaskStartedEventWithInfo( decision.StartedID, decision.StartedTimestamp, decision.ScheduleID, decision.RequestID, identity, ) return scheduledEvent, startedEvent } func (m *mutableStateDecisionTaskManagerImpl) getDecisionInfo() *DecisionInfo { taskList := m.msb.executionInfo.TaskList if m.msb.executionInfo.StickyTaskList != "" { taskList = m.msb.executionInfo.StickyTaskList } return &DecisionInfo{ Version: m.msb.executionInfo.DecisionVersion, ScheduleID: m.msb.executionInfo.DecisionScheduleID, StartedID: m.msb.executionInfo.DecisionStartedID, RequestID: m.msb.executionInfo.DecisionRequestID, DecisionTimeout: m.msb.executionInfo.DecisionTimeout, Attempt: m.msb.executionInfo.DecisionAttempt, StartedTimestamp: m.msb.executionInfo.DecisionStartedTimestamp, ScheduledTimestamp: m.msb.executionInfo.DecisionScheduledTimestamp, TaskList: taskList, OriginalScheduledTimestamp: m.msb.executionInfo.DecisionOriginalScheduledTimestamp, } } func (m *mutableStateDecisionTaskManagerImpl) beforeAddDecisionTaskCompletedEvent() { // Make sure to delete decision before adding events. Otherwise they are buffered rather than getting appended m.DeleteDecision() } func (m *mutableStateDecisionTaskManagerImpl) afterAddDecisionTaskCompletedEvent( event *types.HistoryEvent, maxResetPoints int, ) error { m.msb.executionInfo.LastProcessedEvent = event.GetDecisionTaskCompletedEventAttributes().GetStartedEventID() return m.msb.addBinaryCheckSumIfNotExists(event, maxResetPoints) } func (m *mutableStateDecisionTaskManagerImpl) shouldUpdateLastWriteVersion() bool { currentVersion := m.msb.GetCurrentVersion() lastWriteVersion, err := m.msb.GetLastWriteVersion() if err != nil { // The error is version history has no item. This is expected for the first batch of a workflow. return false } return currentVersion != lastWriteVersion }