func()

in banyand/liaison/grpc/measure.go [67:182]


func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error {
	reply := func(metadata *commonv1.Metadata, status modelv1.Status, messageId uint64, measure measurev1.MeasureService_WriteServer, logger *logger.Logger) {
		if status != modelv1.Status_STATUS_SUCCEED {
			ms.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, "measure", "write")
		}
		ms.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "measure", "write")
		if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata, Status: status.String(), MessageId: messageId}); errResp != nil {
			logger.Debug().Err(errResp).Msg("failed to send measure write response")
			ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "measure", "write")
		}
	}
	ctx := measure.Context()
	publisher := ms.pipeline.NewBatchPublisher(ms.writeTimeout)
	ms.metrics.totalStreamStarted.Inc(1, "measure", "write")
	start := time.Now()
	var succeedSent []succeedSentMessage
	defer func() {
		cee, err := publisher.Close()
		for _, s := range succeedSent {
			code := modelv1.Status_STATUS_SUCCEED
			if cee != nil {
				if ce, ok := cee[s.node]; ok {
					code = ce.Status()
				}
			}
			reply(s.metadata, code, s.messageID, measure, ms.sampled)
		}
		if err != nil {
			ms.sampled.Error().Err(err).Msg("failed to close the publisher")
		}
		ms.metrics.totalStreamFinished.Inc(1, "measure", "write")
		ms.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(), "measure", "write")
	}()
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		writeRequest, err := measure.Recv()
		if errors.Is(err, io.EOF) {
			return nil
		}
		if err != nil {
			if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
				ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to receive message")
			}
			return err
		}
		ms.metrics.totalStreamMsgReceived.Inc(1, writeRequest.Metadata.Group, "measure", "write")
		if errTime := timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
			ms.sampled.Error().Err(errTime).Stringer("written", writeRequest).Msg("the data point time is invalid")
			reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure, ms.sampled)
			continue
		}
		if writeRequest.Metadata.ModRevision > 0 {
			measureCache, existed := ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
			if !existed {
				ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to measure schema not found")
				reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, ms.sampled)
				continue
			}
			if writeRequest.Metadata.ModRevision != measureCache.ModRevision {
				ms.sampled.Error().Stringer("written", writeRequest).Msg("the measure schema is expired")
				reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, ms.sampled)
				continue
			}
		}
		entity, tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
		if err != nil {
			ms.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
			reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled)
			continue
		}
		if writeRequest.DataPoint.Version == 0 {
			if writeRequest.MessageId == 0 {
				writeRequest.MessageId = uint64(time.Now().UnixNano())
			}
			writeRequest.DataPoint.Version = int64(writeRequest.MessageId)
		}
		if ms.ingestionAccessLog != nil {
			if errAccessLog := ms.ingestionAccessLog.Write(writeRequest); errAccessLog != nil {
				ms.sampled.Error().Err(errAccessLog).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to write access log")
			}
		}
		iwr := &measurev1.InternalWriteRequest{
			Request:      writeRequest,
			ShardId:      uint32(shardID),
			SeriesHash:   pbv1.HashEntity(entity),
			EntityValues: tagValues[1:].Encode(),
		}
		nodeID, errPickNode := ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(), writeRequest.GetMetadata().GetName(), uint32(shardID))
		if errPickNode != nil {
			ms.sampled.Error().Err(errPickNode).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to pick an available node")
			reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled)
			continue
		}
		message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
		_, errWritePub := publisher.Publish(ctx, data.TopicMeasureWrite, message)
		if errWritePub != nil {
			ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a message")
			var ce *common.Error
			if errors.As(errWritePub, &ce) {
				reply(writeRequest.GetMetadata(), ce.Status(), writeRequest.GetMessageId(), measure, ms.sampled)
				continue
			}
			reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled)
			continue
		}
		succeedSent = append(succeedSent, succeedSentMessage{
			metadata:  writeRequest.GetMetadata(),
			messageID: writeRequest.GetMessageId(),
			node:      nodeID,
		})
	}
}