in pkg/filter/http/dubboproxy/dubbo.go [100:242]
func (f *Filter) Decode(hc *pixiuHttp.HttpContext) filter.FilterStatus {
rEntry := hc.GetRouteEntry()
if rEntry == nil {
logger.Info("[dubbo-go-pixiu] http not match route")
bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: "not match route"})
hc.SendLocalReply(http.StatusNotFound, bt)
return filter.Stop
}
logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster :%v", rEntry.Cluster)
clusterName := rEntry.Cluster
clusterManager := server.GetClusterManager()
endpoint := clusterManager.PickEndpoint(clusterName, hc)
if endpoint == nil {
logger.Info("[dubbo-go-pixiu] cluster not found endpoint")
bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: "cluster not found endpoint"})
hc.SendLocalReply(http.StatusServiceUnavailable, bt)
return filter.Stop
}
// http://host/{application}/{service}/{method} or https://host/{application}/{service}/{method}
rawPath := hc.Request.URL.Path
rawPath = strings.Trim(rawPath, "/")
splits := strings.Split(rawPath, "/")
if len(splits) != 3 {
logger.Info("[dubbo-go-pixiu] http path pattern error. path pattern should be http://127.0.0.1/{application}/{service}/{method}")
bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: "http path pattern error"})
hc.SendLocalReply(http.StatusBadRequest, bt)
return filter.Stop
}
service := splits[1]
method := splits[2]
interfaceKey := service
groupKey := hc.Request.Header.Get(constant.DubboGroup)
versionKey := hc.Request.Header.Get(constant.DubboServiceVersion)
types := hc.Request.Header.Get(constant.DubboServiceMethodTypes)
rawBody, err := io.ReadAll(hc.Request.Body)
if err != nil {
logger.Infof("[dubbo-go-pixiu] read request body error %v", err)
bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: fmt.Sprintf("read request body error %v", err)})
hc.SendLocalReply(http.StatusBadRequest, bt)
return filter.Stop
}
var body any
if err := json.Unmarshal(rawBody, &body); err != nil {
logger.Infof("[dubbo-go-pixiu] unmarshal request body error %v", err)
bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: fmt.Sprintf("unmarshal request body error %v", err)})
hc.SendLocalReply(http.StatusBadRequest, bt)
return filter.Stop
}
inIArr := make([]any, 3)
inVArr := make([]reflect.Value, 3)
inIArr[0] = method
var (
typesList []string
valuesList []hessian.Object
)
if types != "" {
typesList = strings.Split(types, ",")
}
values := body
if _, ok := values.([]any); ok {
for _, v := range values.([]any) {
valuesList = append(valuesList, v)
}
} else {
valuesList = append(valuesList, values)
}
inIArr[1] = typesList
inIArr[2] = valuesList
inVArr[0] = reflect.ValueOf(inIArr[0])
inVArr[1] = reflect.ValueOf(inIArr[1])
inVArr[2] = reflect.ValueOf(inIArr[2])
invoc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("$invoke"),
invocation.WithArguments(inIArr),
invocation.WithParameterValues(inVArr))
url, err := common.NewURL(endpoint.Address.GetAddress(),
common.WithProtocol(dubbo.DUBBO), common.WithParamsValue(dubboConstant.SerializationKey, dubboConstant.Hessian2Serialization),
common.WithParamsValue(dubboConstant.GenericFilterKey, "true"),
common.WithParamsValue(dubboConstant.InterfaceKey, interfaceKey),
common.WithParamsValue(dubboConstant.ReferenceFilterKey, "generic,filter"),
// dubboAttachment must contains group and version info
common.WithParamsValue(dubboConstant.GroupKey, groupKey),
common.WithParamsValue(dubboConstant.VersionKey, versionKey),
common.WithPath(interfaceKey),
)
if err != nil {
logger.Infof("[dubbo-go-pixiu] newURL error %v", err)
bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: fmt.Sprintf("newURL error %v", err)})
hc.SendLocalReply(http.StatusServiceUnavailable, bt)
return filter.Stop
}
dubboProtocol := dubbo.NewDubboProtocol()
// TODO: will print many Error when failed to connect server
invoker := dubboProtocol.Refer(url)
if invoker == nil {
logger.Info("[dubbo-go-pixiu] dubbo protocol refer error")
bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: "dubbo protocol refer error"})
hc.SendLocalReply(http.StatusServiceUnavailable, bt)
return filter.Stop
}
var resp any
invoc.SetReply(&resp)
invCtx, cancel := context.WithTimeout(context.Background(), hc.Timeout)
defer cancel()
result := invoker.Invoke(invCtx, invoc)
result.SetAttachments(invoc.Attachments())
if result.Error() != nil {
logger.Debugf("[dubbo-go-pixiu] invoke result error %v", result.Error())
bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: fmt.Sprintf("invoke result error %v", result.Error())})
// TODO statusCode I don't know what dubbo returns when it times out, first use the string to judge
if strings.Contains(result.Error().Error(), "timeout") {
hc.SendLocalReply(http.StatusGatewayTimeout, bt)
}
hc.SendLocalReply(http.StatusServiceUnavailable, bt)
return filter.Stop
}
value := reflect.ValueOf(result.Result())
result.SetResult(value.Elem().Interface())
hc.SourceResp = resp
invoker.Destroy()
// response write in hcm
return filter.Continue
}