func()

in client.go [309:337]


func (c *Connection) streamSend(req *acpb.StreamAgentMessagesRequest, streamClosed, streamSendLock chan struct{}, stream acpb.AgentCommunication_StreamAgentMessagesClient) error {
	select {
	case <-streamClosed:
		return errors.New("stream closed")
	case streamSendLock <- struct{}{}:
		defer func() { <-streamSendLock }()
	}
	if err := stream.Send(req); err != nil {
		if err != io.EOF && !errors.Is(err, io.EOF) {
			// Something is very broken, just close the stream here.
			loggerPrintf("Unexpected send error, closing connection: %v", err)
			c.close(err)
			return err
		}
		// EOF error means the stream is closed, this should be picked up by recv, but that could be
		// blocked, close our sends for now and just allow the caller handle it, SendMessage will wait
		// for response which will never come and auto retry. acknowledgeMessage will fail and prevent
		// the message from being passed on to message handlers, allowing recv to handle the stream
		// close error.
		loggerPrintf("Error sending message, stream closed.")
		select {
		case <-streamClosed:
		default:
			close(streamClosed)
		}
		return ErrConnectionClosed
	}
	return nil
}