func()

in banyand/liaison/grpc/stream.go [58:111]


func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
	reply := func(stream streamv1.StreamService_WriteServer, logger *logger.Logger) {
		if errResp := stream.Send(&streamv1.WriteResponse{}); errResp != nil {
			logger.Err(errResp).Msg("failed to send response")
		}
	}
	ctx := stream.Context()
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		writeEntity, err := stream.Recv()
		if errors.Is(err, io.EOF) {
			return nil
		}
		if err != nil {
			s.sampled.Error().Stringer("written", writeEntity).Err(err).Msg("failed to receive message")
			reply(stream, s.sampled)
			continue
		}
		if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
			s.sampled.Error().Stringer("written", writeEntity).Err(errTime).Msg("the element time is invalid")
			reply(stream, s.sampled)
			continue
		}
		entity, tagValues, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
		if err != nil {
			s.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
			reply(stream, s.sampled)
			continue
		}
		if s.ingestionAccessLog != nil {
			if errAccessLog := s.ingestionAccessLog.Write(writeEntity); errAccessLog != nil {
				s.sampled.Error().Err(errAccessLog).Msg("failed to write ingestion access log")
			}
		}
		iwr := &streamv1.InternalWriteRequest{
			Request:    writeEntity,
			ShardId:    uint32(shardID),
			SeriesHash: tsdb.HashEntity(entity),
		}
		if s.log.Debug().Enabled() {
			iwr.EntityValues = tagValues.Encode()
		}
		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
		_, errWritePub := s.pipeline.Publish(data.TopicStreamWrite, message)
		if errWritePub != nil {
			s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to send a message")
		}
		reply(stream, s.sampled)
	}
}