in internal/deployers/eksapi/k8s.go [52:96]
func (k *k8sClient) waitForReadyNodes(nodeCount int, timeout time.Duration) error {
klog.Infof("waiting up to %v for %d node(s) to be ready...", timeout, nodeCount)
readyNodes := sets.NewString()
watcher, err := k.clientset.CoreV1().Nodes().Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to create node watcher: %v", err)
}
defer watcher.Stop()
initialReadyNodes, err := k.getReadyNodes()
if err != nil {
return fmt.Errorf("failed to get ready nodes: %v", err)
}
counter := len(initialReadyNodes)
ctx, _ := context.WithTimeout(context.Background(), timeout)
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("the watcher channel for the nodes was closed by Kubernetes due to an unknown error")
}
if event.Type == watch.Error {
msg := "unexpected error event type from node watcher"
if statusErr, ok := event.Object.(*metav1.Status); ok {
return fmt.Errorf("%s: %s", msg, statusErr.String())
}
return fmt.Errorf("%s: %+v", msg, event.Object)
}
if event.Object != nil && event.Type != watch.Deleted {
if node, ok := event.Object.(*corev1.Node); ok {
if isNodeReady(node) {
readyNodes.Insert(node.Name)
counter = readyNodes.Len()
}
}
}
case <-ctx.Done():
return fmt.Errorf("timed out waiting for %d nodes to be ready: %w", nodeCount, ctx.Err())
}
if counter >= nodeCount {
break
}
}
klog.Infof("%d node(s) are ready: %v", readyNodes.Len(), readyNodes)
return nil
}