pkg/discovery/etcd3.go (212 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package discovery
import (
"context"
"fmt"
etcd3 "go.etcd.io/etcd/client/v3"
"seata.apache.org/seata-go/pkg/util/log"
"strconv"
"strings"
"sync"
)
const (
clusterNameSplitChar = "-"
addressSplitChar = ":"
etcdClusterPrefix = "registry-seata"
)
type EtcdRegistryService struct {
client *etcd3.Client
cfg etcd3.Config
vgroupMapping map[string]string
grouplist map[string][]*ServiceInstance
rwLock sync.RWMutex
stopCh chan struct{}
}
func newEtcdRegistryService(config *ServiceConfig, etcd3Config *Etcd3Config) RegistryService {
if etcd3Config == nil {
log.Fatalf("etcd config is nil")
panic("etcd config is nil")
}
cfg := etcd3.Config{
Endpoints: []string{etcd3Config.ServerAddr},
}
cli, err := etcd3.New(cfg)
if err != nil {
log.Fatalf("failed to create etcd3 client")
panic("failed to create etcd3 client")
}
vgroupMapping := config.VgroupMapping
grouplist := make(map[string][]*ServiceInstance, 0)
etcdRegistryService := &EtcdRegistryService{
client: cli,
cfg: cfg,
vgroupMapping: vgroupMapping,
grouplist: grouplist,
stopCh: make(chan struct{}),
}
go etcdRegistryService.watch(etcdClusterPrefix)
return etcdRegistryService
}
func (s *EtcdRegistryService) watch(key string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := s.client.Get(ctx, key, etcd3.WithPrefix())
if err != nil {
log.Infof("cant get server instances from etcd")
}
if resp != nil {
for _, kv := range resp.Kvs {
k := kv.Key
v := kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key has an incorrect format: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value has an incorrect format: ", err)
return
}
s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
} else {
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
}
s.rwLock.Unlock()
}
}
// watch the changes of endpoints
watchCh := s.client.Watch(ctx, key, etcd3.WithPrefix())
for {
select {
case watchResp, ok := <-watchCh:
if !ok {
log.Warnf("Watch channel closed")
return
}
for _, event := range watchResp.Events {
switch event.Type {
case etcd3.EventTypePut:
log.Infof("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value)
k := event.Kv.Key
v := event.Kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value err: ", err)
return
}
s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
s.rwLock.Unlock()
continue
}
if ifHaveSameServiceInstances(s.grouplist[clusterName], serverInstance) {
s.rwLock.Unlock()
continue
}
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
s.rwLock.Unlock()
case etcd3.EventTypeDelete:
log.Infof("Key %s deleted.\n", event.Kv.Key)
cluster, ip, port, err := getClusterAndAddress(event.Kv.Key)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}
s.rwLock.Lock()
serviceInstances := s.grouplist[cluster]
if serviceInstances == nil {
log.Warnf("etcd doesnt exit cluster: ", cluster)
s.rwLock.Unlock()
continue
}
s.grouplist[cluster] = removeValueFromList(serviceInstances, ip, port)
s.rwLock.Unlock()
}
}
case <-s.stopCh:
log.Warn("stop etcd watch")
return
}
}
}
func getClusterName(key []byte) (string, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}
cluster := keySplit[2]
return cluster, nil
}
func getServerInstance(value []byte) (*ServiceInstance, error) {
stringValue := string(value)
valueSplit := strings.Split(stringValue, addressSplitChar)
if len(valueSplit) != 2 {
return nil, fmt.Errorf("etcd value has an incorrect format. value: %s", stringValue)
}
ip := valueSplit[0]
port, err := strconv.Atoi(valueSplit[1])
if err != nil {
return nil, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}
serverInstance := &ServiceInstance{
Addr: ip,
Port: port,
}
return serverInstance, nil
}
func getClusterAndAddress(key []byte) (string, string, int, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", "", 0, fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}
cluster := keySplit[2]
address := strings.Split(keySplit[3], addressSplitChar)
ip := address[0]
port, err := strconv.Atoi(address[1])
if err != nil {
return "", "", 0, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}
return cluster, ip, port, nil
}
func ifHaveSameServiceInstances(list []*ServiceInstance, value *ServiceInstance) bool {
for _, v := range list {
if v.Addr == value.Addr && v.Port == value.Port {
return true
}
}
return false
}
func removeValueFromList(list []*ServiceInstance, ip string, port int) []*ServiceInstance {
for k, v := range list {
if v.Addr == ip && v.Port == port {
result := list[:k]
if k < len(list)-1 {
result = append(result, list[k+1:]...)
}
return result
}
}
return list
}
func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
s.rwLock.RLock()
defer s.rwLock.RUnlock()
cluster := s.vgroupMapping[key]
if cluster == "" {
return nil, fmt.Errorf("cluster doesnt exit")
}
list := s.grouplist[cluster]
return list, nil
}
func (s *EtcdRegistryService) Close() {
s.stopCh <- struct{}{}
}