func()

in pkg/monitor/sqsevent/sqs-monitor.go [139:190]


func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []InterruptionEventWrapper, message *sqs.Message) error {
	dropMessageSuggestionCount := 0
	failedInterruptionEventsCount := 0

	for _, eventWrapper := range interruptionEventWrappers {
		switch {
		case errors.Is(eventWrapper.Err, ErrNodeStateNotRunning):
			// If the node is no longer running, just log and delete the message
			log.Warn().Err(eventWrapper.Err).Msg("dropping interruption event for an already terminated node")
			dropMessageSuggestionCount++

		case eventWrapper.Err != nil:
			// Log errors and record as failed events. Don't delete the message in order to allow retries
			log.Err(eventWrapper.Err).Msg("ignoring interruption event due to error")
			failedInterruptionEventsCount++ // seems useless

		case eventWrapper.InterruptionEvent == nil:
			log.Debug().Msg("dropping non-actionable interruption event")
			dropMessageSuggestionCount++

		case m.CheckIfManaged && !eventWrapper.InterruptionEvent.IsManaged:
			// This event isn't for an instance that is managed by this process
			log.Debug().Str("instance-id", eventWrapper.InterruptionEvent.InstanceID).Msg("dropping interruption event for unmanaged node")
			dropMessageSuggestionCount++

		case eventWrapper.InterruptionEvent.Kind == SQSTerminateKind:
			// Successfully processed SQS message into a SQSTerminateKind interruption event
			log.Debug().Msgf("Sending %s interruption event to the interruption channel", SQSTerminateKind)
			m.InterruptionChan <- *eventWrapper.InterruptionEvent

		default:
			eventJSON, _ := json.MarshalIndent(eventWrapper.InterruptionEvent, " ", "    ")
			log.Warn().Msgf("dropping interruption event of an unrecognized kind: %s", eventJSON)
			dropMessageSuggestionCount++
		}
	}

	if dropMessageSuggestionCount == len(interruptionEventWrappers) {
		// All interruption events weren't actionable, just delete the message. If message deletion fails, count it as an error
		errs := m.deleteMessages([]*sqs.Message{message})
		if len(errs) > 0 {
			log.Err(errs[0]).Msg("Error deleting message from SQS")
			failedInterruptionEventsCount++
		}
	}

	if failedInterruptionEventsCount != 0 {
		return fmt.Errorf("some interruption events for message Id %b could not be processed", message.MessageId)
	}

	return nil
}