in pkg/batcher/describeinstances.go [66:132]
func execDescribeInstancesBatch(ec2api sdk.EC2API) BatchExecutor[ec2.DescribeInstancesInput, ec2.DescribeInstancesOutput] {
return func(ctx context.Context, inputs []*ec2.DescribeInstancesInput) []Result[ec2.DescribeInstancesOutput] {
results := make([]Result[ec2.DescribeInstancesOutput], len(inputs))
firstInput := inputs[0]
// aggregate instanceIDs into 1 input
for _, input := range inputs[1:] {
firstInput.InstanceIds = append(firstInput.InstanceIds, input.InstanceIds...)
}
missingInstanceIDs := sets.NewString(lo.Map(firstInput.InstanceIds, func(i string, _ int) string { return i })...)
paginator := ec2.NewDescribeInstancesPaginator(ec2api, firstInput)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
break
}
for _, r := range output.Reservations {
for _, instance := range r.Instances {
missingInstanceIDs.Delete(*instance.InstanceId)
// Find all indexes where we are requesting this instance and populate with the result
for reqID := range inputs {
if inputs[reqID].InstanceIds[0] == *instance.InstanceId {
inst := instance
results[reqID] = Result[ec2.DescribeInstancesOutput]{Output: &ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{{
OwnerId: r.OwnerId,
RequesterId: r.RequesterId,
ReservationId: r.ReservationId,
Instances: []ec2types.Instance{inst},
}},
ResultMetadata: output.ResultMetadata,
}}
}
}
}
}
}
// If we have any missing instanceIDs, we need to describe them individually
// Some or all instances may have failed to be described due to eventual consistency or transient zonal issue.
// A single instance lookup failure can result in all of an availability zone's instances failing to describe.
// So we try to describe them individually now. This should be rare and only results in a handfull of extra calls per batch than without batching.
var wg sync.WaitGroup
for instanceID := range missingInstanceIDs {
wg.Add(1)
go func(instanceID string) {
defer wg.Done()
// try to execute separately
out, err := ec2api.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
Filters: firstInput.Filters,
InstanceIds: []string{instanceID},
})
// Find all indexes where we are requesting this instance and populate with the result
for reqID := range inputs {
if inputs[reqID].InstanceIds[0] == instanceID {
results[reqID] = Result[ec2.DescribeInstancesOutput]{Output: out, Err: err}
}
}
}(instanceID)
}
wg.Wait()
return results
}
}