internal/satellite/module/gatherer/receiver_gatherer.go (148 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package gatherer
import (
"context"
"errors"
"sync"
"time"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
processor "github.com/apache/skywalking-satellite/internal/satellite/module/processor/api"
"github.com/apache/skywalking-satellite/internal/satellite/telemetry"
queue "github.com/apache/skywalking-satellite/plugins/queue/api"
"github.com/apache/skywalking-satellite/plugins/queue/partition"
receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
server "github.com/apache/skywalking-satellite/plugins/server/api"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)
var enqueueErrorPrintInterval = 20 * time.Second
type ReceiverGatherer struct {
// config
config *api.GathererConfig
// dependency plugins
runningReceiver receiver.Receiver
runningQueue *partition.PartitionedQueue
runningServer server.Server
// self components
outputChannel []chan *queue.SequenceEvent
// metrics
receiveCounter telemetry.Counter
queueOutputCounter telemetry.Counter
// sync invoker
processor processor.Processor
lastEnqueueErrorPrintTime time.Time
enqueueErrorCounter map[string]int
}
func (r *ReceiverGatherer) Prepare() error {
log.Logger.WithField("pipe", r.config.PipeName).Info("receiver gatherer module is preparing...")
r.runningReceiver.RegisterHandler(r.runningServer.GetServer())
if err := r.runningQueue.Initialize(); err != nil {
log.Logger.WithField("pipe", r.config.PipeName).Infof("the %s queue failed when initializing", r.runningQueue.Name())
return err
}
r.outputChannel = make([]chan *queue.SequenceEvent, r.runningQueue.TotalPartitionCount())
for p := 0; p < r.runningQueue.TotalPartitionCount(); p++ {
r.outputChannel[p] = make(chan *queue.SequenceEvent)
}
r.receiveCounter = telemetry.NewCounter("gatherer_receive_count", "Total number of the receiving count in the Gatherer.", "pipe", "status")
r.queueOutputCounter = telemetry.NewCounter("queue_output_count", "Total number of the output count in the Queue of Gatherer.", "pipe", "status")
r.lastEnqueueErrorPrintTime = time.Now()
r.enqueueErrorCounter = make(map[string]int)
return nil
}
func (r *ReceiverGatherer) Boot(ctx context.Context) {
r.runningReceiver.RegisterSyncInvoker(r)
var wg sync.WaitGroup
wg.Add(r.PartitionCount() + 1)
log.Logger.WithField("pipe", r.config.PipeName).Info("receive_gatherer module is starting...")
go func() {
childCtx, cancel := context.WithCancel(ctx)
defer wg.Done()
for {
select {
case e := <-r.runningReceiver.Channel():
r.receiveCounter.Inc(r.config.PipeName, "all")
err := r.runningQueue.Enqueue(e)
if err != nil {
r.recordEnqueueError(err)
}
case <-childCtx.Done():
cancel()
return
}
}
}()
for p := 0; p < r.PartitionCount(); p++ {
r.consumeQueue(ctx, p, &wg)
}
wg.Wait()
}
func (r *ReceiverGatherer) recordEnqueueError(err error) {
r.receiveCounter.Inc(r.config.PipeName, "abandoned")
r.enqueueErrorCounter[err.Error()]++
if time.Now().After(r.lastEnqueueErrorPrintTime.Add(enqueueErrorPrintInterval)) {
for e, count := range r.enqueueErrorCounter {
log.Logger.WithFields(logrus.Fields{
"pipe": r.config.PipeName,
"queue": r.runningQueue.Name(),
"count": count,
}).Errorf("error in enqueue: %v", e)
}
r.lastEnqueueErrorPrintTime = time.Now()
r.enqueueErrorCounter = make(map[string]int)
}
}
func (r *ReceiverGatherer) consumeQueue(ctx context.Context, p int, wg *sync.WaitGroup) {
go func() {
childCtx, cancel := context.WithCancel(ctx)
defer wg.Done()
for {
select {
case <-childCtx.Done():
cancel()
r.Shutdown()
return
default:
if e, err := r.runningQueue.Dequeue(p); err == nil {
r.outputChannel[p] <- e
r.queueOutputCounter.Inc(r.config.PipeName, "success")
} else if err == queue.ErrEmpty {
time.Sleep(time.Second)
} else {
r.queueOutputCounter.Inc(r.config.PipeName, "error")
log.Logger.WithFields(logrus.Fields{
"pipe": r.config.PipeName,
"queue": r.runningQueue.Name(),
}).Errorf("error in dequeue: %v", err)
}
}
}
}()
}
func (r *ReceiverGatherer) Shutdown() {
log.Logger.WithField("pipe", r.config.PipeName).Infof("receiver gatherer module is closing...")
time.Sleep(module.ShutdownHookTime)
if err := r.runningQueue.Close(); err != nil {
log.Logger.WithFields(logrus.Fields{
"pipe": r.config.PipeName,
"queue": r.runningQueue.Name(),
}).Errorf("error in closing: %v", err)
}
}
func (r *ReceiverGatherer) PartitionCount() int {
return len(r.outputChannel)
}
func (r *ReceiverGatherer) OutputDataChannel(index int) <-chan *queue.SequenceEvent {
return r.outputChannel[index]
}
func (r *ReceiverGatherer) Ack(lastOffset *event.Offset) {
r.runningQueue.Ack(lastOffset)
}
func (r *ReceiverGatherer) SyncInvoke(d *v1.SniffData) (*v1.SniffData, grpc.ClientStream, error) {
return r.processor.SyncInvoke(d)
}
func (r *ReceiverGatherer) SetProcessor(m module.Module) error {
if p, ok := m.(processor.Processor); ok {
r.processor = p
return nil
}
return errors.New("set processor only supports to inject processor module")
}