proxy/protocol/http/sidecar.go (275 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package http import ( "context" "encoding/json" "errors" "io" "net/http" "strconv" "strings" "time" "github.com/apache/servicecomb-mesher/proxy/common" "github.com/apache/servicecomb-mesher/proxy/pkg/egress" "github.com/apache/servicecomb-mesher/proxy/pkg/metrics" "github.com/apache/servicecomb-mesher/proxy/protocol" "github.com/apache/servicecomb-mesher/proxy/resolver" "github.com/apache/servicecomb-mesher/proxy/util" "github.com/go-chassis/foundation/stringutil" "github.com/go-chassis/go-chassis/v2/client/rest" chassisCommon "github.com/go-chassis/go-chassis/v2/core/common" "github.com/go-chassis/go-chassis/v2/core/fault" "github.com/go-chassis/go-chassis/v2/core/handler" "github.com/go-chassis/go-chassis/v2/core/invocation" "github.com/go-chassis/go-chassis/v2/core/loadbalancer" "github.com/go-chassis/go-chassis/v2/pkg/runtime" utiltags "github.com/go-chassis/go-chassis/v2/pkg/util/tags" "github.com/go-chassis/go-chassis/v2/third_party/forked/afex/hystrix-go/hystrix" "github.com/go-chassis/openlog" ) var dr = resolver.GetDestinationResolver("http") var sr = resolver.GetSourceResolver() //constants for headers const ( XForwardedPort = "X-Forwarded-Port" XForwardedHost = "X-Forwarded-Host" ) var ( //ErrRestFaultAbort is a variable of type error ErrRestFaultAbort = errors.New("injecting abort") //ErrRestFault is a variable of type error ErrRestFault = errors.New("injecting abort and delay") //ErrNilResponse is a variable of type error ErrNilResponse = errors.New("http response is nil") ) func preHandler(req *http.Request) *invocation.Invocation { inv := &invocation.Invocation{} inv.Args = req inv.Reply = rest.NewResponse() inv.Protocol = "rest" inv.URLPath = req.URL.Path return inv } func consumerPreHandler(req *http.Request) *invocation.Invocation { inv := preHandler(req) inv.SourceServiceID = runtime.ServiceID inv.Ctx = context.TODO() var m map[string]string cseContextStr := req.Header.Get(chassisCommon.HeaderXCseContent) if cseContextStr == "" { m = map[string]string{chassisCommon.HeaderSourceName: runtime.ServiceName} chassisCommon.SetXCSEContext(m, req) return inv } if err := json.Unmarshal([]byte(cseContextStr), &m); err != nil { openlog.Debug("unmarshal " + chassisCommon.HeaderXCseContent + " header content failed: " + err.Error()) m = make(map[string]string) } m[chassisCommon.HeaderSourceName] = runtime.ServiceName chassisCommon.SetXCSEContext(m, req) return inv } func providerPreHandler(req *http.Request) *invocation.Invocation { inv := preHandler(req) inv.MicroServiceName = runtime.ServiceName inv.RouteTags = utiltags.NewDefaultTag(runtime.Version, runtime.App) inv.SourceMicroService = chassisCommon.GetXCSEContext(chassisCommon.HeaderSourceName, req) inv.Ctx = context.TODO() return inv } //LocalRequestHandler is for request from local func LocalRequestHandler(w http.ResponseWriter, r *http.Request) { prepareRequest(r) inv := consumerPreHandler(r) remoteIP := stringutil.SplitFirstSep(r.RemoteAddr, ":") var err error h := make(map[string]string) for k := range r.Header { h[k] = r.Header.Get(k) } //Resolve Destination destination, port, err := dr.Resolve(remoteIP, r.Host, r.URL.String(), h) if err != nil { handleErrorResponse(inv, w, http.StatusBadRequest, err) return } inv.MicroServiceName = destination if port != "" { h[XForwardedPort] = port } //transfer header into ctx inv.Ctx = context.WithValue(inv.Ctx, chassisCommon.ContextHeaderKey{}, h) var c *handler.Chain ok, egressRule := egress.Match(inv.MicroServiceName) if ok { var targetPort int32 = 80 for _, port := range egressRule.Ports { if strings.EqualFold(port.Protocol, common.HTTPProtocol) { targetPort = port.Port break } } inv.Endpoint = inv.MicroServiceName + ":" + strconv.Itoa(int(targetPort)) c, err = handler.GetChain(common.ConsumerEgress, common.ChainConsumerEgress) if err != nil { handleErrorResponse(inv, w, http.StatusBadGateway, err) openlog.Error("Get chain failed" + err.Error()) return } } else { c, err = handler.GetChain(chassisCommon.Consumer, common.ChainConsumerOutgoing) if err != nil { handleErrorResponse(inv, w, http.StatusBadGateway, err) openlog.Error("Get chain failed: " + err.Error()) return } } defer func(begin time.Time) { timeTaken := time.Since(begin).Seconds() serviceLabelValues := map[string]string{metrics.LServiceName: inv.MicroServiceName, metrics.LApp: inv.RouteTags.AppID(), metrics.LVersion: inv.RouteTags.Version()} metrics.RecordLatency(serviceLabelValues, timeTaken) }(time.Now()) var invRsp *invocation.Response c.Next(inv, func(ir *invocation.Response) { //Send the request to the destination invRsp = ir }) resp, err := handleRequest(w, inv, invRsp) if err != nil { openlog.Error("handle request failed: " + err.Error()) return } RecordStatus(inv, resp.StatusCode) } //RemoteRequestHandler is for request from remote func RemoteRequestHandler(w http.ResponseWriter, r *http.Request) { prepareRequest(r) inv := providerPreHandler(r) if inv.SourceMicroService == "" { source := stringutil.SplitFirstSep(r.RemoteAddr, ":") //Resolve Source si := sr.Resolve(source) if si != nil { inv.SourceMicroService = si.Name } } h := make(map[string]string) for k := range r.Header { h[k] = r.Header.Get(k) } //transfer header into ctx inv.Ctx = context.WithValue(inv.Ctx, chassisCommon.ContextHeaderKey{}, h) c, err := handler.GetChain(chassisCommon.Provider, common.ChainProviderIncoming) if err != nil { handleErrorResponse(inv, w, http.StatusBadGateway, err) openlog.Error("Get chain failed: " + err.Error()) return } if err = util.SetLocalServiceAddress(inv, r.Header.Get("X-Forwarded-Port")); err != nil { handleErrorResponse(inv, w, http.StatusBadGateway, err) } if r.Header.Get(XForwardedHost) == "" { r.Header.Set(XForwardedHost, r.Host) } var invRsp *invocation.Response c.Next(inv, func(ir *invocation.Response) { //Send the request to the destination invRsp = ir }) if _, err = handleRequest(w, inv, invRsp); err != nil { openlog.Error("Handle request failed: " + err.Error()) } } func copyChassisResp2HttpResp(w http.ResponseWriter, resp *http.Response) { if resp == nil { openlog.Warn("response is nil because of unknown reason") return } postProcessResponse(resp) copyHeader(w.Header(), resp.Header) w.WriteHeader(resp.StatusCode) _, err := io.Copy(w, resp.Body) if err != nil { openlog.Error("can not copy: " + err.Error()) } err = resp.Body.Close() if err != nil { openlog.Error("Http response close error: " + err.Error()) } } func handleRequest(w http.ResponseWriter, inv *invocation.Invocation, ir *invocation.Response) (*http.Response, error) { if ir != nil { if ir.Err != nil { //handler only mesher errors, ignore http response err switch ir.Err.(type) { case hystrix.FallbackNullError: handleErrorResponse(inv, w, http.StatusOK, nil) case loadbalancer.LBError: handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err) case hystrix.CircuitError: handleErrorResponse(inv, w, http.StatusServiceUnavailable, ir.Err) case fault.Fault: handleErrorResponse(inv, w, ir.Status, ir.Err) default: //for other error, check response and response body, if there is body, just transparent response resp, ok := inv.Reply.(*http.Response) if ok { // return raw transport error if resp != nil { if resp.Body == nil { //resp.Resp can be nil, for example network error, must handle it handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err) return nil, ir.Err } copyChassisResp2HttpResp(w, resp) RecordStatus(inv, resp.StatusCode) } else { // unknown error, resp is nil, e.g. connection refused handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err) } } else { // unknown err in handler chain handleErrorResponse(inv, w, http.StatusInternalServerError, ir.Err) } } return nil, ir.Err } if inv.Endpoint == "" { handleErrorResponse(inv, w, http.StatusBadGateway, protocol.ErrUnknown) return nil, protocol.ErrUnknown } if ir.Result == nil { if ir.Err != nil { handleErrorResponse(inv, w, http.StatusBadGateway, ir.Err) return nil, ir.Err } handleErrorResponse(inv, w, http.StatusBadGateway, ErrNilResponse) return nil, protocol.ErrUnknown } resp, ok := ir.Result.(*http.Response) if !ok { err := errors.New("invocationResponse result is not type *rest.Response") handleErrorResponse(inv, w, http.StatusBadGateway, err) return nil, err } //transparent proxy copyChassisResp2HttpResp(w, resp) return resp, nil } else { handleErrorResponse(inv, w, http.StatusBadGateway, protocol.ErrUnExpectedHandlerChainResponse) return nil, protocol.ErrUnExpectedHandlerChainResponse } } //handleErrorResponse return proxy errors, not err from real service func handleErrorResponse(inv *invocation.Invocation, w http.ResponseWriter, statusCode int, err error) { w.WriteHeader(statusCode) if err != nil { _, err := w.Write([]byte(err.Error())) if err != nil { openlog.Error("can not write err to client: " + err.Error()) } } RecordStatus(inv, statusCode) } //RecordStatus record an operation status func RecordStatus(inv *invocation.Invocation, statusCode int) { LabelValues := map[string]string{metrics.LServiceName: inv.MicroServiceName, metrics.LApp: inv.RouteTags.AppID(), metrics.LVersion: inv.RouteTags.Version()} metrics.RecordStatus(LabelValues, statusCode) } func prepareRequest(req *http.Request) { if req.ContentLength == 0 { req.Body = nil } req.RequestURI = "" // client is forbidden to set RequestURI req.Close = false req.Header.Del("Connection") } func copyHeader(dst, src http.Header) { for k, vs := range src { for _, v := range vs { dst.Add(k, v) } } } func postProcessResponse(rsp *http.Response) { rsp.Header.Del("Connection") }