func createStreamLoop()

in client.go [477:535]


func createStreamLoop(ctx context.Context, client *agentcommunication.Client, resourceID string, channelID string) (acpb.AgentCommunication_StreamAgentMessagesClient, error) {
	resourceExhaustedRetries := 0
	unavailableRetries := 0
	for {
		stream, err := client.StreamAgentMessages(ctx)
		if err != nil {
			return nil, fmt.Errorf("error creating stream: %v", err)
		}

		// RegisterConnection is a special message that must be sent before any other messages.
		req := &acpb.StreamAgentMessagesRequest{
			MessageId: uuid.New().String(),
			Type: &acpb.StreamAgentMessagesRequest_RegisterConnection{
				RegisterConnection: &acpb.RegisterConnection{ResourceId: resourceID, ChannelId: channelID}}}

		if err := stream.Send(req); err != nil {
			return nil, fmt.Errorf("error sending register connection: %v", err)
		}

		// We expect the first message to be a MessageResponse.
		resp, err := stream.Recv()
		if err == nil {
			switch resp.GetType().(type) {
			case *acpb.StreamAgentMessagesResponse_MessageResponse:
				if resp.GetMessageResponse().GetStatus().GetCode() != int32(codes.OK) {
					return nil, fmt.Errorf("unexpected register response: %+v", resp.GetMessageResponse().GetStatus())
				}
			}
			// Stream is connected.
			return stream, nil
		}

		st, ok := status.FromError(err)
		if ok && st.Code() == codes.ResourceExhausted {
			loggerPrintf("Resource exhausted, sleeping before reconnect: %v", err)
			if resourceExhaustedRetries > 20 {
				loggerPrintf("Stream returned ResourceExhausted, exceeded max number of reconnects, closing connection: %v", err)
			}
			sleep := time.Duration(resourceExhaustedRetries+1) * time.Second
			if resourceExhaustedRetries > 9 {
				sleep = 10 * time.Second
			}
			time.Sleep(sleep)
			resourceExhaustedRetries++
			continue
		} else if ok && st.Code() == codes.Unavailable {
			// Retry max 5 times (2s total).
			if unavailableRetries <= 5 {
				loggerPrintf("Stream returned Unavailable, will reconnect: %v", err)
				// Sleep for 200ms * num of unavailableRetries, first retry is immediate.
				time.Sleep(time.Duration(unavailableRetries*200) * time.Millisecond)
				unavailableRetries++
				continue
			}
			loggerPrintf("Stream returned Unavailable, exceeded max number of reconnects, closing connection: %v", err)
		}
		return nil, err
	}
}