in banyand/liaison/grpc/stream.go [67:176]
func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
reply := func(metadata *commonv1.Metadata, status modelv1.Status, messageId uint64, stream streamv1.StreamService_WriteServer, logger *logger.Logger) {
if status != modelv1.Status_STATUS_SUCCEED {
s.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, "stream", "write")
}
s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, "stream", "write")
if errResp := stream.Send(&streamv1.WriteResponse{Metadata: metadata, Status: status.String(), MessageId: messageId}); errResp != nil {
logger.Debug().Err(errResp).Msg("failed to send stream write response")
s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "stream", "write")
}
}
s.metrics.totalStreamStarted.Inc(1, "stream", "write")
publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
start := time.Now()
var succeedSent []succeedSentMessage
defer func() {
cee, err := publisher.Close()
for _, ssm := range succeedSent {
code := modelv1.Status_STATUS_SUCCEED
if cee != nil {
if ce, ok := cee[ssm.node]; ok {
code = ce.Status()
}
}
reply(ssm.metadata, code, ssm.messageID, stream, s.sampled)
}
if err != nil {
s.sampled.Error().Err(err).Msg("failed to close the publisher")
}
s.metrics.totalStreamFinished.Inc(1, "stream", "write")
s.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(), "stream", "write")
}()
ctx := stream.Context()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
writeEntity, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
s.sampled.Error().Stringer("written", writeEntity).Err(err).Msg("failed to receive message")
}
return err
}
s.metrics.totalStreamMsgReceived.Inc(1, writeEntity.Metadata.Group, "stream", "write")
if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
s.sampled.Error().Stringer("written", writeEntity).Err(errTime).Msg("the element time is invalid")
reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
if writeEntity.Metadata.ModRevision > 0 {
streamCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
if !existed {
s.sampled.Error().Err(err).Stringer("written", writeEntity).Msg("failed to stream schema not found")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
s.sampled.Error().Stringer("written", writeEntity).Msg("the stream schema is expired")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
}
entity, tagValues, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
s.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
if s.ingestionAccessLog != nil {
if errAccessLog := s.ingestionAccessLog.Write(writeEntity); errAccessLog != nil {
s.sampled.Error().Err(errAccessLog).Msg("failed to write ingestion access log")
}
}
iwr := &streamv1.InternalWriteRequest{
Request: writeEntity,
ShardId: uint32(shardID),
SeriesHash: pbv1.HashEntity(entity),
EntityValues: tagValues[1:].Encode(),
}
nodeID, errPickNode := s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), writeEntity.GetMetadata().GetName(), uint32(shardID))
if errPickNode != nil {
s.sampled.Error().Err(errPickNode).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to pick an available node")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
_, errWritePub := publisher.Publish(ctx, data.TopicStreamWrite, message)
if errWritePub != nil {
var ce *common.Error
if errors.As(errWritePub, &ce) {
reply(writeEntity.GetMetadata(), ce.Status(), writeEntity.GetMessageId(), stream, s.sampled)
continue
}
s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message")
reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled)
continue
}
succeedSent = append(succeedSent, succeedSentMessage{
metadata: writeEntity.GetMetadata(),
messageID: writeEntity.GetMessageId(),
node: nodeID,
})
}
}