pkg/scheduler/scheduler.go (160 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"reflect"
"time"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/handler"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
// Main Scheduler service that starts the needed sub services
type Scheduler struct {
clusterContext *ClusterContext // main context
pendingEvents chan interface{} // queue for events
activityPending chan bool // activity pending channel
stop chan struct{} // channel to signal stop request
healthChecker *HealthChecker
nodesMonitor *nodesResourceUsageMonitor
}
func NewScheduler() *Scheduler {
m := &Scheduler{}
m.clusterContext = newClusterContext()
m.pendingEvents = make(chan interface{}, 1024*1024)
m.activityPending = make(chan bool, 1)
m.stop = make(chan struct{})
return m
}
// Start service
func (s *Scheduler) StartService(handlers handler.EventHandlers, manualSchedule bool) {
// set the proxy handler in the context
s.clusterContext.setEventHandler(handlers.RMProxyEventHandler)
// Start event handlers
go s.handleRMEvent()
// Start resource monitor if necessary (majorly for testing)
s.nodesMonitor = newNodesResourceUsageMonitor(s.clusterContext)
s.nodesMonitor.start()
// Start health check periodically
s.healthChecker = NewHealthChecker(s.clusterContext)
s.healthChecker.Start()
if !manualSchedule {
go s.internalSchedule()
go s.internalInspectOutstandingRequests()
}
}
// Internal start scheduling service
func (s *Scheduler) internalSchedule() {
for {
select {
case <-s.stop:
return
case <-s.activityPending:
// activity pending
case <-time.After(100 * time.Millisecond):
// timeout, run scheduler anyway
}
if s.clusterContext.schedule() {
s.registerActivity()
}
}
}
func (s *Scheduler) internalInspectOutstandingRequests() {
for {
select {
case <-s.stop:
return
case <-time.After(time.Second):
if noRequests, totalResources := s.inspectOutstandingRequests(); noRequests > 0 {
log.Log(log.Scheduler).Info("Found outstanding requests that will trigger autoscaling",
zap.Int("number of requests", noRequests),
zap.Stringer("total resources", totalResources))
}
}
}
}
// Implement methods for Scheduler events
func (s *Scheduler) HandleEvent(ev interface{}) {
enqueueAndCheckFull(s.pendingEvents, ev)
}
func enqueueAndCheckFull(queue chan interface{}, ev interface{}) {
select {
case queue <- ev:
log.Log(log.Scheduler).Debug("enqueued event",
zap.Stringer("eventType", reflect.TypeOf(ev)),
zap.Any("event", ev),
zap.Int("currentQueueSize", len(queue)))
default:
log.Log(log.Scheduler).DPanic("failed to enqueue event",
zap.Stringer("event", reflect.TypeOf(ev)))
}
}
func (s *Scheduler) handleRMEvent() {
for {
select {
case ev := <-s.pendingEvents:
switch v := ev.(type) {
case *rmevent.RMUpdateAllocationEvent:
s.clusterContext.handleRMUpdateAllocationEvent(v)
case *rmevent.RMUpdateApplicationEvent:
s.clusterContext.handleRMUpdateApplicationEvent(v)
case *rmevent.RMUpdateNodeEvent:
s.clusterContext.handleRMUpdateNodeEvent(v)
case *rmevent.RMPartitionsRemoveEvent:
s.clusterContext.removePartitionsByRMID(v)
case *rmevent.RMRegistrationEvent:
s.clusterContext.processRMRegistrationEvent(v)
case *rmevent.RMConfigUpdateEvent:
s.clusterContext.processRMConfigUpdateEvent(v)
default:
log.Log(log.Scheduler).Error("Received type is not an acceptable type for RM event.",
zap.Stringer("received type", reflect.TypeOf(v)))
}
s.registerActivity()
case <-s.stop:
return
}
}
}
// registerActivity is used to notify the scheduler that some activity that may impact scheduling results has occurred.
func (s *Scheduler) registerActivity() {
select {
case s.activityPending <- true:
// activity registered
default:
// buffer is full, activity will be processed at the next available opportunity
}
}
// inspect on the outstanding requests for each of the queues,
// update request state accordingly to shim if needed.
// this function filters out all outstanding requests that being
// skipped due to insufficient cluster resources and update the
// state through the ContainerSchedulingStateUpdaterPlugin in order
// to trigger the auto-scaling.
func (s *Scheduler) inspectOutstandingRequests() (int, *resources.Resource) {
log.Log(log.Scheduler).Debug("inspect outstanding requests")
// schedule each partition defined in the cluster
total := resources.NewResource()
noRequests := 0
for _, psc := range s.clusterContext.GetPartitionMapClone() {
requests := psc.calculateOutstandingRequests()
noRequests = len(requests)
if noRequests > 0 {
for _, ask := range requests {
log.Log(log.Scheduler).Debug("outstanding request",
zap.String("appID", ask.GetApplicationID()),
zap.String("allocationKey", ask.GetAllocationKey()))
// these asks are queue outstanding requests,
// they can fit into the max head room, but they are pending because lack of partition resources
if updater := plugins.GetResourceManagerCallbackPlugin(); updater != nil {
updater.UpdateContainerSchedulingState(&si.UpdateContainerSchedulingStateRequest{
ApplicationID: ask.GetApplicationID(),
AllocationKey: ask.GetAllocationKey(),
State: si.UpdateContainerSchedulingStateRequest_FAILED,
Reason: "request is waiting for cluster resources become available",
})
}
total.AddTo(ask.GetAllocatedResource())
ask.SetScaleUpTriggered(true)
}
}
}
return noRequests, total
}
// Visible by tests
func (s *Scheduler) GetClusterContext() *ClusterContext {
return s.clusterContext
}
// The scheduler for testing which runs nAlloc times the normal schedule routine.
// Visible by tests
func (s *Scheduler) MultiStepSchedule(nAlloc int) {
for i := 0; i < nAlloc; i++ {
log.Log(log.Scheduler).Debug("Scheduler manual stepping",
zap.Int("count", i))
s.clusterContext.schedule()
// sometimes the smoke tests are failing because they are competing CPU resources.
// each scheduling cycle, let's sleep for a small amount of time (100ms),
// this can ensure even CPU is intensive, the main thread can give up some CPU time
// for other go routines to process, such as event handling routines.
// Note, this sleep only works in tests.
time.Sleep(100 * time.Millisecond)
}
}
func (s *Scheduler) Stop() {
log.Log(log.Scheduler).Info("Stopping scheduler & background services")
s.healthChecker.Stop()
s.nodesMonitor.stop()
s.clusterContext.Stop()
close(s.stop)
}