func()

in pkg/exporter/skywalking.go [144:209]


func (exporter *SkyWalking) Export(ctx context.Context, events chan *k8score.Event) {
	logger.Log.Debugf("exporting events into %+v", exporter.Name())

	stream, err := exporter.client.Collect(ctx)

	for err != nil {
		select {
		case <-ctx.Done():
			logger.Log.Debugf("stopping exporter %+v", exporter.Name())
			if err = stream.CloseSend(); err != nil {
				logger.Log.Warnf("failed to close stream. %+v", err)
			}
			return
		default:
			logger.Log.Errorf("failed to connect to SkyWalking server. %+v", err)
			time.Sleep(3 * time.Second)
			stream, err = exporter.client.Collect(ctx)
		}
	}

	for {
		select {
		case <-ctx.Done():
			logger.Log.Debugf("stopping exporter %+v", exporter.Name())
			return
		case kEvent := <-events:
			if logger.Log.IsLevelEnabled(logrus.DebugLevel) {
				if bytes, err := json.Marshal(kEvent); err == nil {
					logger.Log.Debugf("exporting event to %v: %v", exporter.Name(), string(bytes))
				}
			}

			t := sw.Type_Normal
			if kEvent.Type == k8score.EventTypeWarning {
				t = sw.Type_Error
			}
			swEvent := &sw.Event{
				Uuid:      string(kEvent.UID),
				Source:    &sw.Source{},
				Name:      kEvent.Reason,
				Type:      t,
				Message:   kEvent.Message,
				StartTime: kEvent.FirstTimestamp.UnixNano() / 1000000,
				EndTime:   kEvent.LastTimestamp.UnixNano() / 1000000,
				Layer:     k8sLayerName,
			}
			if exporter.config.Template != nil {
				go func() {
					renderCtx, cancel := context.WithTimeout(ctx, time.Minute)
					done := exporter.config.Template.render(renderCtx, swEvent, kEvent)
					select {
					case <-done:
						logger.Log.Debugf("done: rendered event is: %+v", swEvent)
						exporter.export(stream, swEvent)
					case <-renderCtx.Done():
						logger.Log.Debugf("canceled: rendered event is: %+v", swEvent)
						exporter.export(stream, swEvent)
					}
					cancel()
				}()
			} else {
				exporter.export(stream, swEvent)
			}
		}
	}
}