cluster/router/polaris/router.go (258 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package polaris
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
)
import (
"github.com/dubbogo/gost/log/logger"
"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/pkg/model"
v1 "github.com/polarismesh/polaris-go/pkg/model/pb/v1"
)
import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
remotingpolaris "dubbo.apache.org/dubbo-go/v3/remoting/polaris"
"dubbo.apache.org/dubbo-go/v3/remoting/polaris/parser"
)
var (
_ router.PriorityRouter = (*polarisRouter)(nil)
)
var (
ErrorPolarisServiceRouteRuleEmpty = errors.New("service route rule is empty")
)
func newPolarisRouter() (*polarisRouter, error) {
if err := remotingpolaris.Check(); errors.Is(err, remotingpolaris.ErrorNoOpenPolarisAbility) {
return &polarisRouter{
openRoute: false,
}, nil
}
routerAPI, err := remotingpolaris.GetRouterAPI()
if err != nil {
return nil, err
}
consumerAPI, err := remotingpolaris.GetConsumerAPI()
if err != nil {
return nil, err
}
return &polarisRouter{
openRoute: true,
routerAPI: routerAPI,
consumerAPI: consumerAPI,
}, nil
}
type polarisRouter struct {
openRoute bool
routerAPI polaris.RouterAPI
consumerAPI polaris.ConsumerAPI
cancel context.CancelFunc
lock sync.RWMutex
instances map[string]model.Instance
}
// Route Determine the target invokers list.
func (p *polarisRouter) Route(invokers []protocol.Invoker, url *common.URL,
invoaction protocol.Invocation) []protocol.Invoker {
if !p.openRoute {
logger.Debug("[Router][Polaris] not open polaris route ability")
return invokers
}
if len(invokers) == 0 {
logger.Warn("[Router][Polaris] invokers from previous router is empty")
return invokers
}
service := getService(url)
instanceMap := p.buildInstanceMap(service)
if len(instanceMap) == 0 {
return invokers
}
invokersMap := make(map[string]protocol.Invoker, len(invokers))
targetIns := make([]model.Instance, 0, len(invokers))
for i := range invokers {
invoker := invokers[i]
instanceID := invoker.GetURL().GetParam(constant.PolarisInstanceID, "")
if len(instanceID) == 0 {
continue
}
invokersMap[instanceID] = invoker
if val, ok := instanceMap[instanceID]; ok {
targetIns = append(targetIns, val)
}
}
req, err := p.buildRouteRequest(service, url, invoaction)
if err != nil {
return invokers
}
req.DstInstances = model.NewDefaultServiceInstances(model.ServiceInfo{
Service: service,
Namespace: remotingpolaris.GetNamespace(),
}, targetIns)
resp, err := p.routerAPI.ProcessRouters(&req)
if err != nil {
return invokers
}
ret := make([]protocol.Invoker, 0, len(resp.GetInstances()))
for i := range resp.GetInstances() {
if val, ok := invokersMap[resp.GetInstances()[i].GetId()]; ok {
ret = append(ret, val)
}
}
return ret
}
func getService(url *common.URL) string {
applicationMode := false
for _, item := range config.GetRootConfig().Registries {
if item.Protocol == constant.PolarisKey {
applicationMode = item.RegistryType == constant.ServiceKey
}
}
service := url.Interface()
if applicationMode {
service = config.GetApplicationConfig().Name
}
return service
}
func (p *polarisRouter) buildRouteRequest(svc string, url *common.URL,
invocation protocol.Invocation) (polaris.ProcessRoutersRequest, error) {
routeReq := polaris.ProcessRoutersRequest{
ProcessRoutersRequest: model.ProcessRoutersRequest{
SourceService: model.ServiceInfo{
Metadata: map[string]string{},
},
},
}
attachement := invocation.Attachments()
arguments := invocation.Arguments()
labels, err := p.buildTrafficLabels(svc)
if err != nil {
return polaris.ProcessRoutersRequest{}, err
}
for i := range labels {
label := labels[i]
if strings.Compare(label, model.LabelKeyPath) == 0 {
routeReq.AddArguments(model.BuildPathArgument(getInvokeMethod(url, invocation)))
continue
}
if strings.HasPrefix(label, model.LabelKeyHeader) {
if val, ok := attachement[strings.TrimPrefix(label, model.LabelKeyHeader)]; ok {
routeReq.SourceService.Metadata[label] = fmt.Sprintf("%+v", val)
routeReq.AddArguments(model.BuildArgumentFromLabel(label, fmt.Sprintf("%+v", val)))
}
}
if strings.HasPrefix(label, model.LabelKeyQuery) {
if val := parser.ParseArgumentsByExpression(label, arguments); val != nil {
routeReq.AddArguments(model.BuildArgumentFromLabel(label, fmt.Sprintf("%+v", val)))
}
}
}
return routeReq, nil
}
func (p *polarisRouter) buildTrafficLabels(svc string) ([]string, error) {
req := &model.GetServiceRuleRequest{}
req.Namespace = remotingpolaris.GetNamespace()
req.Service = svc
req.SetTimeout(time.Second)
engine := p.routerAPI.SDKContext().GetEngine()
resp, err := engine.SyncGetServiceRule(model.EventRouting, req)
if err != nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get route rule fail : %+v", req.GetNamespace(), req.GetService(), err)
return nil, err
}
if resp == nil || resp.GetValue() == nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get route rule empty", req.GetNamespace(), req.GetService())
return nil, ErrorPolarisServiceRouteRuleEmpty
}
routeRule := resp.GetValue().(*v1.Routing)
labels := make([]string, 0, 4)
labels = append(labels, collectRouteLabels(routeRule.GetInbounds())...)
labels = append(labels, collectRouteLabels(routeRule.GetInbounds())...)
return labels, nil
}
func getInvokeMethod(url *common.URL, invoaction protocol.Invocation) string {
applicationMode := false
for _, item := range config.GetRootConfig().Registries {
if item.Protocol == constant.PolarisKey {
applicationMode = item.RegistryType == constant.ServiceKey
}
}
method := invoaction.MethodName()
if applicationMode {
method = url.Interface() + "/" + invoaction.MethodName()
}
return method
}
func collectRouteLabels(routings []*v1.Route) []string {
ret := make([]string, 0, 4)
for i := range routings {
route := routings[i]
sources := route.GetSources()
for p := range sources {
source := sources[p]
for k := range source.GetMetadata() {
ret = append(ret, k)
}
}
}
return ret
}
func (p *polarisRouter) buildInstanceMap(svc string) map[string]model.Instance {
resp, err := p.consumerAPI.GetAllInstances(&polaris.GetAllInstancesRequest{
GetAllInstancesRequest: model.GetAllInstancesRequest{
Service: svc,
Namespace: remotingpolaris.GetNamespace(),
},
})
if err != nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get all instances fail : %+v", remotingpolaris.GetNamespace(), svc, err)
return nil
}
ret := make(map[string]model.Instance, len(resp.GetInstances()))
for i := range resp.GetInstances() {
ret[resp.GetInstances()[i].GetId()] = resp.GetInstances()[i]
}
return ret
}
// URL Return URL in router
func (p *polarisRouter) URL() *common.URL {
return nil
}
// Priority Return Priority in router
// 0 to ^int(0) is better
func (p *polarisRouter) Priority() int64 {
return 0
}
// Notify the router the invoker list
func (p *polarisRouter) Notify(invokers []protocol.Invoker) {
if !p.openRoute {
return
}
if len(invokers) == 0 {
return
}
service := getService(invokers[0].GetURL())
if service == "" {
logger.Error("url service is empty")
return
}
req := &model.GetServiceRuleRequest{}
req.Namespace = remotingpolaris.GetNamespace()
req.Service = service
req.SetTimeout(time.Second)
engine := p.routerAPI.SDKContext().GetEngine()
_, err := engine.SyncGetServiceRule(model.EventRouting, req)
if err != nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get route rule fail : %+v", req.GetNamespace(), req.GetService(), err)
return
}
_, err = p.consumerAPI.GetAllInstances(&polaris.GetAllInstancesRequest{
GetAllInstancesRequest: model.GetAllInstancesRequest{
Service: service,
Namespace: remotingpolaris.GetNamespace(),
},
})
if err != nil {
logger.Errorf("[Router][Polaris] ns:%s svc:%s get all instances fail : %+v", req.GetNamespace(), req.GetService(), err)
return
}
}