in pkg/controllers/provisioning/provisioner.go [84:126]
func (p *Provisioner) provision(ctx context.Context) (err error) {
// Batch pods
logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
items, window := p.batcher.Wait()
defer p.batcher.Flush()
logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window)
// Filter pods
pods := []*v1.Pod{}
for _, item := range items {
provisionable, err := p.isProvisionable(ctx, item.(*v1.Pod))
if err != nil {
return err
}
if provisionable {
pods = append(pods, item.(*v1.Pod))
}
}
// Separate pods by scheduling constraints
schedules, err := p.scheduler.Solve(ctx, p.Provisioner, pods)
if err != nil {
return fmt.Errorf("solving scheduling constraints, %w", err)
}
// Get instance type options
instanceTypes, err := p.cloudProvider.GetInstanceTypes(ctx, p.Spec.Provider)
if err != nil {
return fmt.Errorf("getting instance types, %w", err)
}
// Launch capacity and bind pods
workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(i int) {
packings, err := p.packer.Pack(ctx, schedules[i].Constraints, schedules[i].Pods, instanceTypes)
if err != nil {
logging.FromContext(ctx).Errorf("Could not pack pods, %s", err.Error())
return
}
workqueue.ParallelizeUntil(ctx, len(packings), len(packings), func(j int) {
if err := p.launch(ctx, schedules[i].Constraints, packings[j]); err != nil {
logging.FromContext(ctx).Errorf("Could not launch node, %s", err.Error())
return
}
})
})
return nil
}