service/frontend/api/producer_manager.go (80 lines of code) (raw):

// The MIT License (MIT) // Copyright (c) 2017-2020 Uber Technologies Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. //go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination producer_manager_mock.go -self_package github.com/uber/cadence/service/frontend/api package api import ( "fmt" "time" "github.com/uber/cadence/common/asyncworkflow/queue" "github.com/uber/cadence/common/asyncworkflow/queue/provider" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" ) type ( // ProducerManager is used to create a producer for a domain. // Producer is used for Async APIs such as StartWorkflowExecutionAsync ProducerManager interface { GetProducerByDomain(domain string) (messaging.Producer, error) } producerManagerImpl struct { domainCache cache.DomainCache provider queue.Provider logger log.Logger metricsClient metrics.Client producerCache cache.Cache } ) func NewProducerManager( domainCache cache.DomainCache, provider queue.Provider, logger log.Logger, metricsClient metrics.Client, ) ProducerManager { return &producerManagerImpl{ domainCache: domainCache, provider: provider, logger: logger, metricsClient: metricsClient, producerCache: cache.New(&cache.Options{ TTL: time.Minute * 5, InitialCapacity: 5, MaxCount: 100, Pin: true, }), } } // GetProducerByDomain returns a producer for a domain func (q *producerManagerImpl) GetProducerByDomain( domain string, ) (messaging.Producer, error) { domainEntry, err := q.domainCache.GetDomain(domain) if err != nil { return nil, err } if !domainEntry.GetConfig().AsyncWorkflowConfig.Enabled { return nil, fmt.Errorf("async workflow is not enabled for domain %v", domain) } queueName := domainEntry.GetConfig().AsyncWorkflowConfig.PredefinedQueueName var queue provider.Queue if queueName != "" { queue, err = q.provider.GetPredefinedQueue(queueName) if err != nil { return nil, err } } else { queue, err = q.provider.GetQueue(domainEntry.GetConfig().AsyncWorkflowConfig.QueueType, domainEntry.GetConfig().AsyncWorkflowConfig.QueueConfig) if err != nil { return nil, err } } queueID := queue.ID() val := q.producerCache.Get(queueID) if val != nil { return val.(messaging.Producer), nil } producer, err := queue.CreateProducer(&provider.Params{Logger: q.logger, MetricsClient: q.metricsClient}) if err != nil { return nil, err } // PutIfNotExist is thread safe, and will either return the value that was already in the cache or the value we just created // another thread might have inserted a value between the Get and PutIfNotExist, but that is ok // it should never return an error as we do not use Pin val, err = q.producerCache.PutIfNotExist(queueID, producer) if err != nil { return nil, err } return val.(messaging.Producer), nil }