service/worker/archiver/client.go (261 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package archiver import ( "context" "errors" "fmt" "math/rand" "time" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" cclient "go.uber.org/cadence/client" "github.com/uber/cadence/common" carchiver "github.com/uber/cadence/common/archiver" "github.com/uber/cadence/common/archiver/provider" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/types" ) type ( // ClientRequest is the archive request sent to the archiver client ClientRequest struct { ArchiveRequest *ArchiveRequest CallerService string AttemptArchiveInline bool } // ClientResponse is the archive response returned from the archiver client ClientResponse struct { HistoryArchivedInline bool } // ArchiveRequest is the request signal sent to the archival workflow ArchiveRequest struct { DomainID string DomainName string WorkflowID string RunID string // history archival ShardID int BranchToken []byte NextEventID int64 CloseFailoverVersion int64 URI string // should be historyURI, but keep the existing name for backward compatibility // visibility archival WorkflowTypeName string StartTimestamp int64 ExecutionTimestamp int64 CloseTimestamp int64 CloseStatus types.WorkflowExecutionCloseStatus HistoryLength int64 Memo *types.Memo SearchAttributes map[string][]byte VisibilityURI string // archival targets: history and/or visibility Targets []ArchivalTarget } // Client is used to archive workflow histories Client interface { Archive(context.Context, *ClientRequest) (*ClientResponse, error) } client struct { metricsScope metrics.Scope logger log.Logger cadenceClient cclient.Client numWorkflows dynamicconfig.IntPropertyFn rateLimiter quotas.Limiter inlineHistoryRateLimiter quotas.Limiter inlineVisibilityRateLimiter quotas.Limiter archiverProvider provider.ArchiverProvider archivingIncompleteHistory dynamicconfig.BoolPropertyFn } // ArchivalTarget is either history or visibility ArchivalTarget int ) const ( signalTimeout = 300 * time.Millisecond tooManyRequestsErrMsg = "too many requests to archival workflow" ) const ( // ArchiveTargetHistory is the archive target for workflow history ArchiveTargetHistory ArchivalTarget = iota // ArchiveTargetVisibility is the archive target for workflow visibility record ArchiveTargetVisibility ) var ( errInlineArchivalThrottled = errors.New("inline archival throttled") ) // NewClient creates a new Client func NewClient( metricsClient metrics.Client, logger log.Logger, publicClient workflowserviceclient.Interface, numWorkflows dynamicconfig.IntPropertyFn, requestRateLimiter quotas.Limiter, inlineHistoryRateLimiter quotas.Limiter, inlineVisibilityRateLimiter quotas.Limiter, archiverProvider provider.ArchiverProvider, archivingIncompleteHistory dynamicconfig.BoolPropertyFn, ) Client { return &client{ metricsScope: metricsClient.Scope(metrics.ArchiverClientScope), logger: logger, cadenceClient: cclient.NewClient(publicClient, common.SystemLocalDomainName, &cclient.Options{}), numWorkflows: numWorkflows, rateLimiter: requestRateLimiter, inlineHistoryRateLimiter: inlineHistoryRateLimiter, inlineVisibilityRateLimiter: inlineVisibilityRateLimiter, archiverProvider: archiverProvider, archivingIncompleteHistory: archivingIncompleteHistory, } } // Archive starts an archival task func (c *client) Archive(ctx context.Context, request *ClientRequest) (*ClientResponse, error) { scopeWithDomainTag := c.metricsScope.Tagged(metrics.DomainTag(request.ArchiveRequest.DomainName)) for _, target := range request.ArchiveRequest.Targets { switch target { case ArchiveTargetHistory: scopeWithDomainTag.IncCounter(metrics.ArchiverClientHistoryRequestCountPerDomain) case ArchiveTargetVisibility: scopeWithDomainTag.IncCounter(metrics.ArchiverClientVisibilityRequestCountPerDomain) } } logger := c.logger.WithTags( tag.ArchivalCallerServiceName(request.CallerService), tag.ArchivalArchiveAttemptedInline(request.AttemptArchiveInline), ) resp := &ClientResponse{ HistoryArchivedInline: false, } if request.AttemptArchiveInline { results := []chan error{} for _, target := range request.ArchiveRequest.Targets { ch := make(chan error) results = append(results, ch) switch target { case ArchiveTargetHistory: go c.archiveHistoryInline(ctx, request, logger, ch) case ArchiveTargetVisibility: go c.archiveVisibilityInline(ctx, request, logger, ch) default: close(ch) } } targets := []ArchivalTarget{} for i, target := range request.ArchiveRequest.Targets { if <-results[i] != nil { targets = append(targets, target) } else if target == ArchiveTargetHistory { resp.HistoryArchivedInline = true } } request.ArchiveRequest.Targets = targets } if len(request.ArchiveRequest.Targets) != 0 { if err := c.sendArchiveSignal(ctx, request.ArchiveRequest, logger); err != nil { return nil, err } } return resp, nil } func (c *client) archiveHistoryInline(ctx context.Context, request *ClientRequest, logger log.Logger, errCh chan error) { logger = tagLoggerWithHistoryRequest(logger, request.ArchiveRequest) scopeWithDomainTag := c.metricsScope.Tagged(metrics.DomainTag(request.ArchiveRequest.DomainName)) if !c.inlineHistoryRateLimiter.Allow() { scopeWithDomainTag.IncCounter(metrics.ArchiverClientHistoryInlineArchiveThrottledCountPerDomain) logger.Debug("inline history archival throttled") errCh <- errInlineArchivalThrottled return } var err error defer func() { if err != nil { scopeWithDomainTag.IncCounter(metrics.ArchiverClientHistoryInlineArchiveFailureCountPerDomain) logger.Info("failed to perform workflow history archival inline", tag.Error(err)) } errCh <- err }() scopeWithDomainTag.IncCounter(metrics.ArchiverClientHistoryInlineArchiveAttemptCountPerDomain) URI, err := carchiver.NewURI(request.ArchiveRequest.URI) if err != nil { return } historyArchiver, err := c.archiverProvider.GetHistoryArchiver(URI.Scheme(), request.CallerService) if err != nil { return } allowArchivingIncompleteHistoryOpt := carchiver.GetArchivingIncompleteHistoryOption(c.archivingIncompleteHistory) err = historyArchiver.Archive(ctx, URI, &carchiver.ArchiveHistoryRequest{ ShardID: request.ArchiveRequest.ShardID, DomainID: request.ArchiveRequest.DomainID, DomainName: request.ArchiveRequest.DomainName, WorkflowID: request.ArchiveRequest.WorkflowID, RunID: request.ArchiveRequest.RunID, BranchToken: request.ArchiveRequest.BranchToken, NextEventID: request.ArchiveRequest.NextEventID, CloseFailoverVersion: request.ArchiveRequest.CloseFailoverVersion, }, allowArchivingIncompleteHistoryOpt) } func (c *client) archiveVisibilityInline(ctx context.Context, request *ClientRequest, logger log.Logger, errCh chan error) { logger = tagLoggerWithVisibilityRequest(logger, request.ArchiveRequest) scopeWithDomainTag := c.metricsScope.Tagged(metrics.DomainTag(request.ArchiveRequest.DomainName)) if !c.inlineVisibilityRateLimiter.Allow() { scopeWithDomainTag.IncCounter(metrics.ArchiverClientVisibilityInlineArchiveThrottledCountPerDomain) logger.Debug("inline visibility archival throttled") errCh <- errInlineArchivalThrottled return } var err error defer func() { if err != nil { scopeWithDomainTag.IncCounter(metrics.ArchiverClientVisibilityInlineArchiveFailureCountPerDomain) logger.Info("failed to perform visibility archival inline", tag.Error(err)) } errCh <- err }() scopeWithDomainTag.IncCounter(metrics.ArchiverClientVisibilityInlineArchiveAttemptCountPerDomain) URI, err := carchiver.NewURI(request.ArchiveRequest.VisibilityURI) if err != nil { return } visibilityArchiver, err := c.archiverProvider.GetVisibilityArchiver(URI.Scheme(), request.CallerService) if err != nil { return } err = visibilityArchiver.Archive(ctx, URI, &carchiver.ArchiveVisibilityRequest{ DomainID: request.ArchiveRequest.DomainID, DomainName: request.ArchiveRequest.DomainName, WorkflowID: request.ArchiveRequest.WorkflowID, RunID: request.ArchiveRequest.RunID, WorkflowTypeName: request.ArchiveRequest.WorkflowTypeName, StartTimestamp: request.ArchiveRequest.StartTimestamp, ExecutionTimestamp: request.ArchiveRequest.ExecutionTimestamp, CloseTimestamp: request.ArchiveRequest.CloseTimestamp, CloseStatus: request.ArchiveRequest.CloseStatus, HistoryLength: request.ArchiveRequest.HistoryLength, Memo: request.ArchiveRequest.Memo, SearchAttributes: convertSearchAttributesToString(request.ArchiveRequest.SearchAttributes), HistoryArchivalURI: request.ArchiveRequest.URI, }) } func (c *client) sendArchiveSignal(ctx context.Context, request *ArchiveRequest, taggedLogger log.Logger) error { scopeWithDomainTag := c.metricsScope.Tagged(metrics.DomainTag(request.DomainName)) scopeWithDomainTag.IncCounter(metrics.ArchiverClientSendSignalCountPerDomain) if ok := c.rateLimiter.Allow(); !ok { c.logger.Error(tooManyRequestsErrMsg) scopeWithDomainTag.IncCounter(metrics.CadenceErrServiceBusyCounter) return errors.New(tooManyRequestsErrMsg) } workflowID := fmt.Sprintf("%v-%v", workflowIDPrefix, rand.Intn(c.numWorkflows())) workflowOptions := cclient.StartWorkflowOptions{ ID: workflowID, TaskList: decisionTaskList, ExecutionStartToCloseTimeout: workflowStartToCloseTimeout, DecisionTaskStartToCloseTimeout: workflowTaskStartToCloseTimeout, WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate, } signalCtx, cancel := context.WithTimeout(context.Background(), signalTimeout) defer cancel() _, err := c.cadenceClient.SignalWithStartWorkflow(signalCtx, workflowID, signalName, *request, workflowOptions, archivalWorkflowFnName, nil) if err != nil { taggedLogger = taggedLogger.WithTags( tag.ArchivalRequestDomainID(request.DomainID), tag.ArchivalRequestDomainName(request.DomainName), tag.ArchivalRequestWorkflowID(request.WorkflowID), tag.ArchivalRequestRunID(request.RunID), tag.WorkflowID(workflowID), tag.Error(err), ) taggedLogger.Error("failed to send signal to archival system workflow") scopeWithDomainTag.IncCounter(metrics.ArchiverClientSendSignalFailureCountPerDomain) return err } return nil }