common/persistence/nosql/nosql_history_store.go (308 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package nosql
import (
"context"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/types"
)
type nosqlHistoryStore struct {
*shardedNosqlStore
}
// newNoSQLHistoryStore is used to create an instance of HistoryStore implementation
func newNoSQLHistoryStore(
cfg config.ShardedNoSQL,
logger log.Logger,
dc *persistence.DynamicConfiguration,
) (persistence.HistoryStore, error) {
s, err := newShardedNosqlStore(cfg, logger, dc)
if err != nil {
return nil, err
}
return &nosqlHistoryStore{
shardedNosqlStore: s,
}, nil
}
// AppendHistoryNodes upsert a batch of events as a single node to a history branch
// Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID
func (h *nosqlHistoryStore) AppendHistoryNodes(
ctx context.Context,
request *persistence.InternalAppendHistoryNodesRequest,
) error {
branchInfo := request.BranchInfo
beginNodeID := persistenceutils.GetBeginNodeID(branchInfo)
if request.NodeID < beginNodeID {
return &persistence.InvalidPersistenceRequestError{
Msg: "cannot append to ancestors' nodes",
}
}
var treeRow *nosqlplugin.HistoryTreeRow
if request.IsNewBranch {
var ancestors []*types.HistoryBranchRange
ancestors = append(ancestors, branchInfo.Ancestors...)
treeRow = &nosqlplugin.HistoryTreeRow{
ShardID: request.ShardID,
TreeID: branchInfo.TreeID,
BranchID: branchInfo.BranchID,
Ancestors: ancestors,
CreateTimestamp: time.Now(),
Info: request.Info,
}
}
nodeRow := &nosqlplugin.HistoryNodeRow{
TreeID: branchInfo.TreeID,
BranchID: branchInfo.BranchID,
NodeID: request.NodeID,
TxnID: &request.TransactionID,
Data: request.Events.Data,
DataEncoding: string(request.Events.Encoding),
ShardID: request.ShardID,
}
storeShard, err := h.GetStoreShardByHistoryShard(request.ShardID)
if err != nil {
return err
}
err = storeShard.db.InsertIntoHistoryTreeAndNode(ctx, treeRow, nodeRow)
if err != nil {
return convertCommonErrors(storeShard.db, "AppendHistoryNodes", err)
}
return nil
}
// ReadHistoryBranch returns history node data for a branch
// NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator
func (h *nosqlHistoryStore) ReadHistoryBranch(
ctx context.Context,
request *persistence.InternalReadHistoryBranchRequest,
) (*persistence.InternalReadHistoryBranchResponse, error) {
filter := &nosqlplugin.HistoryNodeFilter{
ShardID: request.ShardID,
TreeID: request.TreeID,
BranchID: request.BranchID,
MinNodeID: request.MinNodeID,
MaxNodeID: request.MaxNodeID,
NextPageToken: request.NextPageToken,
PageSize: request.PageSize,
}
storeShard, err := h.GetStoreShardByHistoryShard(request.ShardID)
if err != nil {
return nil, err
}
rows, pagingToken, err := storeShard.db.SelectFromHistoryNode(ctx, filter)
if err != nil {
return nil, convertCommonErrors(storeShard.db, "SelectFromHistoryNode", err)
}
history := make([]*persistence.DataBlob, 0, int(request.PageSize))
eventBlob := &persistence.DataBlob{}
nodeID := int64(0)
txnID := int64(0)
lastNodeID := request.LastNodeID
lastTxnID := request.LastTransactionID
for _, row := range rows {
nodeID = row.NodeID
txnID = *row.TxnID
eventBlob.Data = row.Data
eventBlob.Encoding = common.EncodingType(row.DataEncoding)
if txnID < lastTxnID {
// assuming that business logic layer is correct and transaction ID only increase
// thus, valid event batch will come with increasing transaction ID
// event batches with smaller node ID
// -> should not be possible since records are already sorted
// event batches with same node ID
// -> batch with higher transaction ID is valid
// event batches with larger node ID
// -> batch with lower transaction ID is invalid (happens before)
// -> batch with higher transaction ID is valid
continue
}
switch {
case nodeID < lastNodeID:
return nil, &types.InternalDataInconsistencyError{
Message: "corrupted data, nodeID cannot decrease",
}
case nodeID == lastNodeID:
return nil, &types.InternalDataInconsistencyError{
Message: "corrupted data, same nodeID must have smaller txnID",
}
default: // row.NodeID > lastNodeID:
// NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first
lastTxnID = txnID
lastNodeID = nodeID
history = append(history, eventBlob)
eventBlob = &persistence.DataBlob{}
}
}
return &persistence.InternalReadHistoryBranchResponse{
History: history,
NextPageToken: pagingToken,
LastNodeID: lastNodeID,
LastTransactionID: lastTxnID,
}, nil
}
// ForkHistoryBranch forks a new branch from an existing branch
// Note that application must provide a void forking nodeID, it must be a valid nodeID in that branch.
// A valid forking nodeID can be an ancestor from the existing branch.
// For example, we have branch B1 with three nodes(1[1,2], 3[3,4,5] and 6[6,7,8]. 1, 3 and 6 are nodeIDs (first eventID of the batch).
// So B1 looks like this:
//
// 1[1,2]
// /
// 3[3,4,5]
// /
// 6[6,7,8]
//
// Assuming we have branch B2 which contains one ancestor B1 stopping at 6 (exclusive). So B2 inherit nodeID 1 and 3 from B1, and have its own nodeID 6 and 8.
// Branch B2 looks like this:
//
// 1[1,2]
// /
// 3[3,4,5]
// \
// 6[6,7]
// \
// 8[8]
//
// Now we want to fork a new branch B3 from B2.
// The only valid forking nodeIDs are 3,6 or 8.
// 1 is not valid because we can't fork from first node.
// 2/4/5 is NOT valid either because they are inside a batch.
//
// Case #1: If we fork from nodeID 6, then B3 will have an ancestor B1 which stops at 6(exclusive).
// As we append a batch of events[6,7,8,9] to B3, it will look like :
//
// 1[1,2]
// /
// 3[3,4,5]
// \
// 6[6,7,8,9]
//
// Case #2: If we fork from node 8, then B3 will have two ancestors: B1 stops at 6(exclusive) and ancestor B2 stops at 8(exclusive)
// As we append a batch of events[8,9] to B3, it will look like:
//
// 1[1,2]
// /
// 3[3,4,5]
// /
// 6[6,7]
// \
// 8[8,9]
func (h *nosqlHistoryStore) ForkHistoryBranch(
ctx context.Context,
request *persistence.InternalForkHistoryBranchRequest,
) (*persistence.InternalForkHistoryBranchResponse, error) {
forkB := request.ForkBranchInfo
treeID := forkB.TreeID
newAncestors := make([]*types.HistoryBranchRange, 0, len(forkB.Ancestors)+1)
beginNodeID := persistenceutils.GetBeginNodeID(forkB)
if beginNodeID >= request.ForkNodeID {
// this is the case that new branch's ancestors doesn't include the forking branch
for _, br := range forkB.Ancestors {
if br.EndNodeID >= request.ForkNodeID {
newAncestors = append(newAncestors, &types.HistoryBranchRange{
BranchID: br.BranchID,
BeginNodeID: br.BeginNodeID,
EndNodeID: request.ForkNodeID,
})
break
} else {
newAncestors = append(newAncestors, br)
}
}
} else {
// this is the case the new branch will inherit all ancestors from forking branch
newAncestors = forkB.Ancestors
newAncestors = append(newAncestors, &types.HistoryBranchRange{
BranchID: forkB.BranchID,
BeginNodeID: beginNodeID,
EndNodeID: request.ForkNodeID,
})
}
resp := &persistence.InternalForkHistoryBranchResponse{
NewBranchInfo: types.HistoryBranch{
TreeID: treeID,
BranchID: request.NewBranchID,
Ancestors: newAncestors,
}}
var ancestors []*types.HistoryBranchRange
for _, an := range newAncestors {
anc := &types.HistoryBranchRange{
BranchID: an.BranchID,
EndNodeID: an.EndNodeID,
}
ancestors = append(ancestors, anc)
}
treeRow := &nosqlplugin.HistoryTreeRow{
ShardID: request.ShardID,
TreeID: treeID,
BranchID: request.NewBranchID,
Ancestors: ancestors,
CreateTimestamp: time.Now(),
Info: request.Info,
}
storeShard, err := h.GetStoreShardByHistoryShard(request.ShardID)
if err != nil {
return nil, err
}
err = storeShard.db.InsertIntoHistoryTreeAndNode(ctx, treeRow, nil)
if err != nil {
return nil, convertCommonErrors(storeShard.db, "ForkHistoryBranch", err)
}
return resp, nil
}
// DeleteHistoryBranch removes a branch
func (h *nosqlHistoryStore) DeleteHistoryBranch(
ctx context.Context,
request *persistence.InternalDeleteHistoryBranchRequest,
) error {
branch := request.BranchInfo
treeID := branch.TreeID
brsToDelete := branch.Ancestors
beginNodeID := persistenceutils.GetBeginNodeID(branch)
brsToDelete = append(brsToDelete, &types.HistoryBranchRange{
BranchID: branch.BranchID,
BeginNodeID: beginNodeID,
})
rsp, err := h.GetHistoryTree(ctx, &persistence.InternalGetHistoryTreeRequest{
TreeID: treeID,
ShardID: &request.ShardID,
})
if err != nil {
return err
}
treeFilter := &nosqlplugin.HistoryTreeFilter{
ShardID: request.ShardID,
TreeID: treeID,
BranchID: &branch.BranchID,
}
var nodeFilters []*nosqlplugin.HistoryNodeFilter
// validBRsMaxEndNode is to know each branch range that is being used, we want to know what is the max nodeID referred by other valid branch
validBRsMaxEndNode := persistenceutils.GetBranchesMaxReferredNodeIDs(rsp.Branches)
// for each branch range to delete, we iterate from bottom to up, and delete up to the point according to validBRsEndNode
for i := len(brsToDelete) - 1; i >= 0; i-- {
br := brsToDelete[i]
maxReferredEndNodeID, ok := validBRsMaxEndNode[br.BranchID]
if ok {
// we can only delete from the maxEndNode and stop here
nodeFilter := &nosqlplugin.HistoryNodeFilter{
ShardID: request.ShardID,
TreeID: treeID,
BranchID: br.BranchID,
MinNodeID: maxReferredEndNodeID,
}
nodeFilters = append(nodeFilters, nodeFilter)
break
} else {
// No any branch is using this range, we can delete all of it
nodeFilter := &nosqlplugin.HistoryNodeFilter{
ShardID: request.ShardID,
TreeID: treeID,
BranchID: br.BranchID,
MinNodeID: br.BeginNodeID,
}
nodeFilters = append(nodeFilters, nodeFilter)
}
}
storeShard, err := h.GetStoreShardByHistoryShard(request.ShardID)
if err != nil {
return err
}
err = storeShard.db.DeleteFromHistoryTreeAndNode(ctx, treeFilter, nodeFilters)
if err != nil {
return convertCommonErrors(storeShard.db, "DeleteHistoryBranch", err)
}
return nil
}
func (h *nosqlHistoryStore) GetAllHistoryTreeBranches(
ctx context.Context,
request *persistence.GetAllHistoryTreeBranchesRequest,
) (*persistence.GetAllHistoryTreeBranchesResponse, error) {
if h.shardingPolicy.hasShardedHistory {
return nil, &types.InternalServiceError{
Message: "SelectAllHistoryTrees is not supported on sharded nosql db",
}
}
storeShard := h.GetDefaultShard()
dbBranches, pagingToken, err := storeShard.db.SelectAllHistoryTrees(ctx, request.NextPageToken, request.PageSize)
if err != nil {
return nil, convertCommonErrors(storeShard.db, "SelectAllHistoryTrees", err)
}
branchDetails := make([]persistence.HistoryBranchDetail, 0, int(request.PageSize))
for _, branch := range dbBranches {
branchDetail := persistence.HistoryBranchDetail{
TreeID: branch.TreeID,
BranchID: branch.BranchID,
ForkTime: branch.CreateTimestamp,
Info: branch.Info,
}
branchDetails = append(branchDetails, branchDetail)
}
response := &persistence.GetAllHistoryTreeBranchesResponse{
Branches: branchDetails,
NextPageToken: pagingToken,
}
return response, nil
}
// GetHistoryTree returns all branch information of a tree
func (h *nosqlHistoryStore) GetHistoryTree(
ctx context.Context,
request *persistence.InternalGetHistoryTreeRequest,
) (*persistence.InternalGetHistoryTreeResponse, error) {
treeID := request.TreeID
storeShard, err := h.GetStoreShardByHistoryShard(*request.ShardID)
if err != nil {
return nil, err
}
dbBranches, err := storeShard.db.SelectFromHistoryTree(ctx,
&nosqlplugin.HistoryTreeFilter{
ShardID: *request.ShardID,
TreeID: treeID,
})
if err != nil {
return nil, convertCommonErrors(storeShard.db, "SelectFromHistoryTree", err)
}
branches := make([]*types.HistoryBranch, 0)
for _, dbBr := range dbBranches {
br := &types.HistoryBranch{
TreeID: treeID,
BranchID: dbBr.BranchID,
Ancestors: dbBr.Ancestors,
}
branches = append(branches, br)
}
return &persistence.InternalGetHistoryTreeResponse{
Branches: branches,
}, nil
}