func getBackendAddrsFromGrpclb()

in dp_check/dp_check.go [378:471]


func getBackendAddrsFromGrpclb(lbAddr string, balancerHostname string, srvQueriesSucceeded bool) ([]string, error) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
	defer cancel()
	altsCreds := alts.NewClientCreds(alts.DefaultClientOptions())
	altsCreds.OverrideServerName(balancerHostname)
	opts := []grpc.DialOption{
		grpc.WithTransportCredentials(altsCreds),
		grpc.WithBlock(),
	}
	userAgentLogString := fmt.Sprintf(". Note that we are not overriding the user agent, so the grpc-go library will use the default user agent based on the grpc-go library version: |%v|...", grpc.Version)
	if len(*userAgent) > 0 {
		opts = append(opts, grpc.WithUserAgent(*userAgent))
		userAgentLogString = fmt.Sprintf(" and grpc.WithUserAgent(\"%v\")...", *userAgent)
	}
	infoLog.Printf("Attempt to dial: %v using ALTS and grpc.WithBlock()%v", lbAddr, userAgentLogString)
	conn, err := grpc.DialContext(
		ctx,
		lbAddr,
		opts...,
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create grpc connection to balancer: %v", err)
	}
	infoLog.Printf("Successfully dialed balancer. Now send initial grpc request...")
	lbClient := lbgrpc.NewLoadBalancerClient(conn)
	stream, err := lbClient.BalanceLoad(ctx)
	initReq := &lbpb.LoadBalanceRequest{
		LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
			InitialRequest: &lbpb.InitialLoadBalanceRequest{
				Name: *service,
			},
		},
	}
	if err != nil {
		return nil, fmt.Errorf("failed to open stream to the balancer: %v", err)
	}
	if err := stream.Send(initReq); err != nil {
		return nil, fmt.Errorf("failed to send initial grpc request to balancer: %v", err)
	}
	infoLog.Printf("Successfully sent initial grpc request to balancer: |%v|. Now wait for initial response...", initReq)
	reply, err := stream.Recv()
	if status.Code(err) == codes.InvalidArgument {
		return nil, fmt.Errorf("the BalanceLoad RPC failed with status code %v, error: %v, which indicates that %s is not a DirectPath-enabled service", status.Code(err), err, *service)
	}
	// TODO(apolcyn): remove this check for permission denied once denied requests always result in FallbackResponse messages
	if status.Code(err) == codes.PermissionDenied {
		if srvQueriesSucceeded {
			return nil, fmt.Errorf(`the BalanceLoad stream failed with status code: %v, error: %v. Because the earlier SRV record query for _grpclb._tcp.%s succeeded, this most likely indicates that %s is a DirectPath-enabled service, but that the service is preventing DirectPath access from gRPC clients which send the user agent header that we sent in the BalanceLoad RPC; see logs above for a hint about what the user-agent header is that we just sent. Consider running this tool again but with the --user_agent flag set to a new value, to try a different user agent header`, status.Code(err), err, *service, *service)
		}
		return nil, fmt.Errorf(`the BalanceLoad stream failed with status code: %v, error: %v. Because the earlier SRV record query for _grpclb._tcp.%s failed, this most likely indicates that %s is a DirectPath-enabled service, but that some attribute(s) of this specific VM (for example the VPC network project number of this VM's primary network interface, the VM project number, or the current region or zone we're running in), are causing this VM to be prevented DirectPath access to %s`, status.Code(err), err, *service, *service, *service)
	}
	if err != nil {
		return nil, fmt.Errorf("failed to recv initial grpc response from balancer: %v", err)
	}
	initResp := reply.GetInitialResponse()
	if initResp == nil {
		return nil, fmt.Errorf("gRPC reply from balancer did not include initial response: %v", err)
	}
	infoLog.Printf("Successfully received initial grpc response from balancer: |%v|. Now wait for a serverlist...", initResp)
	// Just wait for the first non-empty server list
	for {
		reply, err = stream.Recv()
		if err != nil {
			return nil, fmt.Errorf("grpc balancer stream Recv error:%v", err)
		}
		if reply.GetFallbackResponse() != nil {
			if srvQueriesSucceeded {
				return nil, fmt.Errorf(`received a FallbackResponse on the BalanceLoad stream. Because the earlier SRV record query for _grpclb._tcp.%s succeeded, this most likely indicates that %s is a DirectPath-enabled service, but that the service is preventing DirectPath access from gRPC clients which send the user agent header that we sent in the BalanceLoad RPC; see logs above for a hint about what the user-agent header is that we just sent. Consider running this tool again but with the --user_agent flag set to a new value, to try a different user agent header`, *service, *service)
			}
			return nil, fmt.Errorf(`received a FallbackResponse on the BalanceLoad stream. Because the earlier SRV record query for _grpclb._tcp.%s failed, this most likely indicates that %s is a DirectPath-enabled service, but that some attribute(s) of this specific VM (for example the VPC network project number of this VM's primary network interface, the VM project number, or the current region or zone we're running in), are causing this VM to be prevented DirectPath access to %s`, *service, *service, *service)
		}
		if serverList := reply.GetServerList(); serverList != nil {
			var out []string
			for _, s := range serverList.Servers {
				if s.Drop {
					continue
				}
				ip := net.IP(s.IpAddress)
				var addrStr string
				if ip.To4() != nil {
					addrStr = fmt.Sprintf("%s:%v", ip.String(), s.Port)
				} else if ip.To16() != nil {
					addrStr = fmt.Sprintf("[%s]:%v", ip.String(), s.Port)
				} else {
					return nil, fmt.Errorf("resolved backend ip:|%v|, which was not recgnoized as a valid IPv4 or IPv6 address", s.IpAddress)
				}
				out = append(out, addrStr)
			}
			if len(out) > 0 {
				return out, nil
			}
		}
	}
}