ext/datasource/helper.go (228 lines of code) (raw):

// Copyright 1999-2020 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package datasource import ( "encoding/json" "fmt" cb "github.com/alibaba/sentinel-golang/core/circuitbreaker" "github.com/alibaba/sentinel-golang/core/flow" "github.com/alibaba/sentinel-golang/core/hotspot" "github.com/alibaba/sentinel-golang/core/isolation" "github.com/alibaba/sentinel-golang/core/system" ) func checkSrcComplianceJson(src []byte) (bool, error) { if len(src) == 0 { return false, nil } return true, nil } // FlowRuleJsonArrayParser provide JSON as the default serialization for list of flow.Rule func FlowRuleJsonArrayParser(src []byte) (interface{}, error) { if valid, err := checkSrcComplianceJson(src); !valid { return nil, err } rules := make([]*flow.Rule, 0, 8) if err := json.Unmarshal(src, &rules); err != nil { desc := fmt.Sprintf("Fail to convert source bytes to []*flow.Rule, err: %s", err.Error()) return nil, NewError(ConvertSourceError, desc) } return rules, nil } // FlowRulesUpdater load the newest []flow.Rule to downstream flow component. func FlowRulesUpdater(data interface{}) error { if data == nil { return flow.ClearRules() } rules := make([]*flow.Rule, 0, 8) if val, ok := data.([]flow.Rule); ok { for _, v := range val { rules = append(rules, &v) } } else if val, ok := data.([]*flow.Rule); ok { rules = val } else { return NewError( UpdatePropertyError, fmt.Sprintf("Fail to type assert data to []flow.Rule or []*flow.Rule, in fact, data: %+v", data), ) } _, err := flow.LoadRules(rules) if err == nil { return nil } return NewError( UpdatePropertyError, fmt.Sprintf("%+v", err), ) } func NewFlowRulesHandler(converter PropertyConverter) PropertyHandler { return NewDefaultPropertyHandler(converter, FlowRulesUpdater) } // SystemRuleJsonArrayParser provide JSON as the default serialization for list of system.Rule func SystemRuleJsonArrayParser(src []byte) (interface{}, error) { if valid, err := checkSrcComplianceJson(src); !valid { return nil, err } rules := make([]*system.Rule, 0, 8) if err := json.Unmarshal(src, &rules); err != nil { desc := fmt.Sprintf("Fail to convert source bytes to []*system.Rule, err: %s", err.Error()) return nil, NewError(ConvertSourceError, desc) } return rules, nil } // SystemRulesUpdater load the newest []system.Rule to downstream system component. func SystemRulesUpdater(data interface{}) error { if data == nil { return system.ClearRules() } rules := make([]*system.Rule, 0, 8) if val, ok := data.([]system.Rule); ok { for _, v := range val { rules = append(rules, &v) } } else if val, ok := data.([]*system.Rule); ok { rules = val } else { return NewError( UpdatePropertyError, fmt.Sprintf("Fail to type assert data to []system.Rule or []*system.Rule, in fact, data: %+v", data), ) } _, err := system.LoadRules(rules) if err == nil { return nil } return NewError( UpdatePropertyError, fmt.Sprintf("%+v", err), ) } func NewSystemRulesHandler(converter PropertyConverter) *DefaultPropertyHandler { return NewDefaultPropertyHandler(converter, SystemRulesUpdater) } func CircuitBreakerRuleJsonArrayParser(src []byte) (interface{}, error) { if valid, err := checkSrcComplianceJson(src); !valid { return nil, err } rules := make([]*cb.Rule, 0, 8) if err := json.Unmarshal(src, &rules); err != nil { desc := fmt.Sprintf("Fail to convert source bytes to []*circuitbreaker.Rule, err: %s", err.Error()) return nil, NewError(ConvertSourceError, desc) } return rules, nil } // CircuitBreakerRulesUpdater load the newest []cb.Rule to downstream circuit breaker component. func CircuitBreakerRulesUpdater(data interface{}) error { if data == nil { return cb.ClearRules() } var rules []*cb.Rule if val, ok := data.([]*cb.Rule); ok { rules = val } else { return NewError( UpdatePropertyError, fmt.Sprintf("Fail to type assert data to []*circuitbreaker.Rule, in fact, data: %+v", data), ) } _, err := cb.LoadRules(rules) if err == nil { return nil } return NewError( UpdatePropertyError, fmt.Sprintf("%+v", err), ) } func NewCircuitBreakerRulesHandler(converter PropertyConverter) *DefaultPropertyHandler { return NewDefaultPropertyHandler(converter, CircuitBreakerRulesUpdater) } // HotSpotParamRuleJsonArrayParser decodes list of param flow rules from JSON bytes. func HotSpotParamRuleJsonArrayParser(src []byte) (interface{}, error) { if valid, err := checkSrcComplianceJson(src); !valid { return nil, err } hotspotRules := make([]*HotspotRule, 0, 8) if err := json.Unmarshal(src, &hotspotRules); err != nil { desc := fmt.Sprintf("Fail to convert source bytes to []*hotspot.Rule, err: %s", err.Error()) return nil, NewError(ConvertSourceError, desc) } rules := make([]*hotspot.Rule, len(hotspotRules)) for i, hotspotRule := range hotspotRules { rules[i] = &hotspot.Rule{ ID: hotspotRule.ID, Resource: hotspotRule.Resource, MetricType: hotspotRule.MetricType, ControlBehavior: hotspotRule.ControlBehavior, ParamIndex: hotspotRule.ParamIndex, Threshold: hotspotRule.Threshold, MaxQueueingTimeMs: hotspotRule.MaxQueueingTimeMs, BurstCount: hotspotRule.BurstCount, DurationInSec: hotspotRule.DurationInSec, ParamsMaxCapacity: hotspotRule.ParamsMaxCapacity, SpecificItems: parseSpecificItems(hotspotRule.SpecificItems), } } return rules, nil } // HotSpotParamRulesUpdater loads the provided hot-spot param rules to downstream rule manager. func HotSpotParamRulesUpdater(data interface{}) error { if data == nil { return hotspot.ClearRules() } rules := make([]*hotspot.Rule, 0, 8) if val, ok := data.([]hotspot.Rule); ok { for _, v := range val { rules = append(rules, &v) } } else if val, ok := data.([]*hotspot.Rule); ok { rules = val } else { return NewError( UpdatePropertyError, fmt.Sprintf("Fail to type assert data to []hotspot.Rule or []*hotspot.Rule, in fact, data: %+v", data), ) } _, err := hotspot.LoadRules(rules) if err == nil { return nil } return NewError( UpdatePropertyError, fmt.Sprintf("%+v", err), ) } func NewHotSpotParamRulesHandler(converter PropertyConverter) PropertyHandler { return NewDefaultPropertyHandler(converter, HotSpotParamRulesUpdater) } // IsolationRuleJsonArrayParser provide JSON as the default serialization for list of isolation.Rule func IsolationRuleJsonArrayParser(src []byte) (interface{}, error) { if valid, err := checkSrcComplianceJson(src); !valid { return nil, err } rules := make([]*isolation.Rule, 0, 8) if err := json.Unmarshal(src, &rules); err != nil { desc := fmt.Sprintf("Fail to convert source bytes to []*isolation.Rule, err: %s", err.Error()) return nil, NewError(ConvertSourceError, desc) } return rules, nil } // IsolationRulesUpdater load the newest []isolation.Rule to downstream system component. func IsolationRulesUpdater(data interface{}) error { if data == nil { return isolation.ClearRules() } rules := make([]*isolation.Rule, 0, 8) if val, ok := data.([]isolation.Rule); ok { for _, v := range val { rules = append(rules, &v) } } else if val, ok := data.([]*isolation.Rule); ok { rules = val } else { return NewError( UpdatePropertyError, fmt.Sprintf("Fail to type assert data to []isolation.Rule or []*isolation.Rule, in fact, data: %+v", data), ) } _, err := isolation.LoadRules(rules) if err == nil { return nil } return NewError( UpdatePropertyError, fmt.Sprintf("%+v", err), ) } func NewIsolationRulesHandler(converter PropertyConverter) *DefaultPropertyHandler { return NewDefaultPropertyHandler(converter, IsolationRulesUpdater) }