common/persistence/sql/sql_history_store.go (410 lines of code) (raw):
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package sql
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
"github.com/uber/cadence/common/types"
)
const (
_defaultHistoryNodeDeleteBatch = 1000
)
type sqlHistoryStore struct {
sqlStore
}
type historyTreePageToken struct {
ShardID int
TreeID serialization.UUID
BranchID serialization.UUID
}
// NewHistoryV2Persistence creates an instance of HistoryManager
func NewHistoryV2Persistence(
db sqlplugin.DB,
logger log.Logger,
parser serialization.Parser,
) (persistence.HistoryStore, error) {
return &sqlHistoryStore{
sqlStore: sqlStore{
db: db,
logger: logger,
parser: parser,
},
}, nil
}
// AppendHistoryNodes add(or override) a node to a history branch
func (m *sqlHistoryStore) AppendHistoryNodes(
ctx context.Context,
request *persistence.InternalAppendHistoryNodesRequest,
) error {
branchInfo := request.BranchInfo
beginNodeID := persistenceutils.GetBeginNodeID(branchInfo)
if request.NodeID < beginNodeID {
return &persistence.InvalidPersistenceRequestError{
Msg: "cannot append to ancestors' nodes",
}
}
nodeRow := &sqlplugin.HistoryNodeRow{
TreeID: serialization.MustParseUUID(branchInfo.TreeID),
BranchID: serialization.MustParseUUID(branchInfo.BranchID),
NodeID: request.NodeID,
TxnID: &request.TransactionID,
Data: request.Events.Data,
DataEncoding: string(request.Events.Encoding),
ShardID: request.ShardID,
}
if request.IsNewBranch {
var ancestors []*types.HistoryBranchRange
ancestors = append(ancestors, branchInfo.Ancestors...)
treeInfo := &serialization.HistoryTreeInfo{
Ancestors: ancestors,
Info: request.Info,
CreatedTimestamp: time.Now(),
}
blob, err := m.parser.HistoryTreeInfoToBlob(treeInfo)
if err != nil {
return err
}
treeRow := &sqlplugin.HistoryTreeRow{
ShardID: request.ShardID,
TreeID: serialization.MustParseUUID(branchInfo.TreeID),
BranchID: serialization.MustParseUUID(branchInfo.BranchID),
Data: blob.Data,
DataEncoding: string(blob.Encoding),
}
treeUUID := serialization.MustParseUUID(branchInfo.TreeID)
dbShardID := sqlplugin.GetDBShardIDFromTreeID(treeUUID, m.db.GetTotalNumDBShards())
return m.txExecute(ctx, dbShardID, "AppendHistoryNodes", func(tx sqlplugin.Tx) error {
result, err := tx.InsertIntoHistoryNode(ctx, nodeRow)
if err != nil {
return err
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return err
}
if rowsAffected != 1 {
return fmt.Errorf("expected 1 row to be affected for node table, got %v", rowsAffected)
}
result, err = tx.InsertIntoHistoryTree(ctx, treeRow)
if err != nil {
return err
}
rowsAffected, err = result.RowsAffected()
if err != nil {
return err
}
if rowsAffected != 1 {
return fmt.Errorf("expected 1 row to be affected for tree table, got %v", rowsAffected)
}
return nil
})
}
_, err := m.db.InsertIntoHistoryNode(ctx, nodeRow)
if err != nil {
if m.db.IsDupEntryError(err) {
return &persistence.ConditionFailedError{Msg: fmt.Sprintf("AppendHistoryNodes: row already exist: %v", err)}
}
return convertCommonErrors(m.db, "AppendHistoryEvents", "", err)
}
return nil
}
// ReadHistoryBranch returns history node data for a branch
func (m *sqlHistoryStore) ReadHistoryBranch(
ctx context.Context,
request *persistence.InternalReadHistoryBranchRequest,
) (*persistence.InternalReadHistoryBranchResponse, error) {
minNodeID := request.MinNodeID
maxNodeID := request.MaxNodeID
lastNodeID := request.LastNodeID
lastTxnID := request.LastTransactionID
if request.NextPageToken != nil && len(request.NextPageToken) > 0 {
var lastNodeID int64
var err error
// TODO the inner pagination token can be replaced by a dummy token
// since lastNodeID & lastTxnID are both provided
if lastNodeID, err = deserializePageToken(request.NextPageToken); err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("invalid next page token %v", request.NextPageToken)}
}
minNodeID = lastNodeID + 1
}
filter := &sqlplugin.HistoryNodeFilter{
TreeID: serialization.MustParseUUID(request.TreeID),
BranchID: serialization.MustParseUUID(request.BranchID),
MinNodeID: &minNodeID,
MaxNodeID: &maxNodeID,
PageSize: request.PageSize,
ShardID: request.ShardID,
}
rows, err := m.db.SelectFromHistoryNode(ctx, filter)
if err == sql.ErrNoRows || (err == nil && len(rows) == 0) {
return &persistence.InternalReadHistoryBranchResponse{}, nil
}
if err != nil {
return nil, convertCommonErrors(m.db, "ReadHistoryBranch", "", err)
}
history := make([]*persistence.DataBlob, 0, int(request.PageSize))
eventBlob := &persistence.DataBlob{}
for _, row := range rows {
eventBlob.Data = row.Data
eventBlob.Encoding = common.EncodingType(row.DataEncoding)
if *row.TxnID < lastTxnID {
// assuming that business logic layer is correct and transaction ID only increase
// thus, valid event batch will come with increasing transaction ID
// event batches with smaller node ID
// -> should not be possible since records are already sorted
// event batches with same node ID
// -> batch with higher transaction ID is valid
// event batches with larger node ID
// -> batch with lower transaction ID is invalid (happens before)
// -> batch with higher transaction ID is valid
if row.NodeID < lastNodeID {
return nil, &types.InternalDataInconsistencyError{
Message: "corrupted data, nodeID cannot decrease",
}
} else if row.NodeID > lastNodeID {
// update lastNodeID so that our pagination can make progress in the corner case that
// the page are all rows with smaller txnID
// because next page we always have minNodeID = lastNodeID+1
lastNodeID = row.NodeID
}
continue
}
switch {
case row.NodeID < lastNodeID:
return nil, &types.InternalDataInconsistencyError{
Message: "corrupted data, nodeID cannot decrease",
}
case row.NodeID == lastNodeID:
return nil, &types.InternalDataInconsistencyError{
Message: "corrupted data, same nodeID must have smaller txnID",
}
default: // row.NodeID > lastNodeID:
// NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first
lastTxnID = *row.TxnID
lastNodeID = row.NodeID
history = append(history, eventBlob)
eventBlob = &persistence.DataBlob{}
}
}
var pagingToken []byte
if len(rows) >= request.PageSize {
pagingToken = serializePageToken(lastNodeID)
}
return &persistence.InternalReadHistoryBranchResponse{
History: history,
NextPageToken: pagingToken,
LastNodeID: lastNodeID,
LastTransactionID: lastTxnID,
}, nil
}
// ForkHistoryBranch forks a new branch from an existing branch
// Note that application must provide a void forking nodeID, it must be a valid nodeID in that branch.
// A valid forking nodeID can be an ancestor from the existing branch.
// For example, we have branch B1 with three nodes(1[1,2], 3[3,4,5] and 6[6,7,8]. 1, 3 and 6 are nodeIDs (first eventID of the batch).
// So B1 looks like this:
//
// 1[1,2]
// /
// 3[3,4,5]
// /
// 6[6,7,8]
//
// Assuming we have branch B2 which contains one ancestor B1 stopping at 6 (exclusive). So B2 inherit nodeID 1 and 3 from B1, and have its own nodeID 6 and 8.
// Branch B2 looks like this:
//
// 1[1,2]
// /
// 3[3,4,5]
// \
// 6[6,7]
// \
// 8[8]
//
// Now we want to fork a new branch B3 from B2.
// The only valid forking nodeIDs are 3,6 or 8.
// 1 is not valid because we can't fork from first node.
// 2/4/5 is NOT valid either because they are inside a batch.
//
// Case #1: If we fork from nodeID 6, then B3 will have an ancestor B1 which stops at 6(exclusive).
// As we append a batch of events[6,7,8,9] to B3, it will look like :
//
// 1[1,2]
// /
// 3[3,4,5]
// \
// 6[6,7,8,9]
//
// Case #2: If we fork from node 8, then B3 will have two ancestors: B1 stops at 6(exclusive) and ancestor B2 stops at 8(exclusive)
// As we append a batch of events[8,9] to B3, it will look like:
//
// 1[1,2]
// /
// 3[3,4,5]
// /
// 6[6,7]
// \
// 8[8,9]
func (m *sqlHistoryStore) ForkHistoryBranch(
ctx context.Context,
request *persistence.InternalForkHistoryBranchRequest,
) (*persistence.InternalForkHistoryBranchResponse, error) {
forkB := request.ForkBranchInfo
treeID := forkB.TreeID
newAncestors := make([]*types.HistoryBranchRange, 0, len(forkB.Ancestors)+1)
beginNodeID := persistenceutils.GetBeginNodeID(forkB)
if beginNodeID >= request.ForkNodeID {
// this is the case that new branch's ancestors doesn't include the forking branch
for _, br := range forkB.Ancestors {
if br.EndNodeID >= request.ForkNodeID {
newAncestors = append(newAncestors, &types.HistoryBranchRange{
BranchID: br.BranchID,
BeginNodeID: br.BeginNodeID,
EndNodeID: request.ForkNodeID,
})
break
} else {
newAncestors = append(newAncestors, br)
}
}
} else {
// this is the case the new branch will inherit all ancestors from forking branch
newAncestors = forkB.Ancestors
newAncestors = append(newAncestors, &types.HistoryBranchRange{
BranchID: forkB.BranchID,
BeginNodeID: beginNodeID,
EndNodeID: request.ForkNodeID,
})
}
resp := &persistence.InternalForkHistoryBranchResponse{
NewBranchInfo: types.HistoryBranch{
TreeID: treeID,
BranchID: request.NewBranchID,
Ancestors: newAncestors,
}}
treeInfo := &serialization.HistoryTreeInfo{
Ancestors: newAncestors,
Info: request.Info,
CreatedTimestamp: time.Now(),
}
blob, err := m.parser.HistoryTreeInfoToBlob(treeInfo)
if err != nil {
return nil, err
}
row := &sqlplugin.HistoryTreeRow{
ShardID: request.ShardID,
TreeID: serialization.MustParseUUID(treeID),
BranchID: serialization.MustParseUUID(request.NewBranchID),
Data: blob.Data,
DataEncoding: string(blob.Encoding),
}
result, err := m.db.InsertIntoHistoryTree(ctx, row)
if err != nil {
return nil, convertCommonErrors(m.db, "ForkHistoryBranch", "", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return nil, err
}
if rowsAffected != 1 {
return nil, types.InternalServiceError{Message: fmt.Sprintf("expected 1 row to be affected for tree table, got %v", rowsAffected)}
}
return resp, nil
}
// DeleteHistoryBranch removes a branch
func (m *sqlHistoryStore) DeleteHistoryBranch(
ctx context.Context,
request *persistence.InternalDeleteHistoryBranchRequest,
) error {
branch := request.BranchInfo
treeID := branch.TreeID
brsToDelete := branch.Ancestors
beginNodeID := persistenceutils.GetBeginNodeID(branch)
brsToDelete = append(brsToDelete, &types.HistoryBranchRange{
BranchID: branch.BranchID,
BeginNodeID: beginNodeID,
})
rsp, err := m.GetHistoryTree(ctx, &persistence.InternalGetHistoryTreeRequest{
TreeID: treeID,
ShardID: common.IntPtr(request.ShardID),
})
if err != nil {
return err
}
// validBRsMaxEndNode is to for each branch range that is being used, we want to know what is the max nodeID referred by other valid branch
validBRsMaxEndNode := persistenceutils.GetBranchesMaxReferredNodeIDs(rsp.Branches)
treeUUID := serialization.MustParseUUID(treeID)
dbShardID := sqlplugin.GetDBShardIDFromTreeID(treeUUID, m.db.GetTotalNumDBShards())
return m.txExecute(ctx, dbShardID, "DeleteHistoryBranch", func(tx sqlplugin.Tx) error {
branchID := serialization.MustParseUUID(branch.BranchID)
treeFilter := &sqlplugin.HistoryTreeFilter{
TreeID: treeUUID,
BranchID: &branchID,
ShardID: request.ShardID,
}
_, err = tx.DeleteFromHistoryTree(ctx, treeFilter)
if err != nil {
return err
}
done := false
// for each branch range to delete, we iterate from bottom to up, and delete up to the point according to validBRsEndNode
for i := len(brsToDelete) - 1; i >= 0; i-- {
br := brsToDelete[i]
maxReferredEndNodeID, ok := validBRsMaxEndNode[br.BranchID]
nodeFilter := &sqlplugin.HistoryNodeFilter{
TreeID: serialization.MustParseUUID(treeID),
BranchID: serialization.MustParseUUID(br.BranchID),
ShardID: request.ShardID,
PageSize: _defaultHistoryNodeDeleteBatch,
}
if ok {
// we can only delete from the maxEndNode and stop here
nodeFilter.MinNodeID = &maxReferredEndNodeID
done = true
} else {
// No any branch is using this range, we can delete all of it
nodeFilter.MinNodeID = &br.BeginNodeID
}
for {
result, err := tx.DeleteFromHistoryNode(ctx, nodeFilter)
if err != nil {
return err
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return err
}
if rowsAffected < _defaultHistoryNodeDeleteBatch ||
rowsAffected == persistence.UnknownNumRowsAffected ||
rowsAffected > _defaultHistoryNodeDeleteBatch {
break
}
}
if done {
break
}
}
return nil
})
}
// TODO: Limit the underlying query to a specific shard at a time. See https://github.com/uber/cadence/issues/4064
func (m *sqlHistoryStore) GetAllHistoryTreeBranches(
ctx context.Context,
request *persistence.GetAllHistoryTreeBranchesRequest,
) (*persistence.GetAllHistoryTreeBranchesResponse, error) {
page := historyTreePageToken{}
if request.NextPageToken != nil {
if err := gobDeserialize(request.NextPageToken, &page); err != nil {
return nil, fmt.Errorf("unable to decode next page token")
}
} else {
page = historyTreePageToken{
ShardID: 0, // First page starting from ShardID 0, and increase if finish reading current shard
TreeID: serialization.UUID{},
BranchID: serialization.UUID{},
}
}
filter := sqlplugin.HistoryTreeFilter{
ShardID: page.ShardID,
TreeID: page.TreeID,
BranchID: &page.BranchID,
PageSize: &request.PageSize,
}
rows, err := m.db.GetAllHistoryTreeBranches(ctx, &filter)
if err == sql.ErrNoRows || (err == nil && len(rows) == 0) {
return &persistence.GetAllHistoryTreeBranchesResponse{}, nil
}
if err != nil {
return nil, convertCommonErrors(m.db, "GetAllHistoryTreeBranches", "", err)
}
resp := &persistence.GetAllHistoryTreeBranchesResponse{}
resp.Branches = make([]persistence.HistoryBranchDetail, len(rows))
for i, row := range rows {
treeInfo, err := m.parser.HistoryTreeInfoFromBlob(row.Data, row.DataEncoding)
if err != nil {
return nil, err
}
resp.Branches[i].TreeID = row.TreeID.String()
resp.Branches[i].BranchID = row.BranchID.String()
resp.Branches[i].ForkTime = treeInfo.GetCreatedTimestamp()
resp.Branches[i].Info = treeInfo.GetInfo()
}
if len(rows) >= request.PageSize {
// there could be more
lastRow := &rows[request.PageSize-1]
resp.NextPageToken, err = gobSerialize(&historyTreePageToken{
ShardID: lastRow.ShardID,
TreeID: lastRow.TreeID,
BranchID: lastRow.BranchID,
})
if err != nil {
return nil, &types.InternalServiceError{Message: fmt.Sprintf("error serializing nextPageToken:%v", err)}
}
}
// TODO: this is broken for multi-sharding: the shardID should increase if there are less rows than request pageSize,
// until loop over all shards
return resp, nil
}
// GetHistoryTree returns all branch information of a tree
func (m *sqlHistoryStore) GetHistoryTree(
ctx context.Context,
request *persistence.InternalGetHistoryTreeRequest,
) (*persistence.InternalGetHistoryTreeResponse, error) {
treeID := serialization.MustParseUUID(request.TreeID)
branches := make([]*types.HistoryBranch, 0)
treeFilter := &sqlplugin.HistoryTreeFilter{
TreeID: treeID,
ShardID: *request.ShardID,
}
rows, err := m.db.SelectFromHistoryTree(ctx, treeFilter)
if err == sql.ErrNoRows || (err == nil && len(rows) == 0) {
return &persistence.InternalGetHistoryTreeResponse{}, nil
}
if err != nil {
return nil, convertCommonErrors(m.db, "GetHistoryTree", "", err)
}
for _, row := range rows {
treeInfo, err := m.parser.HistoryTreeInfoFromBlob(row.Data, row.DataEncoding)
if err != nil {
return nil, err
}
br := &types.HistoryBranch{
TreeID: request.TreeID,
BranchID: row.BranchID.String(),
Ancestors: treeInfo.Ancestors,
}
branches = append(branches, br)
}
return &persistence.InternalGetHistoryTreeResponse{
Branches: branches,
}, nil
}