func()

in internal/deployers/eksapi/k8s.go [52:96]


func (k *k8sClient) waitForReadyNodes(nodeCount int, timeout time.Duration) error {
	klog.Infof("waiting up to %v for %d node(s) to be ready...", timeout, nodeCount)
	readyNodes := sets.NewString()
	watcher, err := k.clientset.CoreV1().Nodes().Watch(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return fmt.Errorf("failed to create node watcher: %v", err)
	}
	defer watcher.Stop()
	initialReadyNodes, err := k.getReadyNodes()
	if err != nil {
		return fmt.Errorf("failed to get ready nodes: %v", err)
	}
	counter := len(initialReadyNodes)
	ctx, _ := context.WithTimeout(context.Background(), timeout)
	for {
		select {
		case event, ok := <-watcher.ResultChan():
			if !ok {
				return fmt.Errorf("the watcher channel for the nodes was closed by Kubernetes due to an unknown error")
			}
			if event.Type == watch.Error {
				msg := "unexpected error event type from node watcher"
				if statusErr, ok := event.Object.(*metav1.Status); ok {
					return fmt.Errorf("%s: %s", msg, statusErr.String())
				}
				return fmt.Errorf("%s: %+v", msg, event.Object)
			}
			if event.Object != nil && event.Type != watch.Deleted {
				if node, ok := event.Object.(*corev1.Node); ok {
					if isNodeReady(node) {
						readyNodes.Insert(node.Name)
						counter = readyNodes.Len()
					}
				}
			}
		case <-ctx.Done():
			return fmt.Errorf("timed out waiting for %d nodes to be ready: %w", nodeCount, ctx.Err())
		}
		if counter >= nodeCount {
			break
		}
	}
	klog.Infof("%d node(s) are ready: %v", readyNodes.Len(), readyNodes)
	return nil
}