common/persistence/wrappers/sampled/visibility_manager.go (238 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package sampled import ( "context" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) const ( // To sample visibility request, open has only 1 bucket, closed has 2 numOfPriorityForOpen = 1 numOfPriorityForClosed = 2 numOfPriorityForList = 1 ) // errPersistenceLimitExceededForList is the error indicating QPS limit reached for list visibility. var errPersistenceLimitExceededForList = &types.ServiceBusyError{Message: "Persistence Max QPS Reached for List Operations."} type visibilityManager struct { rateLimitersForOpen RateLimiterFactory rateLimitersForClosed RateLimiterFactory rateLimitersForList RateLimiterFactory persistence persistence.VisibilityManager metricClient metrics.Client logger log.Logger } type ( // Config is config for visibility Config struct { VisibilityOpenMaxQPS dynamicconfig.IntPropertyFnWithDomainFilter `yaml:"-" json:"-"` // VisibilityClosedMaxQPS max QPS for record closed workflows VisibilityClosedMaxQPS dynamicconfig.IntPropertyFnWithDomainFilter `yaml:"-" json:"-"` // VisibilityListMaxQPS max QPS for list workflow VisibilityListMaxQPS dynamicconfig.IntPropertyFnWithDomainFilter `yaml:"-" json:"-"` } ) type Params struct { Config *Config MetricClient metrics.Client Logger log.Logger TimeSource clock.TimeSource RateLimiterFactoryFunc RateLimiterFactoryFunc } // NewVisibilityManager creates a client to manage visibility with sampling // For write requests, it will do sampling which will lose some records // For read requests, it will do sampling which will return service busy errors. // Note that this is different from NewVisibilityPersistenceRateLimitedClient which is overlapping with the read processing. func NewVisibilityManager(persistence persistence.VisibilityManager, p Params) persistence.VisibilityManager { return &visibilityManager{ persistence: persistence, rateLimitersForOpen: p.RateLimiterFactoryFunc(p.TimeSource, numOfPriorityForOpen, p.Config.VisibilityOpenMaxQPS), rateLimitersForClosed: p.RateLimiterFactoryFunc(p.TimeSource, numOfPriorityForClosed, p.Config.VisibilityClosedMaxQPS), rateLimitersForList: p.RateLimiterFactoryFunc(p.TimeSource, numOfPriorityForList, p.Config.VisibilityListMaxQPS), metricClient: p.MetricClient, logger: p.Logger, } } func (p *visibilityManager) RecordWorkflowExecutionStarted( ctx context.Context, request *persistence.RecordWorkflowExecutionStartedRequest, ) error { domain := request.Domain domainID := request.DomainUUID rateLimiter := p.rateLimitersForOpen.GetRateLimiter(domain) if ok, _ := rateLimiter.GetToken(0, 1); ok { return p.persistence.RecordWorkflowExecutionStarted(ctx, request) } p.logger.Info("Request for open workflow is sampled", tag.WorkflowDomainID(domainID), tag.WorkflowDomainName(domain), tag.WorkflowType(request.WorkflowTypeName), tag.WorkflowID(request.Execution.GetWorkflowID()), tag.WorkflowRunID(request.Execution.GetRunID()), ) p.metricClient.IncCounter(metrics.PersistenceRecordWorkflowExecutionStartedScope, metrics.PersistenceSampledCounter) return nil } func (p *visibilityManager) RecordWorkflowExecutionClosed( ctx context.Context, request *persistence.RecordWorkflowExecutionClosedRequest, ) error { domain := request.Domain domainID := request.DomainUUID priority := getRequestPriority(request) rateLimiter := p.rateLimitersForClosed.GetRateLimiter(domain) if ok, _ := rateLimiter.GetToken(priority, 1); ok { return p.persistence.RecordWorkflowExecutionClosed(ctx, request) } p.logger.Info("Request for closed workflow is sampled", tag.WorkflowDomainID(domainID), tag.WorkflowDomainName(domain), tag.WorkflowType(request.WorkflowTypeName), tag.WorkflowID(request.Execution.GetWorkflowID()), tag.WorkflowRunID(request.Execution.GetRunID()), ) p.metricClient.IncCounter(metrics.PersistenceRecordWorkflowExecutionClosedScope, metrics.PersistenceSampledCounter) return nil } func (p *visibilityManager) UpsertWorkflowExecution( ctx context.Context, request *persistence.UpsertWorkflowExecutionRequest, ) error { domain := request.Domain domainID := request.DomainUUID rateLimiter := p.rateLimitersForClosed.GetRateLimiter(domain) if ok, _ := rateLimiter.GetToken(0, 1); ok { return p.persistence.UpsertWorkflowExecution(ctx, request) } p.logger.Info("Request for upsert workflow is sampled", tag.WorkflowDomainID(domainID), tag.WorkflowDomainName(domain), tag.WorkflowType(request.WorkflowTypeName), tag.WorkflowID(request.Execution.GetWorkflowID()), tag.WorkflowRunID(request.Execution.GetRunID()), ) p.metricClient.IncCounter(metrics.PersistenceUpsertWorkflowExecutionScope, metrics.PersistenceSampledCounter) return nil } func (p *visibilityManager) ListOpenWorkflowExecutions( ctx context.Context, request *persistence.ListWorkflowExecutionsRequest, ) (*persistence.ListWorkflowExecutionsResponse, error) { if err := p.tryConsumeListToken(request.Domain, "ListOpenWorkflowExecutions"); err != nil { return nil, err } return p.persistence.ListOpenWorkflowExecutions(ctx, request) } func (p *visibilityManager) ListClosedWorkflowExecutions( ctx context.Context, request *persistence.ListWorkflowExecutionsRequest, ) (*persistence.ListWorkflowExecutionsResponse, error) { if err := p.tryConsumeListToken(request.Domain, "ListClosedWorkflowExecutions"); err != nil { return nil, err } return p.persistence.ListClosedWorkflowExecutions(ctx, request) } func (p *visibilityManager) ListOpenWorkflowExecutionsByType( ctx context.Context, request *persistence.ListWorkflowExecutionsByTypeRequest, ) (*persistence.ListWorkflowExecutionsResponse, error) { if err := p.tryConsumeListToken(request.Domain, "ListOpenWorkflowExecutionsByType"); err != nil { return nil, err } return p.persistence.ListOpenWorkflowExecutionsByType(ctx, request) } func (p *visibilityManager) ListClosedWorkflowExecutionsByType( ctx context.Context, request *persistence.ListWorkflowExecutionsByTypeRequest, ) (*persistence.ListWorkflowExecutionsResponse, error) { if err := p.tryConsumeListToken(request.Domain, "ListClosedWorkflowExecutionsByType"); err != nil { return nil, err } return p.persistence.ListClosedWorkflowExecutionsByType(ctx, request) } func (p *visibilityManager) ListOpenWorkflowExecutionsByWorkflowID( ctx context.Context, request *persistence.ListWorkflowExecutionsByWorkflowIDRequest, ) (*persistence.ListWorkflowExecutionsResponse, error) { if err := p.tryConsumeListToken(request.Domain, "ListOpenWorkflowExecutionsByWorkflowID"); err != nil { return nil, err } return p.persistence.ListOpenWorkflowExecutionsByWorkflowID(ctx, request) } func (p *visibilityManager) ListClosedWorkflowExecutionsByWorkflowID( ctx context.Context, request *persistence.ListWorkflowExecutionsByWorkflowIDRequest, ) (*persistence.ListWorkflowExecutionsResponse, error) { if err := p.tryConsumeListToken(request.Domain, "ListClosedWorkflowExecutionsByWorkflowID"); err != nil { return nil, err } return p.persistence.ListClosedWorkflowExecutionsByWorkflowID(ctx, request) } func (p *visibilityManager) ListClosedWorkflowExecutionsByStatus( ctx context.Context, request *persistence.ListClosedWorkflowExecutionsByStatusRequest, ) (*persistence.ListWorkflowExecutionsResponse, error) { if err := p.tryConsumeListToken(request.Domain, "ListClosedWorkflowExecutionsByStatus"); err != nil { return nil, err } return p.persistence.ListClosedWorkflowExecutionsByStatus(ctx, request) } func (p *visibilityManager) RecordWorkflowExecutionUninitialized( ctx context.Context, request *persistence.RecordWorkflowExecutionUninitializedRequest, ) error { return p.persistence.RecordWorkflowExecutionUninitialized(ctx, request) } func (p *visibilityManager) GetClosedWorkflowExecution( ctx context.Context, request *persistence.GetClosedWorkflowExecutionRequest, ) (*persistence.GetClosedWorkflowExecutionResponse, error) { return p.persistence.GetClosedWorkflowExecution(ctx, request) } func (p *visibilityManager) DeleteWorkflowExecution( ctx context.Context, request *persistence.VisibilityDeleteWorkflowExecutionRequest, ) error { return p.persistence.DeleteWorkflowExecution(ctx, request) } func (p *visibilityManager) DeleteUninitializedWorkflowExecution( ctx context.Context, request *persistence.VisibilityDeleteWorkflowExecutionRequest, ) error { return p.persistence.DeleteUninitializedWorkflowExecution(ctx, request) } func (p *visibilityManager) ListWorkflowExecutions( ctx context.Context, request *persistence.ListWorkflowExecutionsByQueryRequest, ) (*persistence.ListWorkflowExecutionsResponse, error) { return p.persistence.ListWorkflowExecutions(ctx, request) } func (p *visibilityManager) ScanWorkflowExecutions( ctx context.Context, request *persistence.ListWorkflowExecutionsByQueryRequest, ) (*persistence.ListWorkflowExecutionsResponse, error) { return p.persistence.ScanWorkflowExecutions(ctx, request) } func (p *visibilityManager) CountWorkflowExecutions( ctx context.Context, request *persistence.CountWorkflowExecutionsRequest, ) (*persistence.CountWorkflowExecutionsResponse, error) { return p.persistence.CountWorkflowExecutions(ctx, request) } func (p *visibilityManager) Close() { p.persistence.Close() } func (p *visibilityManager) GetName() string { return p.persistence.GetName() } func getRequestPriority(request *persistence.RecordWorkflowExecutionClosedRequest) int { priority := 0 if request.Status == types.WorkflowExecutionCloseStatusCompleted { priority = 1 // low priority for completed workflows } return priority } func (p *visibilityManager) tryConsumeListToken(domain, method string) error { rateLimiter := p.rateLimitersForList.GetRateLimiter(domain) ok, _ := rateLimiter.GetToken(0, 1) if ok { p.logger.Debug("List API request consumed QPS token", tag.WorkflowDomainName(domain), tag.Name(method)) return nil } p.logger.Debug("List API request is being sampled", tag.WorkflowDomainName(domain), tag.Name(method)) return errPersistenceLimitExceededForList }