pkg/exporter/skywalking.go (167 lines of code) (raw):

/* * Licensed to Apache Software Foundation (ASF) under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Apache Software Foundation (ASF) licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package exporter import ( "context" "crypto/tls" "crypto/x509" "encoding/json" "fmt" "os" "time" "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/credentials" k8score "k8s.io/api/core/v1" sw "skywalking.apache.org/repo/goapi/collect/event/v3" "github.com/apache/skywalking-kubernetes-event-exporter/configs" "github.com/apache/skywalking-kubernetes-event-exporter/internal/pkg/logger" ) // k8sLayerName is the name of the layer that represents the k8s event, // which is defined at https://github.com/apache/skywalking/blob/master/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java const k8sLayerName = "K8S" // SkyWalking Exporter exports the events into Apache SkyWalking OAP server. type SkyWalking struct { config SkyWalkingConfig client sw.EventServiceClient } type SkyWalkingConfig struct { Address string `mapstructure:"address"` Template *EventTemplate `mapstructure:"template"` EnableTLS bool `mapstructure:"enableTLS"` ClientCertPath string `mapstructure:"clientCertPath"` ClientKeyPath string `mapstructure:"clientKeyPath"` TrustedCertPath string `mapstructure:"trustedCertPath"` InsecureSkipVerify bool `mapstructure:"insecureSkipVerify"` } func init() { s := &SkyWalking{} RegisterExporter(s.Name(), s) } func (exporter *SkyWalking) Init(ctx context.Context) error { config := SkyWalkingConfig{} if c := configs.GlobalConfig.Exporters[exporter.Name()]; c == nil { return fmt.Errorf("configs of %+v exporter cannot be empty", exporter.Name()) } else if marshal, err := json.Marshal(c); err != nil { return err } else if err := json.Unmarshal(marshal, &config); err != nil { return err } if err := config.Template.Init(); err != nil { return err } var dialOption grpc.DialOption if config.EnableTLS { if isFileExisted(config.ClientCertPath) && isFileExisted(config.ClientKeyPath) { clientCert, err := tls.LoadX509KeyPair(config.ClientCertPath, config.ClientKeyPath) if err != nil { return err } trustedCert, err := os.ReadFile(config.TrustedCertPath) if err != nil { return err } certPool := x509.NewCertPool() certPool.AppendCertsFromPEM(trustedCert) tlsConfig := &tls.Config{ Certificates: []tls.Certificate{clientCert}, RootCAs: certPool, MinVersion: tls.VersionTLS13, MaxVersion: tls.VersionTLS13, } tlsConfig.InsecureSkipVerify = config.InsecureSkipVerify dialOption = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } else { cred, _ := credentials.NewClientTLSFromFile(config.TrustedCertPath, "") dialOption = grpc.WithTransportCredentials(cred) } } else { dialOption = grpc.WithInsecure() } conn, err := grpc.Dial(config.Address, dialOption) if err != nil { return err } exporter.config = config exporter.client = sw.NewEventServiceClient(conn) go func() { <-ctx.Done() if err := conn.Close(); err != nil { logger.Log.Errorf("failed to close connection. %+v", err) } }() return nil } // checkTLSFile checks the TLS files. func isFileExisted(path string) bool { file, err := os.Open(path) if err != nil { return false } _, err = file.Stat() return err == nil } func (exporter *SkyWalking) Name() string { return "skywalking" } func (exporter *SkyWalking) Export(ctx context.Context, events chan *k8score.Event) { logger.Log.Debugf("exporting events into %+v", exporter.Name()) stream, err := exporter.client.Collect(ctx) for err != nil { select { case <-ctx.Done(): logger.Log.Debugf("stopping exporter %+v", exporter.Name()) if err = stream.CloseSend(); err != nil { logger.Log.Warnf("failed to close stream. %+v", err) } return default: logger.Log.Errorf("failed to connect to SkyWalking server. %+v", err) time.Sleep(3 * time.Second) stream, err = exporter.client.Collect(ctx) } } for { select { case <-ctx.Done(): logger.Log.Debugf("stopping exporter %+v", exporter.Name()) return case kEvent := <-events: if logger.Log.IsLevelEnabled(logrus.DebugLevel) { if bytes, err := json.Marshal(kEvent); err == nil { logger.Log.Debugf("exporting event to %v: %v", exporter.Name(), string(bytes)) } } t := sw.Type_Normal if kEvent.Type == k8score.EventTypeWarning { t = sw.Type_Error } swEvent := &sw.Event{ Uuid: string(kEvent.UID), Source: &sw.Source{}, Name: kEvent.Reason, Type: t, Message: kEvent.Message, StartTime: kEvent.FirstTimestamp.UnixNano() / 1000000, EndTime: kEvent.LastTimestamp.UnixNano() / 1000000, Layer: k8sLayerName, } if exporter.config.Template != nil { go func() { renderCtx, cancel := context.WithTimeout(ctx, time.Minute) done := exporter.config.Template.render(renderCtx, swEvent, kEvent) select { case <-done: logger.Log.Debugf("done: rendered event is: %+v", swEvent) exporter.export(stream, swEvent) case <-renderCtx.Done(): logger.Log.Debugf("canceled: rendered event is: %+v", swEvent) exporter.export(stream, swEvent) } cancel() }() } else { exporter.export(stream, swEvent) } } } } func (exporter *SkyWalking) export(stream sw.EventService_CollectClient, swEvent *sw.Event) { if err := stream.Send(swEvent); err != nil { logger.Log.Errorf("failed to send event to %+v. %+v", exporter.Name(), err) } }