internal/http/request.go (341 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package http import ( "context" "encoding/binary" "net" "net/http" "net/url" "reflect" "sync" "time" "github.com/api7/ext-plugin-proto/go/A6" ei "github.com/api7/ext-plugin-proto/go/A6/ExtraInfo" hrc "github.com/api7/ext-plugin-proto/go/A6/HTTPReqCall" flatbuffers "github.com/google/flatbuffers/go" "github.com/apache/apisix-go-plugin-runner/internal/util" "github.com/apache/apisix-go-plugin-runner/pkg/common" pkgHTTP "github.com/apache/apisix-go-plugin-runner/pkg/http" "github.com/apache/apisix-go-plugin-runner/pkg/log" ) type Request struct { // the root of the flatbuffers HTTPReqCall Request msg r *hrc.Req conn net.Conn extraInfoHeader []byte path []byte hdr *Header rawHdr http.Header args url.Values rawArgs url.Values vars map[string][]byte body []byte ctx context.Context cancel context.CancelFunc respHdr http.Header } func (r *Request) ConfToken() uint32 { return r.r.ConfToken() } func (r *Request) ID() uint32 { return r.r.Id() } func (r *Request) SrcIP() net.IP { return r.r.SrcIpBytes() } func (r *Request) Method() string { return r.r.Method().String() } func (r *Request) Path() []byte { if r.path == nil { return r.r.Path() } return r.path } func (r *Request) SetPath(path []byte) { r.path = path } func (r *Request) Header() pkgHTTP.Header { if r.hdr == nil { hdr := newHeader() hh := hdr.View() size := r.r.HeadersLength() obj := A6.TextEntry{} for i := 0; i < size; i++ { if r.r.Headers(&obj, i) { hh.Add(string(obj.Name()), string(obj.Value())) } } r.hdr = hdr r.rawHdr = hdr.Clone() } return r.hdr } func (r *Request) RespHeader() http.Header { if r.respHdr == nil { r.respHdr = http.Header{} } return r.respHdr } func cloneUrlValues(oldV url.Values) url.Values { nv := 0 for _, vv := range oldV { nv += len(vv) } sv := make([]string, nv) newV := make(url.Values, len(oldV)) for k, vv := range oldV { n := copy(sv, vv) newV[k] = sv[:n:n] sv = sv[n:] } return newV } func (r *Request) Args() url.Values { if r.args == nil { args := url.Values{} size := r.r.ArgsLength() obj := A6.TextEntry{} for i := 0; i < size; i++ { if r.r.Args(&obj, i) { args.Add(string(obj.Name()), string(obj.Value())) } } r.args = args r.rawArgs = cloneUrlValues(args) } return r.args } func (r *Request) Var(name string) ([]byte, error) { if r.vars == nil { r.vars = map[string][]byte{} } var v []byte var found bool if v, found = r.vars[name]; !found { var err error builder := util.GetBuilder() varName := builder.CreateString(name) ei.VarStart(builder) ei.VarAddName(builder, varName) varInfo := ei.VarEnd(builder) v, err = r.askExtraInfo(builder, ei.InfoVar, varInfo) util.PutBuilder(builder) if err != nil { return nil, err } r.vars[name] = v } return v, nil } func (r *Request) Body() ([]byte, error) { if len(r.body) > 0 { return r.body, nil } builder := util.GetBuilder() ei.ReqBodyStart(builder) bodyInfo := ei.ReqBodyEnd(builder) v, err := r.askExtraInfo(builder, ei.InfoReqBody, bodyInfo) if err != nil { return nil, err } r.body = v return v, nil } func (r *Request) Reset() { defer r.cancel() r.path = nil r.hdr = nil r.args = nil r.vars = nil r.body = nil r.conn = nil r.ctx = nil r.respHdr = nil // Keep the fields below // r.extraInfoHeader = nil } func (r *Request) FetchChanges(id uint32, builder *flatbuffers.Builder) bool { if r.path == nil && r.hdr == nil && r.args == nil && r.respHdr == nil { return false } var path flatbuffers.UOffsetT if r.path != nil { path = builder.CreateByteString(r.path) } var hdrVec, respHdrVec flatbuffers.UOffsetT if r.hdr != nil { hdrs := []flatbuffers.UOffsetT{} oldHdr := r.rawHdr newHdr := r.hdr.View() for n := range oldHdr { if _, ok := newHdr[n]; !ok { // deleted name := builder.CreateString(n) A6.TextEntryStart(builder) A6.TextEntryAddName(builder, name) te := A6.TextEntryEnd(builder) hdrs = append(hdrs, te) } } for n, v := range newHdr { if raw, ok := oldHdr[n]; !ok || raw[0] != v[0] { // set name := builder.CreateString(n) value := builder.CreateString(v[0]) A6.TextEntryStart(builder) A6.TextEntryAddName(builder, name) A6.TextEntryAddValue(builder, value) te := A6.TextEntryEnd(builder) hdrs = append(hdrs, te) } } size := len(hdrs) hrc.RewriteStartHeadersVector(builder, size) for i := size - 1; i >= 0; i-- { te := hdrs[i] builder.PrependUOffsetT(te) } hdrVec = builder.EndVector(size) } if r.respHdr != nil { respHdrs := []flatbuffers.UOffsetT{} for n, arr := range r.respHdr { for _, v := range arr { name := builder.CreateString(n) value := builder.CreateString(v) A6.TextEntryStart(builder) A6.TextEntryAddName(builder, name) A6.TextEntryAddValue(builder, value) te := A6.TextEntryEnd(builder) respHdrs = append(respHdrs, te) } } size := len(respHdrs) hrc.RewriteStartRespHeadersVector(builder, size) for i := size - 1; i >= 0; i-- { te := respHdrs[i] builder.PrependUOffsetT(te) } respHdrVec = builder.EndVector(size) } var argsVec flatbuffers.UOffsetT if r.args != nil { args := []flatbuffers.UOffsetT{} oldArgs := r.rawArgs newArgs := r.args for n := range oldArgs { if _, ok := newArgs[n]; !ok { // deleted name := builder.CreateString(n) A6.TextEntryStart(builder) A6.TextEntryAddName(builder, name) te := A6.TextEntryEnd(builder) args = append(args, te) } } for n, v := range newArgs { if raw, ok := oldArgs[n]; !ok || !reflect.DeepEqual(raw, v) { // set / add for _, vv := range v { name := builder.CreateString(n) value := builder.CreateString(vv) A6.TextEntryStart(builder) A6.TextEntryAddName(builder, name) A6.TextEntryAddValue(builder, value) te := A6.TextEntryEnd(builder) args = append(args, te) } } } size := len(args) hrc.RewriteStartArgsVector(builder, size) for i := size - 1; i >= 0; i-- { te := args[i] builder.PrependUOffsetT(te) } argsVec = builder.EndVector(size) } hrc.RewriteStart(builder) if path > 0 { hrc.RewriteAddPath(builder, path) } if hdrVec > 0 { hrc.RewriteAddHeaders(builder, hdrVec) } if respHdrVec > 0 { hrc.RewriteAddRespHeaders(builder, respHdrVec) } if argsVec > 0 { hrc.RewriteAddArgs(builder, argsVec) } rewrite := hrc.RewriteEnd(builder) hrc.RespStart(builder) hrc.RespAddId(builder, id) hrc.RespAddActionType(builder, hrc.ActionRewrite) hrc.RespAddAction(builder, rewrite) res := hrc.RespEnd(builder) builder.Finish(res) return true } func (r *Request) BindConn(c net.Conn) { r.conn = c } func (r *Request) Context() context.Context { if r.ctx != nil { return r.ctx } return context.Background() } func (r *Request) askExtraInfo(builder *flatbuffers.Builder, infoType ei.Info, info flatbuffers.UOffsetT) ([]byte, error) { ei.ReqStart(builder) ei.ReqAddInfoType(builder, infoType) ei.ReqAddInfo(builder, info) eiRes := ei.ReqEnd(builder) builder.Finish(eiRes) c := r.conn if len(r.extraInfoHeader) == 0 { r.extraInfoHeader = make([]byte, util.HeaderLen) } header := r.extraInfoHeader out := builder.FinishedBytes() size := len(out) binary.BigEndian.PutUint32(header, uint32(size)) header[0] = util.RPCExtraInfo n, err := util.WriteBytes(c, header, len(header)) if err != nil { util.WriteErr(n, err) return nil, common.ErrConnClosed } n, err = util.WriteBytes(c, out, size) if err != nil { util.WriteErr(n, err) return nil, common.ErrConnClosed } n, err = util.ReadBytes(c, header, util.HeaderLen) if util.ReadErr(n, err, util.HeaderLen) { return nil, common.ErrConnClosed } ty := header[0] header[0] = 0 length := binary.BigEndian.Uint32(header) log.Infof("receive rpc type: %d data length: %d", ty, length) buf := make([]byte, length) n, err = util.ReadBytes(c, buf, int(length)) if util.ReadErr(n, err, int(length)) { return nil, common.ErrConnClosed } resp := ei.GetRootAsResp(buf, 0) res := resp.ResultBytes() return res, nil } var reqPool = sync.Pool{ New: func() interface{} { return &Request{} }, } func CreateRequest(buf []byte) *Request { req := reqPool.Get().(*Request) req.r = hrc.GetRootAsReq(buf, 0) // because apisix has an implicit 60s timeout, so set the timeout to 56 seconds(smaller than 60s) // so plugin writer can still break the execution with a custom response before the apisix implicit timeout. ctx, cancel := context.WithTimeout(context.Background(), 56*time.Second) req.ctx = ctx req.cancel = cancel return req } func ReuseRequest(r *Request) { r.Reset() reqPool.Put(r) }