func execDescribeInstancesBatch()

in pkg/batcher/describeinstances.go [66:132]


func execDescribeInstancesBatch(ec2api sdk.EC2API) BatchExecutor[ec2.DescribeInstancesInput, ec2.DescribeInstancesOutput] {
	return func(ctx context.Context, inputs []*ec2.DescribeInstancesInput) []Result[ec2.DescribeInstancesOutput] {
		results := make([]Result[ec2.DescribeInstancesOutput], len(inputs))
		firstInput := inputs[0]
		// aggregate instanceIDs into 1 input
		for _, input := range inputs[1:] {
			firstInput.InstanceIds = append(firstInput.InstanceIds, input.InstanceIds...)
		}
		missingInstanceIDs := sets.NewString(lo.Map(firstInput.InstanceIds, func(i string, _ int) string { return i })...)
		paginator := ec2.NewDescribeInstancesPaginator(ec2api, firstInput)

		for paginator.HasMorePages() {
			output, err := paginator.NextPage(ctx)
			if err != nil {
				break
			}

			for _, r := range output.Reservations {
				for _, instance := range r.Instances {
					missingInstanceIDs.Delete(*instance.InstanceId)
					// Find all indexes where we are requesting this instance and populate with the result
					for reqID := range inputs {
						if inputs[reqID].InstanceIds[0] == *instance.InstanceId {
							inst := instance
							results[reqID] = Result[ec2.DescribeInstancesOutput]{Output: &ec2.DescribeInstancesOutput{
								Reservations: []ec2types.Reservation{{
									OwnerId:       r.OwnerId,
									RequesterId:   r.RequesterId,
									ReservationId: r.ReservationId,
									Instances:     []ec2types.Instance{inst},
								}},
								ResultMetadata: output.ResultMetadata,
							}}
						}
					}
				}
			}
		}

		// If we have any missing instanceIDs, we need to describe them individually

		// Some or all instances may have failed to be described due to eventual consistency or transient zonal issue.
		// A single instance lookup failure can result in all of an availability zone's instances failing to describe.
		// So we try to describe them individually now. This should be rare and only results in a handfull of extra calls per batch than without batching.
		var wg sync.WaitGroup
		for instanceID := range missingInstanceIDs {
			wg.Add(1)
			go func(instanceID string) {
				defer wg.Done()
				// try to execute separately
				out, err := ec2api.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
					Filters:     firstInput.Filters,
					InstanceIds: []string{instanceID},
				})

				// Find all indexes where we are requesting this instance and populate with the result
				for reqID := range inputs {
					if inputs[reqID].InstanceIds[0] == instanceID {
						results[reqID] = Result[ec2.DescribeInstancesOutput]{Output: out, Err: err}
					}
				}
			}(instanceID)
		}
		wg.Wait()
		return results
	}
}