func()

in internal/acs/testserver/testserver.go [93:145]


func (s *acsImplementation) StreamAgentMessages(stream acpb.AgentCommunication_StreamAgentMessagesServer) error {
	closed := make(chan struct{})
	defer close(closed)

	go func() {
		for {
			rec, err := stream.Recv()

			select {
			case <-closed:
				return
			default:
			}

			if err != nil {
				if errors.Is(err, io.EOF) {
					s.recvErr <- nil
					return
				}
				s.recvErr <- err
				return
			}

			switch rec.GetType().(type) {
			case *acpb.StreamAgentMessagesRequest_MessageResponse:
				// Ignore ack's for test messages generated to send on Watch().
				continue
			case *acpb.StreamAgentMessagesRequest_MessageBody:
				// Collect all messages sent by agent.
				s.add(rec)
			}

			// Acks as if service will ack on receiving msg from agent Send().
			if err := stream.Send(&acpb.StreamAgentMessagesResponse{MessageId: rec.GetMessageId(), Type: &acpb.StreamAgentMessagesResponse_MessageResponse{}}); err != nil {
				s.recvErr <- err
				return
			}
		}
	}()

	for {
		select {
		case msg := <-s.toSend:
			if err := stream.Send(msg); err != nil {
				galog.Errorf("[TestACSServer] error sending message [%+v]: %v", msg, err)
				return err
			}
		case err := <-s.recvErr:
			galog.Errorf("[TestACSServer] received error on error stream: %v", err)
			return err
		}
	}
}