tools/integration_tests/emulator_tests/proxy_server/main.go (166 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "flag" "fmt" "io" "log" "net" "net/http" "net/url" "os" "os/signal" "strconv" "syscall" "time" ) const PortAndProxyProcessIdInfoLogFormat = "Listening Proxy Server On Port [%s] with Process ID [%d]" var ( // Flag to accept config-file path. fConfigPath = flag.String("config-path", "configs/config.yaml", "Path to the file") // Flag to turn on/off fDebug logs. fDebug = flag.Bool("debug", true, "Enable proxy server fDebug logs.") // Log file to write proxy server logs. fLogFilePath = flag.String("log-file", "", "Path to the log file") // Initialized before the server gets started. gConfig *Config gOpManager *OperationManager // Port number assigned to listener. gPort string ) type ProxyHandler struct { http.Handler } // logRequestAndType is used for logging the request on proxy server. // More fields can be added or removed as per requirement for debugging purpose. func logRequestAndType(req *http.Request, r RequestType) { // Print empty lines to separate each request in log. log.Println("") log.Println("") log.Printf("RequestType: %s\n", r) log.Printf("URL: %s\n", req.URL.String()) log.Printf("Content-Length: %s\n", req.Header.Get("Content-Length")) log.Printf("Content-Range: %s\n", req.Header.Get("Content-Range")) } // AddRetryID creates mock error behavior on the target host for specific request types. // It retrieves the corresponding operation from the operation manager based on the provided RequestTypeAndInstruction. // If a matching operation is found, it creates a retry test with the target host and instruction, // and attaches the generated test ID to the HTTP request header "x-retry-test-id". // // This function is used to simulate error scenarios for testing retry mechanisms. func AddRetryID(req *http.Request, r RequestTypeAndInstruction) error { plantOp := gOpManager.retrieveOperation(r.RequestType) if *fDebug { logRequestAndType(req, r.RequestType) if plantOp != "" { log.Println("Planting operation: ", plantOp) } } if plantOp != "" { testID, err := CreateRetryTest(gConfig.TargetHost, map[string][]string{r.Instruction: {plantOp}}) if err != nil { return fmt.Errorf("CreateRetryTest: %v", err) } req.Header.Set("x-retry-test-id", testID) } return nil } // ServeHTTP handles incoming HTTP requests. It acts as a proxy, forwarding requests // to a target server specified in the configuration and then relaying the // response back to the original client. func (ph ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { targetURL := fmt.Sprintf("%s%s", gConfig.TargetHost, r.RequestURI) req, err := http.NewRequest(r.Method, targetURL, r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } for name, values := range r.Header { for _, value := range values { req.Header.Add(name, value) } } // Determine the request type and instruction (e.g., read, write, metadata) based on the incoming request. reqTypeAndInstruction := deduceRequestTypeAndInstruction(r) // Add a unique retry ID to the request headers, associating it with the // deduced request type and instruction. This is used for adding custom failures on requests. err = AddRetryID(req, reqTypeAndInstruction) if err != nil { log.Printf("AddRetryID: %v", err) } // Send the request to the target server client := &http.Client{} start := time.Now() resp, err := client.Do(req) elapsed := time.Since(start) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } respURL, err := resp.Location() // Change the response URL host to the proxy server host. // This is necessary because, from the client's perspective, the proxy server is the endpoint. // Therefore, the response must appear to originate from the proxy host. if err == nil { // Parse the original URL. u, err := url.Parse(respURL.String()) if err != nil { log.Println("Error parsing URL:", err) return } u.Host = "localhost:" + gPort resp.Header.Set("Location", u.String()) } defer resp.Body.Close() // Copy headers from the target server's response for name, values := range resp.Header { for _, value := range values { w.Header().Add(name, value) } } // Copy the response body w.WriteHeader(resp.StatusCode) _, err = io.Copy(w, resp.Body) if err != nil { log.Printf("Error in coping response body: %v", err) } if *fDebug { log.Printf("Respnse Status: %d\n", resp.StatusCode) log.Printf("Elapsed Time: %.3fs\n", elapsed.Seconds()) } } // ProxyServer represents a simple proxy server over GCS storage based API endpoint. type ProxyServer struct { server *http.Server shutdown chan os.Signal } // NewProxyServer creates a new ProxyServer instance func NewProxyServer() *ProxyServer { return &ProxyServer{ shutdown: make(chan os.Signal, 1), } } // Start starts the proxy server. func (ps *ProxyServer) Start() { // Create a listener on random available port. listener, err := net.Listen("tcp", ":0") if err != nil { log.Fatalf("Error on listening: %v", err) } gPort = strconv.Itoa(listener.Addr().(*net.TCPAddr).Port) // Log port number and proxy process Id for the proxy server. log.Printf(PortAndProxyProcessIdInfoLogFormat, gPort, os.Getpid()) ps.server = &http.Server{ Addr: ":" + gPort, Handler: ProxyHandler{}, } // Start the server in a new goroutine go func() { if err := ps.server.Serve(listener); err != nil && err != http.ErrServerClosed { log.Fatalf("Server error: %v", err) } }() // Handle graceful shutdown signal.Notify(ps.shutdown, syscall.SIGINT, syscall.SIGTERM) // Blocks until one of the Signal is recieved. <-ps.shutdown log.Println("Shutting down proxy server...") ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := ps.server.Shutdown(ctx); err != nil { log.Fatalf("Proxy server forced to shutdown: %v", err) } else { log.Println("Proxy server exiting") } } func main() { // Parse the command-line flags flag.Parse() var err error gConfig, err = parseConfigFile(*fConfigPath) if err != nil { log.Printf("Parsing error: %v\n", err) os.Exit(1) } if *fLogFilePath == "" { log.Println("No log file path for proxy server provided.") os.Exit(1) } logFile, err := os.OpenFile(*fLogFilePath, os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Printf("Error opening log file: %v\n", err) os.Exit(1) } defer logFile.Close() log.SetOutput(logFile) if *fDebug { printConfig(*gConfig) } gOpManager = NewOperationManager(*gConfig) ps := NewProxyServer() ps.Start() }