internal/discovery/discover.go (123 lines of code) (raw):
/*
* Copyright (c) 2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package discovery
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"
"go.uber.org/atomic"
"github.com/alibaba/schedulerx-worker-go/internal/openapi"
"github.com/alibaba/schedulerx-worker-go/logger"
)
const (
ServiceDiscoverInterval = 5 * time.Second
ActiveServerQueryURI = "/worker/v1/appgroup/getLeaderAddr"
)
const GroupHasChild = 300
type ServiceDiscover struct {
timer *time.Ticker
client *openapi.Client
activeServer atomic.String
stopCh chan struct{}
changedCh chan struct{}
}
func NewServiceDiscovery() *ServiceDiscover {
return &ServiceDiscover{
timer: time.NewTicker(ServiceDiscoverInterval),
client: openapi.GetOpenAPIClient(),
stopCh: make(chan struct{}),
changedCh: make(chan struct{}, 1),
}
}
func (s *ServiceDiscover) refreshActiveServer(groupId, appKey string) {
activeServerAddr, err := s.queryActiveServer(groupId, appKey)
if err != nil {
logger.Errorf("query active server from console failed err:%s", err.Error())
return
}
if len(activeServerAddr) > 0 && activeServerAddr != s.activeServer.String() {
logger.Infof("[ServerDiscovery]: active server change from [%s] to [%s], groupId=%s, namespace=%s, namespaceSource=%s",
s.activeServer.String(), activeServerAddr, groupId, s.client.Namespace(), s.client.NamespaceSource())
s.activeServer.Store(activeServerAddr)
s.changedCh <- struct{}{}
}
logger.Debugf("active server: %s", s.activeServer)
}
func (s *ServiceDiscover) Start(groupId, appKey string) {
for {
select {
case <-s.timer.C:
s.refreshActiveServer(groupId, appKey)
case <-s.stopCh:
logger.Infof("receive stop signal")
return
}
}
}
func (s *ServiceDiscover) queryActiveServer(groupId, appKey string) (string, error) {
urlStr := fmt.Sprintf("http://%s%s?groupId=%s&appKey=%s", s.client.Domain(), ActiveServerQueryURI, groupId, url.QueryEscape(appKey))
if len(s.client.Namespace()) > 0 {
urlStr += "&namespace=" + s.client.Namespace()
}
if len(s.client.NamespaceSource()) > 0 {
urlStr += "&namespaceSource=" + s.client.NamespaceSource()
}
urlStr += "&enableScale=true"
resp, err := s.client.HttpClient().Get(urlStr)
if err != nil {
return "", fmt.Errorf("http.Get error %w", err)
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("http.Get statusCode %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("read body error %w", err)
}
defer resp.Body.Close()
var respData struct {
Success bool
RequestId string
Message string
Code int
Data interface{}
}
err = json.Unmarshal(body, &respData)
if err != nil {
return "", fmt.Errorf("unmarshal resp body[%s] fail %w, url=%s", string(body), err, urlStr)
}
if !respData.Success {
return "", fmt.Errorf("result is not success requestId:%s message:%s, url=%s", respData.RequestId, respData.Message, urlStr)
}
if respData.Code != GroupHasChild {
return respData.Data.(string), nil
}
// This application group has enabled automatic scaling and has split child nodes, requiring the parsing of all groupIds, and register serverDiscovery.
var groupResult struct {
CurrentLeaderAddr string
GroupIdMap map[string]string // key=groupId, val=appKey
}
err = json.Unmarshal([]byte(respData.Data.(string)), &groupResult)
if err != nil {
return "", fmt.Errorf("unmarshal group result[%s] fail %w", respData.Data, err)
}
for childGroupId, childAppKey := range groupResult.GroupIdMap {
triggerChan <- &TriggerEvent{
ChildGroupId: childGroupId,
ChildAppKey: childAppKey,
ParentGroupId: groupId,
}
}
return groupResult.CurrentLeaderAddr, nil
}
func (s *ServiceDiscover) ActiveServer() string {
return s.activeServer.String()
}
func (s *ServiceDiscover) Stop(stop <-chan struct{}) {
<-stop
s.stopCh <- struct{}{}
s.timer.Stop()
}
func (s *ServiceDiscover) ResultChangedCh() chan struct{} {
return s.changedCh
}