in internal/deployers/eksapi/k8s.go [98:149]
func (k *k8sClient) waitForNodeDeletion(timeout time.Duration) error {
klog.Infof("waiting up to %v for node(s) to be deleted...", timeout)
nodes := sets.NewString()
watcher, err := k.clientset.CoreV1().Nodes().Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to create node watcher: %v", err)
}
defer watcher.Stop()
initialNodes, err := k.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list nodes: %v", err)
}
for _, node := range initialNodes.Items {
nodes.Insert(node.Name)
}
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
defer cancelFunc()
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("the watcher channel for the nodes was closed by Kubernetes due to an unknown error")
}
if event.Type == watch.Error {
msg := "unexpected error event type from node watcher"
if statusErr, ok := event.Object.(*metav1.Status); ok {
return fmt.Errorf("%s: %s", msg, statusErr.String())
}
return fmt.Errorf("%s: %+v", msg, event.Object)
}
if event.Object != nil {
if node, ok := event.Object.(*corev1.Node); !ok {
return fmt.Errorf("node watcher received an object that isn't a Node: %+v", event.Object)
} else {
switch event.Type {
case watch.Added:
nodes.Insert(node.Name)
case watch.Deleted:
nodes.Delete(node.Name)
}
}
}
case <-ctx.Done():
return fmt.Errorf("timed out waiting for nodes to be deleted: %w", ctx.Err())
}
if len(nodes) == 0 {
break
}
}
klog.Info("all nodes deleted!")
return nil
}