common/persistence/data_manager_interfaces.go (1,329 lines of code) (raw):
// Copyright (c) 2017-2020 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
// Geneate rate limiter wrappers.
//go:generate mockgen -package $GOPACKAGE -destination dataManagerInterfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence Task,ShardManager,ExecutionManager,ExecutionManagerFactory,TaskManager,HistoryManager,DomainManager,QueueManager,ConfigStoreManager
//go:generate gowrap gen -g -p . -i ConfigStoreManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/configstore_generated.go
//go:generate gowrap gen -g -p . -i DomainManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/domain_generated.go
//go:generate gowrap gen -g -p . -i HistoryManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/history_generated.go
//go:generate gowrap gen -g -p . -i ExecutionManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/execution_generated.go
//go:generate gowrap gen -g -p . -i QueueManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/queue_generated.go
//go:generate gowrap gen -g -p . -i TaskManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/task_generated.go
//go:generate gowrap gen -g -p . -i ShardManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/shard_generated.go
// Geneate error injector wrappers.
//go:generate gowrap gen -g -p . -i ConfigStoreManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/configstore_generated.go
//go:generate gowrap gen -g -p . -i ShardManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/shard_generated.go
//go:generate gowrap gen -g -p . -i ExecutionManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/execution_generated.go
//go:generate gowrap gen -g -p . -i TaskManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/task_generated.go
//go:generate gowrap gen -g -p . -i HistoryManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/history_generated.go
//go:generate gowrap gen -g -p . -i DomainManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/domain_generated.go
//go:generate gowrap gen -g -p . -i QueueManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/queue_generated.go
// Generate metered wrappers.
//go:generate gowrap gen -g -p . -i ConfigStoreManager -t ./wrappers/templates/metered.tmpl -o wrappers/metered/configstore_generated.go
//go:generate gowrap gen -g -p . -i ShardManager -t ./wrappers/templates/metered.tmpl -o wrappers/metered/shard_generated.go
//go:generate gowrap gen -g -p . -i TaskManager -t ./wrappers/templates/metered.tmpl -o wrappers/metered/task_generated.go
//go:generate gowrap gen -g -p . -i HistoryManager -t ./wrappers/templates/metered.tmpl -o wrappers/metered/history_generated.go
//go:generate gowrap gen -g -p . -i DomainManager -t ./wrappers/templates/metered.tmpl -o wrappers/metered/domain_generated.go
//go:generate gowrap gen -g -p . -i QueueManager -t ./wrappers/templates/metered.tmpl -o wrappers/metered/queue_generated.go
// execution metered wrapper is special
//go:generate gowrap gen -g -p . -i ExecutionManager -t ./wrappers/templates/metered_execution.tmpl -o wrappers/metered/execution_generated.go
package persistence
import (
"context"
"fmt"
"strings"
"time"
"github.com/pborman/uuid"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/types"
)
// Domain status
const (
DomainStatusRegistered = iota
DomainStatusDeprecated
DomainStatusDeleted
)
const (
// EventStoreVersion is already deprecated, this is used for forward
// compatibility (so that rollback is possible).
// TODO we can remove it after fixing all the query templates and when
// we decide the compatibility is no longer needed.
EventStoreVersion = 2
)
// CreateWorkflowMode workflow creation mode
type CreateWorkflowMode int
// QueueType is an enum that represents various queue types in persistence
type QueueType int
// Queue types used in queue table
// Use positive numbers for queue type
// Negative numbers are reserved for DLQ
const (
DomainReplicationQueueType QueueType = iota + 1
)
// Create Workflow Execution Mode
const (
// Fail if current record exists
// Only applicable for CreateWorkflowExecution
CreateWorkflowModeBrandNew CreateWorkflowMode = iota
// Update current record only if workflow is closed
// Only applicable for CreateWorkflowExecution
CreateWorkflowModeWorkflowIDReuse
// Update current record only if workflow is open
// Only applicable for UpdateWorkflowExecution
CreateWorkflowModeContinueAsNew
// Do not update current record since workflow to
// applicable for CreateWorkflowExecution, UpdateWorkflowExecution
CreateWorkflowModeZombie
)
// UpdateWorkflowMode update mode
type UpdateWorkflowMode int
// Update Workflow Execution Mode
const (
// Update workflow, including current record
// NOTE: update on current record is a condition update
UpdateWorkflowModeUpdateCurrent UpdateWorkflowMode = iota
// Update workflow, without current record
// NOTE: current record CANNOT point to the workflow to be updated
UpdateWorkflowModeBypassCurrent
// Update workflow, ignoring current record
// NOTE: current record may or may not point to the workflow
// this mode should only be used for (re-)generating workflow tasks
// and there's no other changes to the workflow
UpdateWorkflowModeIgnoreCurrent
)
// ConflictResolveWorkflowMode conflict resolve mode
type ConflictResolveWorkflowMode int
// Conflict Resolve Workflow Mode
const (
// Conflict resolve workflow, including current record
// NOTE: update on current record is a condition update
ConflictResolveWorkflowModeUpdateCurrent ConflictResolveWorkflowMode = iota
// Conflict resolve workflow, without current record
// NOTE: current record CANNOT point to the workflow to be updated
ConflictResolveWorkflowModeBypassCurrent
)
// Workflow execution states
const (
WorkflowStateCreated = iota
WorkflowStateRunning
WorkflowStateCompleted
WorkflowStateZombie
WorkflowStateVoid
WorkflowStateCorrupted
)
// Workflow execution close status
const (
WorkflowCloseStatusNone = iota
WorkflowCloseStatusCompleted
WorkflowCloseStatusFailed
WorkflowCloseStatusCanceled
WorkflowCloseStatusTerminated
WorkflowCloseStatusContinuedAsNew
WorkflowCloseStatusTimedOut
)
// Types of task lists
const (
TaskListTypeDecision = iota
TaskListTypeActivity
)
// Kinds of task lists
const (
TaskListKindNormal = iota
TaskListKindSticky
)
// Transfer task types
const (
TransferTaskTypeDecisionTask = iota
TransferTaskTypeActivityTask
TransferTaskTypeCloseExecution
TransferTaskTypeCancelExecution
TransferTaskTypeStartChildExecution
TransferTaskTypeSignalExecution
TransferTaskTypeRecordWorkflowStarted
TransferTaskTypeResetWorkflow
TransferTaskTypeUpsertWorkflowSearchAttributes
TransferTaskTypeRecordWorkflowClosed
TransferTaskTypeRecordChildExecutionCompleted
TransferTaskTypeApplyParentClosePolicy
)
// Types of cross-cluster tasks
const (
CrossClusterTaskTypeStartChildExecution = iota + 1
CrossClusterTaskTypeCancelExecution
CrossClusterTaskTypeSignalExecution
CrossClusterTaskTypeRecordChildExeuctionCompleted
CrossClusterTaskTypeApplyParentClosePolicy
)
// Types of replication tasks
const (
ReplicationTaskTypeHistory = iota
ReplicationTaskTypeSyncActivity
ReplicationTaskTypeFailoverMarker
)
// Types of timers
const (
TaskTypeDecisionTimeout = iota
TaskTypeActivityTimeout
TaskTypeUserTimer
TaskTypeWorkflowTimeout
TaskTypeDeleteHistoryEvent
TaskTypeActivityRetryTimer
TaskTypeWorkflowBackoffTimer
)
// UnknownNumRowsAffected is returned when the number of rows that an API affected cannot be determined
const UnknownNumRowsAffected = -1
// Types of workflow backoff timeout
const (
WorkflowBackoffTimeoutTypeRetry = iota
WorkflowBackoffTimeoutTypeCron
)
const (
// InitialFailoverNotificationVersion is the initial failover version for a domain
InitialFailoverNotificationVersion int64 = 0
// TransferTaskTransferTargetWorkflowID is the the dummy workflow ID for transfer tasks of types
// that do not have a target workflow
TransferTaskTransferTargetWorkflowID = "20000000-0000-f000-f000-000000000001"
// TransferTaskTransferTargetRunID is the the dummy run ID for transfer tasks of types
// that do not have a target workflow
TransferTaskTransferTargetRunID = "30000000-0000-f000-f000-000000000002"
// CrossClusterTaskDefaultTargetRunID is the the dummy run ID for cross-cluster tasks of types
// that do not have a target workflow
CrossClusterTaskDefaultTargetRunID = TransferTaskTransferTargetRunID
// indicate invalid workflow state transition
invalidStateTransitionMsg = "unable to change workflow state from %v to %v, close status %v"
)
const numItemsInGarbageInfo = 3
type ConfigType int
const (
DynamicConfig ConfigType = iota
GlobalIsolationGroupConfig
)
type (
// ShardInfo describes a shard
ShardInfo struct {
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *types.ProcessingQueueStates `json:"transfer_processing_queue_states"`
CrossClusterProcessingQueueStates *types.ProcessingQueueStates `json:"cross_cluster_queue_states"`
TimerProcessingQueueStates *types.ProcessingQueueStates `json:"timer_processing_queue_states"`
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers []*types.FailoverMarkerAttributes `json:"pending_failover_markers"`
}
// WorkflowExecutionInfo describes a workflow execution
WorkflowExecutionInfo struct {
DomainID string
WorkflowID string
RunID string
FirstExecutionRunID string
ParentDomainID string
ParentWorkflowID string
ParentRunID string
InitiatedID int64
CompletionEventBatchID int64
CompletionEvent *types.HistoryEvent
TaskList string
WorkflowTypeName string
WorkflowTimeout int32
DecisionStartToCloseTimeout int32
ExecutionContext []byte
State int
CloseStatus int
LastFirstEventID int64
LastEventTaskID int64
NextEventID int64
LastProcessedEvent int64
StartTimestamp time.Time
LastUpdatedTimestamp time.Time
CreateRequestID string
SignalCount int32
DecisionVersion int64
DecisionScheduleID int64
DecisionStartedID int64
DecisionRequestID string
DecisionTimeout int32
DecisionAttempt int64
DecisionStartedTimestamp int64
DecisionScheduledTimestamp int64
DecisionOriginalScheduledTimestamp int64
CancelRequested bool
CancelRequestID string
StickyTaskList string
StickyScheduleToStartTimeout int32
ClientLibraryVersion string
ClientFeatureVersion string
ClientImpl string
AutoResetPoints *types.ResetPoints
Memo map[string][]byte
SearchAttributes map[string][]byte
PartitionConfig map[string]string
// for retry
Attempt int32
HasRetryPolicy bool
InitialInterval int32
BackoffCoefficient float64
MaximumInterval int32
ExpirationTime time.Time
MaximumAttempts int32
NonRetriableErrors []string
BranchToken []byte
// Cron
CronSchedule string
IsCron bool
ExpirationSeconds int32 // TODO: is this field useful?
}
// ExecutionStats is the statistics about workflow execution
ExecutionStats struct {
HistorySize int64
}
// ReplicationState represents mutable state information for global domains.
// This information is used by replication protocol when applying events from remote clusters
// TODO: remove this struct after all 2DC workflows complete
ReplicationState struct {
CurrentVersion int64
StartVersion int64
LastWriteVersion int64
LastWriteEventID int64
LastReplicationInfo map[string]*ReplicationInfo
}
// CurrentWorkflowExecution describes a current execution record
CurrentWorkflowExecution struct {
DomainID string
WorkflowID string
RunID string
State int
CurrentRunID string
}
// TransferTaskInfo describes a transfer task
TransferTaskInfo struct {
DomainID string
WorkflowID string
RunID string
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetDomainIDs map[string]struct{} // used for ApplyParentPolicy request
TargetWorkflowID string
TargetRunID string
TargetChildWorkflowOnly bool
TaskList string
TaskType int
ScheduleID int64
Version int64
RecordVisibility bool
}
// CrossClusterTaskInfo describes a cross-cluster task
// Cross cluster tasks are exactly like transfer tasks so
// instead of creating another struct and duplicating the same
// logic everywhere. We reuse TransferTaskInfo
CrossClusterTaskInfo = TransferTaskInfo
// ReplicationTaskInfo describes the replication task created for replication of history events
ReplicationTaskInfo struct {
DomainID string
WorkflowID string
RunID string
TaskID int64
TaskType int
FirstEventID int64
NextEventID int64
Version int64
ScheduledID int64
BranchToken []byte
NewRunBranchToken []byte
CreationTime int64
}
// TimerTaskInfo describes a timer task.
TimerTaskInfo struct {
DomainID string
WorkflowID string
RunID string
VisibilityTimestamp time.Time
TaskID int64
TaskType int
TimeoutType int
EventID int64
ScheduleAttempt int64
Version int64
}
// TaskListInfo describes a state of a task list implementation.
TaskListInfo struct {
DomainID string
Name string
TaskType int
RangeID int64
AckLevel int64
Kind int
Expiry time.Time
LastUpdated time.Time
}
// TaskInfo describes either activity or decision task
TaskInfo struct {
DomainID string
WorkflowID string
RunID string
TaskID int64
ScheduleID int64
ScheduleToStartTimeout int32
Expiry time.Time
CreatedTime time.Time
PartitionConfig map[string]string
}
// TaskKey gives primary key info for a specific task
TaskKey struct {
DomainID string
TaskListName string
TaskType int
TaskID int64
}
// ReplicationInfo represents the information stored for last replication event details per cluster
ReplicationInfo struct {
Version int64
LastEventID int64
}
// VersionHistoryItem contains the event id and the associated version
VersionHistoryItem struct {
EventID int64
Version int64
}
// VersionHistory provides operations on version history
VersionHistory struct {
BranchToken []byte
Items []*VersionHistoryItem
}
// VersionHistories contains a set of VersionHistory
VersionHistories struct {
CurrentVersionHistoryIndex int
Histories []*VersionHistory
}
// WorkflowMutableState indicates workflow related state
WorkflowMutableState struct {
ActivityInfos map[int64]*ActivityInfo
TimerInfos map[string]*TimerInfo
ChildExecutionInfos map[int64]*ChildExecutionInfo
RequestCancelInfos map[int64]*RequestCancelInfo
SignalInfos map[int64]*SignalInfo
SignalRequestedIDs map[string]struct{}
ExecutionInfo *WorkflowExecutionInfo
ExecutionStats *ExecutionStats
BufferedEvents []*types.HistoryEvent
VersionHistories *VersionHistories
ReplicationState *ReplicationState // TODO: remove this after all 2DC workflows complete
Checksum checksum.Checksum
}
// ActivityInfo details.
ActivityInfo struct {
Version int64
ScheduleID int64
ScheduledEventBatchID int64
ScheduledEvent *types.HistoryEvent
ScheduledTime time.Time
StartedID int64
StartedEvent *types.HistoryEvent
StartedTime time.Time
DomainID string
ActivityID string
RequestID string
Details []byte
ScheduleToStartTimeout int32
ScheduleToCloseTimeout int32
StartToCloseTimeout int32
HeartbeatTimeout int32
CancelRequested bool
CancelRequestID int64
LastHeartBeatUpdatedTime time.Time
TimerTaskStatus int32
// For retry
Attempt int32
StartedIdentity string
TaskList string
HasRetryPolicy bool
InitialInterval int32
BackoffCoefficient float64
MaximumInterval int32
ExpirationTime time.Time
MaximumAttempts int32
NonRetriableErrors []string
LastFailureReason string
LastWorkerIdentity string
LastFailureDetails []byte
// Not written to database - This is used only for deduping heartbeat timer creation
LastHeartbeatTimeoutVisibilityInSeconds int64
}
// TimerInfo details - metadata about user timer info.
TimerInfo struct {
Version int64
TimerID string
StartedID int64
ExpiryTime time.Time
TaskStatus int64
}
// ChildExecutionInfo has details for pending child executions.
ChildExecutionInfo struct {
Version int64
InitiatedID int64
InitiatedEventBatchID int64
InitiatedEvent *types.HistoryEvent
StartedID int64
StartedWorkflowID string
StartedRunID string
StartedEvent *types.HistoryEvent
CreateRequestID string
DomainID string
DomainNameDEPRECATED string // deprecated: please use DomainID field instead
WorkflowTypeName string
ParentClosePolicy types.ParentClosePolicy
}
// RequestCancelInfo has details for pending external workflow cancellations
RequestCancelInfo struct {
Version int64
InitiatedEventBatchID int64
InitiatedID int64
CancelRequestID string
}
// SignalInfo has details for pending external workflow signal
SignalInfo struct {
Version int64
InitiatedEventBatchID int64
InitiatedID int64
SignalRequestID string
SignalName string
Input []byte
Control []byte
}
// CreateShardRequest is used to create a shard in executions table
CreateShardRequest struct {
ShardInfo *ShardInfo
}
// GetShardRequest is used to get shard information
GetShardRequest struct {
ShardID int
}
// GetShardResponse is the response to GetShard
GetShardResponse struct {
ShardInfo *ShardInfo
}
// UpdateShardRequest is used to update shard information
UpdateShardRequest struct {
ShardInfo *ShardInfo
PreviousRangeID int64
}
// CreateWorkflowExecutionRequest is used to write a new workflow execution
CreateWorkflowExecutionRequest struct {
RangeID int64
Mode CreateWorkflowMode
PreviousRunID string
PreviousLastWriteVersion int64
NewWorkflowSnapshot WorkflowSnapshot
DomainName string
}
// CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest
CreateWorkflowExecutionResponse struct {
MutableStateUpdateSessionStats *MutableStateUpdateSessionStats
}
// GetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
GetWorkflowExecutionRequest struct {
DomainID string
Execution types.WorkflowExecution
DomainName string
RangeID int64
}
// GetWorkflowExecutionResponse is the response to GetworkflowExecutionRequest
GetWorkflowExecutionResponse struct {
State *WorkflowMutableState
MutableStateStats *MutableStateStats
}
// GetCurrentExecutionRequest is used to retrieve the current RunId for an execution
GetCurrentExecutionRequest struct {
DomainID string
WorkflowID string
DomainName string
}
// ListCurrentExecutionsRequest is request to ListCurrentExecutions
ListCurrentExecutionsRequest struct {
PageSize int
PageToken []byte
}
// ListCurrentExecutionsResponse is the response to ListCurrentExecutionsRequest
ListCurrentExecutionsResponse struct {
Executions []*CurrentWorkflowExecution
PageToken []byte
}
// IsWorkflowExecutionExistsRequest is used to check if the concrete execution exists
IsWorkflowExecutionExistsRequest struct {
DomainID string
DomainName string
WorkflowID string
RunID string
}
// ListConcreteExecutionsRequest is request to ListConcreteExecutions
ListConcreteExecutionsRequest struct {
PageSize int
PageToken []byte
}
// ListConcreteExecutionsResponse is response to ListConcreteExecutions
ListConcreteExecutionsResponse struct {
Executions []*ListConcreteExecutionsEntity
PageToken []byte
}
// ListConcreteExecutionsEntity is a single entity in ListConcreteExecutionsResponse
ListConcreteExecutionsEntity struct {
ExecutionInfo *WorkflowExecutionInfo
VersionHistories *VersionHistories
}
// GetCurrentExecutionResponse is the response to GetCurrentExecution
GetCurrentExecutionResponse struct {
StartRequestID string
RunID string
State int
CloseStatus int
LastWriteVersion int64
}
// IsWorkflowExecutionExistsResponse is the response to IsWorkflowExecutionExists
IsWorkflowExecutionExistsResponse struct {
Exists bool
}
// UpdateWorkflowExecutionRequest is used to update a workflow execution
UpdateWorkflowExecutionRequest struct {
RangeID int64
Mode UpdateWorkflowMode
UpdateWorkflowMutation WorkflowMutation
NewWorkflowSnapshot *WorkflowSnapshot
Encoding common.EncodingType // optional binary encoding type
DomainName string
}
// ConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for a single run
ConflictResolveWorkflowExecutionRequest struct {
RangeID int64
Mode ConflictResolveWorkflowMode
// workflow to be resetted
ResetWorkflowSnapshot WorkflowSnapshot
// maybe new workflow
NewWorkflowSnapshot *WorkflowSnapshot
// current workflow
CurrentWorkflowMutation *WorkflowMutation
Encoding common.EncodingType // optional binary encoding type
DomainName string
}
// WorkflowEvents is used as generic workflow history events transaction container
WorkflowEvents struct {
DomainID string
WorkflowID string
RunID string
BranchToken []byte
Events []*types.HistoryEvent
}
// WorkflowMutation is used as generic workflow execution state mutation
WorkflowMutation struct {
ExecutionInfo *WorkflowExecutionInfo
ExecutionStats *ExecutionStats
VersionHistories *VersionHistories
UpsertActivityInfos []*ActivityInfo
DeleteActivityInfos []int64
UpsertTimerInfos []*TimerInfo
DeleteTimerInfos []string
UpsertChildExecutionInfos []*ChildExecutionInfo
DeleteChildExecutionInfos []int64
UpsertRequestCancelInfos []*RequestCancelInfo
DeleteRequestCancelInfos []int64
UpsertSignalInfos []*SignalInfo
DeleteSignalInfos []int64
UpsertSignalRequestedIDs []string
DeleteSignalRequestedIDs []string
NewBufferedEvents []*types.HistoryEvent
ClearBufferedEvents bool
TransferTasks []Task
CrossClusterTasks []Task
ReplicationTasks []Task
TimerTasks []Task
Condition int64
Checksum checksum.Checksum
}
// WorkflowSnapshot is used as generic workflow execution state snapshot
WorkflowSnapshot struct {
ExecutionInfo *WorkflowExecutionInfo
ExecutionStats *ExecutionStats
VersionHistories *VersionHistories
ActivityInfos []*ActivityInfo
TimerInfos []*TimerInfo
ChildExecutionInfos []*ChildExecutionInfo
RequestCancelInfos []*RequestCancelInfo
SignalInfos []*SignalInfo
SignalRequestedIDs []string
TransferTasks []Task
CrossClusterTasks []Task
ReplicationTasks []Task
TimerTasks []Task
Condition int64
Checksum checksum.Checksum
}
// DeleteWorkflowExecutionRequest is used to delete a workflow execution
DeleteWorkflowExecutionRequest struct {
DomainID string
WorkflowID string
RunID string
DomainName string
}
// DeleteCurrentWorkflowExecutionRequest is used to delete the current workflow execution
DeleteCurrentWorkflowExecutionRequest struct {
DomainID string
WorkflowID string
RunID string
DomainName string
}
// GetTransferTasksRequest is used to read tasks from the transfer task queue
GetTransferTasksRequest struct {
ReadLevel int64
MaxReadLevel int64
BatchSize int
NextPageToken []byte
}
// GetTransferTasksResponse is the response to GetTransferTasksRequest
GetTransferTasksResponse struct {
Tasks []*TransferTaskInfo
NextPageToken []byte
}
// GetCrossClusterTasksRequest is used to read tasks from the cross-cluster task queue
GetCrossClusterTasksRequest struct {
TargetCluster string
ReadLevel int64
MaxReadLevel int64
BatchSize int
NextPageToken []byte
}
// GetCrossClusterTasksResponse is the response to GetCrossClusterTasksRequest
GetCrossClusterTasksResponse struct {
Tasks []*CrossClusterTaskInfo
NextPageToken []byte
}
// GetReplicationTasksRequest is used to read tasks from the replication task queue
GetReplicationTasksRequest struct {
ReadLevel int64
MaxReadLevel int64
BatchSize int
NextPageToken []byte
}
// GetReplicationTasksResponse is the response to GetReplicationTask
GetReplicationTasksResponse struct {
Tasks []*ReplicationTaskInfo
NextPageToken []byte
}
// CompleteTransferTaskRequest is used to complete a task in the transfer task queue
CompleteTransferTaskRequest struct {
TaskID int64
}
// RangeCompleteTransferTaskRequest is used to complete a range of tasks in the transfer task queue
RangeCompleteTransferTaskRequest struct {
ExclusiveBeginTaskID int64
InclusiveEndTaskID int64
PageSize int
}
// RangeCompleteTransferTaskResponse is the response of RangeCompleteTransferTask
RangeCompleteTransferTaskResponse struct {
TasksCompleted int
}
// CompleteCrossClusterTaskRequest is used to complete a task in the cross-cluster task queue
CompleteCrossClusterTaskRequest struct {
TargetCluster string
TaskID int64
}
// RangeCompleteCrossClusterTaskRequest is used to complete a range of tasks in the cross-cluster task queue
RangeCompleteCrossClusterTaskRequest struct {
TargetCluster string
ExclusiveBeginTaskID int64
InclusiveEndTaskID int64
PageSize int
}
// RangeCompleteCrossClusterTaskResponse is the response of RangeCompleteCrossClusterTask
RangeCompleteCrossClusterTaskResponse struct {
TasksCompleted int
}
// CompleteReplicationTaskRequest is used to complete a task in the replication task queue
CompleteReplicationTaskRequest struct {
TaskID int64
}
// RangeCompleteReplicationTaskRequest is used to complete a range of task in the replication task queue
RangeCompleteReplicationTaskRequest struct {
InclusiveEndTaskID int64
PageSize int
}
// RangeCompleteReplicationTaskResponse is the response of RangeCompleteReplicationTask
RangeCompleteReplicationTaskResponse struct {
TasksCompleted int
}
// PutReplicationTaskToDLQRequest is used to put a replication task to dlq
PutReplicationTaskToDLQRequest struct {
SourceClusterName string
TaskInfo *ReplicationTaskInfo
DomainName string
}
// GetReplicationTasksFromDLQRequest is used to get replication tasks from dlq
GetReplicationTasksFromDLQRequest struct {
SourceClusterName string
GetReplicationTasksRequest
}
// GetReplicationDLQSizeRequest is used to get one replication task from dlq
GetReplicationDLQSizeRequest struct {
SourceClusterName string
}
// DeleteReplicationTaskFromDLQRequest is used to delete replication task from DLQ
DeleteReplicationTaskFromDLQRequest struct {
SourceClusterName string
TaskID int64
}
// RangeDeleteReplicationTaskFromDLQRequest is used to delete replication tasks from DLQ
RangeDeleteReplicationTaskFromDLQRequest struct {
SourceClusterName string
ExclusiveBeginTaskID int64
InclusiveEndTaskID int64
PageSize int
}
// RangeDeleteReplicationTaskFromDLQResponse is the response of RangeDeleteReplicationTaskFromDLQ
RangeDeleteReplicationTaskFromDLQResponse struct {
TasksCompleted int
}
// GetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ
GetReplicationTasksFromDLQResponse = GetReplicationTasksResponse
// GetReplicationDLQSizeResponse is the response for GetReplicationDLQSize
GetReplicationDLQSizeResponse struct {
Size int64
}
// RangeCompleteTimerTaskRequest is used to complete a range of tasks in the timer task queue
RangeCompleteTimerTaskRequest struct {
InclusiveBeginTimestamp time.Time
ExclusiveEndTimestamp time.Time
PageSize int
}
// RangeCompleteTimerTaskResponse is the response of RangeCompleteTimerTask
RangeCompleteTimerTaskResponse struct {
TasksCompleted int
}
// CompleteTimerTaskRequest is used to complete a task in the timer task queue
CompleteTimerTaskRequest struct {
VisibilityTimestamp time.Time
TaskID int64
}
// LeaseTaskListRequest is used to request lease of a task list
LeaseTaskListRequest struct {
DomainID string
DomainName string
TaskList string
TaskType int
TaskListKind int
RangeID int64
}
// LeaseTaskListResponse is response to LeaseTaskListRequest
LeaseTaskListResponse struct {
TaskListInfo *TaskListInfo
}
// UpdateTaskListRequest is used to update task list implementation information
UpdateTaskListRequest struct {
TaskListInfo *TaskListInfo
DomainName string
}
// UpdateTaskListResponse is the response to UpdateTaskList
UpdateTaskListResponse struct {
}
// ListTaskListRequest contains the request params needed to invoke ListTaskList API
ListTaskListRequest struct {
PageSize int
PageToken []byte
}
// ListTaskListResponse is the response from ListTaskList API
ListTaskListResponse struct {
Items []TaskListInfo
NextPageToken []byte
}
// DeleteTaskListRequest contains the request params needed to invoke DeleteTaskList API
DeleteTaskListRequest struct {
DomainID string
DomainName string
TaskListName string
TaskListType int
RangeID int64
}
GetTaskListSizeRequest struct {
DomainID string
DomainName string
TaskListName string
TaskListType int
AckLevel int64
}
GetTaskListSizeResponse struct {
Size int64
}
// CreateTasksRequest is used to create a new task for a workflow exectution
CreateTasksRequest struct {
TaskListInfo *TaskListInfo
Tasks []*CreateTaskInfo
DomainName string
}
// CreateTaskInfo describes a task to be created in CreateTasksRequest
CreateTaskInfo struct {
Execution types.WorkflowExecution
Data *TaskInfo
TaskID int64
}
// CreateTasksResponse is the response to CreateTasksRequest
CreateTasksResponse struct {
}
// GetTasksRequest is used to retrieve tasks of a task list
GetTasksRequest struct {
DomainID string
TaskList string
TaskType int
ReadLevel int64 // range exclusive
MaxReadLevel *int64 // optional: range inclusive when specified
BatchSize int
DomainName string
}
// GetTasksResponse is the response to GetTasksRequests
GetTasksResponse struct {
Tasks []*TaskInfo
}
// CompleteTaskRequest is used to complete a task
CompleteTaskRequest struct {
TaskList *TaskListInfo
TaskID int64
DomainName string
}
// CompleteTasksLessThanRequest contains the request params needed to invoke CompleteTasksLessThan API
CompleteTasksLessThanRequest struct {
DomainID string
TaskListName string
TaskType int
TaskID int64 // Tasks less than or equal to this ID will be completed
Limit int // Limit on the max number of tasks that can be completed. Required param
DomainName string
}
// CompleteTasksLessThanResponse is the response of CompleteTasksLessThan
CompleteTasksLessThanResponse struct {
TasksCompleted int
}
// GetOrphanTasksRequest contains the request params need to invoke the GetOrphanTasks API
GetOrphanTasksRequest struct {
Limit int
}
// GetOrphanTasksResponse is the response to GetOrphanTasksRequests
GetOrphanTasksResponse struct {
Tasks []*TaskKey
}
// GetTimerIndexTasksRequest is the request for GetTimerIndexTasks
// TODO: replace this with an iterator that can configure min and max index.
GetTimerIndexTasksRequest struct {
MinTimestamp time.Time
MaxTimestamp time.Time
BatchSize int
NextPageToken []byte
}
// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
NextPageToken []byte
}
// DomainInfo describes the domain entity
DomainInfo struct {
ID string
Name string
Status int
Description string
OwnerEmail string
Data map[string]string
}
// DomainConfig describes the domain configuration
DomainConfig struct {
// NOTE: this retention is in days, not in seconds
Retention int32
EmitMetric bool
HistoryArchivalStatus types.ArchivalStatus
HistoryArchivalURI string
VisibilityArchivalStatus types.ArchivalStatus
VisibilityArchivalURI string
BadBinaries types.BadBinaries
IsolationGroups types.IsolationGroupConfiguration
AsyncWorkflowConfig types.AsyncWorkflowConfiguration
}
// DomainReplicationConfig describes the cross DC domain replication configuration
DomainReplicationConfig struct {
ActiveClusterName string
Clusters []*ClusterReplicationConfig
}
// ClusterReplicationConfig describes the cross DC cluster replication configuration
ClusterReplicationConfig struct {
ClusterName string
// Note: if adding new properties of non-primitive types, remember to update GetCopy()
}
// CreateDomainRequest is used to create the domain
CreateDomainRequest struct {
Info *DomainInfo
Config *DomainConfig
ReplicationConfig *DomainReplicationConfig
IsGlobalDomain bool
ConfigVersion int64
FailoverVersion int64
LastUpdatedTime int64
}
// CreateDomainResponse is the response for CreateDomain
CreateDomainResponse struct {
ID string
}
// GetDomainRequest is used to read domain
GetDomainRequest struct {
ID string
Name string
}
// GetDomainResponse is the response for GetDomain
GetDomainResponse struct {
Info *DomainInfo
Config *DomainConfig
ReplicationConfig *DomainReplicationConfig
IsGlobalDomain bool
ConfigVersion int64
FailoverVersion int64
FailoverNotificationVersion int64
PreviousFailoverVersion int64
FailoverEndTime *int64
LastUpdatedTime int64
NotificationVersion int64
}
// UpdateDomainRequest is used to update domain
UpdateDomainRequest struct {
Info *DomainInfo
Config *DomainConfig
ReplicationConfig *DomainReplicationConfig
ConfigVersion int64
FailoverVersion int64
FailoverNotificationVersion int64
PreviousFailoverVersion int64
FailoverEndTime *int64
LastUpdatedTime int64
NotificationVersion int64
}
// DeleteDomainRequest is used to delete domain entry from domains table
DeleteDomainRequest struct {
ID string
}
// DeleteDomainByNameRequest is used to delete domain entry from domains_by_name table
DeleteDomainByNameRequest struct {
Name string
}
// ListDomainsRequest is used to list domains
ListDomainsRequest struct {
PageSize int
NextPageToken []byte
}
// ListDomainsResponse is the response for GetDomain
ListDomainsResponse struct {
Domains []*GetDomainResponse
NextPageToken []byte
}
// GetMetadataResponse is the response for GetMetadata
GetMetadataResponse struct {
NotificationVersion int64
}
// MutableStateStats is the size stats for MutableState
MutableStateStats struct {
// Total size of mutable state
MutableStateSize int
// Breakdown of size into more granular stats
ExecutionInfoSize int
ActivityInfoSize int
TimerInfoSize int
ChildInfoSize int
SignalInfoSize int
BufferedEventsSize int
// Item count for various information captured within mutable state
ActivityInfoCount int
TimerInfoCount int
ChildInfoCount int
SignalInfoCount int
RequestCancelInfoCount int
BufferedEventsCount int
}
// MutableStateUpdateSessionStats is size stats for mutableState updating session
MutableStateUpdateSessionStats struct {
MutableStateSize int // Total size of mutable state update
// Breakdown of mutable state size update for more granular stats
ExecutionInfoSize int
ActivityInfoSize int
TimerInfoSize int
ChildInfoSize int
SignalInfoSize int
BufferedEventsSize int
// Item counts in this session update
ActivityInfoCount int
TimerInfoCount int
ChildInfoCount int
SignalInfoCount int
RequestCancelInfoCount int
// Deleted item counts in this session update
DeleteActivityInfoCount int
DeleteTimerInfoCount int
DeleteChildInfoCount int
DeleteSignalInfoCount int
DeleteRequestCancelInfoCount int
TransferTasksCount int
CrossClusterTaskCount int
TimerTasksCount int
ReplicationTasksCount int
}
// UpdateWorkflowExecutionResponse is response for UpdateWorkflowExecutionRequest
UpdateWorkflowExecutionResponse struct {
MutableStateUpdateSessionStats *MutableStateUpdateSessionStats
}
// ConflictResolveWorkflowExecutionResponse is response for ConflictResolveWorkflowExecutionRequest
ConflictResolveWorkflowExecutionResponse struct {
MutableStateUpdateSessionStats *MutableStateUpdateSessionStats
}
// AppendHistoryNodesRequest is used to append a batch of history nodes
AppendHistoryNodesRequest struct {
// true if this is the first append request to the branch
IsNewBranch bool
// the info for clean up data in background
Info string
// The branch to be appended
BranchToken []byte
// The batch of events to be appended. The first eventID will become the nodeID of this batch
Events []*types.HistoryEvent
// requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins
TransactionID int64
// optional binary encoding type
Encoding common.EncodingType
// The shard to get history node data
ShardID *int
// DomainName to get metrics created with the domain
DomainName string
}
// AppendHistoryNodesResponse is a response to AppendHistoryNodesRequest
AppendHistoryNodesResponse struct {
// The data blob that was persisted to database
DataBlob DataBlob
}
// ReadHistoryBranchRequest is used to read a history branch
ReadHistoryBranchRequest struct {
// The branch to be read
BranchToken []byte
// Get the history nodes from MinEventID. Inclusive.
MinEventID int64
// Get the history nodes upto MaxEventID. Exclusive.
MaxEventID int64
// Maximum number of batches of events per page. Not that number of events in a batch >=1, it is not number of events per page.
// However for a single page, it is also possible that the returned events is less than PageSize (event zero events) due to stale events.
PageSize int
// Token to continue reading next page of history append transactions. Pass in empty slice for first page
NextPageToken []byte
// The shard to get history branch data
ShardID *int
DomainName string
}
// ReadHistoryBranchResponse is the response to ReadHistoryBranchRequest
ReadHistoryBranchResponse struct {
// History events
HistoryEvents []*types.HistoryEvent
// Token to read next page if there are more events beyond page size.
// Use this to set NextPageToken on ReadHistoryBranchRequest to read the next page.
// Empty means we have reached the last page, not need to continue
NextPageToken []byte
// Size of history read from store
Size int
// the first_event_id of last loaded batch
LastFirstEventID int64
}
// ReadHistoryBranchByBatchResponse is the response to ReadHistoryBranchRequest
ReadHistoryBranchByBatchResponse struct {
// History events by batch
History []*types.History
// Token to read next page if there are more events beyond page size.
// Use this to set NextPageToken on ReadHistoryBranchRequest to read the next page.
// Empty means we have reached the last page, not need to continue
NextPageToken []byte
// Size of history read from store
Size int
// the first_event_id of last loaded batch
LastFirstEventID int64
}
// ReadRawHistoryBranchResponse is the response to ReadHistoryBranchRequest
ReadRawHistoryBranchResponse struct {
// HistoryEventBlobs history event blobs
HistoryEventBlobs []*DataBlob
// Token to read next page if there are more events beyond page size.
// Use this to set NextPageToken on ReadHistoryBranchRequest to read the next page.
// Empty means we have reached the last page, not need to continue
NextPageToken []byte
// Size of history read from store
Size int
}
// ForkHistoryBranchRequest is used to fork a history branch
ForkHistoryBranchRequest struct {
// The base branch to fork from
ForkBranchToken []byte
// The nodeID to fork from, the new branch will start from ( inclusive ), the base branch will stop at(exclusive)
// Application must provide a void forking nodeID, it must be a valid nodeID in that branch. A valid nodeID is the firstEventID of a valid batch of events.
// And ForkNodeID > 1 because forking from 1 doesn't make any sense.
ForkNodeID int64
// the info for clean up data in background
Info string
// The shard to get history branch data
ShardID *int
// DomainName to create metrics for Domain Cost Attribution
DomainName string
}
// ForkHistoryBranchResponse is the response to ForkHistoryBranchRequest
ForkHistoryBranchResponse struct {
// branchToken to represent the new branch
NewBranchToken []byte
}
// CompleteForkBranchRequest is used to complete forking
CompleteForkBranchRequest struct {
// the new branch returned from ForkHistoryBranchRequest
BranchToken []byte
// true means the fork is success, will update the flag, otherwise will delete the new branch
Success bool
// The shard to update history branch data
ShardID *int
}
// DeleteHistoryBranchRequest is used to remove a history branch
DeleteHistoryBranchRequest struct {
// branch to be deleted
BranchToken []byte
// The shard to delete history branch data
ShardID *int
// DomainName to generate metrics for Domain Cost Attribution
DomainName string
}
// GetHistoryTreeRequest is used to retrieve branch info of a history tree
GetHistoryTreeRequest struct {
// A UUID of a tree
TreeID string
// Get data from this shard
ShardID *int
// optional: can provide treeID via branchToken if treeID is empty
BranchToken []byte
// DomainName to create metrics
DomainName string
}
// HistoryBranchDetail contains detailed information of a branch
HistoryBranchDetail struct {
TreeID string
BranchID string
ForkTime time.Time
Info string
}
// GetHistoryTreeResponse is a response to GetHistoryTreeRequest
GetHistoryTreeResponse struct {
// all branches of a tree
Branches []*workflow.HistoryBranch
}
// GetAllHistoryTreeBranchesRequest is a request of GetAllHistoryTreeBranches
GetAllHistoryTreeBranchesRequest struct {
// pagination token
NextPageToken []byte
// maximum number of branches returned per page
PageSize int
}
// GetAllHistoryTreeBranchesResponse is a response to GetAllHistoryTreeBranches
GetAllHistoryTreeBranchesResponse struct {
// pagination token
NextPageToken []byte
// all branches of all trees
Branches []HistoryBranchDetail
}
// CreateFailoverMarkersRequest is request to create failover markers
CreateFailoverMarkersRequest struct {
RangeID int64
Markers []*FailoverMarkerTask
}
// FetchDynamicConfigResponse is a response to FetchDynamicConfigResponse
FetchDynamicConfigResponse struct {
Snapshot *DynamicConfigSnapshot
}
// UpdateDynamicConfigRequest is a request to update dynamic config with snapshot
UpdateDynamicConfigRequest struct {
Snapshot *DynamicConfigSnapshot
}
DynamicConfigSnapshot struct {
Version int64
Values *types.DynamicConfigBlob
}
// Closeable is an interface for any entity that supports a close operation to release resources
Closeable interface {
Close()
}
// ShardManager is used to manage all shards
ShardManager interface {
Closeable
GetName() string
CreateShard(ctx context.Context, request *CreateShardRequest) error
GetShard(ctx context.Context, request *GetShardRequest) (*GetShardResponse, error)
UpdateShard(ctx context.Context, request *UpdateShardRequest) error
}
// ExecutionManager is used to manage workflow executions
ExecutionManager interface {
Closeable
GetName() string
GetShardID() int
CreateWorkflowExecution(ctx context.Context, request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
GetWorkflowExecution(ctx context.Context, request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error)
UpdateWorkflowExecution(ctx context.Context, request *UpdateWorkflowExecutionRequest) (*UpdateWorkflowExecutionResponse, error)
ConflictResolveWorkflowExecution(ctx context.Context, request *ConflictResolveWorkflowExecutionRequest) (*ConflictResolveWorkflowExecutionResponse, error)
DeleteWorkflowExecution(ctx context.Context, request *DeleteWorkflowExecutionRequest) error
DeleteCurrentWorkflowExecution(ctx context.Context, request *DeleteCurrentWorkflowExecutionRequest) error
GetCurrentExecution(ctx context.Context, request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
IsWorkflowExecutionExists(ctx context.Context, request *IsWorkflowExecutionExistsRequest) (*IsWorkflowExecutionExistsResponse, error)
// Transfer task related methods
GetTransferTasks(ctx context.Context, request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
CompleteTransferTask(ctx context.Context, request *CompleteTransferTaskRequest) error
RangeCompleteTransferTask(ctx context.Context, request *RangeCompleteTransferTaskRequest) (*RangeCompleteTransferTaskResponse, error)
// Cross-cluster related methods
GetCrossClusterTasks(ctx context.Context, request *GetCrossClusterTasksRequest) (*GetCrossClusterTasksResponse, error)
CompleteCrossClusterTask(ctx context.Context, request *CompleteCrossClusterTaskRequest) error
RangeCompleteCrossClusterTask(ctx context.Context, request *RangeCompleteCrossClusterTaskRequest) (*RangeCompleteCrossClusterTaskResponse, error)
// Replication task related methods
GetReplicationTasks(ctx context.Context, request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error)
CompleteReplicationTask(ctx context.Context, request *CompleteReplicationTaskRequest) error
RangeCompleteReplicationTask(ctx context.Context, request *RangeCompleteReplicationTaskRequest) (*RangeCompleteReplicationTaskResponse, error)
PutReplicationTaskToDLQ(ctx context.Context, request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(ctx context.Context, request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
GetReplicationDLQSize(ctx context.Context, request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error)
DeleteReplicationTaskFromDLQ(ctx context.Context, request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(ctx context.Context, request *RangeDeleteReplicationTaskFromDLQRequest) (*RangeDeleteReplicationTaskFromDLQResponse, error)
CreateFailoverMarkerTasks(ctx context.Context, request *CreateFailoverMarkersRequest) error
// Timer related methods.
GetTimerIndexTasks(ctx context.Context, request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
CompleteTimerTask(ctx context.Context, request *CompleteTimerTaskRequest) error
RangeCompleteTimerTask(ctx context.Context, request *RangeCompleteTimerTaskRequest) (*RangeCompleteTimerTaskResponse, error)
// Scan operations
ListConcreteExecutions(ctx context.Context, request *ListConcreteExecutionsRequest) (*ListConcreteExecutionsResponse, error)
ListCurrentExecutions(ctx context.Context, request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error)
}
// ExecutionManagerFactory creates an instance of ExecutionManager for a given shard
ExecutionManagerFactory interface {
Closeable
NewExecutionManager(shardID int) (ExecutionManager, error)
}
// TaskManager is used to manage tasks
TaskManager interface {
Closeable
GetName() string
LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error)
DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error
GetTaskListSize(ctx context.Context, request *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error)
CreateTasks(ctx context.Context, request *CreateTasksRequest) (*CreateTasksResponse, error)
GetTasks(ctx context.Context, request *GetTasksRequest) (*GetTasksResponse, error)
CompleteTask(ctx context.Context, request *CompleteTaskRequest) error
CompleteTasksLessThan(ctx context.Context, request *CompleteTasksLessThanRequest) (*CompleteTasksLessThanResponse, error)
GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error)
}
// HistoryManager is used to manager workflow history events
HistoryManager interface {
Closeable
GetName() string
// The below are history V2 APIs
// V2 regards history events growing as a tree, decoupled from workflow concepts
// For Cadence, treeID is new runID, except for fork(reset), treeID will be the runID that it forks from.
// AppendHistoryNodes add(or override) a batch of nodes to a history branch
AppendHistoryNodes(ctx context.Context, request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error)
// ReadHistoryBranchByBatch returns history node data for a branch ByBatch
ReadHistoryBranchByBatch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadHistoryBranchByBatchResponse, error)
// ReadRawHistoryBranch returns history node raw data for a branch ByBatch
// NOTE: this API should only be used by 3+DC
ReadRawHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadRawHistoryBranchResponse, error)
// ForkHistoryBranch forks a new branch from a old branch
ForkHistoryBranch(ctx context.Context, request *ForkHistoryBranchRequest) (*ForkHistoryBranchResponse, error)
// DeleteHistoryBranch removes a branch
// If this is the last branch to delete, it will also remove the root node
DeleteHistoryBranch(ctx context.Context, request *DeleteHistoryBranchRequest) error
// GetHistoryTree returns all branch information of a tree
GetHistoryTree(ctx context.Context, request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error)
// GetAllHistoryTreeBranches returns all branches of all trees
GetAllHistoryTreeBranches(ctx context.Context, request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error)
}
// DomainManager is used to manage metadata CRUD for domain entities
DomainManager interface {
Closeable
GetName() string
CreateDomain(ctx context.Context, request *CreateDomainRequest) (*CreateDomainResponse, error)
GetDomain(ctx context.Context, request *GetDomainRequest) (*GetDomainResponse, error)
UpdateDomain(ctx context.Context, request *UpdateDomainRequest) error
DeleteDomain(ctx context.Context, request *DeleteDomainRequest) error
DeleteDomainByName(ctx context.Context, request *DeleteDomainByNameRequest) error
ListDomains(ctx context.Context, request *ListDomainsRequest) (*ListDomainsResponse, error)
GetMetadata(ctx context.Context) (*GetMetadataResponse, error)
}
// QueueManager is used to manage queue store
QueueManager interface {
Closeable
EnqueueMessage(ctx context.Context, messagePayload []byte) error
ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) (QueueMessageList, error)
DeleteMessagesBefore(ctx context.Context, messageID int64) error
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error
GetAckLevels(ctx context.Context) (map[string]int64, error)
EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte) error
ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*QueueMessage, []byte, error)
DeleteMessageFromDLQ(ctx context.Context, messageID int64) error
RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error
UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error
GetDLQAckLevels(ctx context.Context) (map[string]int64, error)
GetDLQSize(ctx context.Context) (int64, error)
}
// QueueMessage is the message that stores in the queue
QueueMessage struct {
ID int64 `json:"message_id"`
QueueType QueueType `json:"queue_type"`
Payload []byte `json:"message_payload"`
}
QueueMessageList []*QueueMessage
ConfigStoreManager interface {
Closeable
FetchDynamicConfig(ctx context.Context, cfgType ConfigType) (*FetchDynamicConfigResponse, error)
UpdateDynamicConfig(ctx context.Context, request *UpdateDynamicConfigRequest, cfgType ConfigType) error
// can add functions for config types other than dynamic config
}
)
// IsTimeoutError check whether error is TimeoutError
func IsTimeoutError(err error) bool {
_, ok := err.(*TimeoutError)
return ok
}
// GetTaskID returns the task ID for transfer task
func (t *TransferTaskInfo) GetTaskID() int64 {
return t.TaskID
}
// GetVersion returns the task version for transfer task
func (t *TransferTaskInfo) GetVersion() int64 {
return t.Version
}
// GetTaskType returns the task type for transfer task
func (t *TransferTaskInfo) GetTaskType() int {
return t.TaskType
}
// GetVisibilityTimestamp returns the task type for transfer task
func (t *TransferTaskInfo) GetVisibilityTimestamp() time.Time {
return t.VisibilityTimestamp
}
// GetWorkflowID returns the workflow ID for transfer task
func (t *TransferTaskInfo) GetWorkflowID() string {
return t.WorkflowID
}
// GetRunID returns the run ID for transfer task
func (t *TransferTaskInfo) GetRunID() string {
return t.RunID
}
// GetTargetDomainIDs returns the targetDomainIDs for applyParentPolicy
func (t *TransferTaskInfo) GetTargetDomainIDs() map[string]struct{} {
return t.TargetDomainIDs
}
// GetDomainID returns the domain ID for transfer task
func (t *TransferTaskInfo) GetDomainID() string {
return t.DomainID
}
// String returns a string representation for transfer task
func (t *TransferTaskInfo) String() string {
return fmt.Sprintf("%#v", t)
}
// GetTaskID returns the task ID for replication task
func (t *ReplicationTaskInfo) GetTaskID() int64 {
return t.TaskID
}
// GetVersion returns the task version for replication task
func (t *ReplicationTaskInfo) GetVersion() int64 {
return t.Version
}
// GetTaskType returns the task type for replication task
func (t *ReplicationTaskInfo) GetTaskType() int {
return t.TaskType
}
// GetVisibilityTimestamp returns the task type for replication task
func (t *ReplicationTaskInfo) GetVisibilityTimestamp() time.Time {
return time.Time{}
}
// GetWorkflowID returns the workflow ID for replication task
func (t *ReplicationTaskInfo) GetWorkflowID() string {
return t.WorkflowID
}
// GetRunID returns the run ID for replication task
func (t *ReplicationTaskInfo) GetRunID() string {
return t.RunID
}
// GetDomainID returns the domain ID for replication task
func (t *ReplicationTaskInfo) GetDomainID() string {
return t.DomainID
}
// GetTaskID returns the task ID for timer task
func (t *TimerTaskInfo) GetTaskID() int64 {
return t.TaskID
}
// GetVersion returns the task version for timer task
func (t *TimerTaskInfo) GetVersion() int64 {
return t.Version
}
// GetTaskType returns the task type for timer task
func (t *TimerTaskInfo) GetTaskType() int {
return t.TaskType
}
// GetVisibilityTimestamp returns the task type for timer task
func (t *TimerTaskInfo) GetVisibilityTimestamp() time.Time {
return t.VisibilityTimestamp
}
// GetWorkflowID returns the workflow ID for timer task
func (t *TimerTaskInfo) GetWorkflowID() string {
return t.WorkflowID
}
// GetRunID returns the run ID for timer task
func (t *TimerTaskInfo) GetRunID() string {
return t.RunID
}
// GetDomainID returns the domain ID for timer task
func (t *TimerTaskInfo) GetDomainID() string {
return t.DomainID
}
// String returns a string representation for timer task
func (t *TimerTaskInfo) String() string {
return fmt.Sprintf(
"{DomainID: %v, WorkflowID: %v, RunID: %v, VisibilityTimestamp: %v, TaskID: %v, TaskType: %v, TimeoutType: %v, EventID: %v, ScheduleAttempt: %v, Version: %v.}",
t.DomainID, t.WorkflowID, t.RunID, t.VisibilityTimestamp, t.TaskID, t.TaskType, t.TimeoutType, t.EventID, t.ScheduleAttempt, t.Version,
)
}
// Copy returns a shallow copy of shardInfo
func (s *ShardInfo) Copy() *ShardInfo {
// TODO: do we really need to deep copy those fields?
clusterTransferAckLevel := make(map[string]int64)
for k, v := range s.ClusterTransferAckLevel {
clusterTransferAckLevel[k] = v
}
clusterTimerAckLevel := make(map[string]time.Time)
for k, v := range s.ClusterTimerAckLevel {
clusterTimerAckLevel[k] = v
}
clusterReplicationLevel := make(map[string]int64)
for k, v := range s.ClusterReplicationLevel {
clusterReplicationLevel[k] = v
}
replicationDLQAckLevel := make(map[string]int64)
for k, v := range s.ReplicationDLQAckLevel {
replicationDLQAckLevel[k] = v
}
return &ShardInfo{
ShardID: s.ShardID,
Owner: s.Owner,
RangeID: s.RangeID,
StolenSinceRenew: s.StolenSinceRenew,
ReplicationAckLevel: s.ReplicationAckLevel,
TransferAckLevel: s.TransferAckLevel,
TimerAckLevel: s.TimerAckLevel,
ClusterTransferAckLevel: clusterTransferAckLevel,
ClusterTimerAckLevel: clusterTimerAckLevel,
TransferProcessingQueueStates: s.TransferProcessingQueueStates,
CrossClusterProcessingQueueStates: s.CrossClusterProcessingQueueStates,
TimerProcessingQueueStates: s.TimerProcessingQueueStates,
DomainNotificationVersion: s.DomainNotificationVersion,
ClusterReplicationLevel: clusterReplicationLevel,
ReplicationDLQAckLevel: replicationDLQAckLevel,
PendingFailoverMarkers: s.PendingFailoverMarkers,
UpdatedAt: s.UpdatedAt,
}
}
// SerializeClusterConfigs makes an array of *ClusterReplicationConfig serializable
// by flattening them into map[string]interface{}
func SerializeClusterConfigs(replicationConfigs []*ClusterReplicationConfig) []map[string]interface{} {
seriaizedReplicationConfigs := []map[string]interface{}{}
for index := range replicationConfigs {
seriaizedReplicationConfigs = append(seriaizedReplicationConfigs, replicationConfigs[index].serialize())
}
return seriaizedReplicationConfigs
}
// DeserializeClusterConfigs creates an array of ClusterReplicationConfigs from an array of map representations
func DeserializeClusterConfigs(replicationConfigs []map[string]interface{}) []*ClusterReplicationConfig {
deseriaizedReplicationConfigs := []*ClusterReplicationConfig{}
for index := range replicationConfigs {
deseriaizedReplicationConfig := &ClusterReplicationConfig{}
deseriaizedReplicationConfig.deserialize(replicationConfigs[index])
deseriaizedReplicationConfigs = append(deseriaizedReplicationConfigs, deseriaizedReplicationConfig)
}
return deseriaizedReplicationConfigs
}
func (config *ClusterReplicationConfig) serialize() map[string]interface{} {
output := make(map[string]interface{})
output["cluster_name"] = config.ClusterName
return output
}
func (config *ClusterReplicationConfig) deserialize(input map[string]interface{}) {
config.ClusterName = input["cluster_name"].(string)
}
// GetCopy return a copy of ClusterReplicationConfig
func (config *ClusterReplicationConfig) GetCopy() *ClusterReplicationConfig {
res := *config
return &res
}
// DBTimestampToUnixNano converts Milliseconds timestamp to UnixNano
func DBTimestampToUnixNano(milliseconds int64) int64 {
return milliseconds * 1000 * 1000 // Milliseconds are 10⁻³, nanoseconds are 10⁻⁹, (-3) - (-9) = 6, so multiply by 10⁶
}
// UnixNanoToDBTimestamp converts UnixNano to Milliseconds timestamp
func UnixNanoToDBTimestamp(timestamp int64) int64 {
return timestamp / (1000 * 1000) // Milliseconds are 10⁻³, nanoseconds are 10⁻⁹, (-9) - (-3) = -6, so divide by 10⁶
}
var internalThriftEncoder = codec.NewThriftRWEncoder()
// NewHistoryBranchToken return a new branch token
func NewHistoryBranchToken(treeID string) ([]byte, error) {
branchID := uuid.New()
bi := &workflow.HistoryBranch{
TreeID: &treeID,
BranchID: &branchID,
Ancestors: []*workflow.HistoryBranchRange{},
}
token, err := internalThriftEncoder.Encode(bi)
if err != nil {
return nil, err
}
return token, nil
}
// NewHistoryBranchTokenByBranchID return a new branch token with treeID/branchID
func NewHistoryBranchTokenByBranchID(treeID, branchID string) ([]byte, error) {
bi := &workflow.HistoryBranch{
TreeID: &treeID,
BranchID: &branchID,
Ancestors: []*workflow.HistoryBranchRange{},
}
token, err := internalThriftEncoder.Encode(bi)
if err != nil {
return nil, err
}
return token, nil
}
// NewHistoryBranchTokenFromAnother make up a branchToken
func NewHistoryBranchTokenFromAnother(branchID string, anotherToken []byte) ([]byte, error) {
var branch workflow.HistoryBranch
err := internalThriftEncoder.Decode(anotherToken, &branch)
if err != nil {
return nil, err
}
bi := &workflow.HistoryBranch{
TreeID: branch.TreeID,
BranchID: &branchID,
Ancestors: []*workflow.HistoryBranchRange{},
}
token, err := internalThriftEncoder.Encode(bi)
if err != nil {
return nil, err
}
return token, nil
}
// BuildHistoryGarbageCleanupInfo combine the workflow identity information into a string
func BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID string) string {
return fmt.Sprintf("%v:%v:%v", domainID, workflowID, runID)
}
// SplitHistoryGarbageCleanupInfo returns workflow identity information
func SplitHistoryGarbageCleanupInfo(info string) (domainID, workflowID, runID string, err error) {
ss := strings.Split(info, ":")
// workflowID can contain ":" so len(ss) can be greater than 3
if len(ss) < numItemsInGarbageInfo {
return "", "", "", fmt.Errorf("not able to split info for %s", info)
}
domainID = ss[0]
runID = ss[len(ss)-1]
workflowEnd := len(info) - len(runID) - 1
workflowID = info[len(domainID)+1 : workflowEnd]
return
}
// NewGetReplicationTasksFromDLQRequest creates a new GetReplicationTasksFromDLQRequest
func NewGetReplicationTasksFromDLQRequest(
sourceClusterName string,
readLevel int64,
maxReadLevel int64,
batchSize int,
nextPageToken []byte,
) *GetReplicationTasksFromDLQRequest {
return &GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceClusterName,
GetReplicationTasksRequest: GetReplicationTasksRequest{
ReadLevel: readLevel,
MaxReadLevel: maxReadLevel,
BatchSize: batchSize,
NextPageToken: nextPageToken,
},
}
}
// IsTransientError checks if the error is a transient persistence error
func IsTransientError(err error) bool {
switch err.(type) {
case *types.InternalServiceError, *types.ServiceBusyError, *TimeoutError:
return true
}
return false
}
// IsBackgroundTransientError checks if the error is a transient error on background jobs
func IsBackgroundTransientError(err error) bool {
switch err.(type) {
case *types.InternalServiceError, *TimeoutError:
return true
}
return false
}
// HasMoreRowsToDelete checks if there is more data need to be deleted
func HasMoreRowsToDelete(rowsDeleted, batchSize int) bool {
if rowsDeleted < batchSize || // all target tasks are deleted
rowsDeleted == UnknownNumRowsAffected || // underlying database does not support rows affected, so pageSize is not honored and all target tasks are deleted
rowsDeleted > batchSize { // pageSize is not honored and all tasks are deleted
return false
}
return true
}