in pkg/controllers/interruption/controller.go [160:196]
func (c *Controller) handleMessage(ctx context.Context, msg messages.Message) (err error) {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("messageKind", msg.Kind()))
ReceivedMessages.Inc(map[string]string{messageTypeLabel: string(msg.Kind())})
if msg.Kind() == messages.NoOpKind {
return nil
}
for _, instanceID := range msg.EC2InstanceIDs() {
nodeClaimList := &karpv1.NodeClaimList{}
if e := c.kubeClient.List(ctx, nodeClaimList, client.MatchingFields{"status.instanceID": instanceID}); e != nil {
err = multierr.Append(err, e)
continue
}
if len(nodeClaimList.Items) == 0 {
continue
}
for _, nodeClaim := range nodeClaimList.Items {
nodeList := &corev1.NodeList{}
if e := c.kubeClient.List(ctx, nodeList, client.MatchingFields{"spec.instanceID": instanceID}); e != nil {
err = multierr.Append(err, e)
continue
}
var node *corev1.Node
if len(nodeList.Items) > 0 {
node = &nodeList.Items[0]
}
if e := c.handleNodeClaim(ctx, msg, &nodeClaim, node); e != nil {
err = multierr.Append(err, e)
}
}
}
MessageLatency.Observe(time.Since(msg.StartTime()).Seconds(), nil)
if err != nil {
return fmt.Errorf("acting on NodeClaims, %w", err)
}
return nil
}