in pkg/batcher/terminateinstances.go [57:126]
func execTerminateInstancesBatch(ec2api sdk.EC2API) BatchExecutor[ec2.TerminateInstancesInput, ec2.TerminateInstancesOutput] {
return func(ctx context.Context, inputs []*ec2.TerminateInstancesInput) []Result[ec2.TerminateInstancesOutput] {
results := make([]Result[ec2.TerminateInstancesOutput], len(inputs))
firstInput := inputs[0]
// aggregate instanceIDs into 1 input
for _, input := range inputs[1:] {
firstInput.InstanceIds = append(firstInput.InstanceIds, input.InstanceIds...)
}
// Create a set of all instance IDs
stillRunning := sets.NewString(lo.Map(firstInput.InstanceIds, func(i string, _ int) string { return i })...)
// Execute fully aggregated request
// We don't care about the error here since we'll break up the batch upon any sort of failure
output, err := ec2api.TerminateInstances(ctx, firstInput)
if err != nil {
log.FromContext(ctx).Error(err, "failed terminating instances")
}
if output == nil {
output = &ec2.TerminateInstancesOutput{}
}
// Check the fulfillment for partial or no fulfillment by checking for missing instance IDs or invalid instance states
for _, instanceStateChanges := range output.TerminatingInstances {
// Remove all instances that successfully terminated and separate into distinct outputs
if lo.Contains([]string{string(ec2types.InstanceStateNameShuttingDown), string(ec2types.InstanceStateNameTerminated)}, string(instanceStateChanges.CurrentState.Name)) {
stillRunning.Delete(*instanceStateChanges.InstanceId)
// Find all indexes where we are requesting this instance and populate with the result
for reqID := range inputs {
if inputs[reqID].InstanceIds[0] == *instanceStateChanges.InstanceId {
results[reqID] = Result[ec2.TerminateInstancesOutput]{
Output: &ec2.TerminateInstancesOutput{
TerminatingInstances: []ec2types.InstanceStateChange{{
InstanceId: instanceStateChanges.InstanceId,
CurrentState: instanceStateChanges.CurrentState,
PreviousState: instanceStateChanges.PreviousState,
}},
ResultMetadata: output.ResultMetadata,
},
}
}
}
}
}
// Some or all instances may have failed to terminate due to instance protection or some other error.
// A single instance failure can result in all of an availability zone's instances failing to terminate.
// So we try to terminate them individually now. This should be rare and only results in 1 extra call per batch than without batching.
var wg sync.WaitGroup
for instanceID := range stillRunning {
wg.Add(1)
go func(instanceID string) {
defer wg.Done()
// try to execute separately
out, err := ec2api.TerminateInstances(ctx, &ec2.TerminateInstancesInput{InstanceIds: []string{instanceID}})
// Find all indexes where we are requesting this instance and populate with the result
for reqID := range inputs {
if inputs[reqID].InstanceIds[0] == instanceID {
results[reqID] = Result[ec2.TerminateInstancesOutput]{Output: out, Err: err}
}
}
}(instanceID)
}
wg.Wait()
return results
}
}