pkg/mock/preemption_predicate_plugin.go (111 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mock
import (
"errors"
"fmt"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type PreemptionPredicatePlugin struct {
ResourceManagerCallback
reservations map[string]string
allocations map[string]string
preemptions []Preemption
errHolder *errHolder
locking.RWMutex
}
type Preemption struct {
expectedAllocationKey string
expectedNodeID string
expectedAllocationKeys []string
expectedStartIndex int32
success bool
index int32
}
type errHolder struct {
err error
}
func (m *PreemptionPredicatePlugin) Predicates(args *si.PredicatesArgs) error {
m.RLock()
defer m.RUnlock()
if args.Allocate {
nodeID, ok := m.allocations[args.AllocationKey]
if !ok {
return errors.New("no allocation found")
}
if nodeID != args.NodeID {
return errors.New("wrong node")
}
return nil
} else {
nodeID, ok := m.reservations[args.AllocationKey]
if !ok {
return errors.New("no allocation found")
}
if nodeID != args.NodeID {
return errors.New("wrong node")
}
return nil
}
}
func (m *PreemptionPredicatePlugin) PreemptionPredicates(args *si.PreemptionPredicatesArgs) *si.PreemptionPredicatesResponse {
m.Lock()
defer m.Unlock()
result := &si.PreemptionPredicatesResponse{
Success: false,
Index: -1,
}
for _, preemption := range m.preemptions {
if preemption.expectedAllocationKey != args.AllocationKey {
continue
}
if preemption.expectedNodeID != args.NodeID {
continue
}
if preemption.expectedStartIndex != args.StartIndex {
m.errHolder.err = fmt.Errorf("unexpected start index exepected=%d, actual=%d, allocationKey=%s",
preemption.expectedStartIndex, args.StartIndex, args.AllocationKey)
return result
}
if len(preemption.expectedAllocationKeys) != len(args.PreemptAllocationKeys) {
m.errHolder.err = fmt.Errorf("unexpected alloc key length expected=%d, actual=%d, allocationKey=%s",
len(preemption.expectedAllocationKeys), len(args.PreemptAllocationKeys), args.AllocationKey)
return result
}
for idx, key := range preemption.expectedAllocationKeys {
if args.PreemptAllocationKeys[idx] != key {
m.errHolder.err = fmt.Errorf("unexpected preempt alloc key expected=%s, actual=%s, index=%d, allocationKey=%s",
args.PreemptAllocationKeys[idx], key, idx, args.AllocationKey)
return result
}
}
m.errHolder = &errHolder{}
result.Success = preemption.success
result.Index = preemption.index
return result
}
m.errHolder.err = fmt.Errorf("no match found, allocationKey=%s, nodeID=%s", args.AllocationKey, args.NodeID)
return result
}
// GetPredicateError returns the error set by the preemption predicate check that failed.
// Returns a nil error on success.
func (m *PreemptionPredicatePlugin) GetPredicateError() error {
m.RLock()
defer m.RUnlock()
return m.errHolder.err
}
// NewPreemptionPredicatePlugin returns a mock plugin that can handle multiple predicate scenarios.
// reservations: provide a list of allocations and node IDs for which the reservation predicate succeeds
// allocs: provide a list of allocations and node IDs for which the allocation predicate succeeds
// preempt: a slice of preemption scenarios configured for the plugin to check
func NewPreemptionPredicatePlugin(reservations, allocs map[string]string, preempt []Preemption) *PreemptionPredicatePlugin {
return &PreemptionPredicatePlugin{
reservations: reservations,
allocations: allocs,
preemptions: preempt,
errHolder: &errHolder{},
}
}
// NewPreemption returns a preemption scenario
// success: overall success state
// allocKey: allocation key for this preemption scenario
// nodeID: node for this preemption scenario
// expectedPreemptions: the allocations that should be in the preemption list
// start: the point at which to start the checks
// index: the index into the expectedPreemptions to return on success
func NewPreemption(success bool, allocKey, nodeID string, expectedPreemptions []string, start, index int32) Preemption {
return Preemption{
expectedAllocationKey: allocKey,
expectedNodeID: nodeID,
expectedAllocationKeys: expectedPreemptions,
expectedStartIndex: start,
success: success,
index: index,
}
}