in client.go [477:535]
func createStreamLoop(ctx context.Context, client *agentcommunication.Client, resourceID string, channelID string) (acpb.AgentCommunication_StreamAgentMessagesClient, error) {
resourceExhaustedRetries := 0
unavailableRetries := 0
for {
stream, err := client.StreamAgentMessages(ctx)
if err != nil {
return nil, fmt.Errorf("error creating stream: %v", err)
}
// RegisterConnection is a special message that must be sent before any other messages.
req := &acpb.StreamAgentMessagesRequest{
MessageId: uuid.New().String(),
Type: &acpb.StreamAgentMessagesRequest_RegisterConnection{
RegisterConnection: &acpb.RegisterConnection{ResourceId: resourceID, ChannelId: channelID}}}
if err := stream.Send(req); err != nil {
return nil, fmt.Errorf("error sending register connection: %v", err)
}
// We expect the first message to be a MessageResponse.
resp, err := stream.Recv()
if err == nil {
switch resp.GetType().(type) {
case *acpb.StreamAgentMessagesResponse_MessageResponse:
if resp.GetMessageResponse().GetStatus().GetCode() != int32(codes.OK) {
return nil, fmt.Errorf("unexpected register response: %+v", resp.GetMessageResponse().GetStatus())
}
}
// Stream is connected.
return stream, nil
}
st, ok := status.FromError(err)
if ok && st.Code() == codes.ResourceExhausted {
loggerPrintf("Resource exhausted, sleeping before reconnect: %v", err)
if resourceExhaustedRetries > 20 {
loggerPrintf("Stream returned ResourceExhausted, exceeded max number of reconnects, closing connection: %v", err)
}
sleep := time.Duration(resourceExhaustedRetries+1) * time.Second
if resourceExhaustedRetries > 9 {
sleep = 10 * time.Second
}
time.Sleep(sleep)
resourceExhaustedRetries++
continue
} else if ok && st.Code() == codes.Unavailable {
// Retry max 5 times (2s total).
if unavailableRetries <= 5 {
loggerPrintf("Stream returned Unavailable, will reconnect: %v", err)
// Sleep for 200ms * num of unavailableRetries, first retry is immediate.
time.Sleep(time.Duration(unavailableRetries*200) * time.Millisecond)
unavailableRetries++
continue
}
loggerPrintf("Stream returned Unavailable, exceeded max number of reconnects, closing connection: %v", err)
}
return nil, err
}
}