internal/satellite/module/gatherer/fetcher_gatherer.go (115 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package gatherer import ( "context" "errors" "sync" "time" "github.com/apache/skywalking-satellite/internal/pkg/log" "github.com/apache/skywalking-satellite/internal/satellite/event" module "github.com/apache/skywalking-satellite/internal/satellite/module/api" "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api" processor "github.com/apache/skywalking-satellite/internal/satellite/module/processor/api" "github.com/apache/skywalking-satellite/internal/satellite/telemetry" fetcher "github.com/apache/skywalking-satellite/plugins/fetcher/api" queue "github.com/apache/skywalking-satellite/plugins/queue/api" "github.com/apache/skywalking-satellite/plugins/queue/partition" ) type FetcherGatherer struct { // config config *api.GathererConfig // dependency plugins runningFetcher fetcher.Fetcher runningQueue *partition.PartitionedQueue // self components outputChannel []chan *queue.SequenceEvent // metrics fetchCounter telemetry.Counter queueOutputCounter telemetry.Counter // sync invoker processor processor.Processor } func (f *FetcherGatherer) Prepare() error { log.Logger.WithField("pipe", f.config.PipeName).Info("fetcher gatherer module is preparing...") if err := f.runningQueue.Initialize(); err != nil { log.Logger.WithField("pipe", f.config.PipeName).Infof("the %s queue failed when initializing", f.runningQueue.Name()) return err } f.outputChannel = make([]chan *queue.SequenceEvent, f.runningQueue.TotalPartitionCount()) for p := 0; p < f.runningQueue.TotalPartitionCount(); p++ { f.outputChannel[p] = make(chan *queue.SequenceEvent) } f.fetchCounter = telemetry.NewCounter("gatherer_fetch_count", "Total number of the receiving count in the Gatherer.", "pipe", "status") f.queueOutputCounter = telemetry.NewCounter("queue_output_count", "Total number of the output count in the Queue of Gatherer.", "pipe", "status") return nil } func (f *FetcherGatherer) Boot(ctx context.Context) { log.Logger.WithField("pipe", f.config.PipeName).Info("fetch_gatherer module is starting...") var wg sync.WaitGroup wg.Add(f.PartitionCount() + 1) go func() { defer wg.Done() childCtx, cancel := context.WithCancel(ctx) f.runningFetcher.Fetch(childCtx) for { select { case e := <-f.runningFetcher.Channel(): err := f.runningQueue.Enqueue(e) f.fetchCounter.Inc(f.config.PipeName, "all") if err != nil { f.fetchCounter.Inc(f.config.PipeName, "abandoned") log.Logger.Errorf("cannot put event into queue in %s namespace, %v", f.config.PipeName, err) } case <-childCtx.Done(): cancel() return } } }() for p := 0; p < f.PartitionCount(); p++ { f.consumeQueue(ctx, p, &wg) } wg.Wait() } func (f *FetcherGatherer) consumeQueue(ctx context.Context, p int, wg *sync.WaitGroup) { go func() { defer wg.Done() childCtx, cancel := context.WithCancel(ctx) for { select { case <-childCtx.Done(): cancel() f.Shutdown() return default: if e, err := f.runningQueue.Dequeue(p); err == nil { f.outputChannel[p] <- e f.queueOutputCounter.Inc(f.config.PipeName, "success") } else if err == queue.ErrEmpty { time.Sleep(time.Second) } else { f.queueOutputCounter.Inc(f.config.PipeName, "error") log.Logger.Errorf("error in popping from the queue: %v", err) } } } }() wg.Wait() } func (f *FetcherGatherer) Shutdown() { log.Logger.Infof("fetcher gatherer module of %s namespace is closing", f.config.PipeName) time.Sleep(module.ShutdownHookTime) if err := f.runningQueue.Close(); err != nil { log.Logger.Errorf("failure occurs when closing %s queue in %s namespace :%v", f.runningQueue.Name(), f.config.PipeName, err) } } func (f *FetcherGatherer) PartitionCount() int { return len(f.outputChannel) } func (f *FetcherGatherer) OutputDataChannel(index int) <-chan *queue.SequenceEvent { return f.outputChannel[index] } func (f *FetcherGatherer) Ack(lastOffset *event.Offset) { f.runningQueue.Ack(lastOffset) } func (f *FetcherGatherer) SetProcessor(m module.Module) error { if p, ok := m.(processor.Processor); ok { f.processor = p return nil } return errors.New("set processor only supports to inject processor module") }