in pkg/monitor/sqsevent/sqs-monitor.go [242:293]
func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []InterruptionEventWrapper, message *sqs.Message) error {
dropMessageSuggestionCount := 0
failedInterruptionEventsCount := 0
var skipErr skip
for _, eventWrapper := range interruptionEventWrappers {
switch {
case errors.As(eventWrapper.Err, &skipErr):
log.Warn().Err(skipErr).Msg("dropping event")
dropMessageSuggestionCount++
case eventWrapper.Err != nil:
// Log errors and record as failed events. Don't delete the message in order to allow retries
log.Err(eventWrapper.Err).Msg("ignoring interruption event due to error")
failedInterruptionEventsCount++
case eventWrapper.InterruptionEvent == nil:
log.Debug().Msg("dropping non-actionable interruption event")
dropMessageSuggestionCount++
case m.CheckIfManaged && !eventWrapper.InterruptionEvent.IsManaged:
// This event is for an instance that is not managed by this process
log.Debug().Str("instance-id", eventWrapper.InterruptionEvent.InstanceID).Msg("dropping interruption event for unmanaged node")
dropMessageSuggestionCount++
case eventWrapper.InterruptionEvent.Monitor == SQSMonitorKind:
// Successfully processed SQS message into a eventWrapper.InterruptionEvent.Kind interruption event
logging.VersionedMsgs.SendingInterruptionEventToChannel(eventWrapper.InterruptionEvent.Kind)
m.InterruptionChan <- *eventWrapper.InterruptionEvent
default:
eventJSON, _ := json.MarshalIndent(eventWrapper.InterruptionEvent, " ", " ")
log.Warn().Msgf("dropping interruption event of an unrecognized kind: %s", eventJSON)
dropMessageSuggestionCount++
}
}
if dropMessageSuggestionCount == len(interruptionEventWrappers) {
// All interruption events weren't actionable, just delete the message. If message deletion fails, count it as an error
errs := m.deleteMessages([]*sqs.Message{message})
if len(errs) > 0 {
log.Err(errs[0]).Msg("Error deleting message from SQS")
failedInterruptionEventsCount++
}
}
if failedInterruptionEventsCount != 0 {
return fmt.Errorf("some interruption events for message Id %s could not be processed", *message.MessageId)
}
return nil
}