func()

in client.go [409:475]


func (c *Connection) recv(ctx context.Context, streamClosed, streamSendLock chan struct{}, stream acpb.AgentCommunication_StreamAgentMessagesClient) {
	loggerPrintf("Receiving messages")
	for {
		resp, err := stream.Recv()
		if err != nil {
			select {
			case <-streamClosed:
			default:
				// Causes the send goroutine to exit, c.sends will block.
				close(streamClosed)
			}
			select {
			case <-c.closed:
				// Connection is closed, return now.
				loggerPrintf("Connection closed, recv returning")
				return
			default:
			}
			st, ok := status.FromError(err)
			if ok && st.Code() == codes.ResourceExhausted {
				loggerPrintf("Connection closed due to resource exhausted: %v", err)
			} else if ok && st.Code() == codes.Unavailable {
				loggerPrintf("Stream returned Unavailable, will reconnect: %v", err)
			} else if err != io.EOF && !errors.Is(err, io.EOF) && (ok && st.Code() != codes.Canceled) && (ok && st.Code() != codes.DeadlineExceeded) {
				// EOF is a normal stream close, Canceled will be set by the server when stream timeout is
				// reached, DeadlineExceeded would be because of the client side deadline we set.
				loggerPrintf("Unexpected error, closing connection: %v", err)
				c.close(err)
				return
			}
			// A new stream is created if:
			// 1. Resource exhausted is returned but we have not exceeded the max number of retries.
			// 2. Unavailable is returned but we have not exceeded the max number of retries.
			// 3. A known "normal disconnect" error is returned.
			loggerPrintf("Creating new stream")
			if err := c.createStream(ctx); err != nil {
				loggerPrintf("Error creating new stream: %v", err)
				c.close(err)
			}
			// Always return here, createStream launches a new recv goroutine.
			return
		}
		switch resp.GetType().(type) {
		case *acpb.StreamAgentMessagesResponse_MessageBody:
			// Acknowledge message first, if this ack fails dont forward the message on to the handling
			// logic since that indicates a stream disconnect.
			if err := c.acknowledgeMessage(resp.GetMessageId(), streamClosed, streamSendLock, stream); err != nil {
				loggerPrintf("Error acknowledging message %q: %v", resp.GetMessageId(), err)
				continue
			}
			c.messages <- resp.GetMessageBody()
		case *acpb.StreamAgentMessagesResponse_MessageResponse:
			st := resp.GetMessageResponse().GetStatus()
			c.responseMx.Lock()
			for key, sub := range c.responseSubs {
				if key != resp.GetMessageId() {
					continue
				}
				select {
				case sub <- status.FromProto(st):
				default:
				}
			}
			c.responseMx.Unlock()
		}
	}
}