pkg/hostmgr/hostsvc/handler.go (295 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package hostsvc import ( "context" hpb "github.com/uber/peloton/.gen/peloton/api/v0/host" host_svc "github.com/uber/peloton/.gen/peloton/api/v0/host/svc" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/stringset" "github.com/uber/peloton/pkg/hostmgr/host" "github.com/uber/peloton/pkg/hostmgr/host/drainer" "github.com/uber/peloton/pkg/hostmgr/hostpool/hostmover" hostpool_mgr "github.com/uber/peloton/pkg/hostmgr/hostpool/manager" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" "go.uber.org/multierr" "go.uber.org/yarpc" "go.uber.org/yarpc/yarpcerrors" ) // serviceHandler implements peloton.api.host.svc.HostService type serviceHandler struct { metrics *Metrics drainer drainer.Drainer hostPoolManager hostpool_mgr.HostPoolManager hostMover hostmover.HostMover } // InitServiceHandler initializes the HostService func InitServiceHandler( d *yarpc.Dispatcher, parent tally.Scope, drainer drainer.Drainer, hostPoolManager hostpool_mgr.HostPoolManager, hostMover hostmover.HostMover) { handler := &serviceHandler{ metrics: NewMetrics(parent.SubScope("hostsvc")), drainer: drainer, hostPoolManager: hostPoolManager, hostMover: hostMover, } d.Register(host_svc.BuildHostServiceYARPCProcedures(handler)) log.Info("Hostsvc handler initialized") } // QueryHosts returns the hosts which are in one of the specified states. // A host, at any given time, will be in one of the following states // 1.HostState_HOST_STATE_UP - The host is up and running // 2.HostState_HOST_STATE_DRAINING - The tasks running on the host are being rescheduled and // there will be no further placement of tasks on the host // 3.HostState_HOST_STATE_DRAINED - There are no tasks running on this host and it is ready to be 'DOWN'ed // 4.HostState_HOST_STATE_DOWN - The host is in maintenance. func (m *serviceHandler) QueryHosts( ctx context.Context, request *host_svc.QueryHostsRequest, ) (*host_svc.QueryHostsResponse, error) { m.metrics.QueryHostsAPI.Inc(1) // Add request.HostStates to a set to remove duplicates hostStateSet := stringset.New() for _, state := range request.GetHostStates() { hostStateSet.Add(state.String()) } if request.HostStates == nil || len(request.HostStates) == 0 { for _, state := range hpb.HostState_name { hostStateSet.Add(state) } } var hostInfos []*hpb.HostInfo drainingHostsInfo, err := m.drainer.GetAllDrainingHostInfos() if err != nil { return nil, err } drainedHostsInfo, err := m.drainer.GetAllDrainedHostInfos() if err != nil { return nil, err } downHostsInfo, err := m.drainer.GetAllDownHostInfos() if err != nil { return nil, err } for _, hostState := range hostStateSet.ToSlice() { switch hostState { case hpb.HostState_HOST_STATE_UP.String(): upHosts, err := host.BuildHostInfoForRegisteredAgents() if err != nil { m.metrics.QueryHostsFail.Inc(1) return nil, yarpcerrors.InternalErrorf(err.Error()) } // Remove draining / drained / down hosts from the result. // This is needed because AgentMap is updated every 15s // and might not have the up to date information. for _, hostInfo := range drainingHostsInfo { delete(upHosts, hostInfo.GetHostname()) } for _, hostInfo := range drainedHostsInfo { delete(upHosts, hostInfo.GetHostname()) } for _, hostInfo := range downHostsInfo { delete(upHosts, hostInfo.GetHostname()) } for _, hostInfo := range upHosts { hostInfos = append(hostInfos, hostInfo) } case hpb.HostState_HOST_STATE_DRAINING.String(): for _, hostInfo := range drainingHostsInfo { hostInfos = append(hostInfos, hostInfo) } case hpb.HostState_HOST_STATE_DRAINED.String(): for _, hostInfo := range drainedHostsInfo { hostInfos = append(hostInfos, hostInfo) } case hpb.HostState_HOST_STATE_DOWN.String(): for _, hostInfo := range downHostsInfo { hostInfos = append(hostInfos, hostInfo) } } } if m.hostPoolManager != nil { for _, h := range hostInfos { p, err := m.hostPoolManager.GetPoolByHostname(h.GetHostname()) if err == nil { h.CurrentPool = p.ID() } d, err := m.hostPoolManager.GetDesiredPool(h.GetHostname()) if err == nil { h.DesiredPool = d } } } m.metrics.QueryHostsSuccess.Inc(1) return &host_svc.QueryHostsResponse{ HostInfos: hostInfos, }, nil } // StartMaintenance puts the host(s) into DRAINING state by posting a maintenance // schedule to Mesos Master. Inverse offers are sent out and all future offers // from the(se) host(s) are tagged with unavailability (Please check Mesos // Maintenance Primitives for more info). The hosts are first drained of tasks // before they are put into maintenance by posting to /machine/down endpoint of // Mesos Master. The hosts transition from UP to DRAINING and finally to DOWN. func (m *serviceHandler) StartMaintenance( ctx context.Context, request *host_svc.StartMaintenanceRequest, ) (*host_svc.StartMaintenanceResponse, error) { // StartMaintenanceRequest using deprecated field `hostnames` var errs error if len(request.GetHostnames()) != 0 { for _, hostname := range request.GetHostnames() { if err := m.startMaintenance(ctx, hostname); err != nil { // Not error out on 1rst error, continue and aggregate errors errs = multierr.Append(errs, err) } } if errs != nil { return nil, yarpcerrors.InternalErrorf(errs.Error()) } return &host_svc.StartMaintenanceResponse{}, nil } // StartMaintenanceRequest using prefered field `hostname` if err := m.startMaintenance(ctx, request.GetHostname()); err != nil { if yarpcerrors.IsStatus(err) { // Allow YARPC NotFound error to be returned as such return nil, err } return nil, yarpcerrors.InternalErrorf(err.Error()) } return &host_svc.StartMaintenanceResponse{ Hostname: request.GetHostname(), }, nil } func (m *serviceHandler) startMaintenance( ctx context.Context, hostname string, ) error { m.metrics.StartMaintenanceAPI.Inc(1) if err := m.drainer.StartMaintenance(ctx, hostname); err != nil { m.metrics.StartMaintenanceFail.Inc(1) return err } m.metrics.StartMaintenanceSuccess.Inc(1) return nil } // CompleteMaintenance completes maintenance on the specified hosts. It brings // UP a host which is in maintenance by posting to /machine/up endpoint of // Mesos Master i.e. the machine transitions from DOWN to UP state // (Please check Mesos Maintenance Primitives for more info) func (m *serviceHandler) CompleteMaintenance( ctx context.Context, request *host_svc.CompleteMaintenanceRequest, ) (*host_svc.CompleteMaintenanceResponse, error) { // CompleteMaintenanceRequest using deprecated field `hostnames` var errs error if len(request.GetHostnames()) != 0 { for _, hostname := range request.GetHostnames() { if err := m.completeMaintenance(ctx, hostname); err != nil { // Not error out on 1rst error, continue and aggregate errors errs = multierr.Append(errs, err) } } if errs != nil { return nil, yarpcerrors.InternalErrorf(errs.Error()) } return &host_svc.CompleteMaintenanceResponse{}, nil } // CompleteMaintenanceRequest using prefered field `hostname` if err := m.completeMaintenance(ctx, request.GetHostname()); err != nil { if yarpcerrors.IsYARPCError(err) { // Allow YARPC NotFound error to be returned as such return nil, err } return nil, yarpcerrors.InternalErrorf(err.Error()) } return &host_svc.CompleteMaintenanceResponse{ Hostname: request.GetHostname(), }, nil } func (m *serviceHandler) completeMaintenance( ctx context.Context, hostname string, ) error { m.metrics.CompleteMaintenanceAPI.Inc(1) if err := m.drainer.CompleteMaintenance(ctx, hostname); err != nil { m.metrics.CompleteMaintenanceFail.Inc(1) return err } m.metrics.CompleteMaintenanceSuccess.Inc(1) return nil } // List all host pools func (m *serviceHandler) ListHostPools( ctx context.Context, request *host_svc.ListHostPoolsRequest, ) (response *host_svc.ListHostPoolsResponse, err error) { if m.hostPoolManager == nil { err = yarpcerrors.UnimplementedErrorf("host pools not enabled") return } allPools := m.hostPoolManager.Pools() response = &host_svc.ListHostPoolsResponse{ Pools: make([]*hpb.HostPoolInfo, 0, len(allPools)), } for _, pool := range allPools { poolHosts := pool.Hosts() info := &hpb.HostPoolInfo{ Name: pool.ID(), Hosts: make([]string, 0, len(poolHosts)), } for h := range poolHosts { info.Hosts = append(info.Hosts, h) } response.Pools = append(response.Pools, info) } return } // Create a host pool func (m *serviceHandler) CreateHostPool( ctx context.Context, request *host_svc.CreateHostPoolRequest, ) (response *host_svc.CreateHostPoolResponse, err error) { if m.hostPoolManager == nil { err = yarpcerrors.UnimplementedErrorf("host pools not enabled") return } name := request.GetName() if name == "" { err = yarpcerrors.InvalidArgumentErrorf("name") return } if _, err1 := m.hostPoolManager.GetPool(name); err1 != nil { m.hostPoolManager.RegisterPool(name) response = &host_svc.CreateHostPoolResponse{} } else { err = yarpcerrors.AlreadyExistsErrorf("") } return } // Delete a host pool func (m *serviceHandler) DeleteHostPool( ctx context.Context, request *host_svc.DeleteHostPoolRequest, ) (response *host_svc.DeleteHostPoolResponse, err error) { if m.hostPoolManager == nil { err = yarpcerrors.UnimplementedErrorf("host pools not enabled") return } name := request.GetName() if name == common.DefaultHostPoolID { err = yarpcerrors.InvalidArgumentErrorf("default pool") } else if _, err1 := m.hostPoolManager.GetPool(name); err1 != nil { err = yarpcerrors.NotFoundErrorf("") } else { m.hostPoolManager.DeregisterPool(name) response = &host_svc.DeleteHostPoolResponse{} } return } // Change the pool of a host func (m *serviceHandler) ChangeHostPool( ctx context.Context, request *host_svc.ChangeHostPoolRequest, ) (response *host_svc.ChangeHostPoolResponse, err error) { if m.hostPoolManager == nil { err = yarpcerrors.UnimplementedErrorf("host pools not enabled") return } err = m.hostPoolManager.ChangeHostPool( request.GetHostname(), request.GetSourcePool(), request.GetDestinationPool(), ) if err == nil { response = &host_svc.ChangeHostPoolResponse{} } return } // MoveHosts from source pool to destination func (m *serviceHandler) MoveHosts( ctx context.Context, request *host_svc.MoveHostsRequest, ) (response *host_svc.MoveHostsResponse, err error) { if m.hostPoolManager == nil { err = yarpcerrors.UnimplementedErrorf("host pools not enabled") return } err = m.hostMover.MoveHosts( ctx, request.GetSourcePool(), request.GetSrcPoolDesiredHosts(), request.GetDestinationPool(), request.GetDestPoolDesiredHosts(), ) if err == nil { response = &host_svc.MoveHostsResponse{} } return }