internal/discovery/group_manager.go (217 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package discovery
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"sync"
"github.com/alibaba/schedulerx-worker-go/internal/common"
"github.com/alibaba/schedulerx-worker-go/internal/constants"
"github.com/alibaba/schedulerx-worker-go/internal/openapi"
"github.com/alibaba/schedulerx-worker-go/logger"
)
const (
appGroupIdURL = "/worker/v1/appgroup/getId"
appGroupURL = "/worker/v1/appgroup/get"
)
var (
once sync.Once
groupManager *GroupManager
triggerChan = make(chan *TriggerEvent, 1000)
)
type TriggerEvent struct {
ChildGroupId string
ChildAppKey string
ParentGroupId string
}
type GroupManager struct {
groupId2AppGroupIdMap sync.Map
groupId2AppKeyMap sync.Map
groupId2ParentAppGroupMap map[string]*common.AppGroupInfo
parentGroupId2CountMap map[string]int
client *openapi.Client
stopCh chan struct{}
}
func GetGroupManager() *GroupManager {
once.Do(func() {
groupManager = newGroupManager(openapi.GetOpenAPIClient())
})
return groupManager
}
func (g *GroupManager) GroupId2AppGroupIdMap() map[string]int64 {
normalMap := make(map[string]int64)
g.groupId2AppGroupIdMap.Range(func(key, value interface{}) bool {
k, v := key.(string), value.(int64)
normalMap[k] = v
return true
})
return normalMap
}
func (g *GroupManager) groupExist(groupId string) bool {
_, ok := g.groupId2AppGroupIdMap.Load(groupId)
return ok
}
func (g *GroupManager) appendGroupId(groupId, parentGroupId, appKey string) error {
appGroupId, err := g.getAppGroupId(groupId, appKey)
if err != nil {
return fmt.Errorf("groupId=%s is not exist, namespace=%s %w", groupId, g.client.Namespace(), err)
}
g.groupId2AppGroupIdMap.Store(groupId, appGroupId)
// get config of the parent application group
parentAppGroup, err := g.getAppGroup(groupId, appKey)
if err != nil {
return fmt.Errorf("getAppGroup failed, groupId=%s, namespace=%s, err=%s", groupId, g.client.Namespace(), err.Error())
}
if parentAppGroup != nil {
g.groupId2ParentAppGroupMap[groupId] = parentAppGroup
}
if _, ok := g.parentGroupId2CountMap[parentGroupId]; ok {
count := g.parentGroupId2CountMap[parentGroupId]
g.parentGroupId2CountMap[parentGroupId] = count + 1
} else {
g.parentGroupId2CountMap[parentGroupId] = 1
}
return nil
}
func (g *GroupManager) getAppGroupId(groupId, appKey string) (int64, error) {
if len(g.client.Domain()) == 0 {
return 0, errors.New("domain missing")
}
var urlStr string
if len(g.client.Namespace()) > 0 {
urlStr = fmt.Sprintf("http://%s%s?groupId=%s&namespace=%s&appKey=%s", g.client.Domain(), appGroupIdURL, groupId, g.client.Namespace(), url.QueryEscape(appKey))
if len(g.client.NamespaceSource()) > 0 {
urlStr += "&namespaceSource=" + g.client.NamespaceSource()
}
} else {
urlStr = fmt.Sprintf("http://%s%s?groupId=%s&appKeys=%s", g.client.Domain(), appGroupIdURL, groupId, appKey)
}
resp, err := g.client.HttpClient().Get(urlStr)
if err != nil {
return 0, fmt.Errorf("request appGroupId failed, groupId:%s, err:%s", groupId, err.Error())
}
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("request appGroupId failed, groupId:%s, status:%d", groupId, resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("request appGroupId failed, groupId:%s, read body error:%s", groupId, err.Error())
}
defer resp.Body.Close()
var respData struct {
Success bool `json:"success"`
Message string `json:"message"`
Data string `json:"data"`
}
err = json.Unmarshal(body, &respData)
if err != nil {
return 0, fmt.Errorf("request appGroupId failed, groupId:%s, body:%s unmarshal body error:%s", groupId, string(body), err.Error())
}
if !respData.Success {
return 0, fmt.Errorf("request appGroupId failed, groupId:%s, message:%s", groupId, respData.Message)
}
ret, err := strconv.ParseInt(respData.Data, 10, 64)
if err != nil {
return 0, fmt.Errorf("request appGroupId failed, data expect int, but got=%s", respData.Data)
}
return ret, nil
}
func (g *GroupManager) putGroupId2AppKeyMap(groupId, appKey string) {
g.groupId2AppKeyMap.Store(groupId, appKey)
}
func (g *GroupManager) GetAppKeyByGroupId(groupId string) string {
ret, ok := g.groupId2AppKeyMap.Load(groupId)
if ok {
return ret.(string)
}
return ""
}
func (g *GroupManager) StartServerDiscovery(groupId, appKey string) {
if g.groupExist(groupId) {
return
}
discovery := GetDiscovery(groupId)
discovery.refreshActiveServer(groupId, appKey)
go discovery.Start(groupId, appKey)
go discovery.Stop(g.stopCh)
err := g.appendGroupId(groupId, groupId, appKey)
if err != nil {
logger.Errorf("appendGroupId error %s", err.Error())
}
g.putGroupId2AppKeyMap(groupId, appKey)
}
func (g *GroupManager) Stop() {
g.stopCh <- struct{}{}
}
func newGroupManager(client *openapi.Client) *GroupManager {
instance := &GroupManager{
groupId2AppGroupIdMap: sync.Map{},
groupId2AppKeyMap: sync.Map{},
groupId2ParentAppGroupMap: make(map[string]*common.AppGroupInfo),
parentGroupId2CountMap: make(map[string]int),
client: client,
stopCh: make(chan struct{}),
}
go func() {
for event := range triggerChan {
logger.Infof("receive trigger event childGroupId: %s parentGroupId:%s", event.ChildGroupId, event.ParentGroupId)
instance.StartServerDiscovery(event.ChildGroupId, event.ChildAppKey)
}
}()
return instance
}
func (g *GroupManager) getParentAppGroup(groupId string) *common.AppGroupInfo {
return g.groupId2ParentAppGroupMap[groupId]
}
func (g *GroupManager) IsAdvancedVersion(groupId string) bool {
if appGroupInfo := g.getParentAppGroup(groupId); appGroupInfo != nil {
return appGroupInfo.GetVersion() == int32(constants.Advanced)
}
return false
}
func (g *GroupManager) getAppGroup(groupId, appKey string) (*common.AppGroupInfo, error) {
var (
urlStr string
domain = g.client.Domain()
namespace = g.client.Namespace()
namespaceSource = g.client.NamespaceSource()
)
if domain == "" {
return nil, errors.New("domain missing")
}
if namespace != "" {
urlStr = fmt.Sprintf("http://%s%s?groupId=%s&namespace=%s&appKey=%s", domain, appGroupURL, groupId, namespace, url.QueryEscape(appKey))
if namespaceSource != "" {
urlStr += fmt.Sprintf("&namespaceSource=%s", namespaceSource)
}
} else {
urlStr = fmt.Sprintf("http://%s%s?groupId=%s&appKeys=%s", domain, appGroupURL, groupId, appKey)
}
resp, err := g.client.HttpClient().Get(urlStr)
if err != nil {
return nil, fmt.Errorf("request appGroup failed, groupId:%s, err:%s", groupId, err.Error())
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request appGroup failed, groupId:%s, status:%d", groupId, resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("request appGroup failed, groupId:%s, read body error:%s", groupId, err.Error())
}
defer resp.Body.Close()
var respData struct {
Success bool `json:"success"`
Message string `json:"message"`
Data *common.AppGroupInfo `json:"data"`
}
err = json.Unmarshal(body, &respData)
if err != nil {
return nil, fmt.Errorf("request appGroup failed, groupId:%s, body:%s unmarshal body error:%s", groupId, string(body), err.Error())
}
if !respData.Success {
return nil, fmt.Errorf("request appGroup failed, groupId:%s, message:%s", groupId, respData.Message)
}
return respData.Data, nil
}