in client.go [409:475]
func (c *Connection) recv(ctx context.Context, streamClosed, streamSendLock chan struct{}, stream acpb.AgentCommunication_StreamAgentMessagesClient) {
loggerPrintf("Receiving messages")
for {
resp, err := stream.Recv()
if err != nil {
select {
case <-streamClosed:
default:
// Causes the send goroutine to exit, c.sends will block.
close(streamClosed)
}
select {
case <-c.closed:
// Connection is closed, return now.
loggerPrintf("Connection closed, recv returning")
return
default:
}
st, ok := status.FromError(err)
if ok && st.Code() == codes.ResourceExhausted {
loggerPrintf("Connection closed due to resource exhausted: %v", err)
} else if ok && st.Code() == codes.Unavailable {
loggerPrintf("Stream returned Unavailable, will reconnect: %v", err)
} else if err != io.EOF && !errors.Is(err, io.EOF) && (ok && st.Code() != codes.Canceled) && (ok && st.Code() != codes.DeadlineExceeded) {
// EOF is a normal stream close, Canceled will be set by the server when stream timeout is
// reached, DeadlineExceeded would be because of the client side deadline we set.
loggerPrintf("Unexpected error, closing connection: %v", err)
c.close(err)
return
}
// A new stream is created if:
// 1. Resource exhausted is returned but we have not exceeded the max number of retries.
// 2. Unavailable is returned but we have not exceeded the max number of retries.
// 3. A known "normal disconnect" error is returned.
loggerPrintf("Creating new stream")
if err := c.createStream(ctx); err != nil {
loggerPrintf("Error creating new stream: %v", err)
c.close(err)
}
// Always return here, createStream launches a new recv goroutine.
return
}
switch resp.GetType().(type) {
case *acpb.StreamAgentMessagesResponse_MessageBody:
// Acknowledge message first, if this ack fails dont forward the message on to the handling
// logic since that indicates a stream disconnect.
if err := c.acknowledgeMessage(resp.GetMessageId(), streamClosed, streamSendLock, stream); err != nil {
loggerPrintf("Error acknowledging message %q: %v", resp.GetMessageId(), err)
continue
}
c.messages <- resp.GetMessageBody()
case *acpb.StreamAgentMessagesResponse_MessageResponse:
st := resp.GetMessageResponse().GetStatus()
c.responseMx.Lock()
for key, sub := range c.responseSubs {
if key != resp.GetMessageId() {
continue
}
select {
case sub <- status.FromProto(st):
default:
}
}
c.responseMx.Unlock()
}
}
}