callouts/go/extproc/internal/server/callout_server.go (189 lines of code) (raw):

// Copyright 2024 Google LLC. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "crypto/tls" "log" "net" "net/http" extproc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/reflection" ) // Config holds the server configuration parameters. type Config struct { Address string InsecureAddress string HealthCheckAddress string CertFile string KeyFile string EnableInsecureServer bool } // loadConfig loads the server configuration from environment variables or uses defaults. func loadConfig() Config { return Config{ Address: "0.0.0.0:8443", InsecureAddress: "0.0.0.0:8181", HealthCheckAddress: "0.0.0.0:8000", CertFile: "extproc/ssl_creds/localhost.crt", KeyFile: "extproc/ssl_creds/localhost.key", EnableInsecureServer: true, } } // CalloutServer represents a server that handles callouts. type CalloutServer struct { Config Config Cert tls.Certificate } // NewCalloutServer creates a new CalloutServer with the given configuration. func NewCalloutServer(config Config) *CalloutServer { var cert tls.Certificate var err error if config.CertFile != "" && config.KeyFile != "" { cert, err = tls.LoadX509KeyPair(config.CertFile, config.KeyFile) if err != nil { log.Fatalf("Failed to load server certificate: %v", err) } } return &CalloutServer{ Config: config, Cert: cert, } } // StartGRPC starts the gRPC server with the specified service. func (s *CalloutServer) StartGRPC(service extproc.ExternalProcessorServer) { lis, err := net.Listen("tcp", s.Config.Address) if err != nil { log.Fatalf("Failed to listen: %v", err) } creds := credentials.NewServerTLSFromCert(&s.Cert) grpcServer := grpc.NewServer(grpc.Creds(creds)) extproc.RegisterExternalProcessorServer(grpcServer, service) reflection.Register(grpcServer) if err := grpcServer.Serve(lis); err != nil { log.Fatalf("Failed to serve gRPC: %v", err) } } // StartHealthCheck starts a health check server. func (s *CalloutServer) StartHealthCheck() { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) server := &http.Server{ Addr: s.Config.HealthCheckAddress, Handler: mux, } log.Fatal(server.ListenAndServe()) } // StartInsecureGRPC starts the gRPC server without TLS. func (s *CalloutServer) StartInsecureGRPC(service extproc.ExternalProcessorServer) { if !s.Config.EnableInsecureServer { return } lis, err := net.Listen("tcp", s.Config.InsecureAddress) if err != nil { log.Fatalf("Failed to listen on insecure port: %v", err) } grpcServer := grpc.NewServer() extproc.RegisterExternalProcessorServer(grpcServer, service) reflection.Register(grpcServer) if err := grpcServer.Serve(lis); err != nil { log.Fatalf("Failed to serve gRPC on insecure port: %v", err) } } // RequestHeadersHandler handles request headers. type RequestHeadersHandler func(*extproc.HttpHeaders) (*extproc.ProcessingResponse, error) // ResponseHeadersHandler handles response headers. type ResponseHeadersHandler func(*extproc.HttpHeaders) (*extproc.ProcessingResponse, error) // RequestBodyHandler handles request bodies. type RequestBodyHandler func(*extproc.HttpBody) (*extproc.ProcessingResponse, error) // ResponseBodyHandler handles response bodies. type ResponseBodyHandler func(*extproc.HttpBody) (*extproc.ProcessingResponse, error) // RequestTrailersHandler handles request trailers. type RequestTrailersHandler func(*extproc.HttpTrailers) (*extproc.ProcessingResponse, error) // ResponseTrailersHandler handles response trailers. type ResponseTrailersHandler func(*extproc.HttpTrailers) (*extproc.ProcessingResponse, error) // HandlerRegistry registers various handlers. type HandlerRegistry struct { RequestHeadersHandler RequestHeadersHandler ResponseHeadersHandler ResponseHeadersHandler RequestBodyHandler RequestBodyHandler ResponseBodyHandler ResponseBodyHandler RequestTrailersHandler RequestTrailersHandler ResponseTrailersHandler ResponseTrailersHandler } // GRPCCalloutService implements the gRPC ExternalProcessorServer. type GRPCCalloutService struct { extproc.UnimplementedExternalProcessorServer Handlers HandlerRegistry } // Process processes incoming gRPC streams. func (s *GRPCCalloutService) Process(stream extproc.ExternalProcessor_ProcessServer) error { for { req, err := stream.Recv() if err != nil { return err } var response *extproc.ProcessingResponse switch { case req.GetRequestHeaders() != nil: if s.Handlers.RequestHeadersHandler != nil { response, err = s.Handlers.RequestHeadersHandler(req.GetRequestHeaders()) } case req.GetResponseHeaders() != nil: if s.Handlers.ResponseHeadersHandler != nil { response, err = s.Handlers.ResponseHeadersHandler(req.GetResponseHeaders()) } case req.GetRequestBody() != nil: if s.Handlers.RequestBodyHandler != nil { response, err = s.Handlers.RequestBodyHandler(req.GetRequestBody()) } case req.GetResponseBody() != nil: if s.Handlers.ResponseBodyHandler != nil { response, err = s.Handlers.ResponseBodyHandler(req.GetResponseBody()) } case req.GetRequestTrailers() != nil: if s.Handlers.RequestTrailersHandler != nil { response, err = s.Handlers.RequestTrailersHandler(req.GetRequestTrailers()) } case req.GetResponseTrailers() != nil: if s.Handlers.ResponseTrailersHandler != nil { response, err = s.Handlers.ResponseTrailersHandler(req.GetResponseTrailers()) } } if err != nil { return err } if response != nil { if err := stream.Send(response); err != nil { return err } } } } // HandleRequestHeaders handles request headers. func (s *GRPCCalloutService) HandleRequestHeaders(headers *extproc.HttpHeaders) (*extproc.ProcessingResponse, error) { return &extproc.ProcessingResponse{ Response: &extproc.ProcessingResponse_RequestHeaders{ RequestHeaders: &extproc.HeadersResponse{}, }, }, nil } // HandleResponseHeaders handles response headers. func (s *GRPCCalloutService) HandleResponseHeaders(headers *extproc.HttpHeaders) (*extproc.ProcessingResponse, error) { return &extproc.ProcessingResponse{ Response: &extproc.ProcessingResponse_ResponseHeaders{ ResponseHeaders: &extproc.HeadersResponse{}, }, }, nil } // HandleRequestBody handles request bodies. func (s *GRPCCalloutService) HandleRequestBody(body *extproc.HttpBody) (*extproc.ProcessingResponse, error) { return &extproc.ProcessingResponse{ Response: &extproc.ProcessingResponse_RequestBody{ RequestBody: &extproc.BodyResponse{}, }, }, nil } // HandleResponseBody handles response bodies. func (s *GRPCCalloutService) HandleResponseBody(body *extproc.HttpBody) (*extproc.ProcessingResponse, error) { return &extproc.ProcessingResponse{ Response: &extproc.ProcessingResponse_ResponseBody{ ResponseBody: &extproc.BodyResponse{}, }, }, nil } // HandleRequestTrailers handles request trailers. func (s *GRPCCalloutService) HandleRequestTrailers(trailers *extproc.HttpTrailers) (*extproc.ProcessingResponse, error) { return &extproc.ProcessingResponse{ Response: &extproc.ProcessingResponse_RequestTrailers{ RequestTrailers: &extproc.TrailersResponse{}, }, }, nil } // HandleResponseTrailers handles response trailers. func (s *GRPCCalloutService) HandleResponseTrailers(trailers *extproc.HttpTrailers) (*extproc.ProcessingResponse, error) { return &extproc.ProcessingResponse{ Response: &extproc.ProcessingResponse_ResponseTrailers{ ResponseTrailers: &extproc.TrailersResponse{}, }, }, nil }