func()

in pkg/controllers/interruption/controller.go [96:137]


func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
	ctx = injection.WithControllerName(ctx, "interruption")
	if c.sqsProvider == nil {
		prov, err := sqs.NewSQSProvider(ctx, c.sqsAPI)
		if err != nil {
			log.FromContext(ctx).Error(err, "failed to create valid sqs provider")
			return reconcile.Result{}, fmt.Errorf("creating sqs provider, %w", err)
		}
		c.sqsProvider = prov
	}
	ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("queue", c.sqsProvider.Name()))
	if c.cm.HasChanged(c.sqsProvider.Name(), nil) {
		log.FromContext(ctx).V(1).Info("watching interruption queue")
	}
	sqsMessages, err := c.sqsProvider.GetSQSMessages(ctx)
	if err != nil {
		return reconcile.Result{}, fmt.Errorf("getting messages from queue, %w", err)
	}
	if len(sqsMessages) == 0 {
		return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
	}

	errs := make([]error, len(sqsMessages))
	workqueue.ParallelizeUntil(ctx, 10, len(sqsMessages), func(i int) {
		msg, e := c.parseMessage(sqsMessages[i])
		if e != nil {
			// If we fail to parse, then we should delete the message but still log the error
			log.FromContext(ctx).Error(e, "failed parsing interruption message")
			errs[i] = c.deleteMessage(ctx, sqsMessages[i])
			return
		}
		if e = c.handleMessage(ctx, msg); e != nil {
			errs[i] = fmt.Errorf("handling message, %w", e)
			return
		}
		errs[i] = c.deleteMessage(ctx, sqsMessages[i])
	})
	if err = multierr.Combine(errs...); err != nil {
		return reconcile.Result{}, err
	}
	return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}