common/persistence/data_store_interfaces.go (738 lines of code) (raw):

// Copyright (c) 2017-2020 Uber Technologies, Inc. // Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package persistence import ( "context" "fmt" "time" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/checksum" "github.com/uber/cadence/common/types" ) //go:generate mockgen -package $GOPACKAGE -destination data_store_interfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence ExecutionStore //go:generate mockgen -package $GOPACKAGE -destination visibility_store_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence VisibilityStore type ( // //////////////////////////////////////////////////////////////////// // Persistence interface is a lower layer of dataInterface. // The intention is to let different persistence implementation(SQL,Cassandra/etc) share some common logic // Right now the only common part is serialization/deserialization, and only ExecutionManager/HistoryManager need it. // TaskManager are the same. // //////////////////////////////////////////////////////////////////// // ShardStore is the lower level of ShardManager ShardStore interface { Closeable GetName() string CreateShard(ctx context.Context, request *InternalCreateShardRequest) error GetShard(ctx context.Context, request *InternalGetShardRequest) (*InternalGetShardResponse, error) UpdateShard(ctx context.Context, request *InternalUpdateShardRequest) error } // TaskStore is a lower level of TaskManager TaskStore interface { Closeable GetName() string LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error) UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error) ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error) DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error GetTaskListSize(ctx context.Context, request *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error) CreateTasks(ctx context.Context, request *InternalCreateTasksRequest) (*CreateTasksResponse, error) GetTasks(ctx context.Context, request *GetTasksRequest) (*InternalGetTasksResponse, error) CompleteTask(ctx context.Context, request *CompleteTaskRequest) error // CompleteTasksLessThan completes tasks less than or equal to the given task id // This API takes a limit parameter which specifies the count of maxRows that // can be deleted. This parameter may be ignored by the underlying storage, but // its mandatory to specify it. On success this method returns the number of rows // actually deleted. If the underlying storage doesn't support "limit", all rows // less than or equal to taskID will be deleted. // On success, this method returns: // - number of rows actually deleted, if limit is honored // - UnknownNumRowsDeleted, when all rows below value are deleted CompleteTasksLessThan(ctx context.Context, request *CompleteTasksLessThanRequest) (*CompleteTasksLessThanResponse, error) // GetOrphanTasks returns tasks that exist as records in the database but are part of task lists which // _do not_ exist in the database. They are therefore unreachable and no longer represent valid items // that can be legitimately acted upon. GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error) } // DomainStore is a lower level of DomainManager DomainStore interface { Closeable GetName() string CreateDomain(ctx context.Context, request *InternalCreateDomainRequest) (*CreateDomainResponse, error) GetDomain(ctx context.Context, request *GetDomainRequest) (*InternalGetDomainResponse, error) UpdateDomain(ctx context.Context, request *InternalUpdateDomainRequest) error DeleteDomain(ctx context.Context, request *DeleteDomainRequest) error DeleteDomainByName(ctx context.Context, request *DeleteDomainByNameRequest) error ListDomains(ctx context.Context, request *ListDomainsRequest) (*InternalListDomainsResponse, error) GetMetadata(ctx context.Context) (*GetMetadataResponse, error) } // ExecutionStore is used to manage workflow executions for Persistence layer ExecutionStore interface { Closeable GetName() string GetShardID() int // The below three APIs are related to serialization/deserialization GetWorkflowExecution(ctx context.Context, request *InternalGetWorkflowExecutionRequest) (*InternalGetWorkflowExecutionResponse, error) UpdateWorkflowExecution(ctx context.Context, request *InternalUpdateWorkflowExecutionRequest) error ConflictResolveWorkflowExecution(ctx context.Context, request *InternalConflictResolveWorkflowExecutionRequest) error CreateWorkflowExecution(ctx context.Context, request *InternalCreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error) DeleteWorkflowExecution(ctx context.Context, request *DeleteWorkflowExecutionRequest) error DeleteCurrentWorkflowExecution(ctx context.Context, request *DeleteCurrentWorkflowExecutionRequest) error GetCurrentExecution(ctx context.Context, request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error) IsWorkflowExecutionExists(ctx context.Context, request *IsWorkflowExecutionExistsRequest) (*IsWorkflowExecutionExistsResponse, error) // Transfer task related methods GetTransferTasks(ctx context.Context, request *GetTransferTasksRequest) (*GetTransferTasksResponse, error) CompleteTransferTask(ctx context.Context, request *CompleteTransferTaskRequest) error RangeCompleteTransferTask(ctx context.Context, request *RangeCompleteTransferTaskRequest) (*RangeCompleteTransferTaskResponse, error) // Cross-cluster task related methods GetCrossClusterTasks(ctx context.Context, request *GetCrossClusterTasksRequest) (*GetCrossClusterTasksResponse, error) CompleteCrossClusterTask(ctx context.Context, request *CompleteCrossClusterTaskRequest) error RangeCompleteCrossClusterTask(ctx context.Context, request *RangeCompleteCrossClusterTaskRequest) (*RangeCompleteCrossClusterTaskResponse, error) // Replication task related methods GetReplicationTasks(ctx context.Context, request *GetReplicationTasksRequest) (*InternalGetReplicationTasksResponse, error) CompleteReplicationTask(ctx context.Context, request *CompleteReplicationTaskRequest) error RangeCompleteReplicationTask(ctx context.Context, request *RangeCompleteReplicationTaskRequest) (*RangeCompleteReplicationTaskResponse, error) PutReplicationTaskToDLQ(ctx context.Context, request *InternalPutReplicationTaskToDLQRequest) error GetReplicationTasksFromDLQ(ctx context.Context, request *GetReplicationTasksFromDLQRequest) (*InternalGetReplicationTasksFromDLQResponse, error) GetReplicationDLQSize(ctx context.Context, request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error) DeleteReplicationTaskFromDLQ(ctx context.Context, request *DeleteReplicationTaskFromDLQRequest) error RangeDeleteReplicationTaskFromDLQ(ctx context.Context, request *RangeDeleteReplicationTaskFromDLQRequest) (*RangeDeleteReplicationTaskFromDLQResponse, error) CreateFailoverMarkerTasks(ctx context.Context, request *CreateFailoverMarkersRequest) error // Timer related methods. GetTimerIndexTasks(ctx context.Context, request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) CompleteTimerTask(ctx context.Context, request *CompleteTimerTaskRequest) error RangeCompleteTimerTask(ctx context.Context, request *RangeCompleteTimerTaskRequest) (*RangeCompleteTimerTaskResponse, error) // Scan related methods ListConcreteExecutions(ctx context.Context, request *ListConcreteExecutionsRequest) (*InternalListConcreteExecutionsResponse, error) ListCurrentExecutions(ctx context.Context, request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error) } // HistoryStore is to manager workflow history events HistoryStore interface { Closeable GetName() string // The below are history V2 APIs // V2 regards history events growing as a tree, decoupled from workflow concepts // AppendHistoryNodes add(or override) a node to a history branch AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error // ReadHistoryBranch returns history node data for a branch ReadHistoryBranch(ctx context.Context, request *InternalReadHistoryBranchRequest) (*InternalReadHistoryBranchResponse, error) // ForkHistoryBranch forks a new branch from a old branch ForkHistoryBranch(ctx context.Context, request *InternalForkHistoryBranchRequest) (*InternalForkHistoryBranchResponse, error) // DeleteHistoryBranch removes a branch DeleteHistoryBranch(ctx context.Context, request *InternalDeleteHistoryBranchRequest) error // GetHistoryTree returns all branch information of a tree GetHistoryTree(ctx context.Context, request *InternalGetHistoryTreeRequest) (*InternalGetHistoryTreeResponse, error) // GetAllHistoryTreeBranches returns all branches of all trees GetAllHistoryTreeBranches(ctx context.Context, request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error) } // VisibilityStore is the store interface for visibility VisibilityStore interface { Closeable GetName() string RecordWorkflowExecutionStarted(ctx context.Context, request *InternalRecordWorkflowExecutionStartedRequest) error RecordWorkflowExecutionClosed(ctx context.Context, request *InternalRecordWorkflowExecutionClosedRequest) error RecordWorkflowExecutionUninitialized(ctx context.Context, request *InternalRecordWorkflowExecutionUninitializedRequest) error UpsertWorkflowExecution(ctx context.Context, request *InternalUpsertWorkflowExecutionRequest) error ListOpenWorkflowExecutions(ctx context.Context, request *InternalListWorkflowExecutionsRequest) (*InternalListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutions(ctx context.Context, request *InternalListWorkflowExecutionsRequest) (*InternalListWorkflowExecutionsResponse, error) ListOpenWorkflowExecutionsByType(ctx context.Context, request *InternalListWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutionsByType(ctx context.Context, request *InternalListWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error) ListOpenWorkflowExecutionsByWorkflowID(ctx context.Context, request *InternalListWorkflowExecutionsByWorkflowIDRequest) (*InternalListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutionsByWorkflowID(ctx context.Context, request *InternalListWorkflowExecutionsByWorkflowIDRequest) (*InternalListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutionsByStatus(ctx context.Context, request *InternalListClosedWorkflowExecutionsByStatusRequest) (*InternalListWorkflowExecutionsResponse, error) GetClosedWorkflowExecution(ctx context.Context, request *InternalGetClosedWorkflowExecutionRequest) (*InternalGetClosedWorkflowExecutionResponse, error) DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error) ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error) CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error) DeleteUninitializedWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error } ConfigStore interface { Closeable FetchConfig(ctx context.Context, configType ConfigType) (*InternalConfigStoreEntry, error) UpdateConfig(ctx context.Context, value *InternalConfigStoreEntry) error } InternalConfigStoreEntry struct { RowType int Version int64 Timestamp time.Time Values *DataBlob } // Queue is a store to enqueue and get messages Queue interface { Closeable EnqueueMessage(ctx context.Context, messagePayload []byte) error ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*InternalQueueMessage, error) DeleteMessagesBefore(ctx context.Context, messageID int64) error UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error GetAckLevels(ctx context.Context) (map[string]int64, error) EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte) error ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*InternalQueueMessage, []byte, error) DeleteMessageFromDLQ(ctx context.Context, messageID int64) error RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error GetDLQAckLevels(ctx context.Context) (map[string]int64, error) GetDLQSize(ctx context.Context) (int64, error) } // InternalQueueMessage is the message that stores in the queue InternalQueueMessage struct { ID int64 `json:"message_id"` QueueType QueueType `json:"queue_type"` Payload []byte `json:"message_payload"` } // DataBlob represents a blob for any binary data. // It contains raw data, and metadata(right now only encoding) in other field // Note that it should be only used for Persistence layer, below dataInterface and application(historyEngine/etc) DataBlob struct { Encoding common.EncodingType Data []byte } // InternalCreateWorkflowExecutionRequest is used to write a new workflow execution InternalCreateWorkflowExecutionRequest struct { RangeID int64 Mode CreateWorkflowMode PreviousRunID string PreviousLastWriteVersion int64 NewWorkflowSnapshot InternalWorkflowSnapshot } // InternalGetReplicationTasksResponse is the response to GetReplicationTask InternalGetReplicationTasksResponse struct { Tasks []*InternalReplicationTaskInfo NextPageToken []byte } // InternalPutReplicationTaskToDLQRequest is used to put a replication task to dlq InternalPutReplicationTaskToDLQRequest struct { SourceClusterName string TaskInfo *InternalReplicationTaskInfo } // InternalGetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ InternalGetReplicationTasksFromDLQResponse = InternalGetReplicationTasksResponse // InternalReplicationTaskInfo describes the replication task created for replication of history events InternalReplicationTaskInfo struct { DomainID string WorkflowID string RunID string TaskID int64 TaskType int FirstEventID int64 NextEventID int64 Version int64 ScheduledID int64 BranchToken []byte NewRunBranchToken []byte CreationTime time.Time } // InternalWorkflowExecutionInfo describes a workflow execution for Persistence Interface InternalWorkflowExecutionInfo struct { DomainID string WorkflowID string RunID string FirstExecutionRunID string ParentDomainID string ParentWorkflowID string ParentRunID string InitiatedID int64 CompletionEventBatchID int64 CompletionEvent *DataBlob TaskList string WorkflowTypeName string WorkflowTimeout time.Duration DecisionStartToCloseTimeout time.Duration ExecutionContext []byte State int CloseStatus int LastFirstEventID int64 LastEventTaskID int64 NextEventID int64 LastProcessedEvent int64 StartTimestamp time.Time LastUpdatedTimestamp time.Time CreateRequestID string SignalCount int32 DecisionVersion int64 DecisionScheduleID int64 DecisionStartedID int64 DecisionRequestID string DecisionTimeout time.Duration DecisionAttempt int64 DecisionStartedTimestamp time.Time DecisionScheduledTimestamp time.Time DecisionOriginalScheduledTimestamp time.Time CancelRequested bool CancelRequestID string StickyTaskList string StickyScheduleToStartTimeout time.Duration ClientLibraryVersion string ClientFeatureVersion string ClientImpl string AutoResetPoints *DataBlob // for retry Attempt int32 HasRetryPolicy bool InitialInterval time.Duration BackoffCoefficient float64 MaximumInterval time.Duration ExpirationTime time.Time MaximumAttempts int32 NonRetriableErrors []string BranchToken []byte CronSchedule string ExpirationInterval time.Duration Memo map[string][]byte SearchAttributes map[string][]byte PartitionConfig map[string]string // attributes which are not related to mutable state at all HistorySize int64 IsCron bool } // InternalWorkflowMutableState indicates workflow related state for Persistence Interface InternalWorkflowMutableState struct { ExecutionInfo *InternalWorkflowExecutionInfo VersionHistories *DataBlob ReplicationState *ReplicationState // TODO: remove this after all 2DC workflows complete ActivityInfos map[int64]*InternalActivityInfo TimerInfos map[string]*TimerInfo ChildExecutionInfos map[int64]*InternalChildExecutionInfo RequestCancelInfos map[int64]*RequestCancelInfo SignalInfos map[int64]*SignalInfo SignalRequestedIDs map[string]struct{} BufferedEvents []*DataBlob // Checksum field is used by Cassandra storage // ChecksumData is used by All SQL storage Checksum checksum.Checksum ChecksumData *DataBlob } // InternalActivityInfo details for Persistence Interface InternalActivityInfo struct { Version int64 ScheduleID int64 ScheduledEventBatchID int64 ScheduledEvent *DataBlob ScheduledTime time.Time StartedID int64 StartedEvent *DataBlob StartedTime time.Time ActivityID string RequestID string Details []byte ScheduleToStartTimeout time.Duration ScheduleToCloseTimeout time.Duration StartToCloseTimeout time.Duration HeartbeatTimeout time.Duration CancelRequested bool CancelRequestID int64 LastHeartBeatUpdatedTime time.Time TimerTaskStatus int32 // For retry Attempt int32 DomainID string StartedIdentity string TaskList string HasRetryPolicy bool InitialInterval time.Duration BackoffCoefficient float64 MaximumInterval time.Duration ExpirationTime time.Time MaximumAttempts int32 NonRetriableErrors []string LastFailureReason string LastWorkerIdentity string LastFailureDetails []byte // Not written to database - This is used only for deduping heartbeat timer creation LastHeartbeatTimeoutVisibilityInSeconds int64 } // InternalChildExecutionInfo has details for pending child executions for Persistence Interface InternalChildExecutionInfo struct { Version int64 InitiatedID int64 InitiatedEventBatchID int64 InitiatedEvent *DataBlob StartedID int64 StartedWorkflowID string StartedRunID string StartedEvent *DataBlob CreateRequestID string DomainID string DomainNameDEPRECATED string // deprecated: use DomainID field WorkflowTypeName string ParentClosePolicy types.ParentClosePolicy } // InternalUpdateWorkflowExecutionRequest is used to update a workflow execution for Persistence Interface InternalUpdateWorkflowExecutionRequest struct { RangeID int64 Mode UpdateWorkflowMode UpdateWorkflowMutation InternalWorkflowMutation NewWorkflowSnapshot *InternalWorkflowSnapshot } // InternalConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface InternalConflictResolveWorkflowExecutionRequest struct { RangeID int64 Mode ConflictResolveWorkflowMode // workflow to be resetted ResetWorkflowSnapshot InternalWorkflowSnapshot // maybe new workflow NewWorkflowSnapshot *InternalWorkflowSnapshot // current workflow CurrentWorkflowMutation *InternalWorkflowMutation } // InternalWorkflowMutation is used as generic workflow execution state mutation for Persistence Interface InternalWorkflowMutation struct { ExecutionInfo *InternalWorkflowExecutionInfo VersionHistories *DataBlob StartVersion int64 LastWriteVersion int64 UpsertActivityInfos []*InternalActivityInfo DeleteActivityInfos []int64 UpsertTimerInfos []*TimerInfo DeleteTimerInfos []string UpsertChildExecutionInfos []*InternalChildExecutionInfo DeleteChildExecutionInfos []int64 UpsertRequestCancelInfos []*RequestCancelInfo DeleteRequestCancelInfos []int64 UpsertSignalInfos []*SignalInfo DeleteSignalInfos []int64 UpsertSignalRequestedIDs []string DeleteSignalRequestedIDs []string NewBufferedEvents *DataBlob ClearBufferedEvents bool TransferTasks []Task CrossClusterTasks []Task TimerTasks []Task ReplicationTasks []Task Condition int64 Checksum checksum.Checksum ChecksumData *DataBlob } // InternalWorkflowSnapshot is used as generic workflow execution state snapshot for Persistence Interface InternalWorkflowSnapshot struct { ExecutionInfo *InternalWorkflowExecutionInfo VersionHistories *DataBlob StartVersion int64 LastWriteVersion int64 ActivityInfos []*InternalActivityInfo TimerInfos []*TimerInfo ChildExecutionInfos []*InternalChildExecutionInfo RequestCancelInfos []*RequestCancelInfo SignalInfos []*SignalInfo SignalRequestedIDs []string TransferTasks []Task CrossClusterTasks []Task TimerTasks []Task ReplicationTasks []Task Condition int64 Checksum checksum.Checksum ChecksumData *DataBlob } // InternalAppendHistoryEventsRequest is used to append new events to workflow execution history for Persistence Interface InternalAppendHistoryEventsRequest struct { DomainID string Execution workflow.WorkflowExecution FirstEventID int64 EventBatchVersion int64 RangeID int64 TransactionID int64 Events *DataBlob Overwrite bool } // InternalAppendHistoryNodesRequest is used to append a batch of history nodes InternalAppendHistoryNodesRequest struct { // True if it is the first append request to the branch IsNewBranch bool // The info for clean up data in background Info string // The branch to be appended BranchInfo types.HistoryBranch // The first eventID becomes the nodeID to be appended NodeID int64 // The events to be appended Events *DataBlob // Requested TransactionID for conditional update TransactionID int64 // Used in sharded data stores to identify which shard to use ShardID int } // InternalGetWorkflowExecutionRequest is used to retrieve the info of a workflow execution InternalGetWorkflowExecutionRequest struct { DomainID string Execution types.WorkflowExecution RangeID int64 } // InternalGetWorkflowExecutionResponse is the response to GetWorkflowExecution for Persistence Interface InternalGetWorkflowExecutionResponse struct { State *InternalWorkflowMutableState } // InternalListConcreteExecutionsResponse is the response to ListConcreteExecutions for Persistence Interface InternalListConcreteExecutionsResponse struct { Executions []*InternalListConcreteExecutionsEntity NextPageToken []byte } // InternalListConcreteExecutionsEntity is a single entity in InternalListConcreteExecutionsResponse InternalListConcreteExecutionsEntity struct { ExecutionInfo *InternalWorkflowExecutionInfo VersionHistories *DataBlob } // InternalForkHistoryBranchRequest is used to fork a history branch InternalForkHistoryBranchRequest struct { // The base branch to fork from ForkBranchInfo types.HistoryBranch // The nodeID to fork from, the new branch will start from ( inclusive ), the base branch will stop at(exclusive) ForkNodeID int64 // branchID of the new branch NewBranchID string // the info for clean up data in background Info string // Used in sharded data stores to identify which shard to use ShardID int } // InternalForkHistoryBranchResponse is the response to ForkHistoryBranchRequest InternalForkHistoryBranchResponse struct { // branchInfo to represent the new branch NewBranchInfo types.HistoryBranch } // InternalDeleteHistoryBranchRequest is used to remove a history branch InternalDeleteHistoryBranchRequest struct { // branch to be deleted BranchInfo types.HistoryBranch // Used in sharded data stores to identify which shard to use ShardID int } // InternalReadHistoryBranchRequest is used to read a history branch InternalReadHistoryBranchRequest struct { // The tree of branch range to be read TreeID string // The branch range to be read BranchID string // Get the history nodes from MinNodeID. Inclusive. MinNodeID int64 // Get the history nodes upto MaxNodeID. Exclusive. MaxNodeID int64 // passing thru for pagination PageSize int // Pagination token NextPageToken []byte // LastNodeID is the last known node ID attached to a history node LastNodeID int64 // LastTransactionID is the last known transaction ID attached to a history node LastTransactionID int64 // Used in sharded data stores to identify which shard to use ShardID int } // InternalCompleteForkBranchRequest is used to update some tree/branch meta data for forking InternalCompleteForkBranchRequest struct { // branch to be updated BranchInfo workflow.HistoryBranch // whether fork is successful Success bool // Used in sharded data stores to identify which shard to use ShardID int } // InternalReadHistoryBranchResponse is the response to ReadHistoryBranchRequest InternalReadHistoryBranchResponse struct { // History events History []*DataBlob // Pagination token NextPageToken []byte // LastNodeID is the last known node ID attached to a history node LastNodeID int64 // LastTransactionID is the last known transaction ID attached to a history node LastTransactionID int64 } // InternalGetHistoryTreeRequest is used to get history tree InternalGetHistoryTreeRequest struct { // A UUID of a tree TreeID string // Get data from this shard ShardID *int // optional: can provide treeID via branchToken if treeID is empty BranchToken []byte } // InternalGetHistoryTreeResponse is the response to GetHistoryTree InternalGetHistoryTreeResponse struct { // all branches of a tree Branches []*types.HistoryBranch } // InternalVisibilityWorkflowExecutionInfo is visibility info for internal response InternalVisibilityWorkflowExecutionInfo struct { DomainID string WorkflowType string WorkflowID string RunID string TypeName string StartTime time.Time ExecutionTime time.Time CloseTime time.Time Status *types.WorkflowExecutionCloseStatus HistoryLength int64 Memo *DataBlob TaskList string IsCron bool NumClusters int16 UpdateTime time.Time SearchAttributes map[string]interface{} ShardID int16 } // InternalListWorkflowExecutionsResponse is response from ListWorkflowExecutions InternalListWorkflowExecutionsResponse struct { Executions []*InternalVisibilityWorkflowExecutionInfo // Token to read next page if there are more workflow executions beyond page size. // Use this to set NextPageToken on ListWorkflowExecutionsRequest to read the next page. NextPageToken []byte } // InternalGetClosedWorkflowExecutionRequest is used retrieve the record for a specific execution InternalGetClosedWorkflowExecutionRequest struct { DomainUUID string Domain string // domain name is not persisted, but used as config filter key Execution types.WorkflowExecution } // InternalListClosedWorkflowExecutionsByStatusRequest is used to list executions that have specific close status InternalListClosedWorkflowExecutionsByStatusRequest struct { InternalListWorkflowExecutionsRequest Status types.WorkflowExecutionCloseStatus } // InternalListWorkflowExecutionsByWorkflowIDRequest is used to list executions that have specific WorkflowID in a domain InternalListWorkflowExecutionsByWorkflowIDRequest struct { InternalListWorkflowExecutionsRequest WorkflowID string } // InternalListWorkflowExecutionsByTypeRequest is used to list executions of a specific type in a domain InternalListWorkflowExecutionsByTypeRequest struct { InternalListWorkflowExecutionsRequest WorkflowTypeName string } // InternalGetClosedWorkflowExecutionResponse is response from GetWorkflowExecution InternalGetClosedWorkflowExecutionResponse struct { Execution *InternalVisibilityWorkflowExecutionInfo } // InternalRecordWorkflowExecutionStartedRequest request to RecordWorkflowExecutionStarted InternalRecordWorkflowExecutionStartedRequest struct { DomainUUID string WorkflowID string RunID string WorkflowTypeName string StartTimestamp time.Time ExecutionTimestamp time.Time WorkflowTimeout time.Duration TaskID int64 Memo *DataBlob TaskList string IsCron bool NumClusters int16 UpdateTimestamp time.Time SearchAttributes map[string][]byte ShardID int16 } // InternalRecordWorkflowExecutionClosedRequest is request to RecordWorkflowExecutionClosed InternalRecordWorkflowExecutionClosedRequest struct { DomainUUID string WorkflowID string RunID string WorkflowTypeName string StartTimestamp time.Time ExecutionTimestamp time.Time TaskID int64 Memo *DataBlob TaskList string SearchAttributes map[string][]byte CloseTimestamp time.Time Status types.WorkflowExecutionCloseStatus HistoryLength int64 RetentionPeriod time.Duration IsCron bool NumClusters int16 UpdateTimestamp time.Time ShardID int16 } // InternalRecordWorkflowExecutionUninitializedRequest is used to add a record of a newly uninitialized execution InternalRecordWorkflowExecutionUninitializedRequest struct { DomainUUID string WorkflowID string RunID string WorkflowTypeName string UpdateTimestamp time.Time ShardID int64 } // InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution InternalUpsertWorkflowExecutionRequest struct { DomainUUID string WorkflowID string RunID string WorkflowTypeName string StartTimestamp time.Time ExecutionTimestamp time.Time WorkflowTimeout time.Duration TaskID int64 Memo *DataBlob TaskList string IsCron bool NumClusters int16 UpdateTimestamp time.Time SearchAttributes map[string][]byte ShardID int64 } // InternalListWorkflowExecutionsRequest is used to list executions in a domain InternalListWorkflowExecutionsRequest struct { DomainUUID string Domain string // domain name is not persisted, but used as config filter key // The earliest end of the time range EarliestTime time.Time // The latest end of the time range LatestTime time.Time // Maximum number of workflow executions per page PageSize int // Token to continue reading next page of workflow executions. // Pass in empty slice for first page. NextPageToken []byte } // InternalDomainConfig describes the domain configuration InternalDomainConfig struct { Retention time.Duration EmitMetric bool // deprecated ArchivalBucket string // deprecated ArchivalStatus types.ArchivalStatus // deprecated HistoryArchivalStatus types.ArchivalStatus HistoryArchivalURI string VisibilityArchivalStatus types.ArchivalStatus VisibilityArchivalURI string BadBinaries *DataBlob IsolationGroups *DataBlob AsyncWorkflowsConfig *DataBlob } // InternalCreateDomainRequest is used to create the domain InternalCreateDomainRequest struct { Info *DomainInfo Config *InternalDomainConfig ReplicationConfig *DomainReplicationConfig IsGlobalDomain bool ConfigVersion int64 FailoverVersion int64 LastUpdatedTime time.Time } // InternalGetDomainResponse is the response for GetDomain InternalGetDomainResponse struct { Info *DomainInfo Config *InternalDomainConfig ReplicationConfig *DomainReplicationConfig IsGlobalDomain bool ConfigVersion int64 FailoverVersion int64 FailoverNotificationVersion int64 PreviousFailoverVersion int64 FailoverEndTime *time.Time LastUpdatedTime time.Time NotificationVersion int64 } // InternalUpdateDomainRequest is used to update domain InternalUpdateDomainRequest struct { Info *DomainInfo Config *InternalDomainConfig ReplicationConfig *DomainReplicationConfig ConfigVersion int64 FailoverVersion int64 FailoverNotificationVersion int64 PreviousFailoverVersion int64 FailoverEndTime *time.Time LastUpdatedTime time.Time NotificationVersion int64 } // InternalListDomainsResponse is the response for GetDomain InternalListDomainsResponse struct { Domains []*InternalGetDomainResponse NextPageToken []byte } // InternalShardInfo describes a shard InternalShardInfo struct { ShardID int `json:"shard_id"` Owner string `json:"owner"` RangeID int64 `json:"range_id"` StolenSinceRenew int `json:"stolen_since_renew"` UpdatedAt time.Time `json:"updated_at"` ReplicationAckLevel int64 `json:"replication_ack_level"` ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"` TransferAckLevel int64 `json:"transfer_ack_level"` TimerAckLevel time.Time `json:"timer_ack_level"` ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"` ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"` TransferProcessingQueueStates *DataBlob `json:"transfer_processing_queue_states"` CrossClusterProcessingQueueStates *DataBlob `json:"cross_cluster_processing_queue_states"` TimerProcessingQueueStates *DataBlob `json:"timer_processing_queue_states"` ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"` DomainNotificationVersion int64 `json:"domain_notification_version"` PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"` } // InternalCreateShardRequest is request to CreateShard InternalCreateShardRequest struct { ShardInfo *InternalShardInfo } // InternalGetShardRequest is used to get shard information InternalGetShardRequest struct { ShardID int } // InternalUpdateShardRequest is used to update shard information InternalUpdateShardRequest struct { ShardInfo *InternalShardInfo PreviousRangeID int64 } // InternalGetShardResponse is the response to GetShard InternalGetShardResponse struct { ShardInfo *InternalShardInfo } // InternalTaskInfo describes a Task InternalTaskInfo struct { DomainID string WorkflowID string RunID string TaskID int64 ScheduleID int64 ScheduleToStartTimeout time.Duration Expiry time.Time CreatedTime time.Time PartitionConfig map[string]string } // InternalCreateTasksInfo describes a task to be created in InternalCreateTasksRequest InternalCreateTasksInfo struct { Execution types.WorkflowExecution Data *InternalTaskInfo TaskID int64 } // InternalCreateTasksRequest is request to CreateTasks InternalCreateTasksRequest struct { TaskListInfo *TaskListInfo Tasks []*InternalCreateTasksInfo } // InternalGetTasksResponse is response from GetTasks InternalGetTasksResponse struct { Tasks []*InternalTaskInfo } ) // NewDataBlob returns a new DataBlob func NewDataBlob(data []byte, encodingType common.EncodingType) *DataBlob { if len(data) == 0 { return nil } if encodingType != common.EncodingTypeThriftRW && data[0] == 'Y' { // original reason for this is not written down, but maybe for handling data prior to an encoding type? panic(fmt.Sprintf("Invalid data blob encoding: \"%v\"", encodingType)) } return &DataBlob{ Data: data, Encoding: encodingType, } } // FromDataBlob decodes a datablob into a (payload, encodingType) tuple func FromDataBlob(blob *DataBlob) ([]byte, string) { if blob == nil || len(blob.Data) == 0 { return nil, "" } return blob.Data, string(blob.Encoding) } // Convert a *Datablob to safe that calling its method won't run into NPE func (d *DataBlob) ToNilSafeDataBlob() *DataBlob { if d != nil { return d } return &DataBlob{} } func (d *DataBlob) GetEncodingString() string { if d == nil { return "" } return string(d.Encoding) } // GetData is a safe way to get the byte array or nil func (d *DataBlob) GetData() []byte { if d == nil || d.Data == nil { return []byte{} } return d.Data } // GetEncoding returns encoding type func (d *DataBlob) GetEncoding() common.EncodingType { encodingStr := d.GetEncodingString() switch common.EncodingType(encodingStr) { case common.EncodingTypeGob: return common.EncodingTypeGob case common.EncodingTypeJSON: return common.EncodingTypeJSON case common.EncodingTypeThriftRW: return common.EncodingTypeThriftRW case common.EncodingTypeEmpty: return common.EncodingTypeEmpty default: return common.EncodingTypeUnknown } } // ToInternal convert data blob to internal representation func (d *DataBlob) ToInternal() *types.DataBlob { switch d.Encoding { case common.EncodingTypeJSON: return &types.DataBlob{ EncodingType: types.EncodingTypeJSON.Ptr(), Data: d.Data, } case common.EncodingTypeThriftRW: return &types.DataBlob{ EncodingType: types.EncodingTypeThriftRW.Ptr(), Data: d.Data, } default: panic(fmt.Sprintf("DataBlob.ToInternal() with unsupported encoding type: %v", d.Encoding)) } } // NewDataBlobFromInternal convert data blob from internal representation func NewDataBlobFromInternal(blob *types.DataBlob) *DataBlob { switch blob.GetEncodingType() { case types.EncodingTypeJSON: return &DataBlob{ Encoding: common.EncodingTypeJSON, Data: blob.Data, } case types.EncodingTypeThriftRW: return &DataBlob{ Encoding: common.EncodingTypeThriftRW, Data: blob.Data, } default: panic(fmt.Sprintf("NewDataBlobFromInternal with unsupported encoding type: %v", blob.GetEncodingType())) } }