func()

in pkg/monitor/sqsevent/sqs-monitor.go [84:118]


func (m SQSMonitor) Monitor() error {
	log.Debug().Msg("Checking for queue messages")
	messages, err := m.receiveQueueMessages(m.QueueURL)
	if err != nil {
		return err
	}

	failedEventBridgeEvents := 0
	for _, message := range messages {
		eventBridgeEvent, err := m.processSQSMessage(message)
		if err != nil {
			var s skip
			if errors.As(err, &s) {
				log.Warn().Err(s).Msg("skip processing SQS message")
			} else {
				log.Err(err).Msg("error processing SQS message")
				failedEventBridgeEvents++
			}
			continue
		}

		interruptionEventWrappers := m.processEventBridgeEvent(eventBridgeEvent, message)

		if err = m.processInterruptionEvents(interruptionEventWrappers, message); err != nil {
			log.Err(err).Msg("error processing interruption events")
			failedEventBridgeEvents++
		}
	}

	if len(messages) > 0 && failedEventBridgeEvents == len(messages) {
		return fmt.Errorf("none of the waiting queue events could be processed")
	}

	return nil
}