in banyand/queue/sub/sub.go [37:147]
func (s *server) Send(stream clusterv1.Service_SendServer) error {
ctx := stream.Context()
var topic *bus.Topic
var m bus.Message
var dataCollection []any
start := time.Now()
defer func() {
if topic != nil {
s.metrics.totalFinished.Inc(1, topic.String())
s.metrics.totalLatency.Inc(time.Since(start).Seconds(), topic.String())
}
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
writeEntity, err := stream.Recv()
if errors.Is(err, io.EOF) {
s.handleEOF(stream, topic, dataCollection, writeEntity)
return nil
}
if err != nil {
return s.handleRecvError(err)
}
s.metrics.totalMsgReceived.Inc(1, writeEntity.Topic)
if writeEntity.Topic != "" && topic == nil {
t, ok := data.TopicMap[writeEntity.Topic]
if !ok {
s.reply(stream, writeEntity, err, "invalid topic")
continue
}
topic = &t
}
if topic == nil {
s.reply(stream, writeEntity, err, "topic is empty")
continue
}
if reqSupplier, ok := data.TopicRequestMap[*topic]; ok {
req := reqSupplier()
if errUnmarshal := writeEntity.Body.UnmarshalTo(req); errUnmarshal != nil {
s.reply(stream, writeEntity, errUnmarshal, "failed to unmarshal message")
continue
}
m = bus.NewMessage(bus.MessageID(writeEntity.MessageId), req)
} else {
s.reply(stream, writeEntity, err, "unknown topic")
continue
}
if writeEntity.BatchMod {
s.handleBatch(&dataCollection, writeEntity, &start)
continue
}
s.metrics.totalStarted.Inc(1, writeEntity.Topic)
listeners := s.getListeners(*topic)
if len(listeners) == 0 {
s.reply(stream, writeEntity, err, "no listener found")
continue
}
if len(listeners) > 1 {
logger.Panicf("multiple listeners found for topic %s", *topic)
}
listener := listeners[0]
m = listener.Rev(ctx, m)
if m.Data() == nil {
if errSend := stream.Send(&clusterv1.SendResponse{
MessageId: writeEntity.MessageId,
}); errSend != nil {
s.log.Error().Stringer("request", writeEntity).Err(errSend).Msg("failed to send empty response")
s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
continue
}
s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
continue
}
var message proto.Message
switch d := m.Data().(type) {
case proto.Message:
message = d
case *common.Error:
select {
case <-ctx.Done():
s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic)
return ctx.Err()
default:
}
s.reply(stream, writeEntity, nil, d.Error())
continue
default:
s.reply(stream, writeEntity, nil, fmt.Sprintf("invalid response: %T", d))
continue
}
anyMessage, err := anypb.New(message)
if err != nil {
s.reply(stream, writeEntity, err, "failed to marshal message")
continue
}
if err := stream.Send(&clusterv1.SendResponse{
MessageId: writeEntity.MessageId,
Body: anyMessage,
}); err != nil {
s.log.Error().Stringer("request", writeEntity).Dur("latency", time.Since(start)).Err(err).Msg("failed to send query response")
s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
continue
}
s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
}
}