func()

in internal/deployers/eksapi/k8s.go [98:149]


func (k *k8sClient) waitForNodeDeletion(timeout time.Duration) error {
	klog.Infof("waiting up to %v for node(s) to be deleted...", timeout)
	nodes := sets.NewString()
	watcher, err := k.clientset.CoreV1().Nodes().Watch(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return fmt.Errorf("failed to create node watcher: %v", err)
	}
	defer watcher.Stop()
	initialNodes, err := k.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
	if err != nil {
		return fmt.Errorf("failed to list nodes: %v", err)
	}
	for _, node := range initialNodes.Items {
		nodes.Insert(node.Name)
	}
	ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
	defer cancelFunc()
	for {
		select {
		case event, ok := <-watcher.ResultChan():
			if !ok {
				return fmt.Errorf("the watcher channel for the nodes was closed by Kubernetes due to an unknown error")
			}
			if event.Type == watch.Error {
				msg := "unexpected error event type from node watcher"
				if statusErr, ok := event.Object.(*metav1.Status); ok {
					return fmt.Errorf("%s: %s", msg, statusErr.String())
				}
				return fmt.Errorf("%s: %+v", msg, event.Object)
			}
			if event.Object != nil {
				if node, ok := event.Object.(*corev1.Node); !ok {
					return fmt.Errorf("node watcher received an object that isn't a Node: %+v", event.Object)
				} else {
					switch event.Type {
					case watch.Added:
						nodes.Insert(node.Name)
					case watch.Deleted:
						nodes.Delete(node.Name)
					}
				}
			}
		case <-ctx.Done():
			return fmt.Errorf("timed out waiting for nodes to be deleted: %w", ctx.Err())
		}
		if len(nodes) == 0 {
			break
		}
	}
	klog.Info("all nodes deleted!")
	return nil
}