in banyand/liaison/grpc/stream.go [58:111]
func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
reply := func(stream streamv1.StreamService_WriteServer, logger *logger.Logger) {
if errResp := stream.Send(&streamv1.WriteResponse{}); errResp != nil {
logger.Err(errResp).Msg("failed to send response")
}
}
ctx := stream.Context()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
writeEntity, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
s.sampled.Error().Stringer("written", writeEntity).Err(err).Msg("failed to receive message")
reply(stream, s.sampled)
continue
}
if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
s.sampled.Error().Stringer("written", writeEntity).Err(errTime).Msg("the element time is invalid")
reply(stream, s.sampled)
continue
}
entity, tagValues, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
s.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
reply(stream, s.sampled)
continue
}
if s.ingestionAccessLog != nil {
if errAccessLog := s.ingestionAccessLog.Write(writeEntity); errAccessLog != nil {
s.sampled.Error().Err(errAccessLog).Msg("failed to write ingestion access log")
}
}
iwr := &streamv1.InternalWriteRequest{
Request: writeEntity,
ShardId: uint32(shardID),
SeriesHash: tsdb.HashEntity(entity),
}
if s.log.Debug().Enabled() {
iwr.EntityValues = tagValues.Encode()
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
_, errWritePub := s.pipeline.Publish(data.TopicStreamWrite, message)
if errWritePub != nil {
s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to send a message")
}
reply(stream, s.sampled)
}
}