in pkg/exporter/skywalking.go [144:209]
func (exporter *SkyWalking) Export(ctx context.Context, events chan *k8score.Event) {
logger.Log.Debugf("exporting events into %+v", exporter.Name())
stream, err := exporter.client.Collect(ctx)
for err != nil {
select {
case <-ctx.Done():
logger.Log.Debugf("stopping exporter %+v", exporter.Name())
if err = stream.CloseSend(); err != nil {
logger.Log.Warnf("failed to close stream. %+v", err)
}
return
default:
logger.Log.Errorf("failed to connect to SkyWalking server. %+v", err)
time.Sleep(3 * time.Second)
stream, err = exporter.client.Collect(ctx)
}
}
for {
select {
case <-ctx.Done():
logger.Log.Debugf("stopping exporter %+v", exporter.Name())
return
case kEvent := <-events:
if logger.Log.IsLevelEnabled(logrus.DebugLevel) {
if bytes, err := json.Marshal(kEvent); err == nil {
logger.Log.Debugf("exporting event to %v: %v", exporter.Name(), string(bytes))
}
}
t := sw.Type_Normal
if kEvent.Type == k8score.EventTypeWarning {
t = sw.Type_Error
}
swEvent := &sw.Event{
Uuid: string(kEvent.UID),
Source: &sw.Source{},
Name: kEvent.Reason,
Type: t,
Message: kEvent.Message,
StartTime: kEvent.FirstTimestamp.UnixNano() / 1000000,
EndTime: kEvent.LastTimestamp.UnixNano() / 1000000,
Layer: k8sLayerName,
}
if exporter.config.Template != nil {
go func() {
renderCtx, cancel := context.WithTimeout(ctx, time.Minute)
done := exporter.config.Template.render(renderCtx, swEvent, kEvent)
select {
case <-done:
logger.Log.Debugf("done: rendered event is: %+v", swEvent)
exporter.export(stream, swEvent)
case <-renderCtx.Done():
logger.Log.Debugf("canceled: rendered event is: %+v", swEvent)
exporter.export(stream, swEvent)
}
cancel()
}()
} else {
exporter.export(stream, swEvent)
}
}
}
}