service/worker/scanner/scanner.go (201 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package scanner
import (
"context"
"fmt"
"time"
"github.com/uber-go/tally"
"go.uber.org/cadence"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/client"
"go.uber.org/cadence/worker"
"go.uber.org/zap"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/service/worker/scanner/shardscanner"
"github.com/uber/cadence/service/worker/scanner/tasklist"
"github.com/uber/cadence/service/worker/workercommon"
)
const (
// scannerStartUpDelay is to let services warm up
scannerStartUpDelay = time.Second * 4
)
type (
contextKey string
// Config defines the configuration for scanner
Config struct {
// ScannerPersistenceMaxQPS the max rate of calls to persistence
// Right now is being used by historyScanner to determine the rate of persistence API calls
ScannerPersistenceMaxQPS dynamicconfig.IntPropertyFn
// TaskListScannerEnabled indicates if taskList scanner should be started as part of scanner
TaskListScannerEnabled dynamicconfig.BoolPropertyFn
// TaskListScannerOptions contains options for TaskListScanner
TaskListScannerOptions tasklist.Options
// Persistence contains the persistence configuration
Persistence *config.Persistence
// ClusterMetadata contains the metadata for this cluster
ClusterMetadata cluster.Metadata
// HistoryScannerEnabled indicates if history scanner should be started as part of scanner
HistoryScannerEnabled dynamicconfig.BoolPropertyFn
// ShardScanners is a list of shard scanner configs
ShardScanners []*shardscanner.ScannerConfig
MaxWorkflowRetentionInDays dynamicconfig.IntPropertyFn
}
// BootstrapParams contains the set of params needed to bootstrap
// the scanner sub-system
BootstrapParams struct {
// Config contains the configuration for scanner
Config Config
// TallyScope is an instance of tally metrics scope
TallyScope tally.Scope
}
// scannerContext is the context object that get's
// passed around within the scanner workflows / activities
scannerContext struct {
resource resource.Resource
cfg Config
}
// Scanner is the background sub-system that does full scans
// of database tables to cleanup resources, monitor anomalies
// and emit stats for analytics
Scanner struct {
context scannerContext
tallyScope tally.Scope
zapLogger *zap.Logger
}
)
// New returns a new instance of scanner daemon
// Scanner is the background sub-system that does full
// scans of database tables in an attempt to cleanup
// resources, monitor system anamolies and emit stats
// for analysis and alerting
func New(
resource resource.Resource,
params *BootstrapParams,
) *Scanner {
zapLogger, err := zap.NewProduction()
if err != nil {
resource.GetLogger().Fatal("failed to initialize zap logger", tag.Error(err))
}
zapLogger.Info("Initializing new scanner")
return &Scanner{
context: scannerContext{
resource: resource,
cfg: params.Config,
},
tallyScope: params.TallyScope,
zapLogger: zapLogger.Named("scanner"),
}
}
// Start starts the scanner
func (s *Scanner) Start() error {
ctx := context.Background()
var workerTaskListNames []string
var wtl []string
for _, sc := range s.context.cfg.ShardScanners {
ctx, wtl = s.startShardScanner(ctx, sc)
workerTaskListNames = append(workerTaskListNames, wtl...)
}
if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeSQL {
if s.context.cfg.TaskListScannerEnabled() {
ctx = s.startScanner(
ctx,
tlScannerWFStartOptions,
tlScannerWFTypeName)
workerTaskListNames = append(workerTaskListNames, tlScannerTaskListName)
}
}
if s.context.cfg.HistoryScannerEnabled() {
ctx = s.startScanner(
ctx,
historyScannerWFStartOptions,
historyScannerWFTypeName)
workerTaskListNames = append(workerTaskListNames, historyScannerTaskListName)
}
workerOpts := worker.Options{
Logger: s.zapLogger,
MetricsScope: s.tallyScope,
MaxConcurrentActivityExecutionSize: maxConcurrentActivityExecutionSize,
MaxConcurrentDecisionTaskExecutionSize: maxConcurrentDecisionTaskExecutionSize,
BackgroundActivityContext: ctx,
}
for _, tl := range workerTaskListNames {
s.zapLogger.Info("Starting worker for task list", zap.String("TaskList", tl))
if err := worker.New(s.context.resource.GetSDKClient(), common.SystemLocalDomainName, tl, workerOpts).Start(); err != nil {
s.zapLogger.Error("Failed to start worker", zap.String("TaskList", tl), zap.Error(err))
return err
}
}
s.zapLogger.Info("Scanner started successfully", zap.Strings("workerTaskListNames", workerTaskListNames))
return nil
}
func (s *Scanner) startScanner(ctx context.Context, options client.StartWorkflowOptions, workflowName string) context.Context {
go workercommon.StartWorkflowWithRetry(workflowName, scannerStartUpDelay, s.context.resource, func(client client.Client) error {
return s.startWorkflow(client, options, workflowName, nil)
})
return NewScannerContext(ctx, workflowName, s.context)
}
func (s *Scanner) startShardScanner(
ctx context.Context,
config *shardscanner.ScannerConfig,
) (context.Context, []string) {
var workerTaskListNames []string
if config.DynamicParams.ScannerEnabled() {
ctx = shardscanner.NewScannerContext(
ctx,
config.ScannerWFTypeName,
shardscanner.NewShardScannerContext(s.context.resource, config),
)
go workercommon.StartWorkflowWithRetry(
config.ScannerWFTypeName,
scannerStartUpDelay,
s.context.resource,
func(client client.Client) error {
return s.startWorkflow(client, config.StartWorkflowOptions, config.ScannerWFTypeName, shardscanner.ScannerWorkflowParams{
Shards: shardscanner.Shards{
Range: &shardscanner.ShardRange{
Min: 0,
Max: s.context.cfg.Persistence.NumHistoryShards,
},
},
})
})
workerTaskListNames = append(workerTaskListNames, config.StartWorkflowOptions.TaskList)
}
if config.DynamicParams.FixerEnabled() {
ctx = shardscanner.NewFixerContext(
ctx,
config.FixerWFTypeName,
shardscanner.NewShardFixerContext(s.context.resource, config),
)
go workercommon.StartWorkflowWithRetry(
config.FixerWFTypeName,
scannerStartUpDelay,
s.context.resource,
func(client client.Client) error {
return s.startWorkflow(client, config.StartFixerOptions, config.FixerWFTypeName,
shardscanner.FixerWorkflowParams{
ScannerWorkflowWorkflowID: config.StartWorkflowOptions.ID,
})
})
workerTaskListNames = append(workerTaskListNames, config.StartFixerOptions.TaskList)
}
return ctx, workerTaskListNames
}
func (s *Scanner) startWorkflow(
client client.Client,
options client.StartWorkflowOptions,
workflowType string,
workflowArg interface{},
) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
var err error
if workflowArg != nil {
_, err = client.StartWorkflow(ctx, options, workflowType, workflowArg)
} else {
_, err = client.StartWorkflow(ctx, options, workflowType)
}
cancel()
if cadence.IsWorkflowExecutionAlreadyStartedError(err) {
s.zapLogger.Error("Workflow had already started", zap.String("workflowType", workflowType), zap.Error(err))
return nil
}
if err != nil {
s.zapLogger.Error("Failed to start workflow", zap.String("workflowType", workflowType), zap.Error(err))
} else {
s.zapLogger.Info("Workflow started", zap.String("workflowType", workflowType))
}
return err
}
// NewScannerContext provides context to be used as background activity context
// it uses typed, private key to reduce access scope
func NewScannerContext(ctx context.Context, workflowName string, scannerContext scannerContext) context.Context {
return context.WithValue(ctx, contextKey(workflowName), scannerContext)
}
// getScannerContext extracts scanner context from activity context
// it uses typed, private key to reduce access scope
func getScannerContext(ctx context.Context) (scannerContext, error) {
info := activity.GetInfo(ctx)
if info.WorkflowType == nil {
return scannerContext{}, fmt.Errorf("workflowType is nil")
}
val, ok := ctx.Value(contextKey(info.WorkflowType.Name)).(scannerContext)
if !ok {
return scannerContext{}, fmt.Errorf("context type is not %T for a key %q", val, info.WorkflowType.Name)
}
return val, nil
}