common/persistence/sql/sqlplugin/interfaces.go (558 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. //go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -self_package github.com/uber/cadence/common/persistence/sql/sqlplugin package sqlplugin import ( "context" "database/sql" "errors" "time" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/serialization" ) var ( // ErrTTLNotSupported indicates the sql plugin does not support ttl ErrTTLNotSupported = errors.New("plugin implementation does not support ttl") ) type ( // Plugin defines the interface for any SQL database that needs to implement Plugin interface { CreateDB(cfg *config.SQL) (DB, error) CreateAdminDB(cfg *config.SQL) (AdminDB, error) } // DomainRow represents a row in domain table DomainRow struct { ID serialization.UUID Name string Data []byte DataEncoding string IsGlobal bool } // DomainFilter contains the column names within domain table that // can be used to filter results through a WHERE clause. When ID is not // nil, it will be used for WHERE condition. If ID is nil and Name is non-nil, // Name will be used for WHERE condition. When both ID and Name are nil, // no WHERE clause will be used DomainFilter struct { ID *serialization.UUID Name *string GreaterThanID *serialization.UUID PageSize *int } // DomainMetadataRow represents a row in domain_metadata table DomainMetadataRow struct { NotificationVersion int64 } // ShardsRow represents a row in shards table ShardsRow struct { ShardID int64 RangeID int64 Data []byte DataEncoding string } // ShardsFilter contains the column names within shards table that // can be used to filter results through a WHERE clause ShardsFilter struct { ShardID int64 } // TransferTasksRow represents a row in transfer_tasks table TransferTasksRow struct { ShardID int TaskID int64 Data []byte DataEncoding string } // CrossClusterTasksRow represents a row in cross_cluster_tasks table CrossClusterTasksRow struct { TargetCluster string ShardID int TaskID int64 Data []byte DataEncoding string } // TransferTasksFilter contains the column names within transfer_tasks table that // can be used to filter results through a WHERE clause TransferTasksFilter struct { ShardID int TaskID int64 MinTaskID int64 MaxTaskID int64 PageSize int } // CrossClusterTasksFilter contains the column names within cross_cluster_tasks table that // can be used to filter results through a WHERE clause CrossClusterTasksFilter struct { TargetCluster string ShardID int TaskID int64 MinTaskID int64 MaxTaskID int64 PageSize int } // ExecutionsRow represents a row in executions table ExecutionsRow struct { ShardID int DomainID serialization.UUID WorkflowID string RunID serialization.UUID NextEventID int64 LastWriteVersion int64 Data []byte DataEncoding string VersionHistories []byte VersionHistoriesEncoding string } // ExecutionsFilter contains the column names within executions table that // can be used to filter results through a WHERE clause // To get single row, it requires ShardID, DomainID, WorkflowID, RunID // To get a list of rows, it requires ShardID, Size. // The WorkflowID and RunID are optional for listing rows. They work as the start boundary for pagination. ExecutionsFilter struct { ShardID int DomainID serialization.UUID WorkflowID string RunID serialization.UUID Size int } // CurrentExecutionsRow represents a row in current_executions table CurrentExecutionsRow struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID CreateRequestID string State int CloseStatus int LastWriteVersion int64 StartVersion int64 } // CurrentExecutionsFilter contains the column names within current_executions table that // can be used to filter results through a WHERE clause CurrentExecutionsFilter struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID } // BufferedEventsRow represents a row in buffered_events table BufferedEventsRow struct { ShardID int DomainID serialization.UUID WorkflowID string RunID serialization.UUID Data []byte DataEncoding string } // BufferedEventsFilter contains the column names within buffered_events table that // can be used to filter results through a WHERE clause BufferedEventsFilter struct { ShardID int DomainID serialization.UUID WorkflowID string RunID serialization.UUID } // TasksRow represents a row in tasks table TasksRow struct { ShardID int // this is DBShardID, not historyShardID (TODO: maybe rename it for clarification) DomainID serialization.UUID TaskType int64 TaskID int64 TaskListName string Data []byte DataEncoding string } // TaskKeyRow represents a result row giving task keys TaskKeyRow struct { DomainID serialization.UUID TaskListName string TaskType int64 TaskID int64 } // TasksRowWithTTL represents a row in tasks table with a ttl TasksRowWithTTL struct { TasksRow TasksRow // TTL is optional because InsertIntoTasksWithTTL operates over a slice of TasksRowWithTTL. // Some items in the slice may have a TTL while others do not. It is the responsibility // of the plugin implementation to handle items with TTL set and items with TTL not set. TTL *time.Duration } // TasksFilter contains the column names within tasks table that // can be used to filter results through a WHERE clause TasksFilter struct { ShardID int // this is DBShardID, not historyShardID (TODO: maybe rename it for clarification) DomainID serialization.UUID TaskListName string TaskType int64 TaskID *int64 MinTaskID *int64 MaxTaskID *int64 TaskIDLessThanEquals *int64 Limit *int PageSize *int } // OrphanTasksFilter contains the parameters controlling orphan deletion OrphanTasksFilter struct { Limit *int } // TaskListsRow represents a row in task_lists table TaskListsRow struct { ShardID int // this is DBShardID, not historyShardID (TODO: maybe rename it for clarification) DomainID serialization.UUID Name string TaskType int64 RangeID int64 Data []byte DataEncoding string } // TaskListsRowWithTTL represents a row in task_lists table with a ttl TaskListsRowWithTTL struct { TaskListsRow TaskListsRow TTL time.Duration } // TaskListsFilter contains the column names within task_lists table that // can be used to filter results through a WHERE clause TaskListsFilter struct { ShardID int // this is DBShardID, not historyShardID (TODO: maybe rename it for clarification) DomainID *serialization.UUID Name *string TaskType *int64 DomainIDGreaterThan *serialization.UUID NameGreaterThan *string TaskTypeGreaterThan *int64 RangeID *int64 PageSize *int } // ReplicationTasksRow represents a row in replication_tasks table ReplicationTasksRow struct { ShardID int TaskID int64 Data []byte DataEncoding string } // ReplicationTaskDLQRow represents a row in replication_tasks_dlq table ReplicationTaskDLQRow struct { SourceClusterName string ShardID int TaskID int64 Data []byte DataEncoding string } // ReplicationTasksFilter contains the column names within replication_tasks table that // can be used to filter results through a WHERE clause ReplicationTasksFilter struct { ShardID int TaskID int64 InclusiveEndTaskID int64 MinTaskID int64 MaxTaskID int64 PageSize int } // ReplicationTasksDLQFilter contains the column names within replication_tasks_dlq table that // can be used to filter results through a WHERE clause ReplicationTasksDLQFilter struct { ReplicationTasksFilter SourceClusterName string } // ReplicationTaskDLQFilter contains the column names within replication_tasks_dlq table that // can be used to filter results through a WHERE clause ReplicationTaskDLQFilter struct { SourceClusterName string ShardID int } // TimerTasksRow represents a row in timer_tasks table TimerTasksRow struct { ShardID int VisibilityTimestamp time.Time TaskID int64 Data []byte DataEncoding string } // TimerTasksFilter contains the column names within timer_tasks table that // can be used to filter results through a WHERE clause TimerTasksFilter struct { ShardID int TaskID int64 VisibilityTimestamp time.Time MinVisibilityTimestamp time.Time MaxVisibilityTimestamp time.Time PageSize int } // EventsRow represents a row in events table EventsRow struct { DomainID serialization.UUID WorkflowID string RunID serialization.UUID FirstEventID int64 BatchVersion int64 RangeID int64 TxID int64 Data []byte DataEncoding string } // EventsFilter contains the column names within events table that // can be used to filter results through a WHERE clause EventsFilter struct { DomainID serialization.UUID WorkflowID string RunID serialization.UUID FirstEventID *int64 NextEventID *int64 PageSize *int } // HistoryNodeRow represents a row in history_node table HistoryNodeRow struct { ShardID int TreeID serialization.UUID BranchID serialization.UUID NodeID int64 // use pointer so that it's easier to multiple by -1 TxnID *int64 Data []byte DataEncoding string } // HistoryNodeFilter contains the column names within history_node table that // can be used to filter results through a WHERE clause HistoryNodeFilter struct { ShardID int TreeID serialization.UUID BranchID serialization.UUID // Inclusive MinNodeID *int64 // Exclusive MaxNodeID *int64 PageSize int } // HistoryTreeRow represents a row in history_tree table HistoryTreeRow struct { ShardID int TreeID serialization.UUID BranchID serialization.UUID Data []byte DataEncoding string } // HistoryTreeFilter contains the column names within history_tree table that // can be used to filter results through a WHERE clause HistoryTreeFilter struct { ShardID int TreeID serialization.UUID BranchID *serialization.UUID PageSize *int } // ActivityInfoMapsRow represents a row in activity_info_maps table ActivityInfoMapsRow struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID ScheduleID int64 Data []byte DataEncoding string LastHeartbeatDetails []byte LastHeartbeatUpdatedTime time.Time } // ActivityInfoMapsFilter contains the column names within activity_info_maps table that // can be used to filter results through a WHERE clause ActivityInfoMapsFilter struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID ScheduleIDs []int64 } // TimerInfoMapsRow represents a row in timer_info_maps table TimerInfoMapsRow struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID TimerID string Data []byte DataEncoding string } // TimerInfoMapsFilter contains the column names within timer_info_maps table that // can be used to filter results through a WHERE clause TimerInfoMapsFilter struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID TimerIDs []string } // ChildExecutionInfoMapsRow represents a row in child_execution_info_maps table ChildExecutionInfoMapsRow struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID InitiatedID int64 Data []byte DataEncoding string } // ChildExecutionInfoMapsFilter contains the column names within child_execution_info_maps table that // can be used to filter results through a WHERE clause ChildExecutionInfoMapsFilter struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID InitiatedIDs []int64 } // RequestCancelInfoMapsRow represents a row in request_cancel_info_maps table RequestCancelInfoMapsRow struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID InitiatedID int64 Data []byte DataEncoding string } // RequestCancelInfoMapsFilter contains the column names within request_cancel_info_maps table that // can be used to filter results through a WHERE clause RequestCancelInfoMapsFilter struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID InitiatedIDs []int64 } // SignalInfoMapsRow represents a row in signal_info_maps table SignalInfoMapsRow struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID InitiatedID int64 Data []byte DataEncoding string } // SignalInfoMapsFilter contains the column names within signal_info_maps table that // can be used to filter results through a WHERE clause SignalInfoMapsFilter struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID InitiatedIDs []int64 } // SignalsRequestedSetsRow represents a row in signals_requested_sets table SignalsRequestedSetsRow struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID SignalID string } // SignalsRequestedSetsFilter contains the column names within signals_requested_sets table that // can be used to filter results through a WHERE clause SignalsRequestedSetsFilter struct { ShardID int64 DomainID serialization.UUID WorkflowID string RunID serialization.UUID SignalIDs []string } // VisibilityRow represents a row in executions_visibility table VisibilityRow struct { DomainID string RunID string WorkflowTypeName string WorkflowID string StartTime time.Time ExecutionTime time.Time CloseStatus *int32 CloseTime *time.Time HistoryLength *int64 Memo []byte Encoding string IsCron bool NumClusters int16 UpdateTime time.Time ShardID int16 } // VisibilityFilter contains the column names within executions_visibility table that // can be used to filter results through a WHERE clause VisibilityFilter struct { DomainID string Closed bool RunID *string WorkflowID *string WorkflowTypeName *string CloseStatus *int32 MinStartTime *time.Time MaxStartTime *time.Time PageSize *int } // QueueRow represents a row in queue table QueueRow struct { QueueType persistence.QueueType MessageID int64 MessagePayload []byte } // QueueMetadataRow represents a row in queue_metadata table QueueMetadataRow struct { QueueType persistence.QueueType Data []byte } // ClusterConfigRow represents a row in cluster_config table ClusterConfigRow struct { RowType int Version int64 Timestamp time.Time Data []byte DataEncoding string } // tableCRUD defines the API for interacting with the database tables tableCRUD interface { InsertIntoDomain(ctx context.Context, rows *DomainRow) (sql.Result, error) UpdateDomain(ctx context.Context, row *DomainRow) (sql.Result, error) // SelectFromDomain returns domains that match filter criteria. Either ID or // Name can be specified to filter results. If both are not specified, all rows // will be returned SelectFromDomain(ctx context.Context, filter *DomainFilter) ([]DomainRow, error) // DeleteDomain deletes a single row. One of ID or Name MUST be specified DeleteFromDomain(ctx context.Context, filter *DomainFilter) (sql.Result, error) LockDomainMetadata(ctx context.Context) error UpdateDomainMetadata(ctx context.Context, row *DomainMetadataRow) (sql.Result, error) SelectFromDomainMetadata(ctx context.Context) (*DomainMetadataRow, error) InsertIntoShards(ctx context.Context, rows *ShardsRow) (sql.Result, error) UpdateShards(ctx context.Context, row *ShardsRow) (sql.Result, error) SelectFromShards(ctx context.Context, filter *ShardsFilter) (*ShardsRow, error) ReadLockShards(ctx context.Context, filter *ShardsFilter) (int, error) WriteLockShards(ctx context.Context, filter *ShardsFilter) (int, error) InsertIntoTasks(ctx context.Context, rows []TasksRow) (sql.Result, error) InsertIntoTasksWithTTL(ctx context.Context, rows []TasksRowWithTTL) (sql.Result, error) // SelectFromTasks retrieves one or more rows from the tasks table // Required filter params - {domainID, tasklistName, taskType, minTaskID, maxTaskID, pageSize} SelectFromTasks(ctx context.Context, filter *TasksFilter) ([]TasksRow, error) // DeleteFromTasks deletes a row from tasks table // Required filter params: // to delete single row // - {domainID, tasklistName, taskType, taskID} // to delete multiple rows // - {domainID, tasklistName, taskType, taskIDLessThanEquals, limit } // - this will delete up to limit number of tasks less than or equal to the given task id DeleteFromTasks(ctx context.Context, filter *TasksFilter) (sql.Result, error) GetTasksCount(ctx context.Context, filter *TasksFilter) (int64, error) GetOrphanTasks(ctx context.Context, filter *OrphanTasksFilter) ([]TaskKeyRow, error) InsertIntoTaskLists(ctx context.Context, row *TaskListsRow) (sql.Result, error) InsertIntoTaskListsWithTTL(ctx context.Context, row *TaskListsRowWithTTL) (sql.Result, error) UpdateTaskLists(ctx context.Context, row *TaskListsRow) (sql.Result, error) UpdateTaskListsWithTTL(ctx context.Context, row *TaskListsRowWithTTL) (sql.Result, error) // SelectFromTaskLists returns one or more rows from task_lists table // Required Filter params: // to read a single row: {shardID, domainID, name, taskType} // to range read multiple rows: {shardID, domainIDGreaterThan, nameGreaterThan, taskTypeGreaterThan, pageSize} SelectFromTaskLists(ctx context.Context, filter *TaskListsFilter) ([]TaskListsRow, error) DeleteFromTaskLists(ctx context.Context, filter *TaskListsFilter) (sql.Result, error) LockTaskLists(ctx context.Context, filter *TaskListsFilter) (int64, error) // eventsV2 InsertIntoHistoryNode(ctx context.Context, row *HistoryNodeRow) (sql.Result, error) SelectFromHistoryNode(ctx context.Context, filter *HistoryNodeFilter) ([]HistoryNodeRow, error) DeleteFromHistoryNode(ctx context.Context, filter *HistoryNodeFilter) (sql.Result, error) InsertIntoHistoryTree(ctx context.Context, row *HistoryTreeRow) (sql.Result, error) SelectFromHistoryTree(ctx context.Context, filter *HistoryTreeFilter) ([]HistoryTreeRow, error) DeleteFromHistoryTree(ctx context.Context, filter *HistoryTreeFilter) (sql.Result, error) GetAllHistoryTreeBranches(ctx context.Context, filter *HistoryTreeFilter) ([]HistoryTreeRow, error) InsertIntoExecutions(ctx context.Context, row *ExecutionsRow) (sql.Result, error) UpdateExecutions(ctx context.Context, row *ExecutionsRow) (sql.Result, error) SelectFromExecutions(ctx context.Context, filter *ExecutionsFilter) ([]ExecutionsRow, error) DeleteFromExecutions(ctx context.Context, filter *ExecutionsFilter) (sql.Result, error) ReadLockExecutions(ctx context.Context, filter *ExecutionsFilter) (int, error) WriteLockExecutions(ctx context.Context, filter *ExecutionsFilter) (int, error) LockCurrentExecutionsJoinExecutions(ctx context.Context, filter *CurrentExecutionsFilter) ([]CurrentExecutionsRow, error) InsertIntoCurrentExecutions(ctx context.Context, row *CurrentExecutionsRow) (sql.Result, error) UpdateCurrentExecutions(ctx context.Context, row *CurrentExecutionsRow) (sql.Result, error) // SelectFromCurrentExecutions returns one or more rows from current_executions table // Required params - {shardID, domainID, workflowID} SelectFromCurrentExecutions(ctx context.Context, filter *CurrentExecutionsFilter) (*CurrentExecutionsRow, error) // DeleteFromCurrentExecutions deletes a single row that matches the filter criteria // If a row exist, that row will be deleted and this method will return success // If there is no row matching the filter criteria, this method will still return success // Callers can check the output of Result.RowsAffected() to see if a row was deleted or not // Required params - {shardID, domainID, workflowID, runID} DeleteFromCurrentExecutions(ctx context.Context, filter *CurrentExecutionsFilter) (sql.Result, error) LockCurrentExecutions(ctx context.Context, filter *CurrentExecutionsFilter) (*CurrentExecutionsRow, error) InsertIntoTransferTasks(ctx context.Context, rows []TransferTasksRow) (sql.Result, error) // SelectFromTransferTasks returns rows that match filter criteria from transfer_tasks table. // Required filter params - {shardID, minTaskID, maxTaskID} SelectFromTransferTasks(ctx context.Context, filter *TransferTasksFilter) ([]TransferTasksRow, error) // DeleteFromTransferTasks deletes one or more rows from transfer_tasks table. // Required filter params - {shardID, taskID} DeleteFromTransferTasks(ctx context.Context, filter *TransferTasksFilter) (sql.Result, error) // RangeDeleteFromTransferTasks deletes one or more rows from transfer_tasks table. // Required filter params - {shardID, minTaskID, maxTaskID} RangeDeleteFromTransferTasks(ctx context.Context, filter *TransferTasksFilter) (sql.Result, error) // TODO: add cross-cluster tasks methods // InsertIntoCrossClusterTasks adds a new row to the cross_cluster_tasks table InsertIntoCrossClusterTasks(ctx context.Context, rows []CrossClusterTasksRow) (sql.Result, error) // SelectFromCrossClusterTasks returns rows that match filter criteria from cross_cluster_tasks table. // Required filter params - {shardID, minTaskID, maxTaskID} SelectFromCrossClusterTasks(ctx context.Context, filter *CrossClusterTasksFilter) ([]CrossClusterTasksRow, error) // DeleteFromCrossClusterTasks deletes one or more rows from cross_cluster_tasks table. // Required filter params - {shardID, taskID} DeleteFromCrossClusterTasks(ctx context.Context, filter *CrossClusterTasksFilter) (sql.Result, error) // RangeDeleteFromCrossClusterTasks deletes one or more rows from cross_cluster_tasks table. // Required filter params - {shardID, minTaskID, maxTaskID} RangeDeleteFromCrossClusterTasks(ctx context.Context, filter *CrossClusterTasksFilter) (sql.Result, error) InsertIntoTimerTasks(ctx context.Context, rows []TimerTasksRow) (sql.Result, error) // SelectFromTimerTasks returns one or more rows from timer_tasks table // Required filter Params - {shardID, taskID, minVisibilityTimestamp, maxVisibilityTimestamp, pageSize} SelectFromTimerTasks(ctx context.Context, filter *TimerTasksFilter) ([]TimerTasksRow, error) // DeleteFromTimerTasks deletes one or more rows from timer_tasks table // Required filter Params: {shardID, visibilityTimestamp, taskID} DeleteFromTimerTasks(ctx context.Context, filter *TimerTasksFilter) (sql.Result, error) // RangeDeleteFromTimerTasks deletes one or more rows from timer_tasks table // Required filter Params: {shardID, minVisibilityTimestamp, maxVisibilityTimestamp} RangeDeleteFromTimerTasks(ctx context.Context, filter *TimerTasksFilter) (sql.Result, error) InsertIntoBufferedEvents(ctx context.Context, rows []BufferedEventsRow) (sql.Result, error) SelectFromBufferedEvents(ctx context.Context, filter *BufferedEventsFilter) ([]BufferedEventsRow, error) DeleteFromBufferedEvents(ctx context.Context, filter *BufferedEventsFilter) (sql.Result, error) InsertIntoReplicationTasks(ctx context.Context, rows []ReplicationTasksRow) (sql.Result, error) // SelectFromReplicationTasks returns one or more rows from replication_tasks table // Required filter params - {shardID, minTaskID, maxTaskID, pageSize} SelectFromReplicationTasks(ctx context.Context, filter *ReplicationTasksFilter) ([]ReplicationTasksRow, error) // DeleteFromReplicationTasks deletes a row from replication_tasks table // Required filter params - {shardID, taskID} DeleteFromReplicationTasks(ctx context.Context, filter *ReplicationTasksFilter) (sql.Result, error) // DeleteFromReplicationTasks deletes multi rows from replication_tasks table // Required filter params - {shardID, inclusiveEndTaskID} RangeDeleteFromReplicationTasks(ctx context.Context, filter *ReplicationTasksFilter) (sql.Result, error) // InsertIntoReplicationTasksDLQ puts the replication task into DLQ InsertIntoReplicationTasksDLQ(ctx context.Context, row *ReplicationTaskDLQRow) (sql.Result, error) // SelectFromReplicationTasksDLQ returns one or more rows from replication_tasks_dlq table // Required filter params - {sourceClusterName, shardID, minTaskID, pageSize} SelectFromReplicationTasksDLQ(ctx context.Context, filter *ReplicationTasksDLQFilter) ([]ReplicationTasksRow, error) // SelectFromReplicationDLQ returns one row from replication_tasks_dlq table // Required filter params - {sourceClusterName} SelectFromReplicationDLQ(ctx context.Context, filter *ReplicationTaskDLQFilter) (int64, error) // DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table // Required filter params - {sourceClusterName, shardID, taskID} DeleteMessageFromReplicationTasksDLQ(ctx context.Context, filter *ReplicationTasksDLQFilter) (sql.Result, error) // RangeDeleteMessageFromReplicationTasksDLQ deletes one or more rows from replication_tasks_dlq table // Required filter params - {sourceClusterName, shardID, taskID, inclusiveTaskID} RangeDeleteMessageFromReplicationTasksDLQ(ctx context.Context, filter *ReplicationTasksDLQFilter) (sql.Result, error) ReplaceIntoActivityInfoMaps(ctx context.Context, rows []ActivityInfoMapsRow) (sql.Result, error) // SelectFromActivityInfoMaps returns one or more rows from activity_info_maps // Required filter params - {shardID, domainID, workflowID, runID} SelectFromActivityInfoMaps(ctx context.Context, filter *ActivityInfoMapsFilter) ([]ActivityInfoMapsRow, error) // DeleteFromActivityInfoMaps deletes a row from activity_info_maps table // Required filter params // - one or multiple rows delete - {shardID, domainID, workflowID, runID, scheduleIDs} // - range delete - {shardID, domainID, workflowID, runID} DeleteFromActivityInfoMaps(ctx context.Context, filter *ActivityInfoMapsFilter) (sql.Result, error) ReplaceIntoTimerInfoMaps(ctx context.Context, rows []TimerInfoMapsRow) (sql.Result, error) // SelectFromTimerInfoMaps returns one or more rows form timer_info_maps table // Required filter params - {shardID, domainID, workflowID, runID} SelectFromTimerInfoMaps(ctx context.Context, filter *TimerInfoMapsFilter) ([]TimerInfoMapsRow, error) // DeleteFromTimerInfoMaps deletes one or more rows from timer_info_maps // Required filter params // - one or multiple rows delete- {shardID, domainID, workflowID, runID, timerIDs} // - range delete - {shardID, domainID, workflowID, runID} DeleteFromTimerInfoMaps(ctx context.Context, filter *TimerInfoMapsFilter) (sql.Result, error) ReplaceIntoChildExecutionInfoMaps(ctx context.Context, rows []ChildExecutionInfoMapsRow) (sql.Result, error) // SelectFromChildExecutionInfoMaps returns one or more rows form child_execution_info_maps table // Required filter params - {shardID, domainID, workflowID, runID} SelectFromChildExecutionInfoMaps(ctx context.Context, filter *ChildExecutionInfoMapsFilter) ([]ChildExecutionInfoMapsRow, error) // DeleteFromChildExecutionInfoMaps deletes one or more rows from child_execution_info_maps // Required filter params // - onne or multiple rows delete - {shardID, domainID, workflowID, runID, initiatedIDs} // - range delete - {shardID, domainID, workflowID, runID} DeleteFromChildExecutionInfoMaps(ctx context.Context, filter *ChildExecutionInfoMapsFilter) (sql.Result, error) ReplaceIntoRequestCancelInfoMaps(ctx context.Context, rows []RequestCancelInfoMapsRow) (sql.Result, error) // SelectFromRequestCancelInfoMaps returns one or more rows form request_cancel_info_maps table // Required filter params - {shardID, domainID, workflowID, runID} SelectFromRequestCancelInfoMaps(ctx context.Context, filter *RequestCancelInfoMapsFilter) ([]RequestCancelInfoMapsRow, error) // DeleteFromRequestCancelInfoMaps deletes one or more rows from request_cancel_info_maps // Required filter params // - one or multiple rows delete - {shardID, domainID, workflowID, runID, initiatedIDs} // - range delete - {shardID, domainID, workflowID, runID} DeleteFromRequestCancelInfoMaps(ctx context.Context, filter *RequestCancelInfoMapsFilter) (sql.Result, error) ReplaceIntoSignalInfoMaps(ctx context.Context, rows []SignalInfoMapsRow) (sql.Result, error) // SelectFromSignalInfoMaps returns one or more rows form signal_info_maps table // Required filter params - {shardID, domainID, workflowID, runID} SelectFromSignalInfoMaps(ctx context.Context, filter *SignalInfoMapsFilter) ([]SignalInfoMapsRow, error) // DeleteFromSignalInfoMaps deletes one or more rows from signal_info_maps table // Required filter params // - one or multiple rows delete - {shardID, domainID, workflowID, runID, initiatedIDs} // - range delete - {shardID, domainID, workflowID, runID} DeleteFromSignalInfoMaps(ctx context.Context, filter *SignalInfoMapsFilter) (sql.Result, error) InsertIntoSignalsRequestedSets(ctx context.Context, rows []SignalsRequestedSetsRow) (sql.Result, error) // SelectFromSignalInfoMaps returns one or more rows form singals_requested_sets table // Required filter params - {shardID, domainID, workflowID, runID} SelectFromSignalsRequestedSets(ctx context.Context, filter *SignalsRequestedSetsFilter) ([]SignalsRequestedSetsRow, error) // DeleteFromSignalsRequestedSets deletes one or more rows from signals_requested_sets // Required filter params // - one or multiple rows delete - {shardID, domainID, workflowID, runID, signalIDs} // - range delete - {shardID, domainID, workflowID, runID} DeleteFromSignalsRequestedSets(ctx context.Context, filter *SignalsRequestedSetsFilter) (sql.Result, error) // InsertIntoVisibility inserts a row into visibility table. If a row already exist, // no changes will be made by this API InsertIntoVisibility(ctx context.Context, row *VisibilityRow) (sql.Result, error) // ReplaceIntoVisibility deletes old row (if it exist) and inserts new row into visibility table ReplaceIntoVisibility(ctx context.Context, row *VisibilityRow) (sql.Result, error) // SelectFromVisibility returns one or more rows from visibility table // Required filter params: // - getClosedWorkflowExecution - retrieves single row - {domainID, runID, closed=true} // - All other queries retrieve multiple rows (range): // - MUST specify following required params: // - domainID, minStartTime, maxStartTime, runID and pageSize where some or all of these may come from previous page token // - OPTIONALLY specify one of following params // - workflowID, workflowTypeName, closeStatus (along with closed=true) SelectFromVisibility(ctx context.Context, filter *VisibilityFilter) ([]VisibilityRow, error) DeleteFromVisibility(ctx context.Context, filter *VisibilityFilter) (sql.Result, error) InsertIntoQueue(ctx context.Context, row *QueueRow) (sql.Result, error) GetLastEnqueuedMessageIDForUpdate(ctx context.Context, queueType persistence.QueueType) (int64, error) GetMessagesFromQueue(ctx context.Context, queueType persistence.QueueType, lastMessageID int64, maxRows int) ([]QueueRow, error) GetMessagesBetween(ctx context.Context, queueType persistence.QueueType, firstMessageID int64, lastMessageID int64, maxRows int) ([]QueueRow, error) DeleteMessagesBefore(ctx context.Context, queueType persistence.QueueType, messageID int64) (sql.Result, error) RangeDeleteMessages(ctx context.Context, queueType persistence.QueueType, exclusiveBeginMessageID int64, inclusiveEndMessageID int64) (sql.Result, error) DeleteMessage(ctx context.Context, queueType persistence.QueueType, messageID int64) (sql.Result, error) InsertAckLevel(ctx context.Context, queueType persistence.QueueType, messageID int64, clusterName string) error UpdateAckLevels(ctx context.Context, queueType persistence.QueueType, clusterAckLevels map[string]int64) error GetAckLevels(ctx context.Context, queueType persistence.QueueType, forUpdate bool) (map[string]int64, error) GetQueueSize(ctx context.Context, queueType persistence.QueueType) (int64, error) // InsertConfig insert a config entry with version. Return nosqlplugin.NewConditionFailure if the same version of the row_type is existing InsertConfig(ctx context.Context, row *persistence.InternalConfigStoreEntry) error // SelectLatestConfig returns the config entry of the row_type with the largest(latest) version value SelectLatestConfig(ctx context.Context, rowType int) (*persistence.InternalConfigStoreEntry, error) // The follow provide information about the underlying sql crud implementation SupportsTTL() bool MaxAllowedTTL() (*time.Duration, error) SupportsAsyncTransaction() bool } // adminCRUD defines admin operations for CLI and test suites adminCRUD interface { CreateSchemaVersionTables() error ReadSchemaVersion(database string) (string, error) UpdateSchemaVersion(database string, newVersion string, minCompatibleVersion string) error WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error ListTables(database string) ([]string, error) DropTable(table string) error DropAllTables(database string) error CreateDatabase(database string) error DropDatabase(database string) error // ExecSchemaOperationQuery allows passing in any query, but it must be schema operation (DDL) ExecSchemaOperationQuery(ctx context.Context, stmt string, args ...interface{}) error } // Tx defines the API for a SQL transaction Tx interface { tableCRUD ErrorChecker Commit() error Rollback() error } // DB defines the API for regular SQL operations of a Cadence server DB interface { tableCRUD ErrorChecker GetTotalNumDBShards() int BeginTx(ctx context.Context, dbShardID int) (Tx, error) PluginName() string Close() error } // AdminDB defines the API for admin SQL operations for CLI and testing suites AdminDB interface { adminCRUD PluginName() string Close() error } ErrorChecker interface { IsDupEntryError(err error) bool IsNotFoundError(err error) bool IsTimeoutError(err error) bool IsThrottlingError(err error) bool } )