tools/cli/factory.go (201 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cli
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"time"
"github.com/olivere/elastic"
adminv1 "github.com/uber/cadence-idl/go/proto/admin/v1"
apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
"github.com/urfave/cli"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/peer"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
"go.uber.org/zap"
"google.golang.org/grpc/credentials"
serverAdmin "github.com/uber/cadence/.gen/go/admin/adminserviceclient"
serverFrontend "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
grpcClient "github.com/uber/cadence/client/wrappers/grpc"
"github.com/uber/cadence/client/wrappers/thrift"
"github.com/uber/cadence/common"
cc "github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/config"
)
const (
cadenceClientName = "cadence-client"
cadenceFrontendService = "cadence-frontend"
)
// ContextKey is an alias for string, used as context key
type ContextKey string
const (
// CtxKeyJWT is the name of the context key for the JWT
CtxKeyJWT = ContextKey("ctxKeyJWT")
)
// ClientFactory is used to construct rpc clients
type ClientFactory interface {
ServerFrontendClient(c *cli.Context) frontend.Client
ServerAdminClient(c *cli.Context) admin.Client
// ServerFrontendClientForMigration frontend client of the migration destination
ServerFrontendClientForMigration(c *cli.Context) frontend.Client
// ServerAdminClientForMigration admin client of the migration destination
ServerAdminClientForMigration(c *cli.Context) admin.Client
ElasticSearchClient(c *cli.Context) *elastic.Client
ServerConfig(c *cli.Context) (*config.Config, error)
}
type clientFactory struct {
dispatcher *yarpc.Dispatcher
dispatcherMigration *yarpc.Dispatcher
logger *zap.Logger
}
// NewClientFactory creates a new ClientFactory
func NewClientFactory() ClientFactory {
logger, err := zap.NewDevelopment()
if err != nil {
panic(err)
}
return &clientFactory{
logger: logger,
}
}
// ServerConfig returns Cadence server configs.
// Use in some CLI admin operations (e.g. accessing DB directly)
func (b *clientFactory) ServerConfig(c *cli.Context) (*config.Config, error) {
env := c.String(FlagServiceEnv)
zone := c.String(FlagServiceZone)
configDir := c.String(FlagServiceConfigDir)
var cfg config.Config
err := config.Load(env, configDir, zone, &cfg)
return &cfg, err
}
// ServerFrontendClient builds a frontend client (based on server side thrift interface)
func (b *clientFactory) ServerFrontendClient(c *cli.Context) frontend.Client {
b.ensureDispatcher(c)
clientConfig := b.dispatcher.ClientConfig(cadenceFrontendService)
if c.GlobalString(FlagTransport) == grpcTransport {
return grpcClient.NewFrontendClient(
apiv1.NewDomainAPIYARPCClient(clientConfig),
apiv1.NewWorkflowAPIYARPCClient(clientConfig),
apiv1.NewWorkerAPIYARPCClient(clientConfig),
apiv1.NewVisibilityAPIYARPCClient(clientConfig),
)
}
return thrift.NewFrontendClient(serverFrontend.New(clientConfig))
}
// ServerAdminClient builds an admin client (based on server side thrift interface)
func (b *clientFactory) ServerAdminClient(c *cli.Context) admin.Client {
b.ensureDispatcher(c)
clientConfig := b.dispatcher.ClientConfig(cadenceFrontendService)
if c.GlobalString(FlagTransport) == grpcTransport {
return grpcClient.NewAdminClient(adminv1.NewAdminAPIYARPCClient(clientConfig))
}
return thrift.NewAdminClient(serverAdmin.New(clientConfig))
}
// ServerFrontendClientForMigration builds a frontend client (based on server side thrift interface)
func (b *clientFactory) ServerFrontendClientForMigration(c *cli.Context) frontend.Client {
b.ensureDispatcherForMigration(c)
clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService)
if c.GlobalString(FlagTransport) == grpcTransport {
return grpcClient.NewFrontendClient(
apiv1.NewDomainAPIYARPCClient(clientConfig),
apiv1.NewWorkflowAPIYARPCClient(clientConfig),
apiv1.NewWorkerAPIYARPCClient(clientConfig),
apiv1.NewVisibilityAPIYARPCClient(clientConfig),
)
}
return thrift.NewFrontendClient(serverFrontend.New(clientConfig))
}
// ServerAdminClientForMigration builds an admin client (based on server side thrift interface)
func (b *clientFactory) ServerAdminClientForMigration(c *cli.Context) admin.Client {
b.ensureDispatcherForMigration(c)
clientConfig := b.dispatcherMigration.ClientConfig(cadenceFrontendService)
if c.GlobalString(FlagTransport) == grpcTransport {
return grpcClient.NewAdminClient(adminv1.NewAdminAPIYARPCClient(clientConfig))
}
return thrift.NewAdminClient(serverAdmin.New(clientConfig))
}
// ElasticSearchClient builds an ElasticSearch client
func (b *clientFactory) ElasticSearchClient(c *cli.Context) *elastic.Client {
url := getRequiredOption(c, FlagURL)
retrier := elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(128*time.Millisecond, 513*time.Millisecond))
client, err := elastic.NewClient(
elastic.SetURL(url),
elastic.SetRetrier(retrier),
)
if err != nil {
b.logger.Fatal("Unable to create ElasticSearch client", zap.Error(err))
}
return client
}
func (b *clientFactory) ensureDispatcher(c *cli.Context) {
if b.dispatcher != nil {
return
}
b.dispatcher = b.newClientDispatcher(c, c.GlobalString(FlagAddress))
}
func (b *clientFactory) ensureDispatcherForMigration(c *cli.Context) {
if b.dispatcherMigration != nil {
return
}
b.dispatcherMigration = b.newClientDispatcher(c, c.String(FlagDestinationAddress))
}
func (b *clientFactory) newClientDispatcher(c *cli.Context, hostPortOverride string) *yarpc.Dispatcher {
shouldUseGrpc := c.GlobalString(FlagTransport) == grpcTransport
hostPort := tchannelPort
if shouldUseGrpc {
hostPort = grpcPort
}
if hostPortOverride != "" {
hostPort = hostPortOverride
}
var outbounds transport.Outbounds
if shouldUseGrpc {
grpcTransport := grpc.NewTransport()
outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewSingleOutbound(hostPort)}
tlsCertificatePath := c.GlobalString(FlagTLSCertPath)
if tlsCertificatePath != "" {
caCert, err := ioutil.ReadFile(tlsCertificatePath)
if err != nil {
b.logger.Fatal("Failed to load server CA certificate", zap.Error(err))
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
b.logger.Fatal("Failed to add server CA certificate", zap.Error(err))
}
tlsConfig := tls.Config{
RootCAs: caCertPool,
}
tlsCreds := credentials.NewTLS(&tlsConfig)
tlsChooser := peer.NewSingle(hostport.Identify(hostPort), grpcTransport.NewDialer(grpc.DialerCredentials(tlsCreds)))
outbounds = transport.Outbounds{Unary: grpc.NewTransport().NewOutbound(tlsChooser)}
}
} else {
ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(cadenceClientName), tchannel.ListenAddr("127.0.0.1:0"))
if err != nil {
b.logger.Fatal("Failed to create transport channel", zap.Error(err))
}
outbounds = transport.Outbounds{Unary: ch.NewSingleOutbound(hostPort)}
}
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: cadenceClientName,
Outbounds: yarpc.Outbounds{cadenceFrontendService: outbounds},
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: &versionMiddleware{},
},
})
if err := dispatcher.Start(); err != nil {
dispatcher.Stop()
b.logger.Fatal("Failed to create outbound transport channel: %v", zap.Error(err))
}
return dispatcher
}
type versionMiddleware struct {
}
func (vm *versionMiddleware) Call(ctx context.Context, request *transport.Request, out transport.UnaryOutbound) (*transport.Response, error) {
request.Headers = request.Headers.
With(common.ClientImplHeaderName, cc.CLI).
With(common.FeatureVersionHeaderName, cc.SupportedCLIVersion).
With(common.ClientFeatureFlagsHeaderName, cc.FeatureFlagsHeader(cc.DefaultCLIFeatureFlags))
if jwtKey, ok := ctx.Value(CtxKeyJWT).(string); ok {
request.Headers = request.Headers.With(common.AuthorizationTokenHeaderName, jwtKey)
}
return out.Call(ctx, request)
}
func getJWT(c *cli.Context) string {
return c.GlobalString(FlagJWT)
}
func getJWTPrivateKey(c *cli.Context) string {
return c.GlobalString(FlagJWTPrivateKey)
}