in pkg/monitor/sqsevent/sqs-monitor.go [139:190]
func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []InterruptionEventWrapper, message *sqs.Message) error {
dropMessageSuggestionCount := 0
failedInterruptionEventsCount := 0
for _, eventWrapper := range interruptionEventWrappers {
switch {
case errors.Is(eventWrapper.Err, ErrNodeStateNotRunning):
// If the node is no longer running, just log and delete the message
log.Warn().Err(eventWrapper.Err).Msg("dropping interruption event for an already terminated node")
dropMessageSuggestionCount++
case eventWrapper.Err != nil:
// Log errors and record as failed events. Don't delete the message in order to allow retries
log.Err(eventWrapper.Err).Msg("ignoring interruption event due to error")
failedInterruptionEventsCount++ // seems useless
case eventWrapper.InterruptionEvent == nil:
log.Debug().Msg("dropping non-actionable interruption event")
dropMessageSuggestionCount++
case m.CheckIfManaged && !eventWrapper.InterruptionEvent.IsManaged:
// This event isn't for an instance that is managed by this process
log.Debug().Str("instance-id", eventWrapper.InterruptionEvent.InstanceID).Msg("dropping interruption event for unmanaged node")
dropMessageSuggestionCount++
case eventWrapper.InterruptionEvent.Kind == SQSTerminateKind:
// Successfully processed SQS message into a SQSTerminateKind interruption event
log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
m.InterruptionChan <- *eventWrapper.InterruptionEvent
default:
eventJSON, _ := json.MarshalIndent(eventWrapper.InterruptionEvent, " ", " ")
log.Warn().Msgf("dropping interruption event of an unrecognized kind: %s", eventJSON)
dropMessageSuggestionCount++
}
}
if dropMessageSuggestionCount == len(interruptionEventWrappers) {
// All interruption events weren't actionable, just delete the message. If message deletion fails, count it as an error
errs := m.deleteMessages([]*sqs.Message{message})
if len(errs) > 0 {
log.Err(errs[0]).Msg("Error deleting message from SQS")
failedInterruptionEventsCount++
}
}
if failedInterruptionEventsCount != 0 {
return fmt.Errorf("some interruption events for message Id %b could not be processed", message.MessageId)
}
return nil
}