pkg/mesh/app.go (181 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mesh import ( "errors" "fmt" "io/ioutil" "log" "net" "net/http" "os" "os/exec" "strconv" "strings" "syscall" "time" ) const ( serverStateKey = "server.state" serverStateCheckRegex = "^server.state" listenerCheckKey = "listener_manager.workers_started" listenerCheckRegex = "^listener_manager.workers_started" ) // StartApp uses the reminder of the command line to exec an app, using K8S_UID as UID, if present. func (kr *KRun) StartApp() { var cmd *exec.Cmd if len(os.Args) == 1 { return } else if len(os.Args) == 2 { cmd = exec.Command(os.Args[1]) } else { cmd = exec.Command(os.Args[1], os.Args[2:]...) } if os.Getuid() == 0 { uid := os.Getenv("K8S_UID") if uid != "" { uidi, err := strconv.Atoi(uid) if err == nil { cmd.SysProcAttr = &syscall.SysProcAttr{} cmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(uidi)} } } } cmd.Stdin = os.Stdin cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr // Set port to 8080 - some apps use the PORT from knative to start.a cmd.Env = []string{"PORT=8080"} for _, e := range os.Environ() { if strings.HasPrefix(e, "PORT=") { continue } cmd.Env = append(cmd.Env, e) } if os.Getenv("GRPC_XDS_BOOTSTRAP") == "" { cmd.Env = append(cmd.Env, "GRPC_XDS_BOOTSTRAP=/etc/istio/proxy/grpc_bootstrap.json") // This is set by injector cmd.Env = append(cmd.Env, "GRPC_XDS_EXPERIMENTAL_RBAC=true") cmd.Env = append(cmd.Env, "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT=true") } if kr.WhiteboxMode { cmd.Env = append(cmd.Env, "HTTP_PROXY=127.0.0.1:15007") cmd.Env = append(cmd.Env, "http_proxy=127.0.0.1:15007") } go func() { err := cmd.Start() if err != nil { log.Println("Failed to start ", cmd, err) } kr.appCmd = cmd err = cmd.Wait() if err != nil { log.Println("Application err exit ", err, cmd.ProcessState.ExitCode(), time.Since(kr.StartTime)) } else { log.Println("Application clean exit ", err, cmd.ProcessState.ExitCode(), time.Since(kr.StartTime)) } kr.Exit(cmd.ProcessState.ExitCode()) }() kr.Signals() } // WaitTCPReady uses the same detection as CloudRun, i.e. TCP connect. func (kr *KRun) WaitTCPReady(addr string, max time.Duration) error { t0 := time.Now() deadline := t0.Add(max) for { // if we cant connect, count as fail conn, err := net.DialTimeout("tcp", addr, deadline.Sub(time.Now())) if err != nil { if time.Now().After(deadline) { return err } time.Sleep(50 * time.Millisecond) if conn != nil { _ = conn.Close() } continue } err = conn.Close() if err != nil { log.Println("WaitTCP.Close()", err) } log.Println("Application ready", time.Since(t0), time.Since(kr.StartTime)) return nil } return nil } // WaitAppStartup waits for app to be ready to accept requests. // - default is KNative 'listen on the app port' ( 8080 default, PORT_http overrides ) // - startupProbe.tcp and startupProbe.http can define alternate port and using http ready. func (kr *KRun) WaitAppStartup() error { var err error startupTimeout := 10 * time.Second // TODO: make customizable // PORT_http is used as an alternative to PORT - which is taken over by the tunnel. appPort := kr.Config("PORT_http", "8080") // Wait for app to be ready startupProbeHttp := kr.Config("startupProbe.http", "") startupProbeTcp := kr.Config("startupProbe.tcp", "") if startupProbeHttp != "" { err = kr.WaitHTTPReady(startupProbeHttp, startupTimeout) } else if startupProbeTcp != "" { err = kr.WaitTCPReady(startupProbeTcp, startupTimeout) } else if appPort != "-" && len(os.Args) > 1 { err = kr.WaitTCPReady("127.0.0.1:" + appPort, startupTimeout) } return err } func (kr *KRun) WaitHTTPReady(url string, max time.Duration) error { t0 := time.Now() for { res, _ := http.Get(url) if res != nil && res.StatusCode == 200 { return nil } if time.Since(t0) > max { return errors.New("Timeout waiting for ready") } time.Sleep(100 * time.Millisecond) } } // WaitEnvoyReady waits for envoy to be ready until max is reached, otherwise returns a non-nil error. func (kr *KRun) WaitEnvoyReady(addr string, max time.Duration) error { t0 := time.Now() for { serverStateReady, serverStateErr := kr.envoyServerStateCheck(addr) listenerReady, listenerErr := kr.envoyListenerWorkersStartedCheck(addr) if serverStateErr == nil && listenerErr == nil && serverStateReady && listenerReady { log.Println("Envoy is ready") return nil } if time.Since(t0) > max { return fmt.Errorf("Timeout waiting for ready from envoy") } time.Sleep(100 * time.Millisecond) } } func (kr *KRun) envoyServerStateCheck(addr string) (bool, error) { checkURL := fmt.Sprintf("http://%s/stats?used_only&filter=%s", addr, serverStateCheckRegex) res, err := http.Get(checkURL) if err != nil { return false, fmt.Errorf("Unable to check envoy server state because of %s", err) } defer res.Body.Close() return kr.processHealthCheckResponse(res, serverStateKey, "0") // 0 indicates live. } func (kr *KRun) envoyListenerWorkersStartedCheck(addr string) (bool, error) { checkURL := fmt.Sprintf("http://%s/stats?used_only&filter=%s", addr, listenerCheckRegex) res, err := http.Get(checkURL) if err != nil { return false, fmt.Errorf("Unable to check envoy listener worker state because of : %s", err) } defer res.Body.Close() return kr.processHealthCheckResponse(res, listenerCheckKey, "1") // 1 indicates listener works have started. } // Checks the res has the following structure: "key: val" where key and val should be as specified. func (kr *KRun) processHealthCheckResponse(res *http.Response, key string, val string) (bool, error) { if res == nil || res.StatusCode != 200 { return false, fmt.Errorf("Unable to check envoy for %s", key) } rawResponse, err := ioutil.ReadAll(res.Body) if err != nil { return false, fmt.Errorf("Unable to check envoy for %s because of %s", key, err) } response := string(rawResponse) splits := strings.Split(response, ":") if len(splits) != 2 { return false, nil } // Check key if strings.TrimSpace(splits[0]) != key { return false, nil } // Check val if strings.TrimSpace(splits[1]) != val { return false, nil } return true, nil }