in banyand/liaison/grpc/measure.go [58:109]
func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error {
reply := func(measure measurev1.MeasureService_WriteServer, logger *logger.Logger) {
if errResp := measure.Send(&measurev1.WriteResponse{}); errResp != nil {
logger.Err(errResp).Msg("failed to send response")
}
}
ctx := measure.Context()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
writeRequest, err := measure.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to receive message")
reply(measure, ms.sampled)
continue
}
if errTime := timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
ms.sampled.Error().Err(errTime).Stringer("written", writeRequest).Msg("the data point time is invalid")
reply(measure, ms.sampled)
continue
}
entity, tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
if err != nil {
ms.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
reply(measure, ms.sampled)
continue
}
if ms.ingestionAccessLog != nil {
if errAccessLog := ms.ingestionAccessLog.Write(writeRequest); errAccessLog != nil {
ms.sampled.Error().Err(errAccessLog).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to write access log")
}
}
iwr := &measurev1.InternalWriteRequest{
Request: writeRequest,
ShardId: uint32(shardID),
SeriesHash: tsdb.HashEntity(entity),
EntityValues: tagValues.Encode(),
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
_, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, message)
if errWritePub != nil {
ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to send a message")
}
reply(measure, ms.sampled)
}
}