pkg/cli/client.go (143 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cli
import (
"context"
"fmt"
"time"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/transport/grpc"
hostsvc "github.com/uber/peloton/.gen/peloton/api/v0/host/svc"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/respool"
"github.com/uber/peloton/.gen/peloton/api/v0/task"
updatesvc "github.com/uber/peloton/.gen/peloton/api/v0/update/svc"
volume_svc "github.com/uber/peloton/.gen/peloton/api/v0/volume/svc"
adminsvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/admin/svc"
statelesssvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless/svc"
podsvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod/svc"
watchsvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/watch/svc"
hostmgr_svc "github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc"
hostmgr_svc_v1 "github.com/uber/peloton/.gen/peloton/private/hostmgr/v1alpha/svc"
"github.com/uber/peloton/.gen/peloton/private/jobmgrsvc"
"github.com/uber/peloton/.gen/peloton/private/resmgrsvc"
"github.com/uber/peloton/pkg/cli/middleware"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/leader"
)
// Client is a JSON Client with associated dispatcher and context
type Client struct {
jobClient job.JobManagerYARPCClient
taskClient task.TaskManagerYARPCClient
podClient podsvc.PodServiceYARPCClient
statelessClient statelesssvc.JobServiceYARPCClient
watchClient watchsvc.WatchServiceYARPCClient
resClient respool.ResourceManagerYARPCClient
resMgrClient resmgrsvc.ResourceManagerServiceYARPCClient
updateClient updatesvc.UpdateServiceYARPCClient
volumeClient volume_svc.VolumeServiceYARPCClient
hostMgrClient hostmgr_svc.InternalHostServiceYARPCClient
hostMgrClientV1 hostmgr_svc_v1.HostManagerServiceYARPCClient
hostClient hostsvc.HostServiceYARPCClient
jobmgrClient jobmgrsvc.JobManagerServiceYARPCClient
adminClient adminsvc.AdminServiceYARPCClient
dispatcher *yarpc.Dispatcher
ctx context.Context
cancelFunc context.CancelFunc
// Debug is whether debug output is enabled
Debug bool
}
// New returns a new RPC client given a framework URL and timeout and error
func New(
discovery leader.Discovery,
timeout time.Duration,
authConfig *middleware.BasicAuthConfig,
debug bool) (*Client, error) {
jobmgrURL, err := discovery.GetAppURL(common.JobManagerRole)
if err != nil {
return nil, err
}
resmgrURL, err := discovery.GetAppURL(common.ResourceManagerRole)
if err != nil {
return nil, err
}
hostmgrURL, err := discovery.GetAppURL(common.HostManagerRole)
if err != nil {
return nil, err
}
t := grpc.NewTransport()
authMiddleware := middleware.NewBasicAuthOutboundMiddleware(authConfig)
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: common.PelotonCLI,
Outbounds: yarpc.Outbounds{
common.PelotonJobManager: transport.Outbounds{
Unary: t.NewSingleOutbound(jobmgrURL.Host),
Stream: t.NewSingleOutbound(jobmgrURL.Host),
},
common.PelotonResourceManager: transport.Outbounds{
Unary: t.NewSingleOutbound(resmgrURL.Host),
},
common.PelotonHostManager: transport.Outbounds{
Unary: t.NewSingleOutbound(hostmgrURL.Host),
Stream: t.NewSingleOutbound(hostmgrURL.Host),
},
},
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: authMiddleware,
Oneway: authMiddleware,
Stream: authMiddleware,
},
})
if err := dispatcher.Start(); err != nil {
return nil, fmt.Errorf("Unable to start dispatcher: %v", err)
}
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
client := Client{
Debug: debug,
jobClient: job.NewJobManagerYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
taskClient: task.NewTaskManagerYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
resClient: respool.NewResourceManagerYARPCClient(
dispatcher.ClientConfig(common.PelotonResourceManager),
),
resMgrClient: resmgrsvc.NewResourceManagerServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonResourceManager)),
updateClient: updatesvc.NewUpdateServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
volumeClient: volume_svc.NewVolumeServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
hostMgrClient: hostmgr_svc.NewInternalHostServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonHostManager),
),
hostMgrClientV1: hostmgr_svc_v1.NewHostManagerServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonHostManager),
),
hostClient: hostsvc.NewHostServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonHostManager),
),
podClient: podsvc.NewPodServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
statelessClient: statelesssvc.NewJobServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
watchClient: watchsvc.NewWatchServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
jobmgrClient: jobmgrsvc.NewJobManagerServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
adminClient: adminsvc.NewAdminServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonJobManager),
),
dispatcher: dispatcher,
ctx: ctx,
cancelFunc: cancelFunc,
}
return &client, nil
}
// Cleanup ensures the client's YARPC dispatcher is stopped
func (c *Client) Cleanup() {
defer c.cancelFunc()
c.dispatcher.Stop()
}