pkg/resmgr/respool/respoolsvc/handler.go (472 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package respoolsvc import ( "context" "sync" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" "github.com/uber/peloton/.gen/peloton/api/v0/respool" "github.com/uber/peloton/pkg/common" res "github.com/uber/peloton/pkg/resmgr/respool" "github.com/uber/peloton/pkg/resmgr/scalar" ormobjects "github.com/uber/peloton/pkg/storage/objects" "github.com/pborman/uuid" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" "go.uber.org/yarpc" ) const ( resPoolNotFoundErrString = "resource pool not found" resPoolDeleteErrString = "resource pool could not be deleted" resPoolIsBusyErrString = "resource pool is busy" resPoolIsNotLeafErrString = "resource pool is not leaf" ) // ServiceHandler implements peloton.api.respool.ResourcePoolService type ServiceHandler struct { sync.Mutex // respool store resPoolOps ormobjects.ResPoolOps // The in-memory resource pool tree resPoolTree res.Tree // validator to validate the mutations resPoolConfigValidator res.Validator // handler metrics metrics *res.Metrics } // InitServiceHandler returns a new handler for ResourcePoolService. func InitServiceHandler( d *yarpc.Dispatcher, parent tally.Scope, tree res.Tree, resPoolOps ormobjects.ResPoolOps, ) *ServiceHandler { scope := parent.SubScope("respool") metrics := res.NewMetrics(scope) // Initialize Resource Pool Config Validator. resPoolConfigValidator, err := res.NewResourcePoolConfigValidator(tree) if err != nil { log.Fatalf( `Error initializing resource pool config validator: %v`, err, ) } handler := &ServiceHandler{ metrics: metrics, resPoolTree: tree, resPoolConfigValidator: resPoolConfigValidator, resPoolOps: resPoolOps, } d.Register(respool.BuildResourceManagerYARPCProcedures(handler)) return handler } // CreateResourcePool will create resource pool. func (h *ServiceHandler) CreateResourcePool( ctx context.Context, req *respool.CreateRequest) ( *respool.CreateResponse, error) { h.Lock() defer h.Unlock() h.metrics.APICreateResourcePool.Inc(1) log.WithField( "request", req, ).Info("CreateResourcePool called") resPoolConfig := req.GetConfig() resPoolID := &peloton.ResourcePoolID{ Value: uuid.New(), } resourcePoolConfigData := res.ResourcePoolConfigData{ ID: resPoolID, ResourcePoolConfig: resPoolConfig, } // perform validation on resource pool resPoolConfig. if err := h.resPoolConfigValidator.Validate( resourcePoolConfigData, ); err != nil { h.metrics.CreateResourcePoolFail.Inc(1) log.WithError( err, ).Infof( "Error validating respoolID: %s in store", resPoolID.Value, ) return &respool.CreateResponse{ Error: &respool.CreateResponse_Error{ InvalidResourcePoolConfig: &respool.InvalidResourcePoolConfig{ Id: resPoolID, Message: err.Error(), }, }, }, nil } // TODO Handle parent of the new_resource_pool_config // already has tasks added running, drain, distinguish? // insert persistent store. if err := h.resPoolOps.Create(ctx, resPoolID, resPoolConfig, "peloton"); err != nil { h.metrics.CreateResourcePoolFail.Inc(1) log.WithError(err).Infof( "Error creating respoolID: %s in store", resPoolID.Value, ) return &respool.CreateResponse{ Error: &respool.CreateResponse_Error{ // TODO differentiate between n/w errors vs other data errors AlreadyExists: &respool.ResourcePoolAlreadyExists{ Id: resPoolID, Message: err.Error(), }, }, }, nil } // update the in-memory data structure. if err := h.resPoolTree.Upsert(resPoolID, resPoolConfig); err != nil { // rollback i.e. delete the current version if any errors. h.metrics.CreateResourcePoolFail.Inc(1) log.WithError( err, ).Infof( "Error creating respoolID: %s in memory tree", resPoolID.Value, ) if err := h.resPoolOps.Delete(ctx, resPoolID); err != nil { log.WithError(err). WithField("respool_id", resPoolID.Value). Info("Error rolling back respoolID in store") h.metrics.CreateResourcePoolRollbackFail.Inc(1) return &respool.CreateResponse{}, err } } h.metrics.CreateResourcePoolSuccess.Inc(1) return &respool.CreateResponse{ Result: resPoolID, }, nil } // GetResourcePool will get resource pool. func (h *ServiceHandler) GetResourcePool( ctx context.Context, req *respool.GetRequest) (*respool.GetResponse, error) { h.metrics.APIGetResourcePool.Inc(1) log.WithField("request", req).Info("GetResourcePool called") resPoolID := req.GetId() includeChildPools := req.GetIncludeChildPools() if resPoolID == nil { //TODO temporary solution to unblock, // fix with new naming convention resPoolID = &peloton.ResourcePoolID{ Value: common.RootResPoolID, } } resPool, err := h.resPoolTree.Get(resPoolID) if err != nil { h.metrics.GetResourcePoolFail.Inc(1) return &respool.GetResponse{ Error: &respool.GetResponse_Error{ NotFound: &respool.ResourcePoolNotFound{ Id: resPoolID, Message: resPoolNotFoundErrString, }, }, }, nil } resPoolInfo := resPool.ToResourcePoolInfo() childPoolInfos := make([]*respool.ResourcePoolInfo, 0) if includeChildPools && resPoolInfo.Children != nil && len(resPoolInfo.Children) > 0 { for _, childID := range resPoolInfo.Children { childPool, err := h.resPoolTree.Get(childID) if err != nil { h.metrics.GetResourcePoolFail.Inc(1) return &respool.GetResponse{ Error: &respool.GetResponse_Error{ NotFound: &respool.ResourcePoolNotFound{ Id: childID, Message: resPoolNotFoundErrString, }, }, }, nil } childPoolInfos = append(childPoolInfos, childPool.ToResourcePoolInfo()) } } h.metrics.GetResourcePoolSuccess.Inc(1) log.WithFields(log.Fields{ "pool_info": resPool.ToResourcePoolInfo(), "child_pools": childPoolInfos, }).Debug("GetResourcePool Response") return &respool.GetResponse{ Poolinfo: resPool.ToResourcePoolInfo(), ChildPools: childPoolInfos, }, nil } // DeleteResourcePool will delete resource pool. func (h *ServiceHandler) DeleteResourcePool( ctx context.Context, req *respool.DeleteRequest) ( *respool.DeleteResponse, error) { h.Lock() defer h.Unlock() h.metrics.APIDeleteResourcePool.Inc(1) lookupReq := &respool.LookupRequest{ Path: &respool.ResourcePoolPath{ Value: req.Path.Value, }, } lookupRes, err := h.LookupResourcePoolID(ctx, lookupReq) if err != nil || lookupRes.GetError() != nil { h.metrics.DeleteResourcePoolFail.Inc(1) resp := h.getDeleteResponse() resp.GetError().NotFound = h.getResPoolNotFoundError(req.Path.Value) return resp, nil } resPoolID := lookupRes.GetId() if resPoolID == nil { h.metrics.DeleteResourcePoolFail.Inc(1) resp := h.getDeleteResponse() resp.GetError().NotFound = h.getResPoolNotFoundError(req.Path.Value) return resp, nil } resPool, err := h.resPoolTree.Get(resPoolID) if err != nil { h.metrics.DeleteResourcePoolFail.Inc(1) resp := h.getDeleteResponse() resp.GetError().NotFound = h.getResPoolNotFoundError(req.Path.Value) return resp, nil } // As if the resource pool is not leaf, Delete method should // not let this operation occur. As delete is only supported for // leaf resource pools if !resPool.IsLeaf() { h.metrics.DeleteResourcePoolFail.Inc(1) resp := h.getDeleteResponse() resp.GetError().IsNotLeaf = h.getResPoolNotLeafError(resPoolID) return resp, nil } // Get the allocation of the resource pool. allocation := resPool.GetTotalAllocatedResources() // Get the resource pool demand. demand := resPool.GetDemand() // We need to check if any tasks are running in the resource pool // by looking demand or allocation. if !(allocation.LessThanOrEqual(scalar.ZeroResource)) || !(demand.LessThanOrEqual(scalar.ZeroResource)) { h.metrics.DeleteResourcePoolFail.Inc(1) resp := h.getDeleteResponse() resp.GetError().IsBusy = h.getResPoolIsBusyError(resPoolID) return resp, nil } // Deleting the respool from In memory tree. if err := h.resPoolTree.Delete(resPoolID); err != nil { h.metrics.DeleteResourcePoolFail.Inc(1) // Logging and returning error as this is the API Failed // We need to log the context in resmgr log.WithError(err).WithField("respool", resPoolID).Error("delete Respool failed ") resp := h.getDeleteResponse() resp.GetError().NotDeleted = h.getResPoolNotDeletedError(resPoolID, err) return resp, nil } // Deleting the resource pool from the DB. if err := h.resPoolOps.Delete(ctx, resPoolID); err != nil { h.metrics.DeleteResourcePoolFail.Inc(1) // Logging and returning error as this is the API Failed // We need to log the context in resmgr log.WithError(err).WithField("respool", resPoolID).Error("delete Respool failed ") resp := h.getDeleteResponse() resp.GetError().NotDeleted = h.getResPoolNotDeletedError(resPoolID, err) return resp, nil } h.metrics.DeleteResourcePoolSuccess.Inc(1) return &respool.DeleteResponse{ Error: nil, }, nil } // getDeleteResponse returns the empty respool DeleteResponse func (h *ServiceHandler) getDeleteResponse() *respool.DeleteResponse { return &respool.DeleteResponse{ Error: &respool.DeleteResponse_Error{}, } } // getResPoolNotFoundError returns the repool not found error func (h *ServiceHandler) getResPoolNotFoundError(path string, ) *respool.ResourcePoolPathNotFound { return &respool.ResourcePoolPathNotFound{ Path: &respool.ResourcePoolPath{ Value: path, }, Message: resPoolNotFoundErrString, } } // getResPoolNotLeafError returns the ResPoolNotLeafError func (h *ServiceHandler) getResPoolNotLeafError(resPoolID *peloton.ResourcePoolID, ) *respool.ResourcePoolIsNotLeaf { return &respool.ResourcePoolIsNotLeaf{ Id: resPoolID, Message: resPoolIsNotLeafErrString, } } // getResPoolIsBusyError returns the ResPoolIsBusyError func (h *ServiceHandler) getResPoolIsBusyError(resPoolID *peloton.ResourcePoolID, ) *respool.ResourcePoolIsBusy { return &respool.ResourcePoolIsBusy{ Id: resPoolID, Message: resPoolIsBusyErrString, } } // getResPoolNotDeletedError returns ResPoolNotDeletedError func (h *ServiceHandler) getResPoolNotDeletedError(resPoolID *peloton.ResourcePoolID, err error) *respool.ResourcePoolNotDeleted { return &respool.ResourcePoolNotDeleted{ Id: resPoolID, Message: err.Error() + " " + resPoolDeleteErrString, } } // UpdateResourcePool will update resource pool. func (h *ServiceHandler) UpdateResourcePool( ctx context.Context, req *respool.UpdateRequest) ( *respool.UpdateResponse, error) { h.Lock() defer h.Unlock() h.metrics.APIUpdateResourcePool.Inc(1) log.WithField( "request", req, ).Info("UpdateResourcePool called") resPoolID := req.GetId() resPoolConfig := req.GetConfig() if !req.GetForce() { resourcePoolConfigData := res.ResourcePoolConfigData{ ID: resPoolID, ResourcePoolConfig: resPoolConfig, } // perform validation on resource pool resPoolConfig. err := h.resPoolConfigValidator.Validate(resourcePoolConfigData) if err != nil { h.metrics.UpdateResourcePoolFail.Inc(1) log.WithError( err, ).WithField( "respool_id", resPoolID.GetValue()). Info( "Error validating resource pool:") return &respool.UpdateResponse{ Error: &respool.UpdateResponse_Error{ InvalidResourcePoolConfig: &respool.InvalidResourcePoolConfig{ Id: resPoolID, Message: err.Error(), }, }, }, nil } } // needed for rollback. existingResPool, err := h.resPoolTree.Get(resPoolID) if err != nil { h.metrics.UpdateResourcePoolFail.Inc(1) log.WithError( err, ).Infof( "Error fetching respoolID: %s", resPoolID.Value, ) return &respool.UpdateResponse{ Error: &respool.UpdateResponse_Error{ // TODO differentiate between n/w errors vs other data errors NotFound: &respool.ResourcePoolNotFound{ Id: resPoolID, Message: err.Error(), }, }, }, nil } // update persistent store. if err := h.resPoolOps.Update(ctx, resPoolID, resPoolConfig); err != nil { h.metrics.UpdateResourcePoolFail.Inc(1) log.WithError( err, ).Infof( "Error updating respoolID: %s in store", resPoolID.Value, ) return &respool.UpdateResponse{ Error: &respool.UpdateResponse_Error{ // TODO differentiate between n/w errors // vs other data errors NotFound: &respool.ResourcePoolNotFound{ Id: resPoolID, Message: err.Error(), }, }, }, nil } // update the in-memory data structure. if err := h.resPoolTree.Upsert(resPoolID, resPoolConfig); err != nil { // rollback to a previous version if any errors. h.metrics.UpdateResourcePoolFail.Inc(1) log.WithError( err, ).Infof( "Error updating respoolID: %s in memory tree", resPoolID.Value, ) // update with existing. if err := h.resPoolOps.Update( ctx, resPoolID, existingResPool.ResourcePoolConfig(), ); err != nil { log.WithError(err). Infof("Error rolling back respoolID: %s in store", resPoolID.Value) h.metrics.UpdateResourcePoolRollbackFail.Inc(1) return &respool.UpdateResponse{}, err } } h.metrics.UpdateResourcePoolSuccess.Inc(1) return &respool.UpdateResponse{}, nil } // LookupResourcePoolID returns the resource pool ID for a given resource pool // path. func (h *ServiceHandler) LookupResourcePoolID(ctx context.Context, req *respool.LookupRequest) ( *respool.LookupResponse, error) { h.metrics.APILookupResourcePoolID.Inc(1) log.WithField( "request", req, ).Info("LookupResourcePoolID called") path := req.Path validator, err := res.NewResourcePoolConfigValidator(nil) if err != nil { log.Fatalf( `Error initializing resource pool config validator: %v`, err, ) } validator.Register([]res.ResourcePoolConfigValidatorFunc{res.ValidateResourcePoolPath}) resourcePoolConfigData := res.ResourcePoolConfigData{ Path: path, } err = validator.Validate(resourcePoolConfigData) if err != nil { log.WithField("path", path). WithError(err).Error("failed validating resource path") h.metrics.LookupResourcePoolIDFail.Inc(1) return &respool.LookupResponse{ Error: &respool.LookupResponse_Error{ InvalidPath: &respool.InvalidResourcePoolPath{ Path: path, Message: err.Error(), }, }, }, nil } resPool, err := h.resPoolTree.GetByPath(path) if err != nil { log.WithField("path", path). WithError(err).Error("failed finding resource path") h.metrics.LookupResourcePoolIDFail.Inc(1) return &respool.LookupResponse{ Error: &respool.LookupResponse_Error{ NotFound: &respool.ResourcePoolPathNotFound{ Path: path, Message: resPoolNotFoundErrString, }, }, }, nil } if resPool == nil { return &respool.LookupResponse{ Id: nil, }, nil } h.metrics.LookupResourcePoolIDSuccess.Inc(1) return &respool.LookupResponse{ Id: &peloton.ResourcePoolID{ Value: resPool.ID(), }, }, nil } // Query returns the matching resource pools by default returns all. func (h *ServiceHandler) Query( ctx context.Context, req *respool.QueryRequest) ( *respool.QueryResponse, error) { h.metrics.APIQueryResourcePools.Inc(1) log.WithField( "request", req, ).Info("Query called") var resourcePoolInfos []*respool.ResourcePoolInfo // TODO use query request to read filters nodeList := h.resPoolTree.GetAllNodes(false) if nodeList != nil { for n := nodeList.Front(); n != nil; n = n.Next() { resPoolNode, _ := n.Value.(res.ResPool) resourcePoolInfos = append( resourcePoolInfos, resPoolNode.ToResourcePoolInfo(), ) } } h.metrics.QueryResourcePoolsSuccess.Inc(1) resp := &respool.QueryResponse{ ResourcePools: resourcePoolInfos, } log.WithField("response", resp).Debug("Query returned") return resp, nil }