service/history/replication/task_processor.go (564 lines of code) (raw):

// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package replication import ( "context" "encoding/json" "errors" "fmt" "math" "strconv" "sync/atomic" "time" "github.com/pborman/uuid" "go.uber.org/yarpc/yarpcerrors" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/reconciliation" "github.com/uber/cadence/common/reconciliation/entity" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/engine" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" ) const ( dropSyncShardTaskTimeThreshold = 10 * time.Minute replicationTimeout = 30 * time.Second dlqErrorRetryWait = time.Second dlqMetricsEmitTimerInterval = 5 * time.Minute dlqMetricsEmitTimerCoefficient = 0.05 ) var ( // ErrUnknownReplicationTask is the error to indicate unknown replication task type ErrUnknownReplicationTask = &types.BadRequestError{Message: "unknown replication task"} ) type ( // TaskProcessor is responsible for processing replication tasks for a shard. TaskProcessor interface { common.Daemon } // taskProcessorImpl is responsible for processing replication tasks for a shard. taskProcessorImpl struct { currentCluster string sourceCluster string status int32 shard shard.Context historyEngine engine.Engine historySerializer persistence.PayloadSerializer config *config.Config metricsClient metrics.Client logger log.Logger taskExecutor TaskExecutor hostRateLimiter *quotas.DynamicRateLimiter shardRateLimiter *quotas.DynamicRateLimiter taskRetryPolicy backoff.RetryPolicy dlqRetryPolicy backoff.RetryPolicy noTaskRetrier backoff.Retrier lastProcessedMessageID int64 lastRetrievedMessageID int64 requestChan chan<- *request syncShardChan chan *types.SyncShardStatus done chan struct{} } request struct { token *types.ReplicationToken respChan chan<- *types.ReplicationMessages } ) var _ TaskProcessor = (*taskProcessorImpl)(nil) // NewTaskProcessor creates a new replication task processor. func NewTaskProcessor( shard shard.Context, historyEngine engine.Engine, config *config.Config, metricsClient metrics.Client, taskFetcher TaskFetcher, taskExecutor TaskExecutor, ) TaskProcessor { shardID := shard.GetShardID() sourceCluster := taskFetcher.GetSourceCluster() firstRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID)) firstRetryPolicy.SetMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID)) secondRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorSecondRetryWait(shardID)) secondRetryPolicy.SetMaximumInterval(config.ReplicationTaskProcessorErrorSecondRetryMaxWait(shardID)) secondRetryPolicy.SetExpirationInterval(config.ReplicationTaskProcessorErrorSecondRetryExpiration(shardID)) taskRetryPolicy := backoff.NewMultiPhasesRetryPolicy(firstRetryPolicy, secondRetryPolicy) dlqRetryPolicy := backoff.NewExponentialRetryPolicy(dlqErrorRetryWait) dlqRetryPolicy.SetExpirationInterval(backoff.NoInterval) noTaskBackoffPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorNoTaskRetryWait(shardID)) noTaskBackoffPolicy.SetBackoffCoefficient(1) noTaskBackoffPolicy.SetExpirationInterval(backoff.NoInterval) noTaskRetrier := backoff.NewRetrier(noTaskBackoffPolicy, backoff.SystemClock) return &taskProcessorImpl{ currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), sourceCluster: sourceCluster, status: common.DaemonStatusInitialized, shard: shard, historyEngine: historyEngine, historySerializer: persistence.NewPayloadSerializer(), config: config, metricsClient: metricsClient, logger: shard.GetLogger().WithTags(tag.SourceCluster(sourceCluster), tag.ShardID(shardID)), taskExecutor: taskExecutor, hostRateLimiter: taskFetcher.GetRateLimiter(), shardRateLimiter: quotas.NewDynamicRateLimiter(config.ReplicationTaskProcessorShardQPS.AsFloat64()), taskRetryPolicy: taskRetryPolicy, dlqRetryPolicy: dlqRetryPolicy, noTaskRetrier: noTaskRetrier, requestChan: taskFetcher.GetRequestChan(), syncShardChan: make(chan *types.SyncShardStatus, 1), done: make(chan struct{}), lastProcessedMessageID: common.EmptyMessageID, lastRetrievedMessageID: common.EmptyMessageID, } } // Start starts the processor func (p *taskProcessorImpl) Start() { if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } go p.processorLoop() go p.syncShardStatusLoop() go p.cleanupReplicationTaskLoop() p.logger.Info("ReplicationTaskProcessor started.") } // Stop stops the processor func (p *taskProcessorImpl) Stop() { if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } p.logger.Debug("ReplicationTaskProcessor shutting down.") close(p.done) } func (p *taskProcessorImpl) processorLoop() { defer func() { p.logger.Debug("Closing replication task processor.", tag.ReadLevel(p.lastRetrievedMessageID)) }() Loop: for { // for each iteration, do close check first select { case <-p.done: p.logger.Debug("ReplicationTaskProcessor shutting down.") return default: } respChan := p.sendFetchMessageRequest() select { case response, ok := <-respChan: if !ok { p.logger.Debug("Fetch replication messages chan closed.") continue Loop } p.logger.Debug("Got fetch replication messages response.", tag.ReadLevel(response.GetLastRetrievedMessageID()), tag.Bool(response.GetHasMore()), tag.Counter(len(response.GetReplicationTasks())), ) p.taskProcessingStartWait() p.processResponse(response) case <-p.done: return } } } func (p *taskProcessorImpl) cleanupReplicationTaskLoop() { shardID := p.shard.GetShardID() timer := time.NewTimer(backoff.JitDuration( p.config.ReplicationTaskProcessorCleanupInterval(shardID), p.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID), )) for { select { case <-p.done: timer.Stop() return case <-timer.C: if err := p.cleanupAckedReplicationTasks(); err != nil { p.logger.Error("Failed to clean up replication messages.", tag.Error(err)) p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupFailure) } timer.Reset(backoff.JitDuration( p.config.ReplicationTaskProcessorCleanupInterval(shardID), p.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID), )) } } } func (p *taskProcessorImpl) cleanupAckedReplicationTasks() error { minAckLevel := int64(math.MaxInt64) for clusterName := range p.shard.GetClusterMetadata().GetRemoteClusterInfo() { ackLevel := p.shard.GetClusterReplicationLevel(clusterName) if ackLevel < minAckLevel { minAckLevel = ackLevel } } p.logger.Debug("Cleaning up replication task queue.", tag.ReadLevel(minAckLevel)) p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupCount) p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope, metrics.TargetClusterTag(p.currentCluster), ).RecordTimer( metrics.ReplicationTasksLag, time.Duration(p.shard.GetTransferMaxReadLevel()-minAckLevel), ) for { pageSize := p.config.ReplicatorTaskDeleteBatchSize() resp, err := p.shard.GetExecutionManager().RangeCompleteReplicationTask( context.Background(), &persistence.RangeCompleteReplicationTaskRequest{ InclusiveEndTaskID: minAckLevel, PageSize: pageSize, // pageSize may or may not be honored }, ) if err != nil { return err } if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, pageSize) { break } } return nil } func (p *taskProcessorImpl) sendFetchMessageRequest() <-chan *types.ReplicationMessages { respChan := make(chan *types.ReplicationMessages, 1) // TODO: when we support prefetching, LastRetrievedMessageID can be different than LastProcessedMessageID p.requestChan <- &request{ token: &types.ReplicationToken{ ShardID: int32(p.shard.GetShardID()), LastRetrievedMessageID: p.lastRetrievedMessageID, LastProcessedMessageID: p.lastProcessedMessageID, }, respChan: respChan, } return respChan } func (p *taskProcessorImpl) processResponse(response *types.ReplicationMessages) { select { case p.syncShardChan <- response.GetSyncShardStatus(): default: } scope := p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope, metrics.TargetClusterTag(p.sourceCluster)) batchRequestStartTime := time.Now() ctx := context.Background() for _, replicationTask := range response.ReplicationTasks { // TODO: move to MultiStageRateLimiter _ = p.hostRateLimiter.Wait(ctx) _ = p.shardRateLimiter.Wait(ctx) err := p.processSingleTask(replicationTask) if err != nil { // Encounter error and skip updating ack levels return } } // Note here we check replication tasks instead of hasMore. The expectation is that in a steady state // we will receive replication tasks but hasMore is false (meaning that we are always catching up). // So hasMore might not be a good indicator for additional wait. if len(response.ReplicationTasks) == 0 { backoffDuration := p.noTaskRetrier.NextBackOff() time.Sleep(backoffDuration) } else { scope.RecordTimer(metrics.ReplicationTasksAppliedLatency, time.Since(batchRequestStartTime)) } p.lastProcessedMessageID = response.GetLastRetrievedMessageID() p.lastRetrievedMessageID = response.GetLastRetrievedMessageID() scope.UpdateGauge(metrics.LastRetrievedMessageID, float64(p.lastRetrievedMessageID)) p.noTaskRetrier.Reset() } func (p *taskProcessorImpl) syncShardStatusLoop() { timer := time.NewTimer(backoff.JitDuration( p.config.ShardSyncMinInterval(), p.config.ShardSyncTimerJitterCoefficient(), )) var syncShardTask *types.SyncShardStatus for { select { case syncShardRequest := <-p.syncShardChan: syncShardTask = syncShardRequest case <-timer.C: if err := p.handleSyncShardStatus( syncShardTask, ); err != nil { p.logger.Error("failed to sync shard status", tag.Error(err)) p.metricsClient.Scope(metrics.HistorySyncShardStatusScope).IncCounter(metrics.SyncShardFromRemoteFailure) } timer.Reset(backoff.JitDuration( p.config.ShardSyncMinInterval(), p.config.ShardSyncTimerJitterCoefficient(), )) case <-p.done: timer.Stop() return } } } func (p *taskProcessorImpl) handleSyncShardStatus( status *types.SyncShardStatus, ) error { if status == nil || p.shard.GetTimeSource().Now().Sub( time.Unix(0, status.GetTimestamp())) > dropSyncShardTaskTimeThreshold { return nil } p.metricsClient.Scope(metrics.HistorySyncShardStatusScope).IncCounter(metrics.SyncShardFromRemoteCounter) ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout) defer cancel() return p.historyEngine.SyncShardStatus(ctx, &types.SyncShardStatusRequest{ SourceCluster: p.sourceCluster, ShardID: int64(p.shard.GetShardID()), Timestamp: status.Timestamp, }) } func (p *taskProcessorImpl) processSingleTask(replicationTask *types.ReplicationTask) error { retryTransientError := func() error { throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(p.taskRetryPolicy), backoff.WithRetryableError(isTransientRetryableError), ) return throttleRetry.Do(context.Background(), func() error { select { case <-p.done: // if the processor is stopping, skip the task // the ack level will not update and the new shard owner will retry the task. return nil default: return p.processTaskOnce(replicationTask) } }) } // Handle service busy error throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(common.CreateReplicationServiceBusyRetryPolicy()), backoff.WithRetryableError(common.IsServiceBusyError), ) err := throttleRetry.Do(context.Background(), retryTransientError) switch { case err == nil: return nil case common.IsServiceBusyError(err): return err case err == execution.ErrMissingVersionHistories: // skip the workflow without version histories p.logger.Warn("Encounter workflow withour version histories") return nil default: // handle error } // handle error to DLQ select { case <-p.done: p.logger.Warn("Skip adding new messages to DLQ.", tag.Error(err)) return err default: request, err := p.generateDLQRequest(replicationTask) if err != nil { p.logger.Error("Failed to generate DLQ replication task.", tag.Error(err)) // We cannot deserialize the task. Dropping it. return nil } p.logger.Error("Failed to apply replication task after retry. Putting task into DLQ.", tag.WorkflowDomainID(request.TaskInfo.GetDomainID()), tag.WorkflowID(request.TaskInfo.GetWorkflowID()), tag.WorkflowRunID(request.TaskInfo.GetRunID()), tag.TaskID(request.TaskInfo.GetTaskID()), tag.TaskType(request.TaskInfo.GetTaskType()), tag.Error(err), ) // TODO: uncomment this when the execution fixer workflow is ready // if err = p.triggerDataInconsistencyScan(replicationTask); err != nil { // p.logger.Warn("Failed to trigger data scan", tag.Error(err)) // p.metricsClient.IncCounter(metrics.ReplicationDLQStatsScope, metrics.ReplicationDLQValidationFailed) // } return p.putReplicationTaskToDLQ(request) } } func (p *taskProcessorImpl) processTaskOnce(replicationTask *types.ReplicationTask) error { ts := p.shard.GetTimeSource() startTime := ts.Now() scope, err := p.taskExecutor.execute( replicationTask, false) if err != nil { p.updateFailureMetric(scope, err) } else { now := ts.Now() mScope := p.metricsClient.Scope(scope, metrics.TargetClusterTag(p.sourceCluster)) domainID := replicationTask.HistoryTaskV2Attributes.GetDomainID() if domainID != "" { domainName, errorDomainName := p.shard.GetDomainCache().GetDomainName(domainID) if errorDomainName != nil { return errorDomainName } mScope = mScope.Tagged(metrics.DomainTag(domainName)) } // emit the number of replication tasks mScope.IncCounter(metrics.ReplicationTasksAppliedPerDomain) // emit single task processing latency mScope.RecordTimer(metrics.TaskProcessingLatency, now.Sub(startTime)) // emit latency from task generated to task received mScope.RecordTimer( metrics.ReplicationTaskLatency, now.Sub(time.Unix(0, replicationTask.GetCreationTime())), ) } return err } func (p *taskProcessorImpl) putReplicationTaskToDLQ(request *persistence.PutReplicationTaskToDLQRequest) error { p.metricsClient.Scope( metrics.ReplicationDLQStatsScope, metrics.TargetClusterTag(p.sourceCluster), metrics.InstanceTag(strconv.Itoa(p.shard.GetShardID())), ).UpdateGauge( metrics.ReplicationDLQMaxLevelGauge, float64(request.TaskInfo.GetTaskID()), ) throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(p.dlqRetryPolicy), backoff.WithRetryableError(p.shouldRetryDLQ), ) // The following is guaranteed to success or retry forever until processor is shutdown. return throttleRetry.Do(context.Background(), func() error { err := p.shard.GetExecutionManager().PutReplicationTaskToDLQ(context.Background(), request) if err != nil { p.logger.Error("Failed to put replication task to DLQ.", tag.Error(err)) p.metricsClient.IncCounter(metrics.ReplicationTaskFetcherScope, metrics.ReplicationDLQFailed) } return err }) } func (p *taskProcessorImpl) generateDLQRequest( replicationTask *types.ReplicationTask, ) (*persistence.PutReplicationTaskToDLQRequest, error) { switch *replicationTask.TaskType { case types.ReplicationTaskTypeSyncActivity: taskAttributes := replicationTask.GetSyncActivityTaskAttributes() domainName, err := p.shard.GetDomainCache().GetDomainName(taskAttributes.GetDomainID()) if err != nil { return nil, err } return &persistence.PutReplicationTaskToDLQRequest{ SourceClusterName: p.sourceCluster, TaskInfo: &persistence.ReplicationTaskInfo{ DomainID: taskAttributes.GetDomainID(), WorkflowID: taskAttributes.GetWorkflowID(), RunID: taskAttributes.GetRunID(), TaskID: replicationTask.GetSourceTaskID(), TaskType: persistence.ReplicationTaskTypeSyncActivity, ScheduledID: taskAttributes.GetScheduledID(), }, DomainName: domainName, }, nil case types.ReplicationTaskTypeHistoryV2: taskAttributes := replicationTask.GetHistoryTaskV2Attributes() domainName, err := p.shard.GetDomainCache().GetDomainName(taskAttributes.GetDomainID()) if err != nil { return nil, err } eventsDataBlob := persistence.NewDataBlobFromInternal(taskAttributes.GetEvents()) events, err := p.historySerializer.DeserializeBatchEvents(eventsDataBlob) if err != nil { return nil, err } if len(events) == 0 { p.logger.Error("Empty events in a batch") return nil, fmt.Errorf("corrupted history event batch, empty events") } return &persistence.PutReplicationTaskToDLQRequest{ SourceClusterName: p.sourceCluster, TaskInfo: &persistence.ReplicationTaskInfo{ DomainID: taskAttributes.GetDomainID(), WorkflowID: taskAttributes.GetWorkflowID(), RunID: taskAttributes.GetRunID(), TaskID: replicationTask.GetSourceTaskID(), TaskType: persistence.ReplicationTaskTypeHistory, FirstEventID: events[0].ID, NextEventID: events[len(events)-1].ID + 1, Version: events[0].Version, }, DomainName: domainName, }, nil default: return nil, fmt.Errorf("unknown replication task type") } } func (p *taskProcessorImpl) triggerDataInconsistencyScan(replicationTask *types.ReplicationTask) error { var failoverVersion int64 var domainID string var workflowID string var runID string switch { case replicationTask.GetHistoryTaskV2Attributes() != nil: attr := replicationTask.GetHistoryTaskV2Attributes() versionHistoryItems := attr.GetVersionHistoryItems() if len(versionHistoryItems) == 0 { return errors.New("failed to trigger data scan due to invalid version history") } // version history items in same batch should be the same failoverVersion = versionHistoryItems[0].GetVersion() domainID = attr.GetDomainID() workflowID = attr.GetWorkflowID() runID = attr.GetRunID() case replicationTask.GetSyncActivityTaskAttributes() != nil: attr := replicationTask.GetSyncActivityTaskAttributes() failoverVersion = replicationTask.GetSyncActivityTaskAttributes().Version domainID = attr.GetDomainID() workflowID = attr.GetWorkflowID() runID = attr.GetRunID() default: return nil } clusterName, err := p.shard.GetClusterMetadata().ClusterNameForFailoverVersion(failoverVersion) if err != nil { return err } client := p.shard.GetService().GetClientBean().GetRemoteFrontendClient(clusterName) fixExecution := entity.Execution{ DomainID: domainID, WorkflowID: workflowID, RunID: runID, ShardID: p.shard.GetShardID(), } fixExecutionInput, err := json.Marshal(fixExecution) if err != nil { return err } // Assume the workflow is corrupted, rely on invariant to validate it _, err = client.SignalWithStartWorkflowExecution(context.Background(), &types.SignalWithStartWorkflowExecutionRequest{ Domain: common.SystemLocalDomainName, WorkflowID: reconciliation.CheckDataCorruptionWorkflowID, WorkflowType: &types.WorkflowType{Name: reconciliation.CheckDataCorruptionWorkflowType}, TaskList: &types.TaskList{Name: reconciliation.CheckDataCorruptionWorkflowTaskList}, ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(reconciliation.CheckDataCorruptionWorkflowTimeoutInSeconds), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(reconciliation.CheckDataCorruptionWorkflowTaskTimeoutInSeconds), Identity: "cadence-history-replication", RequestID: uuid.New(), WorkflowIDReusePolicy: types.WorkflowIDReusePolicyAllowDuplicate.Ptr(), SignalName: reconciliation.CheckDataCorruptionWorkflowSignalName, SignalInput: fixExecutionInput, }) return err } func isTransientRetryableError(err error) bool { switch err.(type) { case *types.BadRequestError: return false case *types.ServiceBusyError: return false default: return true } } func (p *taskProcessorImpl) shouldRetryDLQ(err error) bool { if err == nil { return false } select { case <-p.done: p.logger.Debug("ReplicationTaskProcessor shutting down.") return false default: return true } } func (p *taskProcessorImpl) updateFailureMetric(scope int, err error) { // Always update failure counter for all replicator errors p.metricsClient.IncCounter(scope, metrics.ReplicatorFailures) // Also update counter to distinguish between type of failures switch err := err.(type) { case *types.ShardOwnershipLostError: p.metricsClient.IncCounter(scope, metrics.CadenceErrShardOwnershipLostCounter) case *types.BadRequestError: p.metricsClient.IncCounter(scope, metrics.CadenceErrBadRequestCounter) case *types.DomainNotActiveError: p.metricsClient.IncCounter(scope, metrics.CadenceErrDomainNotActiveCounter) case *types.WorkflowExecutionAlreadyStartedError: p.metricsClient.IncCounter(scope, metrics.CadenceErrExecutionAlreadyStartedCounter) case *types.EntityNotExistsError: p.metricsClient.IncCounter(scope, metrics.CadenceErrEntityNotExistsCounter) case *types.WorkflowExecutionAlreadyCompletedError: p.metricsClient.IncCounter(scope, metrics.CadenceErrWorkflowExecutionAlreadyCompletedCounter) case *types.LimitExceededError: p.metricsClient.IncCounter(scope, metrics.CadenceErrLimitExceededCounter) case *yarpcerrors.Status: if err.Code() == yarpcerrors.CodeDeadlineExceeded { p.metricsClient.IncCounter(scope, metrics.CadenceErrContextTimeoutCounter) } } } func (p *taskProcessorImpl) taskProcessingStartWait() { shardID := p.shard.GetShardID() time.Sleep(backoff.JitDuration( p.config.ReplicationTaskProcessorStartWait(shardID), p.config.ReplicationTaskProcessorStartWaitJitterCoefficient(shardID), )) }