service/history/replication/task_ack_manager.go (94 lines of code) (raw):
// The MIT License (MIT)
//
// Copyright (c) 2017-2022 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package replication
import (
"context"
"strconv"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)
type (
// TaskAckManager is the ack manager for replication tasks
TaskAckManager struct {
ackLevels ackLevelStore
scope metrics.Scope
logger log.Logger
reader taskReader
store *TaskStore
}
ackLevelStore interface {
GetTransferMaxReadLevel() int64
GetClusterReplicationLevel(cluster string) int64
UpdateClusterReplicationLevel(cluster string, lastTaskID int64) error
}
taskReader interface {
Read(ctx context.Context, readLevel int64, maxReadLevel int64) ([]*persistence.ReplicationTaskInfo, bool, error)
}
)
// NewTaskAckManager initializes a new replication task ack manager
func NewTaskAckManager(
shardID int,
ackLevels ackLevelStore,
metricsClient metrics.Client,
logger log.Logger,
reader taskReader,
store *TaskStore,
) TaskAckManager {
return TaskAckManager{
ackLevels: ackLevels,
scope: metricsClient.Scope(
metrics.ReplicatorQueueProcessorScope,
metrics.InstanceTag(strconv.Itoa(shardID)),
),
logger: logger.WithTags(tag.ComponentReplicationAckManager),
reader: reader,
store: store,
}
}
func (t *TaskAckManager) GetTasks(ctx context.Context, pollingCluster string, lastReadTaskID int64) (*types.ReplicationMessages, error) {
if lastReadTaskID == common.EmptyMessageID {
lastReadTaskID = t.ackLevels.GetClusterReplicationLevel(pollingCluster)
}
taskGeneratedTimer := t.scope.StartTimer(metrics.TaskLatency)
tasks, hasMore, err := t.reader.Read(ctx, lastReadTaskID, t.ackLevels.GetTransferMaxReadLevel())
if err != nil {
return nil, err
}
var replicationTasks []*types.ReplicationTask
readLevel := lastReadTaskID
TaskInfoLoop:
for _, task := range tasks {
replicationTask, err := t.store.Get(ctx, pollingCluster, *task)
switch err.(type) {
case nil:
// No action
case *types.BadRequestError, *types.InternalDataInconsistencyError, *types.EntityNotExistsError:
t.logger.Warn("Failed to get replication task.", tag.Error(err))
default:
t.logger.Error("Failed to get replication task. Return what we have so far.", tag.Error(err))
hasMore = true
break TaskInfoLoop
}
readLevel = task.TaskID
if replicationTask != nil {
replicationTasks = append(replicationTasks, replicationTask)
}
}
taskGeneratedTimer.Stop()
t.scope.RecordTimer(metrics.ReplicationTasksLag, time.Duration(t.ackLevels.GetTransferMaxReadLevel()-readLevel))
t.scope.RecordTimer(metrics.ReplicationTasksFetched, time.Duration(len(tasks)))
t.scope.RecordTimer(metrics.ReplicationTasksReturned, time.Duration(len(replicationTasks)))
t.scope.RecordTimer(metrics.ReplicationTasksReturnedDiff, time.Duration(len(tasks)-len(replicationTasks)))
if err := t.ackLevels.UpdateClusterReplicationLevel(pollingCluster, lastReadTaskID); err != nil {
t.logger.Error("error updating replication level for shard", tag.Error(err), tag.OperationFailed)
}
if err := t.store.Ack(pollingCluster, lastReadTaskID); err != nil {
t.logger.Error("error updating replication level for hydrated task store", tag.Error(err), tag.OperationFailed)
}
t.logger.Debug("Get replication tasks", tag.SourceCluster(pollingCluster), tag.ShardReplicationAck(lastReadTaskID), tag.ReadLevel(readLevel))
return &types.ReplicationMessages{
ReplicationTasks: replicationTasks,
HasMore: hasMore,
LastRetrievedMessageID: readLevel,
}, nil
}