func()

in banyand/liaison/grpc/measure.go [58:109]


func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error {
	reply := func(measure measurev1.MeasureService_WriteServer, logger *logger.Logger) {
		if errResp := measure.Send(&measurev1.WriteResponse{}); errResp != nil {
			logger.Err(errResp).Msg("failed to send response")
		}
	}
	ctx := measure.Context()
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		writeRequest, err := measure.Recv()
		if errors.Is(err, io.EOF) {
			return nil
		}
		if err != nil {
			ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to receive message")
			reply(measure, ms.sampled)
			continue
		}
		if errTime := timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
			ms.sampled.Error().Err(errTime).Stringer("written", writeRequest).Msg("the data point time is invalid")
			reply(measure, ms.sampled)
			continue
		}
		entity, tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
		if err != nil {
			ms.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
			reply(measure, ms.sampled)
			continue
		}
		if ms.ingestionAccessLog != nil {
			if errAccessLog := ms.ingestionAccessLog.Write(writeRequest); errAccessLog != nil {
				ms.sampled.Error().Err(errAccessLog).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to write access log")
			}
		}
		iwr := &measurev1.InternalWriteRequest{
			Request:      writeRequest,
			ShardId:      uint32(shardID),
			SeriesHash:   tsdb.HashEntity(entity),
			EntityValues: tagValues.Encode(),
		}
		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
		_, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, message)
		if errWritePub != nil {
			ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to send a message")
		}
		reply(measure, ms.sampled)
	}
}