func()

in pkg/monitor/sqsevent/sqs-monitor.go [242:293]


func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []InterruptionEventWrapper, message *sqs.Message) error {
	dropMessageSuggestionCount := 0
	failedInterruptionEventsCount := 0
	var skipErr skip

	for _, eventWrapper := range interruptionEventWrappers {
		switch {
		case errors.As(eventWrapper.Err, &skipErr):
			log.Warn().Err(skipErr).Msg("dropping event")
			dropMessageSuggestionCount++

		case eventWrapper.Err != nil:
			// Log errors and record as failed events. Don't delete the message in order to allow retries
			log.Err(eventWrapper.Err).Msg("ignoring interruption event due to error")
			failedInterruptionEventsCount++

		case eventWrapper.InterruptionEvent == nil:
			log.Debug().Msg("dropping non-actionable interruption event")
			dropMessageSuggestionCount++

		case m.CheckIfManaged && !eventWrapper.InterruptionEvent.IsManaged:
			// This event is for an instance that is not managed by this process
			log.Debug().Str("instance-id", eventWrapper.InterruptionEvent.InstanceID).Msg("dropping interruption event for unmanaged node")
			dropMessageSuggestionCount++

		case eventWrapper.InterruptionEvent.Monitor == SQSMonitorKind:
			// Successfully processed SQS message into a eventWrapper.InterruptionEvent.Kind interruption event
			logging.VersionedMsgs.SendingInterruptionEventToChannel(eventWrapper.InterruptionEvent.Kind)
			m.InterruptionChan <- *eventWrapper.InterruptionEvent

		default:
			eventJSON, _ := json.MarshalIndent(eventWrapper.InterruptionEvent, " ", "    ")
			log.Warn().Msgf("dropping interruption event of an unrecognized kind: %s", eventJSON)
			dropMessageSuggestionCount++
		}
	}

	if dropMessageSuggestionCount == len(interruptionEventWrappers) {
		// All interruption events weren't actionable, just delete the message. If message deletion fails, count it as an error
		errs := m.deleteMessages([]*sqs.Message{message})
		if len(errs) > 0 {
			log.Err(errs[0]).Msg("Error deleting message from SQS")
			failedInterruptionEventsCount++
		}
	}

	if failedInterruptionEventsCount != 0 {
		return fmt.Errorf("some interruption events for message Id %s could not be processed", *message.MessageId)
	}

	return nil
}