cluster/router/script/router.go (170 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package script
import (
"strings"
"sync"
)
import (
"github.com/dubbogo/gost/log/logger"
"gopkg.in/yaml.v2"
)
import (
ins "dubbo.apache.org/dubbo-go/v3/cluster/router/script/instance"
"dubbo.apache.org/dubbo-go/v3/common"
conf "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
// ScriptRouter only takes effect on consumers and only supports application granular management.
type ScriptRouter struct {
applicationName string
mu sync.RWMutex
enabled bool
scriptType string
rawScript string
}
func NewScriptRouter() *ScriptRouter {
return &ScriptRouter{
applicationName: "",
enabled: false,
}
}
func parseRoute(routeContent string) (*config.RouterConfig, error) {
routeDecoder := yaml.NewDecoder(strings.NewReader(routeContent))
routerConfig := &config.RouterConfig{}
err := routeDecoder.Decode(routerConfig)
if err != nil {
return nil, err
}
return routerConfig, nil
}
func (s *ScriptRouter) Process(event *config_center.ConfigChangeEvent) {
s.mu.Lock()
defer s.mu.Unlock()
rawConf, ok := event.Value.(string)
if !ok {
panic(ok)
}
cfg, err := parseRoute(rawConf)
if err != nil {
logger.Errorf("Parse route cfg failed: %v", err)
return
}
switch event.ConfigType {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
//destroy old instance
if s.enabled && s.scriptType != "" {
in, err := ins.GetInstances(s.scriptType)
if err != nil {
logger.Errorf("GetInstances failed to Destroy: %v", err)
} else {
in.Destroy(s.rawScript)
}
}
// check new config
if "" == cfg.ScriptType {
logger.Errorf("`type` field must be set in config")
return
}
if "" == cfg.Script {
logger.Errorf("`script` field must be set in config")
return
}
if "" == cfg.Key {
logger.Errorf("`applicationName` field must be set in config")
return
}
if !*cfg.Enabled {
logger.Infof("`enabled` field equiles false, this rule will be ignored :%s", cfg.Script)
}
// rewrite to ScriptRouter
s.enabled = *cfg.Enabled
s.rawScript = cfg.Script
s.scriptType = cfg.ScriptType
// compile script
in, err := ins.GetInstances(s.scriptType)
if err != nil {
logger.Errorf("GetInstances failed: %v", err)
s.enabled = false
return
}
if s.enabled {
err = in.Compile(s.rawScript)
// fail, disable rule
if err != nil {
s.enabled = false
logger.Errorf("Compile Script failed: %v", err)
}
}
case remoting.EventTypeDel:
in, _ := ins.GetInstances(s.scriptType)
if in != nil && s.enabled {
in.Destroy(s.rawScript)
}
s.enabled = false
s.rawScript = ""
s.scriptType = ""
}
}
func (s *ScriptRouter) runScript(scriptType, rawScript string, invokers []protocol.Invoker, invocation protocol.Invocation) ([]protocol.Invoker, error) {
in, err := ins.GetInstances(scriptType)
if err != nil {
return nil, err
}
return in.Run(rawScript, invokers, invocation)
}
func (s *ScriptRouter) Route(invokers []protocol.Invoker, _ *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if invokers == nil || len(invokers) == 0 {
return []protocol.Invoker{}
}
s.mu.RLock()
enabled, scriptType, rawScript := s.enabled, s.scriptType, s.rawScript
s.mu.RUnlock()
if enabled == false || s.scriptType == "" || s.rawScript == "" {
return invokers
}
res, err := s.runScript(scriptType, rawScript, invokers, invocation)
if err != nil {
logger.Warnf("ScriptRouter.Route error: %v", err)
}
return res
}
func (s *ScriptRouter) URL() *common.URL {
return nil
}
func (s *ScriptRouter) Priority() int64 {
return 0
}
func (s *ScriptRouter) Notify(invokers []protocol.Invoker) {
if len(invokers) == 0 {
return
}
url := invokers[0].GetURL()
if url == nil {
logger.Error("Failed to notify a dynamically Script rule, because url is empty")
return
}
dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
logger.Infof("Config center does not start, Script router will not be enabled")
return
}
providerApplication := url.GetParam("application", "")
if providerApplication == "" {
logger.Warn("Script router get providerApplication is empty, will not subscribe to provider app rules.")
return
}
var (
listenTarget, value string
err error
)
if providerApplication != s.applicationName {
if s.applicationName != "" {
dynamicConfiguration.RemoveListener(strings.Join([]string{s.applicationName, constant.ScriptRouterRuleSuffix}, ""), s)
}
listenTarget = strings.Join([]string{providerApplication, constant.ScriptRouterRuleSuffix}, "")
dynamicConfiguration.AddListener(listenTarget, s)
s.applicationName = providerApplication
value, err = dynamicConfiguration.GetRule(listenTarget)
if err != nil {
logger.Errorf("Failed to query Script rule, applicationName=%s, listening=%s, err=%v", s.applicationName, listenTarget, err)
}
s.Process(&config_center.ConfigChangeEvent{Key: listenTarget, Value: value, ConfigType: remoting.EventTypeUpdate})
}
}