in pkg/monitor/sqsevent/scheduled-change-event.go [70:126]
func (m SQSMonitor) scheduledEventToInterruptionEvents(event *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper {
scheduledChangeEventDetail := &ScheduledChangeEventDetail{}
interruptionEventWrappers := []InterruptionEventWrapper{}
if err := json.Unmarshal(event.Detail, scheduledChangeEventDetail); err != nil {
return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err})
}
if scheduledChangeEventDetail.Service != "EC2" {
err := skip{fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", scheduledChangeEventDetail.Service)}
return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err})
}
if scheduledChangeEventDetail.EventTypeCategory != "scheduledChange" {
err := skip{fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", scheduledChangeEventDetail.EventTypeCategory)}
return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err})
}
for _, affectedEntity := range scheduledChangeEventDetail.AffectedEntities {
nodeInfo, err := m.getNodeInfo(affectedEntity.EntityValue)
if err != nil {
interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{nil, err})
continue
}
// Begin drain immediately for scheduled change events to avoid disruptions in cases such as degraded hardware
interruptionEvent := monitor.InterruptionEvent{
EventID: fmt.Sprintf("aws-health-scheduled-change-event-%x", event.ID),
Kind: monitor.ScheduledEventKind,
Monitor: SQSMonitorKind,
AutoScalingGroupName: nodeInfo.AsgName,
StartTime: time.Now(),
NodeName: nodeInfo.Name,
InstanceID: nodeInfo.InstanceID,
ProviderID: nodeInfo.ProviderID,
InstanceType: nodeInfo.InstanceType,
IsManaged: nodeInfo.IsManaged,
Description: fmt.Sprintf("AWS Health scheduled change event received. Instance %s will be interrupted at %s \n", nodeInfo.InstanceID, event.getTime()),
}
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
if errs := m.deleteMessages([]*sqs.Message{message}); errs != nil {
return errs[0]
}
return nil
}
interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
if err := n.TaintScheduledMaintenance(interruptionEvent.NodeName, interruptionEvent.EventID); err != nil {
log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ScheduledMaintenanceTaint, interruptionEvent.EventID)
}
return nil
}
interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{&interruptionEvent, nil})
}
return interruptionEventWrappers
}