func()

in pkg/controllers/provisioning/provisioner.go [84:126]


func (p *Provisioner) provision(ctx context.Context) (err error) {
	// Batch pods
	logging.FromContext(ctx).Infof("Waiting for unschedulable pods")
	items, window := p.batcher.Wait()
	defer p.batcher.Flush()
	logging.FromContext(ctx).Infof("Batched %d pods in %s", len(items), window)
	// Filter pods
	pods := []*v1.Pod{}
	for _, item := range items {
		provisionable, err := p.isProvisionable(ctx, item.(*v1.Pod))
		if err != nil {
			return err
		}
		if provisionable {
			pods = append(pods, item.(*v1.Pod))
		}
	}
	// Separate pods by scheduling constraints
	schedules, err := p.scheduler.Solve(ctx, p.Provisioner, pods)
	if err != nil {
		return fmt.Errorf("solving scheduling constraints, %w", err)
	}
	// Get instance type options
	instanceTypes, err := p.cloudProvider.GetInstanceTypes(ctx, p.Spec.Provider)
	if err != nil {
		return fmt.Errorf("getting instance types, %w", err)
	}
	// Launch capacity and bind pods
	workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(i int) {
		packings, err := p.packer.Pack(ctx, schedules[i].Constraints, schedules[i].Pods, instanceTypes)
		if err != nil {
			logging.FromContext(ctx).Errorf("Could not pack pods, %s", err.Error())
			return
		}
		workqueue.ParallelizeUntil(ctx, len(packings), len(packings), func(j int) {
			if err := p.launch(ctx, schedules[i].Constraints, packings[j]); err != nil {
				logging.FromContext(ctx).Errorf("Could not launch node, %s", err.Error())
				return
			}
		})
	})
	return nil
}