pkg/cli/preview/streaming_client.go (259 lines of code) (raw):
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package preview
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"k8s.io/klog/v2"
)
// Object is the fields we need from a Kubernetes object.
type Object interface {
GetNamespace() string
GetName() string
}
// ClientOptions are the options for creating a StreamingClient.
type ClientOptions struct {
HTTPClient *http.Client
BaseURL *url.URL
}
// StreamingClient is a client for streaming Kubernetes API responses.
type StreamingClient struct {
httpClient *http.Client
baseURL url.URL
}
// GroupVersionResource is a group, version, and resource.
type GroupVersionResource struct {
Group string
Version string
Resource string
}
// NewStreamingClient creates a new StreamingClient.
func NewStreamingClient(opt ClientOptions) *StreamingClient {
return &StreamingClient{
httpClient: opt.HTTPClient,
baseURL: *opt.BaseURL,
}
}
// ListMetadata is the metadata for a list operation.
type ListMetadata struct {
APIVersion string
Kind string
ResourceVersion string
}
// ListListener is a listener for list operations.
type ListListener interface {
OnListBegin(metadata ListMetadata)
OnListObject(obj Object) error
OnListEnd()
}
// userAgent returns the user agent for the StreamingClient.
func (c *StreamingClient) userAgent() string {
return "StreamingClient" // TODO
}
// setHeaders sets the headers for the StreamingClient.
func (c *StreamingClient) setHeaders(request *http.Request) error {
request.Header.Set("User-Agent", c.userAgent())
return nil
}
// Get gets the requested object
func (c *StreamingClient) Get(ctx context.Context, typeInfo *typeInfo, namespace, name string, dest Object) error {
log := klog.FromContext(ctx)
u := c.resourceURL(typeInfo.gvr, namespace, name)
request, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return fmt.Errorf("building http request: %w", err)
}
if err := c.setHeaders(request); err != nil {
return err
}
request.Header.Set("Accept", "application/json")
log.Info("doing http request", "method", request.Method, "url", request.URL)
response, err := c.httpClient.Do(request)
if err != nil {
return fmt.Errorf("sending http request: %w", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status from %v: %v", u, response.Status)
}
b, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("reading response body from %v: %w", u, err)
}
if err := json.Unmarshal(b, dest); err != nil {
return fmt.Errorf("decoding %T from %v: %w", dest, u, err)
}
return nil
}
// List lists the objects for the given type.
func (c *StreamingClient) List(ctx context.Context, typeInfo *typeInfo, listener ListListener) error {
log := klog.FromContext(ctx)
u := c.resourceURL(typeInfo.gvr, "", "")
request, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return fmt.Errorf("building http request: %w", err)
}
if err := c.setHeaders(request); err != nil {
return err
}
request.Header.Set("Accept", "application/json")
log.Info("doing http request", "method", request.Method, "url", request.URL)
response, err := c.httpClient.Do(request)
if err != nil {
return fmt.Errorf("sending http request: %w", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status from %v: %v", u, response.Status)
}
// TODO: Implement true streaming parsing (likely with https://github.com/golang/go/issues/71497)
b, err := io.ReadAll(response.Body)
if err != nil {
return fmt.Errorf("reading response body from %v: %w", u, err)
}
type listT struct {
APIVersion string `json:"apiVersion"`
Kind string `json:"kind"`
Metadata struct {
ResourceVersion string `json:"resourceVersion"`
} `json:"metadata"`
Items []json.RawMessage `json:"items"`
}
var list listT
if err := json.Unmarshal(b, &list); err != nil {
return fmt.Errorf("decoding response body from %v: %w", u, err)
}
listener.OnListBegin(ListMetadata{
APIVersion: list.APIVersion,
Kind: list.Kind,
ResourceVersion: list.Metadata.ResourceVersion,
})
for _, item := range list.Items {
t := typeInfo.factory()
if err := json.Unmarshal(item, &t); err != nil {
return fmt.Errorf("decoding %T from %v: %w", t, u, err)
}
if err := listener.OnListObject(t); err != nil {
return err
}
}
listener.OnListEnd()
return nil
}
// WatchOptions are the options for a watch operation.
type WatchOptions struct {
ResourceVersion string
AllowWatchBookmarks bool
}
// WatchListener is a listener for watch operations.
type WatchListener interface {
OnWatchEvent(eventType string, object Object) error
}
// Watch watches the given type.
func (c *StreamingClient) Watch(ctx context.Context, typeInfo *typeInfo, watchOptions WatchOptions, listener WatchListener) error {
log := klog.FromContext(ctx)
u := c.resourceURL(typeInfo.gvr, "", "")
q := u.Query()
q.Set("watch", "true")
if watchOptions.ResourceVersion != "" {
q.Set("resourceVersion", watchOptions.ResourceVersion)
}
if watchOptions.AllowWatchBookmarks {
q.Set("allowWatchBookmarks", "true")
}
u.RawQuery = q.Encode()
request, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return fmt.Errorf("building http request: %w", err)
}
if err := c.setHeaders(request); err != nil {
return err
}
request.Header.Set("Accept", "application/json")
log.Info("doing http request", "method", request.Method, "url", request.URL)
response, err := c.httpClient.Do(request)
if err != nil {
return fmt.Errorf("sending http request: %w", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status from %v: %v", u, response.Status)
}
type eventT struct {
Type string `json:"type"`
Object json.RawMessage `json:"object"`
}
lineReader := &lineSplitReader{inner: response.Body}
for {
var event eventT
jsonDecoder := json.NewDecoder(lineReader)
if err := jsonDecoder.Decode(&event); err != nil {
return fmt.Errorf("decoding event from %v: %w", u, err)
}
var object Object
if len(event.Object) != 0 {
object = typeInfo.factory()
if err := json.Unmarshal(event.Object, object); err != nil {
return fmt.Errorf("decoding %T from %v: %w", object, u, err)
}
}
if err := listener.OnWatchEvent(event.Type, object); err != nil {
return err
}
// reset lineEOF to read next line
lineReader.lineEOF = false
}
}
// lineSplitReader is a reader that splits lines from an inner reader.
// It is similar to io.LimitedReader, but it stops at a newline.
type lineSplitReader struct {
inner io.Reader
buffer []byte
innerEOF bool
lineEOF bool
}
// lineReaderBufferSize is the size of the buffer for the lineSplitReader.
const lineReaderBufferSize = 8192
// Read reads from the inner reader.
var _ io.Reader = &lineSplitReader{}
// Read reads a line from the inner reader.
// It is similar to io.LimitedReader, but it stops at a newline.
func (r *lineSplitReader) Read(p []byte) (int, error) {
// Send io.EOF until we are reset
if r.lineEOF {
return 0, io.EOF
}
if len(r.buffer) == 0 {
if r.buffer == nil {
r.buffer = make([]byte, lineReaderBufferSize)
}
n, err := r.inner.Read(r.buffer[0:cap(r.buffer)])
if err != nil {
if err == io.EOF {
r.innerEOF = true
} else {
return 0, err
}
}
if n == 0 {
if r.innerEOF {
return 0, io.EOF
}
}
r.buffer = r.buffer[:n]
klog.Infof("buffer is %v", string(r.buffer))
}
// Only return up to the newline
ret := r.buffer
nl := bytes.IndexByte(r.buffer, '\n')
if nl != -1 {
ret = r.buffer[:nl+1]
}
// Return what fits into the target buffer
n := copy(p, ret)
r.buffer = r.buffer[n:]
if len(r.buffer) == 0 {
// release the buffer (maybe a sync pool would be faster)
r.buffer = nil
}
klog.Infof("returning %v", string(p[:n]))
// If we sent a newline, pretend this is EOF
if nl != -1 && n == nl+1 {
r.lineEOF = true
return n, io.EOF
}
return n, nil
}
// resourceURL constructs a URL for the given GroupVersionResource.
func (c *StreamingClient) resourceURL(gvr GroupVersionResource, namespace, name string) *url.URL {
u := &c.baseURL
if gvr.Group != "" {
u = u.JoinPath("apis", gvr.Group, gvr.Version)
} else {
u = u.JoinPath("api", gvr.Version)
}
if namespace != "" {
u = u.JoinPath("namespaces", namespace)
}
if name != "" {
u = u.JoinPath(gvr.Resource, name)
} else {
u = u.JoinPath(gvr.Resource)
}
return u
}