func()

in pkg/monitor/sqsevent/scheduled-change-event.go [70:126]


func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper {
	scheduledChangeEventDetail := &ScheduledChangeEventDetail{}
	interruptionEventWrappers := []InterruptionEventWrapper{}

	if err := json.Unmarshal(event.Detail, scheduledChangeEventDetail); err != nil {
		return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err})
	}

	if scheduledChangeEventDetail.Service != "EC2" {
		err := skip{fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledChangeEventDetail.Service)}
		return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err})
	}

	if scheduledChangeEventDetail.EventTypeCategory != "scheduledChange" {
		err := skip{fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledChangeEventDetail.EventTypeCategory)}
		return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err})
	}

	for _, affectedEntity := range scheduledChangeEventDetail.AffectedEntities {
		nodeInfo, err := m.getNodeInfo(affectedEntity.EntityValue)
		if err != nil {
			interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err})
			continue
		}

		// Begin drain immediately for scheduled change events to avoid disruptions in cases such as degraded hardware
		interruptionEvent := monitor.InterruptionEvent{
			EventID:              fmt.Sprintf("aws-health-scheduled-change-event-%x", event.ID),
			Kind:                 monitor.ScheduledEventKind,
			Monitor:              SQSMonitorKind,
			AutoScalingGroupName: nodeInfo.AsgName,
			StartTime:            time.Now(),
			NodeName:             nodeInfo.Name,
			InstanceID:           nodeInfo.InstanceID,
			ProviderID:           nodeInfo.ProviderID,
			InstanceType:         nodeInfo.InstanceType,
			IsManaged:            nodeInfo.IsManaged,
			Description:          fmt.Sprintf("AWS Health scheduled change event received. Instance %s will be interrupted at %s \n", nodeInfo.InstanceID, event.getTime()),
		}
		interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
			if errs := m.deleteMessages([]*sqs.Message{message}); errs != nil {
				return errs[0]
			}
			return nil
		}
		interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
			if err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID); err != nil {
				log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ScheduledMaintenanceTaint, interruptionEvent.EventID)
			}
			return nil
		}

		interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, nil})
	}

	return interruptionEventWrappers
}