in internal/acs/testserver/testserver.go [93:145]
func (s *acsImplementation) StreamAgentMessages(stream acpb.AgentCommunication_StreamAgentMessagesServer) error {
closed := make(chan struct{})
defer close(closed)
go func() {
for {
rec, err := stream.Recv()
select {
case <-closed:
return
default:
}
if err != nil {
if errors.Is(err, io.EOF) {
s.recvErr <- nil
return
}
s.recvErr <- err
return
}
switch rec.GetType().(type) {
case *acpb.StreamAgentMessagesRequest_MessageResponse:
// Ignore ack's for test messages generated to send on Watch().
continue
case *acpb.StreamAgentMessagesRequest_MessageBody:
// Collect all messages sent by agent.
s.add(rec)
}
// Acks as if service will ack on receiving msg from agent Send().
if err := stream.Send(&acpb.StreamAgentMessagesResponse{MessageId: rec.GetMessageId(), Type: &acpb.StreamAgentMessagesResponse_MessageResponse{}}); err != nil {
s.recvErr <- err
return
}
}
}()
for {
select {
case msg := <-s.toSend:
if err := stream.Send(msg); err != nil {
galog.Errorf("[TestACSServer] error sending message [%+v]: %v", msg, err)
return err
}
case err := <-s.recvErr:
galog.Errorf("[TestACSServer] received error on error stream: %v", err)
return err
}
}
}