pkg/monitor/sqsevent/asg-lifecycle-event.go (224 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package sqsevent
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/aws/aws-node-termination-handler/pkg/monitor"
"github.com/aws/aws-node-termination-handler/pkg/node"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/rs/zerolog/log"
)
/* Example SQS ASG Lifecycle Termination Event Message:
{
"version": "0",
"id": "782d5b4c-0f6f-1fd6-9d62-ecf6aed0a470",
"detail-type": "EC2 Instance-terminate Lifecycle Action",
"source": "aws.autoscaling",
"account": "123456789012",
"time": "2020-07-01T22:19:58Z",
"region": "us-east-1",
"resources": [
"arn:aws:autoscaling:us-east-1:123456789012:autoScalingGroup:26e7234b-03a4-47fb-b0a9-2b241662774e:autoScalingGroupName/testt1.demo-0a20f32c.kops.sh"
],
"detail": {
"LifecycleActionToken": "0befcbdb-6ecd-498a-9ff7-ae9b54447cd6",
"AutoScalingGroupName": "testt1.demo-0a20f32c.kops.sh",
"LifecycleHookName": "cluster-termination-handler",
"EC2InstanceId": "i-0633ac2b0d9769723",
"LifecycleTransition": "autoscaling:EC2_INSTANCE_TERMINATING"
}
}
*/
const TEST_NOTIFICATION = "autoscaling:TEST_NOTIFICATION"
type LifecycleDetailMessage struct {
Message interface{} `json:"Message"`
}
// LifecycleDetail provides the ASG lifecycle event details
type LifecycleDetail struct {
LifecycleActionToken string `json:"LifecycleActionToken"`
AutoScalingGroupName string `json:"AutoScalingGroupName"`
LifecycleHookName string `json:"LifecycleHookName"`
EC2InstanceID string `json:"EC2InstanceId"`
LifecycleTransition string `json:"LifecycleTransition"`
Event string `json:"Event"`
RequestID string `json:"RequestId"`
Time string `json:"Time"`
}
func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) {
lifecycleDetail := &LifecycleDetail{}
err := json.Unmarshal(event.Detail, lifecycleDetail)
if err != nil {
return nil, err
}
if lifecycleDetail.Event == TEST_NOTIFICATION || lifecycleDetail.LifecycleTransition == TEST_NOTIFICATION {
return nil, skip{fmt.Errorf("message is an ASG test notification")}
}
nodeInfo, err := m.getNodeInfo(lifecycleDetail.EC2InstanceID)
if err != nil {
return nil, err
}
interruptionEvent := monitor.InterruptionEvent{
EventID: fmt.Sprintf("asg-lifecycle-term-%x", event.ID),
Kind: monitor.ASGLifecycleKind,
Monitor: SQSMonitorKind,
AutoScalingGroupName: lifecycleDetail.AutoScalingGroupName,
StartTime: event.getTime(),
NodeName: nodeInfo.Name,
IsManaged: nodeInfo.IsManaged,
InstanceID: lifecycleDetail.EC2InstanceID,
ProviderID: nodeInfo.ProviderID,
InstanceType: nodeInfo.InstanceType,
Description: fmt.Sprintf("ASG Lifecycle Termination event received. Instance will be interrupted at %s \n", event.getTime()),
}
stopHeartbeatCh := make(chan struct{})
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, _ node.Node) error {
_, err = m.continueLifecycleAction(lifecycleDetail)
if err != nil {
return fmt.Errorf("continuing ASG termination lifecycle: %w", err)
}
log.Info().Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).Str("instanceID", lifecycleDetail.EC2InstanceID).Msg("Completed ASG Lifecycle Hook")
close(stopHeartbeatCh)
return m.deleteMessage(message)
}
interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
nthConfig := n.GetNthConfig()
// If only HeartbeatInterval is set, HeartbeatUntil will default to 172800.
if nthConfig.HeartbeatInterval != -1 && nthConfig.HeartbeatUntil != -1 {
go m.checkHeartbeatTimeout(nthConfig.HeartbeatInterval, lifecycleDetail)
go m.SendHeartbeats(nthConfig.HeartbeatInterval, nthConfig.HeartbeatUntil, lifecycleDetail, stopHeartbeatCh)
}
err := n.TaintASGLifecycleTermination(interruptionEvent.NodeName, interruptionEvent.EventID)
if err != nil {
log.Err(err).Msgf("unable to taint node with taint %s:%s", node.ASGLifecycleTerminationTaint, interruptionEvent.EventID)
}
return nil
}
return &interruptionEvent, nil
}
// Compare the heartbeatInterval with the heartbeat timeout and warn if (heartbeatInterval >= heartbeat timeout)
func (m SQSMonitor) checkHeartbeatTimeout(heartbeatInterval int, lifecycleDetail *LifecycleDetail) {
input := &autoscaling.DescribeLifecycleHooksInput{
AutoScalingGroupName: aws.String(lifecycleDetail.AutoScalingGroupName),
LifecycleHookNames: []*string{aws.String(lifecycleDetail.LifecycleHookName)},
}
lifecyclehook, err := m.ASG.DescribeLifecycleHooks(input)
if err != nil {
log.Err(err).Msg("failed to describe lifecycle hook")
return
}
if len(lifecyclehook.LifecycleHooks) == 0 {
log.Warn().
Str("asgName", lifecycleDetail.AutoScalingGroupName).
Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).
Msg("Tried to check heartbeat timeout, but no lifecycle hook found from ASG")
return
}
heartbeatTimeout := int(*lifecyclehook.LifecycleHooks[0].HeartbeatTimeout)
if heartbeatInterval >= heartbeatTimeout {
log.Warn().Msgf(
"Heartbeat interval (%d seconds) is equal to or greater than "+
"the heartbeat timeout (%d seconds) for the lifecycle hook %s attached to ASG %s. "+
"The node would likely be terminated before the heartbeat is sent",
heartbeatInterval,
heartbeatTimeout,
*lifecyclehook.LifecycleHooks[0].LifecycleHookName,
*lifecyclehook.LifecycleHooks[0].AutoScalingGroupName,
)
}
}
// Issue lifecycle heartbeats to reset the heartbeat timeout timer in ASG
func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, lifecycleDetail *LifecycleDetail, stopCh <-chan struct{}) {
ticker := time.NewTicker(time.Duration(heartbeatInterval) * time.Second)
defer ticker.Stop()
timeout := time.After(time.Duration(heartbeatUntil) * time.Second)
for {
select {
case <-stopCh:
log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName).
Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).
Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken).
Str("instanceID", lifecycleDetail.EC2InstanceID).
Msg("Successfully cordoned and drained the node, stopping heartbeat")
return
case <-ticker.C:
err := m.recordLifecycleActionHeartbeat(lifecycleDetail)
if err != nil {
log.Err(err).Msg("invalid heartbeat target, stopping heartbeat")
return
}
case <-timeout:
log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName).
Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).
Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken).
Str("instanceID", lifecycleDetail.EC2InstanceID).
Msg("Heartbeat deadline exceeded, stopping heartbeat")
return
}
}
}
func (m SQSMonitor) recordLifecycleActionHeartbeat(lifecycleDetail *LifecycleDetail) error {
input := &autoscaling.RecordLifecycleActionHeartbeatInput{
AutoScalingGroupName: aws.String(lifecycleDetail.AutoScalingGroupName),
LifecycleHookName: aws.String(lifecycleDetail.LifecycleHookName),
LifecycleActionToken: aws.String(lifecycleDetail.LifecycleActionToken),
InstanceId: aws.String(lifecycleDetail.EC2InstanceID),
}
// Stop the heartbeat if the target is invalid
_, err := m.ASG.RecordLifecycleActionHeartbeat(input)
if err != nil {
var awsErr awserr.Error
log.Warn().
Str("asgName", lifecycleDetail.AutoScalingGroupName).
Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).
Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken).
Str("instanceID", lifecycleDetail.EC2InstanceID).
Err(err).
Msg("Failed to send lifecycle heartbeat")
if errors.As(err, &awsErr) && awsErr.Code() == "ValidationError" {
return err
}
return nil
}
log.Info().
Str("asgName", lifecycleDetail.AutoScalingGroupName).
Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).
Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken).
Str("instanceID", lifecycleDetail.EC2InstanceID).
Msg("Successfully sent lifecycle heartbeat")
return nil
}
func (m SQSMonitor) deleteMessage(message *sqs.Message) error {
errs := m.deleteMessages([]*sqs.Message{message})
if errs != nil {
return errs[0]
}
return nil
}
// Continues the lifecycle hook thereby indicating a successful action occurred
func (m SQSMonitor) continueLifecycleAction(lifecycleDetail *LifecycleDetail) (*autoscaling.CompleteLifecycleActionOutput, error) {
return m.completeLifecycleAction(&autoscaling.CompleteLifecycleActionInput{
AutoScalingGroupName: &lifecycleDetail.AutoScalingGroupName,
LifecycleActionResult: aws.String("CONTINUE"),
LifecycleHookName: &lifecycleDetail.LifecycleHookName,
LifecycleActionToken: &lifecycleDetail.LifecycleActionToken,
InstanceId: &lifecycleDetail.EC2InstanceID,
})
}
// Completes the ASG launch lifecycle hook if the new EC2 instance launched by ASG is Ready in the cluster
func (m SQSMonitor) createAsgInstanceLaunchEvent(event *EventBridgeEvent, message *sqs.Message) (*monitor.InterruptionEvent, error) {
if event == nil {
return nil, fmt.Errorf("event is nil")
}
if message == nil {
return nil, fmt.Errorf("message is nil")
}
lifecycleDetail := &LifecycleDetail{}
err := json.Unmarshal(event.Detail, lifecycleDetail)
if err != nil {
return nil, fmt.Errorf("unmarshaling message, %s, from ASG launch lifecycle event: %w", *message.MessageId, err)
}
if lifecycleDetail.Event == TEST_NOTIFICATION || lifecycleDetail.LifecycleTransition == TEST_NOTIFICATION {
return nil, skip{fmt.Errorf("message is an ASG test notification")}
}
nodeInfo, err := m.getNodeInfo(lifecycleDetail.EC2InstanceID)
if err != nil {
return nil, err
}
interruptionEvent := monitor.InterruptionEvent{
EventID: fmt.Sprintf("asg-lifecycle-term-%x", event.ID),
Kind: monitor.ASGLaunchLifecycleKind,
Monitor: SQSMonitorKind,
AutoScalingGroupName: lifecycleDetail.AutoScalingGroupName,
StartTime: event.getTime(),
NodeName: nodeInfo.Name,
IsManaged: nodeInfo.IsManaged,
InstanceID: lifecycleDetail.EC2InstanceID,
ProviderID: nodeInfo.ProviderID,
Description: fmt.Sprintf("ASG Lifecycle Launch event received. Instance was started at %s \n", event.getTime()),
}
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, _ node.Node) error {
_, err = m.continueLifecycleAction(lifecycleDetail)
if err != nil {
return fmt.Errorf("continuing ASG launch lifecycle: %w", err)
}
log.Info().Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).Str("instanceID", lifecycleDetail.EC2InstanceID).Msg("Completed ASG Lifecycle Hook")
return m.deleteMessage(message)
}
return &interruptionEvent, err
}