in pkg/controllers/interruption/controller.go [96:137]
func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "interruption")
if c.sqsProvider == nil {
prov, err := sqs.NewSQSProvider(ctx, c.sqsAPI)
if err != nil {
log.FromContext(ctx).Error(err, "failed to create valid sqs provider")
return reconcile.Result{}, fmt.Errorf("creating sqs provider, %w", err)
}
c.sqsProvider = prov
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("queue", c.sqsProvider.Name()))
if c.cm.HasChanged(c.sqsProvider.Name(), nil) {
log.FromContext(ctx).V(1).Info("watching interruption queue")
}
sqsMessages, err := c.sqsProvider.GetSQSMessages(ctx)
if err != nil {
return reconcile.Result{}, fmt.Errorf("getting messages from queue, %w", err)
}
if len(sqsMessages) == 0 {
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}
errs := make([]error, len(sqsMessages))
workqueue.ParallelizeUntil(ctx, 10, len(sqsMessages), func(i int) {
msg, e := c.parseMessage(sqsMessages[i])
if e != nil {
// If we fail to parse, then we should delete the message but still log the error
log.FromContext(ctx).Error(e, "failed parsing interruption message")
errs[i] = c.deleteMessage(ctx, sqsMessages[i])
return
}
if e = c.handleMessage(ctx, msg); e != nil {
errs[i] = fmt.Errorf("handling message, %w", e)
return
}
errs[i] = c.deleteMessage(ctx, sqsMessages[i])
})
if err = multierr.Combine(errs...); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}