func()

in pkg/filter/http/dubboproxy/dubbo.go [100:242]


func (f *Filter) Decode(hc *pixiuHttp.HttpContext) filter.FilterStatus {
	rEntry := hc.GetRouteEntry()
	if rEntry == nil {
		logger.Info("[dubbo-go-pixiu] http not match route")
		bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: "not match route"})
		hc.SendLocalReply(http.StatusNotFound, bt)
		return filter.Stop
	}
	logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster :%v", rEntry.Cluster)

	clusterName := rEntry.Cluster
	clusterManager := server.GetClusterManager()
	endpoint := clusterManager.PickEndpoint(clusterName, hc)
	if endpoint == nil {
		logger.Info("[dubbo-go-pixiu] cluster not found endpoint")
		bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: "cluster not found endpoint"})
		hc.SendLocalReply(http.StatusServiceUnavailable, bt)
		return filter.Stop
	}

	// http://host/{application}/{service}/{method} or https://host/{application}/{service}/{method}
	rawPath := hc.Request.URL.Path
	rawPath = strings.Trim(rawPath, "/")
	splits := strings.Split(rawPath, "/")

	if len(splits) != 3 {
		logger.Info("[dubbo-go-pixiu] http path pattern error. path pattern should be http://127.0.0.1/{application}/{service}/{method}")
		bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: "http path pattern error"})
		hc.SendLocalReply(http.StatusBadRequest, bt)
		return filter.Stop
	}

	service := splits[1]
	method := splits[2]
	interfaceKey := service

	groupKey := hc.Request.Header.Get(constant.DubboGroup)
	versionKey := hc.Request.Header.Get(constant.DubboServiceVersion)
	types := hc.Request.Header.Get(constant.DubboServiceMethodTypes)

	rawBody, err := io.ReadAll(hc.Request.Body)
	if err != nil {
		logger.Infof("[dubbo-go-pixiu] read request body error %v", err)
		bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: fmt.Sprintf("read request body error %v", err)})
		hc.SendLocalReply(http.StatusBadRequest, bt)
		return filter.Stop
	}

	var body any
	if err := json.Unmarshal(rawBody, &body); err != nil {
		logger.Infof("[dubbo-go-pixiu] unmarshal request body error %v", err)
		bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: fmt.Sprintf("unmarshal request body error %v", err)})
		hc.SendLocalReply(http.StatusBadRequest, bt)
		return filter.Stop
	}

	inIArr := make([]any, 3)
	inVArr := make([]reflect.Value, 3)
	inIArr[0] = method

	var (
		typesList  []string
		valuesList []hessian.Object
	)

	if types != "" {
		typesList = strings.Split(types, ",")
	}

	values := body
	if _, ok := values.([]any); ok {
		for _, v := range values.([]any) {
			valuesList = append(valuesList, v)
		}
	} else {
		valuesList = append(valuesList, values)
	}

	inIArr[1] = typesList
	inIArr[2] = valuesList

	inVArr[0] = reflect.ValueOf(inIArr[0])
	inVArr[1] = reflect.ValueOf(inIArr[1])
	inVArr[2] = reflect.ValueOf(inIArr[2])

	invoc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("$invoke"),
		invocation.WithArguments(inIArr),
		invocation.WithParameterValues(inVArr))

	url, err := common.NewURL(endpoint.Address.GetAddress(),
		common.WithProtocol(dubbo.DUBBO), common.WithParamsValue(dubboConstant.SerializationKey, dubboConstant.Hessian2Serialization),
		common.WithParamsValue(dubboConstant.GenericFilterKey, "true"),
		common.WithParamsValue(dubboConstant.InterfaceKey, interfaceKey),
		common.WithParamsValue(dubboConstant.ReferenceFilterKey, "generic,filter"),
		// dubboAttachment must contains group and version info
		common.WithParamsValue(dubboConstant.GroupKey, groupKey),
		common.WithParamsValue(dubboConstant.VersionKey, versionKey),
		common.WithPath(interfaceKey),
	)
	if err != nil {
		logger.Infof("[dubbo-go-pixiu] newURL error %v", err)
		bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: fmt.Sprintf("newURL error %v", err)})
		hc.SendLocalReply(http.StatusServiceUnavailable, bt)
		return filter.Stop
	}

	dubboProtocol := dubbo.NewDubboProtocol()

	// TODO: will print many Error when failed to connect server
	invoker := dubboProtocol.Refer(url)
	if invoker == nil {
		logger.Info("[dubbo-go-pixiu] dubbo protocol refer error")
		bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: "dubbo protocol refer error"})
		hc.SendLocalReply(http.StatusServiceUnavailable, bt)
		return filter.Stop
	}

	var resp any
	invoc.SetReply(&resp)

	invCtx, cancel := context.WithTimeout(context.Background(), hc.Timeout)
	defer cancel()
	result := invoker.Invoke(invCtx, invoc)
	result.SetAttachments(invoc.Attachments())

	if result.Error() != nil {
		logger.Debugf("[dubbo-go-pixiu] invoke result error %v", result.Error())
		bt, _ := json.Marshal(pixiuHttp.ErrResponse{Message: fmt.Sprintf("invoke result error %v", result.Error())})
		// TODO statusCode I don't know what dubbo returns when it times out, first use the string to judge
		if strings.Contains(result.Error().Error(), "timeout") {
			hc.SendLocalReply(http.StatusGatewayTimeout, bt)
		}
		hc.SendLocalReply(http.StatusServiceUnavailable, bt)
		return filter.Stop
	}

	value := reflect.ValueOf(result.Result())
	result.SetResult(value.Elem().Interface())
	hc.SourceResp = resp
	invoker.Destroy()
	// response write in hcm
	return filter.Continue
}