in pkg/monitor/sqsevent/sqs-monitor.go [84:118]
func (m SQSMonitor) Monitor() error {
log.Debug().Msg("Checking for queue messages")
messages, err := m.receiveQueueMessages(m.QueueURL)
if err != nil {
return err
}
failedEventBridgeEvents := 0
for _, message := range messages {
eventBridgeEvent, err := m.processSQSMessage(message)
if err != nil {
var s skip
if errors.As(err, &s) {
log.Warn().Err(s).Msg("skip processing SQS message")
} else {
log.Err(err).Msg("error processing SQS message")
failedEventBridgeEvents++
}
continue
}
interruptionEventWrappers := m.processEventBridgeEvent(eventBridgeEvent, message)
if err = m.processInterruptionEvents(interruptionEventWrappers, message); err != nil {
log.Err(err).Msg("error processing interruption events")
failedEventBridgeEvents++
}
}
if len(messages) > 0 && failedEventBridgeEvents == len(messages) {
return fmt.Errorf("none of the waiting queue events could be processed")
}
return nil
}