plugins/wasm-go/pkg/wrapper/plugin_wrapper.go (643 lines of code) (raw):

// Copyright (c) 2022 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package wrapper import ( "encoding/json" "fmt" "strconv" "time" "unsafe" "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" "github.com/tidwall/gjson" "github.com/tidwall/sjson" "github.com/alibaba/higress/plugins/wasm-go/pkg/log" "github.com/alibaba/higress/plugins/wasm-go/pkg/matcher" ) type Log log.Log const ( CustomLogKey = "custom_log" AILogKey = "ai_log" TraceSpanTagPrefix = "trace_span_tag." PluginIDKey = "_plugin_id_" ) type HttpContext interface { Scheme() string Host() string Path() string Method() string SetContext(key string, value interface{}) GetContext(key string) interface{} GetBoolContext(key string, defaultValue bool) bool GetStringContext(key, defaultValue string) string GetByteSliceContext(key string, defaultValue []byte) []byte GetUserAttribute(key string) interface{} SetUserAttribute(key string, value interface{}) SetUserAttributeMap(kvmap map[string]interface{}) GetUserAttributeMap() map[string]interface{} // You can call this function to set custom log WriteUserAttributeToLog() error // You can call this function to set custom log with your specific key WriteUserAttributeToLogWithKey(key string) error // You can call this function to set custom trace span attribute WriteUserAttributeToTrace() error // If the onHttpRequestBody handle is not set, the request body will not be read by default DontReadRequestBody() // If the onHttpResponseBody handle is not set, the request body will not be read by default DontReadResponseBody() // If the onHttpStreamingRequestBody handle is not set, and the onHttpRequestBody handle is set, the request body will be buffered by default BufferRequestBody() // If the onHttpStreamingResponseBody handle is not set, and the onHttpResponseBody handle is set, the response body will be buffered by default BufferResponseBody() // If any request header is changed in onHttpRequestHeaders, envoy will re-calculate the route. Call this function to disable the re-routing. // You need to call this before making any header modification operations. DisableReroute() // Note that this parameter affects the gateway's memory usage!Support setting a maximum buffer size for each request body individually in request phase. SetRequestBodyBufferLimit(byteSize uint32) // Note that this parameter affects the gateway's memory usage! Support setting a maximum buffer size for each response body individually in response phase. SetResponseBodyBufferLimit(byteSize uint32) // Get contextId of HttpContext GetContextId() uint32 } type oldParseConfigFunc[PluginConfig any] func(json gjson.Result, config *PluginConfig, log Log) error type oldParseRuleConfigFunc[PluginConfig any] func(json gjson.Result, global PluginConfig, config *PluginConfig, log Log) error type oldOnHttpHeadersFunc[PluginConfig any] func(context HttpContext, config PluginConfig, log Log) types.Action type oldOnHttpBodyFunc[PluginConfig any] func(context HttpContext, config PluginConfig, body []byte, log Log) types.Action type oldOnHttpStreamingBodyFunc[PluginConfig any] func(context HttpContext, config PluginConfig, chunk []byte, isLastChunk bool, log Log) []byte type oldOnHttpStreamDoneFunc[PluginConfig any] func(context HttpContext, config PluginConfig, log Log) type ParseConfigFunc[PluginConfig any] func(json gjson.Result, config *PluginConfig) error type ParseRuleConfigFunc[PluginConfig any] func(json gjson.Result, global PluginConfig, config *PluginConfig) error type onHttpHeadersFunc[PluginConfig any] func(context HttpContext, config PluginConfig) types.Action type onHttpBodyFunc[PluginConfig any] func(context HttpContext, config PluginConfig, body []byte) types.Action type onHttpStreamingBodyFunc[PluginConfig any] func(context HttpContext, config PluginConfig, chunk []byte, isLastChunk bool) []byte type onHttpStreamDoneFunc[PluginConfig any] func(context HttpContext, config PluginConfig) type CommonVmCtx[PluginConfig any] struct { types.DefaultVMContext pluginName string log Log hasCustomConfig bool parseConfig ParseConfigFunc[PluginConfig] parseRuleConfig ParseRuleConfigFunc[PluginConfig] onHttpRequestHeaders onHttpHeadersFunc[PluginConfig] onHttpRequestBody onHttpBodyFunc[PluginConfig] onHttpStreamingRequestBody onHttpStreamingBodyFunc[PluginConfig] onHttpResponseHeaders onHttpHeadersFunc[PluginConfig] onHttpResponseBody onHttpBodyFunc[PluginConfig] onHttpStreamingResponseBody onHttpStreamingBodyFunc[PluginConfig] onHttpStreamDone onHttpStreamDoneFunc[PluginConfig] } type TickFuncEntry struct { lastExecuted int64 tickPeriod int64 tickFunc func() } var globalOnTickFuncs []TickFuncEntry = []TickFuncEntry{} // Registe multiple onTick functions. Parameters include: // 1) tickPeriod: the execution period of tickFunc, this value should be a multiple of 100 // 2) tickFunc: the function to be executed // // You should call this function in parseConfig phase, for example: // // func parseConfig(json gjson.Result, config *HelloWorldConfig, log wrapper.Log) error { // wrapper.RegisteTickFunc(1000, func() { proxywasm.LogInfo("onTick 1s") }) // wrapper.RegisteTickFunc(3000, func() { proxywasm.LogInfo("onTick 3s") }) // return nil // } func RegisteTickFunc(tickPeriod int64, tickFunc func()) { globalOnTickFuncs = append(globalOnTickFuncs, TickFuncEntry{0, tickPeriod, tickFunc}) } func SetCtx[PluginConfig any](pluginName string, options ...CtxOption[PluginConfig]) { proxywasm.SetVMContext(NewCommonVmCtx(pluginName, options...)) } func SetCtxWithOptions[PluginConfig any](pluginName string, options ...CtxOption[PluginConfig]) { proxywasm.SetVMContext(NewCommonVmCtxWithOptions(pluginName, options...)) } type CtxOption[PluginConfig any] interface { Apply(*CommonVmCtx[PluginConfig]) } type parseConfigOption[PluginConfig any] struct { f ParseConfigFunc[PluginConfig] oldF oldParseConfigFunc[PluginConfig] } func (o parseConfigOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { if o.f != nil { ctx.parseConfig = o.f } else { ctx.parseConfig = func(json gjson.Result, config *PluginConfig) error { return o.oldF(json, config, ctx.log) } } } // Deprecated: Please use `ParseConfig` instead. func ParseConfigBy[PluginConfig any](f oldParseConfigFunc[PluginConfig]) CtxOption[PluginConfig] { return &parseConfigOption[PluginConfig]{oldF: f} } func ParseConfig[PluginConfig any](f ParseConfigFunc[PluginConfig]) CtxOption[PluginConfig] { return &parseConfigOption[PluginConfig]{f: f} } type parseOverrideConfigOption[PluginConfig any] struct { parseConfigF ParseConfigFunc[PluginConfig] parseRuleConfigF ParseRuleConfigFunc[PluginConfig] oldParseConfigF oldParseConfigFunc[PluginConfig] oldParseRuleConfigF oldParseRuleConfigFunc[PluginConfig] } func (o *parseOverrideConfigOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { if o.parseConfigF != nil && o.parseRuleConfigF != nil { ctx.parseConfig = o.parseConfigF ctx.parseRuleConfig = o.parseRuleConfigF } else { ctx.parseConfig = func(json gjson.Result, config *PluginConfig) error { return o.oldParseConfigF(json, config, ctx.log) } ctx.parseRuleConfig = func(json gjson.Result, global PluginConfig, config *PluginConfig) error { return o.oldParseRuleConfigF(json, global, config, ctx.log) } } } // Deprecated: Please use `ParseOverrideConfig` instead. func ParseOverrideConfigBy[PluginConfig any](f oldParseConfigFunc[PluginConfig], g oldParseRuleConfigFunc[PluginConfig]) CtxOption[PluginConfig] { return &parseOverrideConfigOption[PluginConfig]{ oldParseConfigF: f, oldParseRuleConfigF: g, } } func ParseOverrideConfig[PluginConfig any](f ParseConfigFunc[PluginConfig], g ParseRuleConfigFunc[PluginConfig]) CtxOption[PluginConfig] { return &parseOverrideConfigOption[PluginConfig]{ parseConfigF: f, parseRuleConfigF: g, } } type onProcessRequestHeadersOption[PluginConfig any] struct { f onHttpHeadersFunc[PluginConfig] oldF oldOnHttpHeadersFunc[PluginConfig] } func (o *onProcessRequestHeadersOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { if o.f != nil { ctx.onHttpRequestHeaders = o.f } else { ctx.onHttpRequestHeaders = func(context HttpContext, config PluginConfig) types.Action { return o.oldF(context, config, ctx.log) } } } // Deprecated: Please use `ProcessRequestHeaders` instead. func ProcessRequestHeadersBy[PluginConfig any](f oldOnHttpHeadersFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessRequestHeadersOption[PluginConfig]{oldF: f} } func ProcessRequestHeaders[PluginConfig any](f onHttpHeadersFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessRequestHeadersOption[PluginConfig]{f: f} } type onProcessRequestBodyOption[PluginConfig any] struct { f onHttpBodyFunc[PluginConfig] oldF oldOnHttpBodyFunc[PluginConfig] } func (o *onProcessRequestBodyOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { if o.f != nil { ctx.onHttpRequestBody = o.f } else { ctx.onHttpRequestBody = func(context HttpContext, config PluginConfig, body []byte) types.Action { return o.oldF(context, config, body, ctx.log) } } } // Deprecated: Please use `ProcessRequestBody` instead. func ProcessRequestBodyBy[PluginConfig any](f oldOnHttpBodyFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessRequestBodyOption[PluginConfig]{oldF: f} } func ProcessRequestBody[PluginConfig any](f onHttpBodyFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessRequestBodyOption[PluginConfig]{f: f} } type onProcessStreamingRequestBodyOption[PluginConfig any] struct { f onHttpStreamingBodyFunc[PluginConfig] oldF oldOnHttpStreamingBodyFunc[PluginConfig] } func (o *onProcessStreamingRequestBodyOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { if o.f != nil { ctx.onHttpStreamingRequestBody = o.f } else { ctx.onHttpStreamingRequestBody = func(context HttpContext, config PluginConfig, chunk []byte, isLastChunk bool) []byte { return o.oldF(context, config, chunk, isLastChunk, ctx.log) } } } // Deprecated: Please use `ProcessStreamingRequestBody` instead. func ProcessStreamingRequestBodyBy[PluginConfig any](f oldOnHttpStreamingBodyFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessStreamingRequestBodyOption[PluginConfig]{oldF: f} } func ProcessStreamingRequestBody[PluginConfig any](f onHttpStreamingBodyFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessStreamingRequestBodyOption[PluginConfig]{f: f} } type onProcessResponseHeadersOption[PluginConfig any] struct { f onHttpHeadersFunc[PluginConfig] oldF oldOnHttpHeadersFunc[PluginConfig] } func (o *onProcessResponseHeadersOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { if o.f != nil { ctx.onHttpResponseHeaders = o.f } else { ctx.onHttpResponseHeaders = func(context HttpContext, config PluginConfig) types.Action { return o.oldF(context, config, ctx.log) } } } // Deprecated: Please use `ProcessResponseHeaders` instead. func ProcessResponseHeadersBy[PluginConfig any](f oldOnHttpHeadersFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessResponseHeadersOption[PluginConfig]{oldF: f} } func ProcessResponseHeaders[PluginConfig any](f onHttpHeadersFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessResponseHeadersOption[PluginConfig]{f: f} } type onProcessResponseBodyOption[PluginConfig any] struct { f onHttpBodyFunc[PluginConfig] oldF oldOnHttpBodyFunc[PluginConfig] } func (o *onProcessResponseBodyOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { if o.f != nil { ctx.onHttpResponseBody = o.f } else { ctx.onHttpResponseBody = func(context HttpContext, config PluginConfig, body []byte) types.Action { return o.oldF(context, config, body, ctx.log) } } } // Deprecated: Please use `ProcessResponseBody` instead. func ProcessResponseBodyBy[PluginConfig any](f oldOnHttpBodyFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessResponseBodyOption[PluginConfig]{oldF: f} } func ProcessResponseBody[PluginConfig any](f onHttpBodyFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessResponseBodyOption[PluginConfig]{f: f} } type onProcessStreamingResponseBodyOption[PluginConfig any] struct { f onHttpStreamingBodyFunc[PluginConfig] oldF oldOnHttpStreamingBodyFunc[PluginConfig] } func (o *onProcessStreamingResponseBodyOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { if o.f != nil { ctx.onHttpStreamingResponseBody = o.f } else { ctx.onHttpStreamingResponseBody = func(context HttpContext, config PluginConfig, chunk []byte, isLastChunk bool) []byte { return o.oldF(context, config, chunk, isLastChunk, ctx.log) } } } // Deprecated: Please use `ProcessStreamingResponseBody` instead. func ProcessStreamingResponseBodyBy[PluginConfig any](f oldOnHttpStreamingBodyFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessStreamingResponseBodyOption[PluginConfig]{oldF: f} } func ProcessStreamingResponseBody[PluginConfig any](f onHttpStreamingBodyFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessStreamingResponseBodyOption[PluginConfig]{f: f} } type onProcessStreamDoneOption[PluginConfig any] struct { f onHttpStreamDoneFunc[PluginConfig] oldF oldOnHttpStreamDoneFunc[PluginConfig] } func (o *onProcessStreamDoneOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { if o.f != nil { ctx.onHttpStreamDone = o.f } else { ctx.onHttpStreamDone = func(context HttpContext, config PluginConfig) { o.oldF(context, config, ctx.log) } } } // Deprecated: Please use `ProcessStreamDoneBy` instead. func ProcessStreamDoneBy[PluginConfig any](f oldOnHttpStreamDoneFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessStreamDoneOption[PluginConfig]{oldF: f} } func ProcessStreamDone[PluginConfig any](f onHttpStreamDoneFunc[PluginConfig]) CtxOption[PluginConfig] { return &onProcessStreamDoneOption[PluginConfig]{f: f} } type logOption[PluginConfig any] struct { logger Log } func (o *logOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { log.SetPluginLog(o.logger) ctx.log = o.logger } func WithLogger[PluginConfig any](logger Log) CtxOption[PluginConfig] { return &logOption[PluginConfig]{logger} } func parseEmptyPluginConfig[PluginConfig any](gjson.Result, *PluginConfig) error { return nil } func NewCommonVmCtx[PluginConfig any](pluginName string, options ...CtxOption[PluginConfig]) *CommonVmCtx[PluginConfig] { logger := &DefaultLog{pluginName, "nil"} opts := []CtxOption[PluginConfig]{WithLogger[PluginConfig](logger)} for _, opt := range options { if opt == nil { continue } opts = append(opts, opt) } return NewCommonVmCtxWithOptions(pluginName, opts...) } func NewCommonVmCtxWithOptions[PluginConfig any](pluginName string, options ...CtxOption[PluginConfig]) *CommonVmCtx[PluginConfig] { ctx := &CommonVmCtx[PluginConfig]{ pluginName: pluginName, hasCustomConfig: true, } for _, opt := range options { opt.Apply(ctx) } if ctx.parseConfig == nil { var config PluginConfig if unsafe.Sizeof(config) != 0 { msg := "the `parseConfig` is missing in NewCommonVmCtx's arguments" panic(msg) } ctx.hasCustomConfig = false ctx.parseConfig = parseEmptyPluginConfig[PluginConfig] } return ctx } func (ctx *CommonVmCtx[PluginConfig]) NewPluginContext(uint32) types.PluginContext { return &CommonPluginCtx[PluginConfig]{ vm: ctx, } } type CommonPluginCtx[PluginConfig any] struct { types.DefaultPluginContext matcher.RuleMatcher[PluginConfig] vm *CommonVmCtx[PluginConfig] onTickFuncs []TickFuncEntry } func (ctx *CommonPluginCtx[PluginConfig]) OnPluginStart(int) types.OnPluginStartStatus { data, err := proxywasm.GetPluginConfiguration() globalOnTickFuncs = nil if err != nil && err != types.ErrorStatusNotFound { ctx.vm.log.Criticalf("error reading plugin configuration: %v", err) return types.OnPluginStartStatusFailed } var jsonData gjson.Result if len(data) == 0 { if ctx.vm.hasCustomConfig { ctx.vm.log.Warn("config is empty, but has ParseConfigFunc") } } else { if !gjson.ValidBytes(data) { ctx.vm.log.Warnf("the plugin configuration is not a valid json: %s", string(data)) return types.OnPluginStartStatusFailed } pluginID := gjson.GetBytes(data, PluginIDKey).String() if pluginID != "" { ctx.vm.log.ResetID(pluginID) data, _ = sjson.DeleteBytes([]byte(data), PluginIDKey) } jsonData = gjson.ParseBytes(data) } var parseOverrideConfig func(gjson.Result, PluginConfig, *PluginConfig) error if ctx.vm.parseRuleConfig != nil { parseOverrideConfig = func(js gjson.Result, global PluginConfig, cfg *PluginConfig) error { return ctx.vm.parseRuleConfig(js, global, cfg) } } err = ctx.ParseRuleConfig(jsonData, func(js gjson.Result, cfg *PluginConfig) error { return ctx.vm.parseConfig(js, cfg) }, parseOverrideConfig, ) if err != nil { ctx.vm.log.Warnf("parse rule config failed: %v", err) ctx.vm.log.Error("plugin start failed") return types.OnPluginStartStatusFailed } if globalOnTickFuncs != nil { ctx.onTickFuncs = globalOnTickFuncs if err := proxywasm.SetTickPeriodMilliSeconds(100); err != nil { ctx.vm.log.Error("SetTickPeriodMilliSeconds failed, onTick functions will not take effect.") ctx.vm.log.Error("plugin start failed") return types.OnPluginStartStatusFailed } } ctx.vm.log.Info("plugin start successfully") return types.OnPluginStartStatusOK } func (ctx *CommonPluginCtx[PluginConfig]) OnTick() { for i := range ctx.onTickFuncs { currentTimeStamp := time.Now().UnixMilli() if currentTimeStamp-ctx.onTickFuncs[i].lastExecuted >= ctx.onTickFuncs[i].tickPeriod { ctx.onTickFuncs[i].tickFunc() ctx.onTickFuncs[i].lastExecuted = currentTimeStamp } } } func (ctx *CommonPluginCtx[PluginConfig]) NewHttpContext(contextID uint32) types.HttpContext { httpCtx := &CommonHttpCtx[PluginConfig]{ plugin: ctx, contextID: contextID, userContext: map[string]interface{}{}, userAttribute: map[string]interface{}{}, } if ctx.vm.onHttpRequestBody != nil || ctx.vm.onHttpStreamingRequestBody != nil { httpCtx.needRequestBody = true } if ctx.vm.onHttpResponseBody != nil || ctx.vm.onHttpStreamingResponseBody != nil { httpCtx.needResponseBody = true } if ctx.vm.onHttpStreamingRequestBody != nil { httpCtx.streamingRequestBody = true } if ctx.vm.onHttpStreamingResponseBody != nil { httpCtx.streamingResponseBody = true } return httpCtx } type CommonHttpCtx[PluginConfig any] struct { types.DefaultHttpContext plugin *CommonPluginCtx[PluginConfig] config *PluginConfig needRequestBody bool needResponseBody bool streamingRequestBody bool streamingResponseBody bool requestBodySize int responseBodySize int contextID uint32 userContext map[string]interface{} userAttribute map[string]interface{} } func (ctx *CommonHttpCtx[PluginConfig]) SetContext(key string, value interface{}) { ctx.userContext[key] = value } func (ctx *CommonHttpCtx[PluginConfig]) GetContext(key string) interface{} { return ctx.userContext[key] } func (ctx *CommonHttpCtx[PluginConfig]) SetUserAttribute(key string, value interface{}) { ctx.userAttribute[key] = value } func (ctx *CommonHttpCtx[PluginConfig]) GetUserAttribute(key string) interface{} { return ctx.userAttribute[key] } func (ctx *CommonHttpCtx[PluginConfig]) SetUserAttributeMap(kvmap map[string]interface{}) { ctx.userAttribute = kvmap } func (ctx *CommonHttpCtx[PluginConfig]) GetUserAttributeMap() map[string]interface{} { return ctx.userAttribute } func (ctx *CommonHttpCtx[PluginConfig]) WriteUserAttributeToLog() error { return ctx.WriteUserAttributeToLogWithKey(CustomLogKey) } func (ctx *CommonHttpCtx[PluginConfig]) WriteUserAttributeToLogWithKey(key string) error { // e.g. {\"field1\":\"value1\",\"field2\":\"value2\"} preMarshalledJsonLogStr, _ := proxywasm.GetProperty([]string{key}) newAttributeMap := map[string]interface{}{} if string(preMarshalledJsonLogStr) != "" { // e.g. {"field1":"value1","field2":"value2"} preJsonLogStr := unmarshalStr(fmt.Sprintf(`"%s"`, string(preMarshalledJsonLogStr))) err := json.Unmarshal([]byte(preJsonLogStr), &newAttributeMap) if err != nil { ctx.plugin.vm.log.Warnf("Unmarshal failed, will overwrite %s, pre value is: %s", key, string(preMarshalledJsonLogStr)) return err } } // update customLog for k, v := range ctx.userAttribute { newAttributeMap[k] = v } // e.g. {"field1":"value1","field2":2,"field3":"value3"} jsonStr, _ := json.Marshal(newAttributeMap) // e.g. {\"field1\":\"value1\",\"field2\":2,\"field3\":\"value3\"} marshalledJsonStr := marshalStr(string(jsonStr)) if err := proxywasm.SetProperty([]string{key}, []byte(marshalledJsonStr)); err != nil { ctx.plugin.vm.log.Warnf("failed to set %s in filter state, raw is %s, err is %v", key, marshalledJsonStr, err) return err } return nil } func (ctx *CommonHttpCtx[PluginConfig]) WriteUserAttributeToTrace() error { for k, v := range ctx.userAttribute { traceSpanTag := TraceSpanTagPrefix + k traceSpanValue := fmt.Sprint(v) var err error if traceSpanValue != "" { err = proxywasm.SetProperty([]string{traceSpanTag}, []byte(traceSpanValue)) } else { err = fmt.Errorf("value of %s is empty", traceSpanTag) } if err != nil { ctx.plugin.vm.log.Warnf("Failed to set trace attribute - %s: %s, error message: %v", traceSpanTag, traceSpanValue, err) } } return nil } func (ctx *CommonHttpCtx[PluginConfig]) GetBoolContext(key string, defaultValue bool) bool { if b, ok := ctx.userContext[key].(bool); ok { return b } return defaultValue } func (ctx *CommonHttpCtx[PluginConfig]) GetStringContext(key, defaultValue string) string { if s, ok := ctx.userContext[key].(string); ok { return s } return defaultValue } func (ctx *CommonHttpCtx[PluginConfig]) GetByteSliceContext(key string, defaultValue []byte) []byte { if s, ok := ctx.userContext[key].([]byte); ok { return s } return defaultValue } func (ctx *CommonHttpCtx[PluginConfig]) Scheme() string { proxywasm.SetEffectiveContext(ctx.contextID) return GetRequestScheme() } func (ctx *CommonHttpCtx[PluginConfig]) Host() string { proxywasm.SetEffectiveContext(ctx.contextID) return GetRequestHost() } func (ctx *CommonHttpCtx[PluginConfig]) Path() string { proxywasm.SetEffectiveContext(ctx.contextID) return GetRequestPath() } func (ctx *CommonHttpCtx[PluginConfig]) Method() string { proxywasm.SetEffectiveContext(ctx.contextID) return GetRequestMethod() } func (ctx *CommonHttpCtx[PluginConfig]) DontReadRequestBody() { ctx.needRequestBody = false } func (ctx *CommonHttpCtx[PluginConfig]) DontReadResponseBody() { ctx.needResponseBody = false } func (ctx *CommonHttpCtx[PluginConfig]) BufferRequestBody() { ctx.streamingRequestBody = false } func (ctx *CommonHttpCtx[PluginConfig]) BufferResponseBody() { ctx.streamingResponseBody = false } func (ctx *CommonHttpCtx[PluginConfig]) DisableReroute() { _ = proxywasm.SetProperty([]string{"clear_route_cache"}, []byte("off")) } func (ctx *CommonHttpCtx[PluginConfig]) SetRequestBodyBufferLimit(size uint32) { ctx.plugin.vm.log.Infof("SetRequestBodyBufferLimit: %d", size) _ = proxywasm.SetProperty([]string{"set_decoder_buffer_limit"}, []byte(strconv.Itoa(int(size)))) } func (ctx *CommonHttpCtx[PluginConfig]) SetResponseBodyBufferLimit(size uint32) { ctx.plugin.vm.log.Infof("SetResponseBodyBufferLimit: %d", size) _ = proxywasm.SetProperty([]string{"set_encoder_buffer_limit"}, []byte(strconv.Itoa(int(size)))) } func (ctx *CommonHttpCtx[PluginConfig]) GetContextId() uint32 { return ctx.contextID } func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestHeaders(numHeaders int, endOfStream bool) types.Action { requestID, _ := proxywasm.GetHttpRequestHeader("x-request-id") _ = proxywasm.SetProperty([]string{"x_request_id"}, []byte(requestID)) config, err := ctx.plugin.GetMatchConfig() if err != nil { ctx.plugin.vm.log.Errorf("get match config failed, err:%v", err) return types.ActionContinue } if config == nil { return types.ActionContinue } ctx.config = config // To avoid unexpected operations, plugins do not read the binary content body if IsBinaryRequestBody() { ctx.needRequestBody = false } if ctx.plugin.vm.onHttpRequestHeaders == nil { return types.ActionContinue } return ctx.plugin.vm.onHttpRequestHeaders(ctx, *config) } func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestBody(bodySize int, endOfStream bool) types.Action { if ctx.config == nil { return types.ActionContinue } if !ctx.needRequestBody { return types.ActionContinue } if ctx.plugin.vm.onHttpStreamingRequestBody != nil && ctx.streamingRequestBody { chunk, _ := proxywasm.GetHttpRequestBody(0, bodySize) modifiedChunk := ctx.plugin.vm.onHttpStreamingRequestBody(ctx, *ctx.config, chunk, endOfStream) err := proxywasm.ReplaceHttpRequestBody(modifiedChunk) if err != nil { ctx.plugin.vm.log.Warnf("replace request body chunk failed: %v", err) return types.ActionContinue } return types.ActionContinue } if ctx.plugin.vm.onHttpRequestBody != nil { ctx.requestBodySize += bodySize if !endOfStream { return types.ActionPause } body, err := proxywasm.GetHttpRequestBody(0, ctx.requestBodySize) if err != nil { ctx.plugin.vm.log.Warnf("get request body failed: %v", err) return types.ActionContinue } return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body) } return types.ActionContinue } func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseHeaders(numHeaders int, endOfStream bool) types.Action { if ctx.config == nil { return types.ActionContinue } // To avoid unexpected operations, plugins do not read the binary content body if IsBinaryResponseBody() { ctx.needResponseBody = false } if ctx.plugin.vm.onHttpResponseHeaders == nil { return types.ActionContinue } return ctx.plugin.vm.onHttpResponseHeaders(ctx, *ctx.config) } func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseBody(bodySize int, endOfStream bool) types.Action { if ctx.config == nil { return types.ActionContinue } if !ctx.needResponseBody { return types.ActionContinue } if ctx.plugin.vm.onHttpStreamingResponseBody != nil && ctx.streamingResponseBody { chunk, _ := proxywasm.GetHttpResponseBody(0, bodySize) modifiedChunk := ctx.plugin.vm.onHttpStreamingResponseBody(ctx, *ctx.config, chunk, endOfStream) err := proxywasm.ReplaceHttpResponseBody(modifiedChunk) if err != nil { ctx.plugin.vm.log.Warnf("replace response body chunk failed: %v", err) return types.ActionContinue } return types.ActionContinue } if ctx.plugin.vm.onHttpResponseBody != nil { ctx.responseBodySize += bodySize if !endOfStream { return types.ActionPause } body, err := proxywasm.GetHttpResponseBody(0, ctx.responseBodySize) if err != nil { ctx.plugin.vm.log.Warnf("get response body failed: %v", err) return types.ActionContinue } return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body) } return types.ActionContinue } func (ctx *CommonHttpCtx[PluginConfig]) OnHttpStreamDone() { if ctx.config == nil { return } if ctx.plugin.vm.onHttpStreamDone == nil { return } ctx.plugin.vm.onHttpStreamDone(ctx, *ctx.config) }