pkg/common/http/manager.go (198 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package http import ( "context" "encoding/json" "fmt" "io" stdHttp "net/http" "sync" ) import ( "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go-pixiu/pkg/client" "github.com/apache/dubbo-go-pixiu/pkg/client/http" "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" router2 "github.com/apache/dubbo-go-pixiu/pkg/common/router" "github.com/apache/dubbo-go-pixiu/pkg/common/util" pch "github.com/apache/dubbo-go-pixiu/pkg/context/http" "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" ) // HttpConnectionManager network filter for http type HttpConnectionManager struct { filter.EmptyNetworkFilter config *model.HttpConnectionManagerConfig routerCoordinator *router2.RouterCoordinator filterManager *filter.FilterManager pool sync.Pool } // CreateHttpConnectionManager create http connection manager func CreateHttpConnectionManager(hcmc *model.HttpConnectionManagerConfig) *HttpConnectionManager { hcm := &HttpConnectionManager{config: hcmc} hcm.pool.New = func() any { return hcm.allocateContext() } hcm.routerCoordinator = router2.CreateRouterCoordinator(&hcmc.RouteConfig) hcm.filterManager = filter.NewFilterManager(hcmc.HTTPFilters) hcm.filterManager.Load() return hcm } func (hcm *HttpConnectionManager) allocateContext() *pch.HttpContext { return &pch.HttpContext{ Params: make(map[string]any), } } func (hcm *HttpConnectionManager) Handle(hc *pch.HttpContext) error { hc.Ctx = context.Background() err := hcm.findRoute(hc) if err != nil { return err } hcm.handleHTTPRequest(hc) return nil } func (hcm *HttpConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp.Request) { hc := hcm.pool.Get().(*pch.HttpContext) defer hcm.pool.Put(hc) hc.Writer = w hc.Request = r hc.Reset() hc.Timeout = hcm.config.Timeout err := hcm.Handle(hc) if err != nil { logger.Errorf("ServeHTTP %v", err) } } // handleHTTPRequest handle http request func (hcm *HttpConnectionManager) handleHTTPRequest(c *pch.HttpContext) { filterChain := hcm.filterManager.CreateFilterChain(c) // recover any err when filterChain run defer func() { if err := recover(); err != nil { logger.Warnf("[dubbopixiu go] Occur An Unexpected Err: %+v", err) c.SendLocalReply(stdHttp.StatusInternalServerError, []byte(fmt.Sprintf("Occur An Unexpected Err: %v", err))) } }() //todo timeout filterChain.OnDecode(c) hcm.buildTargetResponse(c) //todo: stream resp has to set HTTP Server's WriteTimeout to 0, need to check it filterChain.OnEncode(c) hcm.writeResponse(c) } func (hcm *HttpConnectionManager) writeResponse(c *pch.HttpContext) { if !c.LocalReply() { c.Writer.WriteHeader(c.GetStatusCode()) if c.TargetResp != nil { switch res := c.TargetResp.(type) { case *client.UnaryResponse: _, err := c.Writer.Write(res.Data) if err != nil { logger.Errorf("Write response failed: %v", err) } case *client.StreamResponse: // create ctx helps goroutine exit ctx, cancel := context.WithCancel(c.Ctx) defer cancel() dataC := make(chan []byte) errC := make(chan error, 1) // goroutine read stream go func() { defer close(dataC) defer close(errC) buf := make([]byte, 1024) // 1KB buffer for { select { case <-ctx.Done(): return default: n, err := res.Stream.Read(buf) if n > 0 { // copy data to prevent data cover data := make([]byte, n) copy(data, buf[:n]) select { case dataC <- data: case <-ctx.Done(): return } } if err != nil { if err != io.EOF { errC <- fmt.Errorf("stream read error: %w", err) } else { errC <- io.EOF } return } } } }() for { select { case <-ctx.Done(): _ = res.Stream.Close() return case data, ok := <-dataC: if !ok { return } if _, err := c.Writer.Write(data); err != nil { cancel() _ = res.Stream.Close() return } case err := <-errC: if err != nil && err != io.EOF { logger.Errorf("Stream error: %v", err) } return } } default: logger.Errorf("Unknown response type: %T", c.TargetResp) } } } } func (hcm *HttpConnectionManager) buildTargetResponse(c *pch.HttpContext) { if c.LocalReply() { return } switch res := c.SourceResp.(type) { case *stdHttp.Response: //Merge header remoteHeader := res.Header for k := range remoteHeader { c.AddHeader(k, remoteHeader.Get(k)) } //status code c.StatusCode(res.StatusCode) if http.IsSSEStream(res) { c.TargetResp = &client.StreamResponse{Stream: res.Body} } else { body, err := io.ReadAll(res.Body) if err != nil { panic(err) } //close body _ = res.Body.Close() c.TargetResp = &client.UnaryResponse{Data: body} } case []byte: c.StatusCode(stdHttp.StatusOK) if json.Valid(res) { c.AddHeader(constant.HeaderKeyContextType, constant.HeaderValueApplicationJson) } else { c.AddHeader(constant.HeaderKeyContextType, constant.HeaderValueTextPlain) } c.TargetResp = &client.UnaryResponse{Data: res} default: //dubbo go generic invoke response := util.NewDubboResponse(res, false) c.StatusCode(stdHttp.StatusOK) c.AddHeader(constant.HeaderKeyContextType, constant.HeaderValueJsonUtf8) c.TargetResp = response } } func (hcm *HttpConnectionManager) findRoute(hc *pch.HttpContext) error { ra, err := hcm.routerCoordinator.Route(hc) if err != nil { hc.SendLocalReply(stdHttp.StatusNotFound, constant.Default404Body) e := errors.Errorf("Requested URL %s not found", hc.GetUrl()) logger.Debug(e.Error()) return e // return 404 } hc.RouteEntry(ra) return nil }