pkg/pipe/pipe.go (80 lines of code) (raw):
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package pipe
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"github.com/apache/skywalking-kubernetes-event-exporter/configs"
"github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger"
exp "github.com/apache/skywalking-kubernetes-event-exporter/pkg/exporter"
"github.com/apache/skywalking-kubernetes-event-exporter/pkg/k8s"
)
type workflow struct {
filter *configs.FilterConfig
exporter exp.Exporter
events chan *v1.Event
}
type Pipe struct {
Watcher *k8s.EventWatcher
workflows []workflow
}
func (p *Pipe) Init(ctx context.Context) error {
logger.Log.Debugf("initializing pipe")
p.workflows = []workflow{}
initialized := map[string]bool{}
for _, filter := range configs.GlobalConfig.Filters {
filter.Init()
for _, name := range filter.Exporters {
if _, ok := configs.GlobalConfig.Exporters[name]; !ok {
return fmt.Errorf("exporter %v is not defined", filter.Exporters)
}
exporter := exp.GetExporter(name)
if exporter == nil {
return fmt.Errorf("exporter %v is not defined", filter.Exporters)
}
if initialized[name] {
logger.Log.Debugf("exporter %+v has been initialized, skip", name)
continue
}
if err := exporter.Init(ctx); err != nil {
return err
}
initialized[name] = true
events := make(chan *v1.Event)
p.workflows = append(p.workflows, workflow{
filter: filter,
exporter: exporter,
events: events,
})
}
}
if err := k8s.Registry.Init(); err != nil {
return err
}
logger.Log.Debugf("pipe has been initialized")
return nil
}
func (p *Pipe) Start(ctx context.Context) error {
p.Watcher.Start(ctx)
k8s.Registry.Start(ctx)
for _, wkfl := range p.workflows {
go wkfl.exporter.Export(ctx, wkfl.events)
}
for {
select {
case <-ctx.Done():
logger.Log.Debugf("stopping pipe")
return nil
case e := <-p.Watcher.Events:
for _, wkfl := range p.workflows {
go func(w workflow) {
fCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if !w.filter.Filter(fCtx, e) {
w.events <- e
}
}(wkfl)
}
}
}
}