service/history/execution/timer_sequence.go (349 lines of code) (raw):
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination timer_sequence_mock.go -self_package github.com/uber/cadence/service/history/execution
package execution
import (
"fmt"
"sort"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)
type (
// TimerType indicates timer type
TimerType int32
)
const (
// TimerTypeStartToClose is the timer type for activity startToClose timer
TimerTypeStartToClose = TimerType(types.TimeoutTypeStartToClose)
// TimerTypeScheduleToStart is the timer type for activity scheduleToStart timer
TimerTypeScheduleToStart = TimerType(types.TimeoutTypeScheduleToStart)
// TimerTypeScheduleToClose is the timer type for activity scheduleToClose timer
TimerTypeScheduleToClose = TimerType(types.TimeoutTypeScheduleToClose)
// TimerTypeHeartbeat is the timer type for activity heartbeat timer
TimerTypeHeartbeat = TimerType(types.TimeoutTypeHeartbeat)
)
const (
// TimerTaskStatusNone indicates activity / user timer task has not been created
TimerTaskStatusNone = iota
// TimerTaskStatusCreated indicates user timer task has been created
TimerTaskStatusCreated
)
const (
// TimerTaskStatusCreatedStartToClose indicates activity startToClose timer has been created
TimerTaskStatusCreatedStartToClose = 1 << iota
// TimerTaskStatusCreatedScheduleToStart indicates activity scheduleToStart timer has been created
TimerTaskStatusCreatedScheduleToStart
// TimerTaskStatusCreatedScheduleToClose indicates activity scheduleToClose timer has been created
TimerTaskStatusCreatedScheduleToClose
// TimerTaskStatusCreatedHeartbeat indicates activity heartbeat timer has been created
TimerTaskStatusCreatedHeartbeat
)
type (
// TimerSequenceID describes user / activity timer and defines an order among timers
TimerSequenceID struct {
EventID int64
Timestamp time.Time
TimerType TimerType
TimerCreated bool
Attempt int32
}
// TimerSequenceIDs is a list of TimerSequenceID
TimerSequenceIDs []TimerSequenceID
// TimerSequence manages user / activity timer
TimerSequence interface {
IsExpired(referenceTime time.Time, TimerSequenceID TimerSequenceID) (time.Duration, bool)
CreateNextUserTimer() (bool, error)
CreateNextActivityTimer() (bool, error)
LoadAndSortUserTimers() []TimerSequenceID
LoadAndSortActivityTimers() []TimerSequenceID
}
timerSequenceImpl struct {
mutableState MutableState
}
)
var _ TimerSequence = (*timerSequenceImpl)(nil)
// NewTimerSequence creates a new timer sequence
func NewTimerSequence(
mutableState MutableState,
) TimerSequence {
return &timerSequenceImpl{
mutableState: mutableState,
}
}
func (t *timerSequenceImpl) IsExpired(
referenceTime time.Time,
TimerSequenceID TimerSequenceID,
) (time.Duration, bool) {
// Cassandra timestamp resolution is in millisecond
// here we do the check in terms of second resolution.
timerFireTimeInSecond := TimerSequenceID.Timestamp.Unix()
referenceTimeInSecond := referenceTime.Unix()
if timerFireTimeInSecond <= referenceTimeInSecond {
return time.Duration(referenceTimeInSecond-timerFireTimeInSecond) * time.Second, true
}
return 0, false
}
func (t *timerSequenceImpl) CreateNextUserTimer() (bool, error) {
sequenceIDs := t.LoadAndSortUserTimers()
if len(sequenceIDs) == 0 {
return false, nil
}
firstTimerTask := sequenceIDs[0]
// timer has already been created
if firstTimerTask.TimerCreated {
return false, nil
}
timerInfo, ok := t.mutableState.GetUserTimerInfoByEventID(firstTimerTask.EventID)
if !ok {
return false, &types.InternalServiceError{
Message: fmt.Sprintf("unable to load activity info %v", firstTimerTask.EventID),
}
}
// mark timer task mask as indication that timer task is generated
// here TaskID is misleading attr, should be called timer created flag or something
timerInfo.TaskStatus = TimerTaskStatusCreated
if err := t.mutableState.UpdateUserTimer(timerInfo); err != nil {
return false, err
}
t.mutableState.AddTimerTasks(&persistence.UserTimerTask{
TaskData: persistence.TaskData{
// TaskID is set by shard
VisibilityTimestamp: firstTimerTask.Timestamp,
Version: t.mutableState.GetCurrentVersion(),
},
EventID: firstTimerTask.EventID,
})
return true, nil
}
func (t *timerSequenceImpl) CreateNextActivityTimer() (bool, error) {
sequenceIDs := t.LoadAndSortActivityTimers()
if len(sequenceIDs) == 0 {
return false, nil
}
firstTimerTask := sequenceIDs[0]
// timer has already been created
if firstTimerTask.TimerCreated {
return false, nil
}
activityInfo, ok := t.mutableState.GetActivityInfo(firstTimerTask.EventID)
if !ok {
return false, &types.InternalServiceError{
Message: fmt.Sprintf("unable to load activity info %v", firstTimerTask.EventID),
}
}
// mark timer task mask as indication that timer task is generated
activityInfo.TimerTaskStatus |= TimerTypeToTimerMask(firstTimerTask.TimerType)
if firstTimerTask.TimerType == TimerTypeHeartbeat {
activityInfo.LastHeartbeatTimeoutVisibilityInSeconds = firstTimerTask.Timestamp.Unix()
}
if err := t.mutableState.UpdateActivity(activityInfo); err != nil {
return false, err
}
t.mutableState.AddTimerTasks(&persistence.ActivityTimeoutTask{
TaskData: persistence.TaskData{
// TaskID is set by shard
VisibilityTimestamp: firstTimerTask.Timestamp,
Version: t.mutableState.GetCurrentVersion(),
},
TimeoutType: int(firstTimerTask.TimerType),
EventID: firstTimerTask.EventID,
Attempt: int64(firstTimerTask.Attempt),
})
return true, nil
}
func (t *timerSequenceImpl) LoadAndSortUserTimers() []TimerSequenceID {
pendingTimers := t.mutableState.GetPendingTimerInfos()
timers := make(TimerSequenceIDs, 0, len(pendingTimers))
for _, timerInfo := range pendingTimers {
if sequenceID := t.getUserTimerTimeout(
timerInfo,
); sequenceID != nil {
timers = append(timers, *sequenceID)
}
}
sort.Sort(timers)
return timers
}
func (t *timerSequenceImpl) LoadAndSortActivityTimers() []TimerSequenceID {
// there can be 4 timer per activity
// see TimerType
pendingActivities := t.mutableState.GetPendingActivityInfos()
activityTimers := make(TimerSequenceIDs, 0, len(pendingActivities)*4)
for _, activityInfo := range pendingActivities {
if sequenceID := t.getActivityScheduleToCloseTimeout(
activityInfo,
); sequenceID != nil {
activityTimers = append(activityTimers, *sequenceID)
}
if sequenceID := t.getActivityScheduleToStartTimeout(
activityInfo,
); sequenceID != nil {
activityTimers = append(activityTimers, *sequenceID)
}
if sequenceID := t.getActivityStartToCloseTimeout(
activityInfo,
); sequenceID != nil {
activityTimers = append(activityTimers, *sequenceID)
}
if sequenceID := t.getActivityHeartbeatTimeout(
activityInfo,
); sequenceID != nil {
activityTimers = append(activityTimers, *sequenceID)
}
}
sort.Sort(activityTimers)
return activityTimers
}
func (t *timerSequenceImpl) getUserTimerTimeout(
timerInfo *persistence.TimerInfo,
) *TimerSequenceID {
return &TimerSequenceID{
EventID: timerInfo.StartedID,
Timestamp: timerInfo.ExpiryTime,
TimerType: TimerTypeStartToClose,
TimerCreated: timerInfo.TaskStatus == TimerTaskStatusCreated,
Attempt: 0,
}
}
func (t *timerSequenceImpl) getActivityScheduleToStartTimeout(
activityInfo *persistence.ActivityInfo,
) *TimerSequenceID {
// activity is not scheduled yet, probably due to retry & backoff
if activityInfo.ScheduleID == common.EmptyEventID {
return nil
}
// activity is already started
if activityInfo.StartedID != common.EmptyEventID {
return nil
}
startTimeout := activityInfo.ScheduledTime.Add(
time.Duration(activityInfo.ScheduleToStartTimeout) * time.Second,
)
return &TimerSequenceID{
EventID: activityInfo.ScheduleID,
Timestamp: startTimeout,
TimerType: TimerTypeScheduleToStart,
TimerCreated: (activityInfo.TimerTaskStatus & TimerTaskStatusCreatedScheduleToStart) > 0,
Attempt: activityInfo.Attempt,
}
}
func (t *timerSequenceImpl) getActivityScheduleToCloseTimeout(
activityInfo *persistence.ActivityInfo,
) *TimerSequenceID {
// activity is not scheduled yet, probably due to retry & backoff
if activityInfo.ScheduleID == common.EmptyEventID {
return nil
}
closeTimeout := activityInfo.ScheduledTime.Add(
time.Duration(activityInfo.ScheduleToCloseTimeout) * time.Second,
)
return &TimerSequenceID{
EventID: activityInfo.ScheduleID,
Timestamp: closeTimeout,
TimerType: TimerTypeScheduleToClose,
TimerCreated: (activityInfo.TimerTaskStatus & TimerTaskStatusCreatedScheduleToClose) > 0,
Attempt: activityInfo.Attempt,
}
}
func (t *timerSequenceImpl) getActivityStartToCloseTimeout(
activityInfo *persistence.ActivityInfo,
) *TimerSequenceID {
// activity is not scheduled yet, probably due to retry & backoff
if activityInfo.ScheduleID == common.EmptyEventID {
return nil
}
// activity is not started yet
if activityInfo.StartedID == common.EmptyEventID {
return nil
}
closeTimeout := activityInfo.StartedTime.Add(
time.Duration(activityInfo.StartToCloseTimeout) * time.Second,
)
return &TimerSequenceID{
EventID: activityInfo.ScheduleID,
Timestamp: closeTimeout,
TimerType: TimerTypeStartToClose,
TimerCreated: (activityInfo.TimerTaskStatus & TimerTaskStatusCreatedStartToClose) > 0,
Attempt: activityInfo.Attempt,
}
}
func (t *timerSequenceImpl) getActivityHeartbeatTimeout(
activityInfo *persistence.ActivityInfo,
) *TimerSequenceID {
// activity is not scheduled yet, probably due to retry & backoff
if activityInfo.ScheduleID == common.EmptyEventID {
return nil
}
// activity is not started yet
if activityInfo.StartedID == common.EmptyEventID {
return nil
}
// not heartbeat timeout configured
if activityInfo.HeartbeatTimeout <= 0 {
return nil
}
// use the latest time as last heartbeat time
lastHeartbeat := activityInfo.StartedTime
if activityInfo.LastHeartBeatUpdatedTime.After(lastHeartbeat) {
lastHeartbeat = activityInfo.LastHeartBeatUpdatedTime
}
heartbeatTimeout := lastHeartbeat.Add(
time.Duration(activityInfo.HeartbeatTimeout) * time.Second,
)
return &TimerSequenceID{
EventID: activityInfo.ScheduleID,
Timestamp: heartbeatTimeout,
TimerType: TimerTypeHeartbeat,
TimerCreated: (activityInfo.TimerTaskStatus & TimerTaskStatusCreatedHeartbeat) > 0,
Attempt: activityInfo.Attempt,
}
}
// TimerTypeToTimerMask converts TimerType into the TimerTaskStatus flag
func TimerTypeToTimerMask(
TimerType TimerType,
) int32 {
switch TimerType {
case TimerTypeStartToClose:
return TimerTaskStatusCreatedStartToClose
case TimerTypeScheduleToStart:
return TimerTaskStatusCreatedScheduleToStart
case TimerTypeScheduleToClose:
return TimerTaskStatusCreatedScheduleToClose
case TimerTypeHeartbeat:
return TimerTaskStatusCreatedHeartbeat
default:
panic("invalid timeout type")
}
}
// TimerTypeToInternal converts TimeType to its internal representation
func TimerTypeToInternal(
TimerType TimerType,
) types.TimeoutType {
switch TimerType {
case TimerTypeStartToClose:
return types.TimeoutTypeStartToClose
case TimerTypeScheduleToStart:
return types.TimeoutTypeScheduleToStart
case TimerTypeScheduleToClose:
return types.TimeoutTypeScheduleToClose
case TimerTypeHeartbeat:
return types.TimeoutTypeHeartbeat
default:
panic(fmt.Sprintf("invalid timer type: %v", TimerType))
}
}
// TimerTypeFromInternal gets TimerType from internal type
func TimerTypeFromInternal(
TimerType types.TimeoutType,
) TimerType {
switch TimerType {
case types.TimeoutTypeStartToClose:
return TimerTypeStartToClose
case types.TimeoutTypeScheduleToStart:
return TimerTypeScheduleToStart
case types.TimeoutTypeScheduleToClose:
return TimerTypeScheduleToClose
case types.TimeoutTypeHeartbeat:
return TimerTypeHeartbeat
default:
panic(fmt.Sprintf("invalid timeout type: %v", TimerType))
}
}
// TimerTypeToReason creates timeout reason based on the TimeType
func TimerTypeToReason(
timerType TimerType,
) string {
return fmt.Sprintf("cadenceInternal:Timeout %v", TimerTypeToInternal(timerType))
}
// Len implements sort.Interface
func (s TimerSequenceIDs) Len() int {
return len(s)
}
// Swap implements sort.Interface.
func (s TimerSequenceIDs) Swap(
this int,
that int,
) {
s[this], s[that] = s[that], s[this]
}
// Less implements sort.Interface
func (s TimerSequenceIDs) Less(
this int,
that int,
) bool {
thisSequenceID := s[this]
thatSequenceID := s[that]
// order: timeout time, event ID, timeout type
if thisSequenceID.Timestamp.Before(thatSequenceID.Timestamp) {
return true
} else if thisSequenceID.Timestamp.After(thatSequenceID.Timestamp) {
return false
}
// timeout time are the same
if thisSequenceID.EventID < thatSequenceID.EventID {
return true
} else if thisSequenceID.EventID > thatSequenceID.EventID {
return false
}
// timeout time & event ID are the same
if thisSequenceID.TimerType < thatSequenceID.TimerType {
return true
} else if thisSequenceID.TimerType > thatSequenceID.TimerType {
return false
}
// thisSequenceID && thatSequenceID are the same
return true
}