common/persistence/tasks.go (276 lines of code) (raw):
// The MIT License (MIT)
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package persistence
import "time"
// Task is the generic interface for workflow tasks
type Task interface {
GetType() int
GetVersion() int64
SetVersion(version int64)
GetTaskID() int64
SetTaskID(id int64)
GetVisibilityTimestamp() time.Time
SetVisibilityTimestamp(timestamp time.Time)
}
type (
// TaskData is common attributes for all tasks.
TaskData struct {
Version int64
TaskID int64
VisibilityTimestamp time.Time
}
// ActivityTask identifies a transfer task for activity
ActivityTask struct {
TaskData
DomainID string
TaskList string
ScheduleID int64
}
// DecisionTask identifies a transfer task for decision
DecisionTask struct {
TaskData
DomainID string
TaskList string
ScheduleID int64
RecordVisibility bool
}
// RecordWorkflowStartedTask identifites a transfer task for writing visibility open execution record
RecordWorkflowStartedTask struct {
TaskData
}
// ResetWorkflowTask identifites a transfer task to reset workflow
ResetWorkflowTask struct {
TaskData
}
// CloseExecutionTask identifies a transfer task for deletion of execution
CloseExecutionTask struct {
TaskData
}
// DeleteHistoryEventTask identifies a timer task for deletion of history events of completed execution.
DeleteHistoryEventTask struct {
TaskData
}
// DecisionTimeoutTask identifies a timeout task.
DecisionTimeoutTask struct {
TaskData
EventID int64
ScheduleAttempt int64
TimeoutType int
}
// WorkflowTimeoutTask identifies a timeout task.
WorkflowTimeoutTask struct {
TaskData
}
// CancelExecutionTask identifies a transfer task for cancel of execution
CancelExecutionTask struct {
TaskData
TargetDomainID string
TargetWorkflowID string
TargetRunID string
TargetChildWorkflowOnly bool
InitiatedID int64
}
// SignalExecutionTask identifies a transfer task for signal execution
SignalExecutionTask struct {
TaskData
TargetDomainID string
TargetWorkflowID string
TargetRunID string
TargetChildWorkflowOnly bool
InitiatedID int64
}
// UpsertWorkflowSearchAttributesTask identifies a transfer task for upsert search attributes
UpsertWorkflowSearchAttributesTask struct {
TaskData
}
// StartChildExecutionTask identifies a transfer task for starting child execution
StartChildExecutionTask struct {
TaskData
TargetDomainID string
TargetWorkflowID string
InitiatedID int64
}
// RecordWorkflowClosedTask identifies a transfer task for writing visibility close execution record
RecordWorkflowClosedTask struct {
TaskData
}
// RecordChildExecutionCompletedTask identifies a task for recording the competion of a child workflow
RecordChildExecutionCompletedTask struct {
TaskData
TargetDomainID string
TargetWorkflowID string
TargetRunID string
}
// ApplyParentClosePolicyTask identifies a task for applying parent close policy
ApplyParentClosePolicyTask struct {
TaskData
TargetDomainIDs map[string]struct{}
}
// CrossClusterStartChildExecutionTask is the cross-cluster version of StartChildExecutionTask
CrossClusterStartChildExecutionTask struct {
StartChildExecutionTask
TargetCluster string
}
// CrossClusterCancelExecutionTask is the cross-cluster version of CancelExecutionTask
CrossClusterCancelExecutionTask struct {
CancelExecutionTask
TargetCluster string
}
// CrossClusterSignalExecutionTask is the cross-cluster version of SignalExecutionTask
CrossClusterSignalExecutionTask struct {
SignalExecutionTask
TargetCluster string
}
// CrossClusterRecordChildExecutionCompletedTask is the cross-cluster version of RecordChildExecutionCompletedTask
CrossClusterRecordChildExecutionCompletedTask struct {
RecordChildExecutionCompletedTask
TargetCluster string
}
// CrossClusterApplyParentClosePolicyTask is the cross-cluster version of ApplyParentClosePolicyTask
CrossClusterApplyParentClosePolicyTask struct {
ApplyParentClosePolicyTask
TargetCluster string
}
// ActivityTimeoutTask identifies a timeout task.
ActivityTimeoutTask struct {
TaskData
TimeoutType int
EventID int64
Attempt int64
}
// UserTimerTask identifies a timeout task.
UserTimerTask struct {
TaskData
EventID int64
}
// ActivityRetryTimerTask to schedule a retry task for activity
ActivityRetryTimerTask struct {
TaskData
EventID int64
Attempt int32
}
// WorkflowBackoffTimerTask to schedule first decision task for retried workflow
WorkflowBackoffTimerTask struct {
TaskData
EventID int64 // TODO this attribute is not used?
TimeoutType int // 0 for retry, 1 for cron.
}
// HistoryReplicationTask is the replication task created for shipping history replication events to other clusters
HistoryReplicationTask struct {
TaskData
FirstEventID int64
NextEventID int64
BranchToken []byte
NewRunBranchToken []byte
}
// SyncActivityTask is the replication task created for shipping activity info to other clusters
SyncActivityTask struct {
TaskData
ScheduledID int64
}
// FailoverMarkerTask is the marker for graceful failover
FailoverMarkerTask struct {
TaskData
DomainID string
}
)
// assert all task types implements Task interface
var (
_ Task = (*ActivityTask)(nil)
_ Task = (*DecisionTask)(nil)
_ Task = (*RecordWorkflowStartedTask)(nil)
_ Task = (*ResetWorkflowTask)(nil)
_ Task = (*CloseExecutionTask)(nil)
_ Task = (*DeleteHistoryEventTask)(nil)
_ Task = (*DecisionTimeoutTask)(nil)
_ Task = (*WorkflowTimeoutTask)(nil)
_ Task = (*CancelExecutionTask)(nil)
_ Task = (*SignalExecutionTask)(nil)
_ Task = (*RecordChildExecutionCompletedTask)(nil)
_ Task = (*ApplyParentClosePolicyTask)(nil)
_ Task = (*UpsertWorkflowSearchAttributesTask)(nil)
_ Task = (*StartChildExecutionTask)(nil)
_ Task = (*RecordWorkflowClosedTask)(nil)
_ Task = (*CrossClusterStartChildExecutionTask)(nil)
_ Task = (*CrossClusterCancelExecutionTask)(nil)
_ Task = (*CrossClusterSignalExecutionTask)(nil)
_ Task = (*CrossClusterRecordChildExecutionCompletedTask)(nil)
_ Task = (*CrossClusterApplyParentClosePolicyTask)(nil)
_ Task = (*ActivityTimeoutTask)(nil)
_ Task = (*UserTimerTask)(nil)
_ Task = (*ActivityRetryTimerTask)(nil)
_ Task = (*WorkflowBackoffTimerTask)(nil)
_ Task = (*HistoryReplicationTask)(nil)
_ Task = (*SyncActivityTask)(nil)
_ Task = (*FailoverMarkerTask)(nil)
)
// GetVersion returns the version of the task
func (a *TaskData) GetVersion() int64 {
return a.Version
}
// SetVersion sets the version of the task
func (a *TaskData) SetVersion(version int64) {
a.Version = version
}
// GetTaskID returns the sequence ID of the task
func (a *TaskData) GetTaskID() int64 {
return a.TaskID
}
// SetTaskID sets the sequence ID of the task
func (a *TaskData) SetTaskID(id int64) {
a.TaskID = id
}
// GetVisibilityTimestamp get the visibility timestamp
func (a *TaskData) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}
// SetVisibilityTimestamp set the visibility timestamp
func (a *TaskData) SetVisibilityTimestamp(timestamp time.Time) {
a.VisibilityTimestamp = timestamp
}
// GetType returns the type of the activity task
func (a *ActivityTask) GetType() int {
return TransferTaskTypeActivityTask
}
// GetType returns the type of the decision task
func (d *DecisionTask) GetType() int {
return TransferTaskTypeDecisionTask
}
// GetVersion returns the version of the decision task
func (d *DecisionTask) GetVersion() int64 {
return d.Version
}
// GetType returns the type of the record workflow started task
func (a *RecordWorkflowStartedTask) GetType() int {
return TransferTaskTypeRecordWorkflowStarted
}
// GetType returns the type of the ResetWorkflowTask
func (a *ResetWorkflowTask) GetType() int {
return TransferTaskTypeResetWorkflow
}
// GetType returns the type of the close execution task
func (a *CloseExecutionTask) GetType() int {
return TransferTaskTypeCloseExecution
}
// GetType returns the type of the delete execution task
func (a *DeleteHistoryEventTask) GetType() int {
return TaskTypeDeleteHistoryEvent
}
// GetType returns the type of the timer task
func (d *DecisionTimeoutTask) GetType() int {
return TaskTypeDecisionTimeout
}
// GetType returns the type of the timer task
func (a *ActivityTimeoutTask) GetType() int {
return TaskTypeActivityTimeout
}
// GetType returns the type of the timer task
func (u *UserTimerTask) GetType() int {
return TaskTypeUserTimer
}
// GetType returns the type of the retry timer task
func (r *ActivityRetryTimerTask) GetType() int {
return TaskTypeActivityRetryTimer
}
// GetType returns the type of the retry timer task
func (r *WorkflowBackoffTimerTask) GetType() int {
return TaskTypeWorkflowBackoffTimer
}
// GetType returns the type of the timeout task.
func (u *WorkflowTimeoutTask) GetType() int {
return TaskTypeWorkflowTimeout
}
// GetType returns the type of the cancel transfer task
func (u *CancelExecutionTask) GetType() int {
return TransferTaskTypeCancelExecution
}
// GetType returns the type of the signal transfer task
func (u *SignalExecutionTask) GetType() int {
return TransferTaskTypeSignalExecution
}
// GetType returns the type of the record child execution completed task
func (u *RecordChildExecutionCompletedTask) GetType() int {
return TransferTaskTypeRecordChildExecutionCompleted
}
// GetType returns the type of the apply parent close policy task
func (u *ApplyParentClosePolicyTask) GetType() int {
return TransferTaskTypeApplyParentClosePolicy
}
// GetType returns the type of the upsert search attributes transfer task
func (u *UpsertWorkflowSearchAttributesTask) GetType() int {
return TransferTaskTypeUpsertWorkflowSearchAttributes
}
// GetType returns the type of the start child transfer task
func (u *StartChildExecutionTask) GetType() int {
return TransferTaskTypeStartChildExecution
}
// GetType returns the type of the record workflow closed task
func (u *RecordWorkflowClosedTask) GetType() int {
return TransferTaskTypeRecordWorkflowClosed
}
// GetType returns of type of the cross-cluster start child task
func (c *CrossClusterStartChildExecutionTask) GetType() int {
return CrossClusterTaskTypeStartChildExecution
}
// GetType returns of type of the cross-cluster cancel task
func (c *CrossClusterCancelExecutionTask) GetType() int {
return CrossClusterTaskTypeCancelExecution
}
// GetType returns of type of the cross-cluster signal task
func (c *CrossClusterSignalExecutionTask) GetType() int {
return CrossClusterTaskTypeSignalExecution
}
// GetType returns of type of the cross-cluster record child workflow completion task
func (c *CrossClusterRecordChildExecutionCompletedTask) GetType() int {
return CrossClusterTaskTypeRecordChildExeuctionCompleted
}
// GetType returns of type of the cross-cluster cancel task
func (c *CrossClusterApplyParentClosePolicyTask) GetType() int {
return CrossClusterTaskTypeApplyParentClosePolicy
}
// GetType returns the type of the history replication task
func (a *HistoryReplicationTask) GetType() int {
return ReplicationTaskTypeHistory
}
// GetType returns the type of the history replication task
func (a *SyncActivityTask) GetType() int {
return ReplicationTaskTypeSyncActivity
}
// GetType returns the type of the history replication task
func (a *FailoverMarkerTask) GetType() int {
return ReplicationTaskTypeFailoverMarker
}