banyand/queue/sub/helpers.go (66 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package sub import ( "time" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/apache/skywalking-banyandb/api/common" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" ) func (s *server) handleEOF(stream clusterv1.Service_SendServer, topic *bus.Topic, dataCollection []any, writeEntity *clusterv1.SendRequest) { if len(dataCollection) < 1 { return } listeners := s.getListeners(*topic) if len(listeners) == 0 { s.reply(stream, writeEntity, nil, "no listener found") return } if len(listeners) > 1 { logger.Panicf("multiple listeners found for topic %s", *topic) } listener := listeners[0] if le := listener.CheckHealth(); le != nil { s.reply(stream, writeEntity, le, "") return } message := listener.Rev(stream.Context(), bus.NewMessage(bus.MessageID(0), dataCollection)) var resp *clusterv1.SendResponse data := message.Data() if data != nil { switch d := data.(type) { case *common.Error: resp = &clusterv1.SendResponse{ MessageId: writeEntity.MessageId, Error: d.Error(), Status: d.Status(), } default: resp = &clusterv1.SendResponse{ MessageId: writeEntity.MessageId, } } } if errSend := stream.Send(resp); errSend != nil { s.log.Error().Stringer("written", writeEntity).Err(errSend).Msg("failed to send write response") if writeEntity != nil { s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic) } } } func (s *server) handleRecvError(err error) error { if status.Code(err) == codes.Canceled || status.Code(err) == codes.DeadlineExceeded { return nil } s.log.Error().Err(err).Msg("failed to receive message") return err } func (s *server) handleBatch(dataCollection *[]any, writeEntity *clusterv1.SendRequest, start *time.Time) { if len(*dataCollection) == 0 { s.metrics.totalStarted.Inc(1, writeEntity.Topic) *start = time.Now() } *dataCollection = append(*dataCollection, writeEntity.Body) s.metrics.totalMsgSent.Inc(1, writeEntity.Topic) }