banyand/liaison/grpc/measure.go (256 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package grpc import ( "context" "io" "time" "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/accesslog" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type measureService struct { measurev1.UnimplementedMeasureServiceServer ingestionAccessLog accesslog.Log pipeline queue.Client broadcaster queue.Client *discoveryService sampled *logger.Logger metrics *metrics writeTimeout time.Duration } func (ms *measureService) setLogger(log *logger.Logger) { ms.sampled = log.Sampled(10) } func (ms *measureService) activeIngestionAccessLog(root string) (err error) { if ms.ingestionAccessLog, err = accesslog. NewFileLog(root, "measure-ingest-%s", 10*time.Minute, ms.log); err != nil { return err } return nil } func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error { reply := func(metadata *commonv1.Metadata, status modelv1.Status, messageId uint64, measure measurev1.MeasureService_WriteServer, logger *logger.Logger) { if status != modelv1.Status_STATUS_SUCCEED { ms.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group, "measure", "write") } ms.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "measure", "write") if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata, Status: status.String(), MessageId: messageId}); errResp != nil { logger.Debug().Err(errResp).Msg("failed to send measure write response") ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "measure", "write") } } ctx := measure.Context() publisher := ms.pipeline.NewBatchPublisher(ms.writeTimeout) ms.metrics.totalStreamStarted.Inc(1, "measure", "write") start := time.Now() var succeedSent []succeedSentMessage defer func() { cee, err := publisher.Close() for _, s := range succeedSent { code := modelv1.Status_STATUS_SUCCEED if cee != nil { if ce, ok := cee[s.node]; ok { code = ce.Status() } } reply(s.metadata, code, s.messageID, measure, ms.sampled) } if err != nil { ms.sampled.Error().Err(err).Msg("failed to close the publisher") } ms.metrics.totalStreamFinished.Inc(1, "measure", "write") ms.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(), "measure", "write") }() for { select { case <-ctx.Done(): return ctx.Err() default: } writeRequest, err := measure.Recv() if errors.Is(err, io.EOF) { return nil } if err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to receive message") } return err } ms.metrics.totalStreamMsgReceived.Inc(1, writeRequest.Metadata.Group, "measure", "write") if errTime := timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil { ms.sampled.Error().Err(errTime).Stringer("written", writeRequest).Msg("the data point time is invalid") reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure, ms.sampled) continue } if writeRequest.Metadata.ModRevision > 0 { measureCache, existed := ms.entityRepo.getLocator(getID(writeRequest.GetMetadata())) if !existed { ms.sampled.Error().Err(err).Stringer("written", writeRequest).Msg("failed to measure schema not found") reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, ms.sampled) continue } if writeRequest.Metadata.ModRevision != measureCache.ModRevision { ms.sampled.Error().Stringer("written", writeRequest).Msg("the measure schema is expired") reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, ms.sampled) continue } } entity, tagValues, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies()) if err != nil { ms.sampled.Error().Err(err).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to navigate to the write target") reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled) continue } if writeRequest.DataPoint.Version == 0 { if writeRequest.MessageId == 0 { writeRequest.MessageId = uint64(time.Now().UnixNano()) } writeRequest.DataPoint.Version = int64(writeRequest.MessageId) } if ms.ingestionAccessLog != nil { if errAccessLog := ms.ingestionAccessLog.Write(writeRequest); errAccessLog != nil { ms.sampled.Error().Err(errAccessLog).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to write access log") } } iwr := &measurev1.InternalWriteRequest{ Request: writeRequest, ShardId: uint32(shardID), SeriesHash: pbv1.HashEntity(entity), EntityValues: tagValues[1:].Encode(), } nodeID, errPickNode := ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(), writeRequest.GetMetadata().GetName(), uint32(shardID)) if errPickNode != nil { ms.sampled.Error().Err(errPickNode).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to pick an available node") reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled) continue } message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) _, errWritePub := publisher.Publish(ctx, data.TopicMeasureWrite, message) if errWritePub != nil { ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a message") var ce *common.Error if errors.As(errWritePub, &ce) { reply(writeRequest.GetMetadata(), ce.Status(), writeRequest.GetMessageId(), measure, ms.sampled) continue } reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled) continue } succeedSent = append(succeedSent, succeedSentMessage{ metadata: writeRequest.GetMetadata(), messageID: writeRequest.GetMessageId(), node: nodeID, }) } } var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: make([]*measurev1.DataPoint, 0)} func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) { for _, g := range req.Groups { ms.metrics.totalStarted.Inc(1, g, "measure", "query") } start := time.Now() defer func() { for _, g := range req.Groups { ms.metrics.totalFinished.Inc(1, g, "measure", "query") if err != nil { ms.metrics.totalErr.Inc(1, g, "measure", "query") } ms.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "measure", "query") } }() if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil { return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err) } now := time.Now() if req.Trace { tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) span, _ := tracer.StartSpan(ctx, "measure-grpc") span.Tag("request", convert.BytesToString(logger.Proto(req))) defer func() { if err != nil { span.Error(err) } else { span.AddSubTrace(resp.Trace) resp.Trace = tracer.ToProto() } span.Stop() }() } feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(now.UnixNano()), req)) if err != nil { return nil, err } msg, err := feat.Get() if err != nil { if errors.Is(err, io.EOF) { return emptyMeasureQueryResponse, nil } return nil, err } data := msg.Data() switch d := data.(type) { case *measurev1.QueryResponse: return d, nil case *common.Error: return nil, errors.WithMessage(errQueryMsg, d.Error()) } return nil, nil } func (ms *measureService) TopN(ctx context.Context, topNRequest *measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) { if err = timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err != nil { return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", topNRequest.GetTimeRange(), err) } now := time.Now() if topNRequest.Trace { tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) span, _ := tracer.StartSpan(ctx, "topn-grpc") span.Tag("request", convert.BytesToString(logger.Proto(topNRequest))) defer func() { if err != nil { span.Error(err) } else { span.AddSubTrace(resp.Trace) resp.Trace = tracer.ToProto() } span.Stop() }() } message := bus.NewMessage(bus.MessageID(now.UnixNano()), topNRequest) feat, errQuery := ms.broadcaster.Publish(ctx, data.TopicTopNQuery, message) if errQuery != nil { return nil, errQuery } msg, errFeat := feat.Get() if errFeat != nil { return nil, errFeat } data := msg.Data() switch d := data.(type) { case *measurev1.TopNResponse: return d, nil case *common.Error: return nil, errors.WithMessage(errQueryMsg, d.Error()) } return nil, nil } func (ms *measureService) Close() error { return ms.ingestionAccessLog.Close() } type succeedSentMessage struct { metadata *commonv1.Metadata node string messageID uint64 }