pkg/cli/preview/intercepting_gcp_client.go (116 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package preview import ( "context" "encoding/json" "errors" "fmt" "io" "net/http" "strings" "golang.org/x/oauth2" "google.golang.org/grpc" "k8s.io/klog/v2" ) // BlockedGCPError is an error that occurs when a GCP API call is blocked. type BlockedGCPError struct { Method string URL string Body string } var _ error = &BlockedGCPError{} // Error implements the error interface. func (e BlockedGCPError) Error() string { // encode in json so that we can unwrap even if this is (incorrectly) wrapped as a string (terraform) j, _ := json.Marshal(e) return fmt.Sprintf("call to GCP blocked (method=%v, url=%v) [jsonstart:BlockedGCPError:%v:jsonend]", e.Method, e.URL, string(j)) } // ExtractBlockedGCPError will unwrap a BlockedGCPError. // To tolerate terraform using string-wrapping of error messages, we also parse a json-encoded form. func ExtractBlockedGCPError(err error) (*BlockedGCPError, bool) { var e *BlockedGCPError if errors.As(err, &e) { return e, true } // Look for a string-wrapped message s := err.Error() for { _, tail, ok := strings.Cut(s, " [jsonstart:BlockedGCPError:") if !ok { break } body, tail, ok := strings.Cut(tail, ":jsonend]") if !ok { break } s = tail var e BlockedGCPError if err := json.Unmarshal([]byte(body), &e); err != nil { klog.Warningf("error parsing json-encoded error %q: %v", body, err) continue } else { return &e, true } } return nil, false } // interceptingGCPClient is a GCP client that intercepts GCP API calls. // It forwards read-only operations "upstream" to real GCP. // It returns a BlockedGCPError on any write operations. type interceptingGCPClient struct { upstreamGCPClient *http.Client authorization oauth2.TokenSource } // newInterceptingGCPClient creates a new interceptingGCPClient. func newInterceptingGCPClient(upstreamGCPClient *http.Client, authorization oauth2.TokenSource) *interceptingGCPClient { return &interceptingGCPClient{ upstreamGCPClient: upstreamGCPClient, authorization: authorization, } } // HTTPClient is the HTTP client that should be used for GCP API calls. func (c *interceptingGCPClient) HTTPClient() *http.Client { return &http.Client{ Transport: c.HTTPRoundTripper(), } } // HTTPRoundTripper is the round tripper that should be used for GCP API calls. func (c *interceptingGCPClient) HTTPRoundTripper() http.RoundTripper { return c } // blockedHTTPMethod is called when a write operation is attempted. // It returns a BlockedGCPError. func (c *interceptingGCPClient) blockedHTTPMethod(req *http.Request) (*http.Response, error) { ctx := req.Context() log := klog.FromContext(ctx) var body []byte if req.Body != nil { b, err := io.ReadAll(req.Body) if err != nil { return nil, fmt.Errorf("error reading body: %w", err) } else { body = b } } log.Info("blockedHTTPMethod", "req.method", req.Method, "req.url", req.URL.String()) return nil, BlockedGCPError{ Method: req.Method, URL: req.URL.String(), Body: string(body), } } // RoundTrip is the round tripper that implements the http.RoundTripper interface. func (c *interceptingGCPClient) RoundTrip(req *http.Request) (*http.Response, error) { ctx := req.Context() log := klog.FromContext(ctx) requestIsAllowed := false if req.Method == "GET" { requestIsAllowed = true } if requestIsAllowed { if c.authorization != nil { token, err := c.authorization.Token() if err != nil { return nil, err } req.Header.Set("Authorization", "Bearer "+token.AccessToken) } response, err := c.upstreamGCPClient.Do(req) if response != nil { log.Info("forwarded request", "req.method", req.Method, "req.url", req.URL, "response.status", response.Status) } else if err != nil { log.Error(err, "error forwarding request", "req.method", req.Method, "req.url", req.URL) } return response, err } return c.blockedHTTPMethod(req) } // GRPCUnaryClientInterceptor intercepts GCP GRPC API calls. func (c *interceptingGCPClient) GRPCUnaryClientInterceptor() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // TODO: Add this back in to support GRPC resources (e.g. bigtable) // entry := &test.LogEntry{} // entry.Request.URL = method // entry.Request.Method = "GRPC" // if req != nil { // requestBytes, _ := protojson.Marshal(req.(proto.Message)) // entry.Request.Body = string(requestBytes) // } // if mockCloudGRPCClientConnection != nil { // cc = mockCloudGRPCClientConnection // } // err := invoker(ctx, method, req, reply, cc, opts...) // if reply != nil { // replyBytes, _ := protojson.Marshal(reply.(proto.Message)) // entry.Response.Body = string(replyBytes) // } // if err != nil { // entry.Response.Status = fmt.Sprintf("error: %v", err) // } else { // entry.Response.Status = "OK" // } // for _, eventSink := range eventSinks { // eventSink.AddHTTPEvent(ctx, entry) // } // return err return fmt.Errorf("GRPC method blocked by InterceptingGCPClient") } }