internal/kubelib/logs.go (224 lines of code) (raw):

package kubelib import ( "bufio" "context" "fmt" "io" "os" "sync" k8scorev1 "k8s.io/api/core/v1" k8smetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) // FollowLogsParams is the parameters for following logs. type FollowLogsParams struct { // Namespace is the namespace of the pods. Namespace string // Selector is the selector of the pods. Selector labels.Selector // Container is the container name of the pods. Container string // MaxConcurrency is the maximum number of concurrent log streams to follow. MaxConcurrency int // AddPrefix specifies whether to add the prefix of the pod name to the logs. AddPrefix bool // Output is the writer to write the logs to. Output io.Writer } func (p *FollowLogsParams) defaults() error { if p.Namespace == "" { return fmt.Errorf(".Namespace is required") } if p.Selector == nil { return fmt.Errorf(".Selector is required") } if p.Container == "" { return fmt.Errorf(".Container is required") } if p.MaxConcurrency == 0 { p.MaxConcurrency = 5 } if p.Output == nil { p.Output = os.Stderr } return nil } type podLog struct { target k8scorev1.ObjectReference request rest.ResponseWrapper } type logsFollower struct { client kubernetes.Interface namespace string selector labels.Selector container string maxConcurrency int addPrefix bool out io.Writer wg *sync.WaitGroup podLogsChan chan podLog handledTargets map[k8scorev1.ObjectReference]struct{} handledTargetsMu *sync.Mutex } func (f *logsFollower) Wait() { f.wg.Wait() } func (f *logsFollower) Start(ctx context.Context) { f.wg.Add(1) go f.discoverPods(ctx) f.wg.Add(f.maxConcurrency) for i := 0; i < f.maxConcurrency; i++ { go f.followPodLogs(ctx) } } func (f *logsFollower) isNewTarget(target k8scorev1.ObjectReference) bool { f.handledTargetsMu.Lock() defer f.handledTargetsMu.Unlock() _, exists := f.handledTargets[target] if exists { // already handled return false } f.handledTargets[target] = struct{}{} return true } var logsAblePodPhase = map[k8scorev1.PodPhase]struct{}{ k8scorev1.PodRunning: {}, k8scorev1.PodFailed: {}, k8scorev1.PodSucceeded: {}, } func (f *logsFollower) discoverPods(ctx context.Context) { defer f.wg.Done() shouldHandlePod := func(pod *k8scorev1.Pod) bool { matchContainer := false for _, container := range pod.Spec.Containers { if container.Name == f.container { matchContainer = true break } } if !matchContainer { return false } if _, ok := logsAblePodPhase[pod.Status.Phase]; !ok { return false } return true } // TODO: log error _ = func() error { podsClient := f.client.CoreV1().Pods(f.namespace) watch, err := podsClient.Watch(ctx, k8smetav1.ListOptions{ LabelSelector: f.selector.String(), }) if err != nil { return err } defer watch.Stop() for { select { case <-ctx.Done(): return nil case event, ok := <-watch.ResultChan(): if !ok { return nil } pod, ok := event.Object.(*k8scorev1.Pod) if !ok { continue } if !shouldHandlePod(pod) { continue } objectRef := k8scorev1.ObjectReference{ Kind: "Pod", Namespace: pod.Namespace, Name: pod.Name, } if !f.isNewTarget(objectRef) { continue } f.podLogsChan <- podLog{ target: objectRef, request: podsClient.GetLogs(pod.Name, &k8scorev1.PodLogOptions{ Container: f.container, Follow: true, }), } } } }() } func (f *logsFollower) followPodLogs(ctx context.Context) { defer f.wg.Done() for { select { case <-ctx.Done(): return case p := <-f.podLogsChan: // TODO: log error _ = f.followPodLog(ctx, p) } } } type prefixWriter struct { prefix []byte w io.Writer } func (w *prefixWriter) Write(p []byte) (n int, err error) { return w.w.Write(append(w.prefix, p...)) } func (f *logsFollower) followPodLog(ctx context.Context, podLog podLog) error { if _, err := fmt.Fprintf(f.out, "Following logs of %s/%s...\n", podLog.target.Namespace, podLog.target.Name); err != nil { return err } logsStream, err := podLog.request.Stream(ctx) if err != nil { return err } defer func() { _ = logsStream.Close() }() r := bufio.NewReader(logsStream) out := f.out if f.addPrefix { out = &prefixWriter{ prefix: []byte(fmt.Sprintf("[%s/%s] ", podLog.target.Namespace, podLog.target.Name)), w: out, } } for { line, err := r.ReadBytes('\n') if _, writeErr := out.Write(line); writeErr != nil { return writeErr } if err != nil { if err == io.EOF { return nil } return err } } } // FollowLogs follows the logs of the pods that match the selector. func FollowLogs( ctx context.Context, client kubernetes.Interface, params *FollowLogsParams, ) error { if err := params.defaults(); err != nil { return err } pr, pw := io.Pipe() follower := &logsFollower{ client: client, namespace: params.Namespace, selector: params.Selector, container: params.Container, maxConcurrency: params.MaxConcurrency, addPrefix: params.AddPrefix, out: pw, wg: new(sync.WaitGroup), podLogsChan: make(chan podLog), handledTargets: map[k8scorev1.ObjectReference]struct{}{}, handledTargetsMu: new(sync.Mutex), } go func() { follower.Start(ctx) follower.Wait() _ = pw.Close() }() _, err := io.Copy(params.Output, pr) return err }