in client.go [309:337]
func (c *Connection) streamSend(req *acpb.StreamAgentMessagesRequest, streamClosed, streamSendLock chan struct{}, stream acpb.AgentCommunication_StreamAgentMessagesClient) error {
select {
case <-streamClosed:
return errors.New("stream closed")
case streamSendLock <- struct{}{}:
defer func() { <-streamSendLock }()
}
if err := stream.Send(req); err != nil {
if err != io.EOF && !errors.Is(err, io.EOF) {
// Something is very broken, just close the stream here.
loggerPrintf("Unexpected send error, closing connection: %v", err)
c.close(err)
return err
}
// EOF error means the stream is closed, this should be picked up by recv, but that could be
// blocked, close our sends for now and just allow the caller handle it, SendMessage will wait
// for response which will never come and auto retry. acknowledgeMessage will fail and prevent
// the message from being passed on to message handlers, allowing recv to handle the stream
// close error.
loggerPrintf("Error sending message, stream closed.")
select {
case <-streamClosed:
default:
close(streamClosed)
}
return ErrConnectionClosed
}
return nil
}