clients/zk_client/zk_client.go (167 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zk_client
import (
"encoding/json"
"github.com/apache/shenyu-client-golang/common/constants"
"github.com/apache/shenyu-client-golang/model"
"github.com/samuel/go-zookeeper/zk"
"github.com/sirupsen/logrus"
"time"
)
var (
logger = logrus.New()
)
/**
* ShenYuZkClient
**/
type ShenYuZkClient struct {
ZkClient *zk.Conn // ZkClient
Zcp *ZkClientParam //client param
}
/**
* ZkClientParam
**/
type ZkClientParam struct {
ZkServers []string // ZkServers ex: 127.0.0.1
ZkRoot string // zkClient Root
}
/**
* init NewClient
**/
func (zc *ShenYuZkClient) NewClient(clientParam interface{}) (client interface{}, createResult bool, err error) {
zcp, ok := clientParam.(*ZkClientParam)
if !ok {
logger.Fatalf("The clientParam must not nil!")
}
//client = new(ShenYuZkClient)
if len(zcp.ZkRoot) == 0 {
logger.Fatalf("The param zkRoot must set a value!")
}
conn, _, err := zk.Connect(zcp.ZkServers, time.Duration(constants.DEFAULT_ZOOKEEPER_CLIENT_TIME)*time.Second)
if err != nil {
if err := zc.ensureRoot(); err != nil {
zc.Close()
return &ShenYuZkClient{}, false, err
}
}
return &ShenYuZkClient{
Zcp: &ZkClientParam{
ZkRoot: zcp.ZkRoot,
ZkServers: zcp.ZkServers,
},
ZkClient: conn,
}, true, nil
}
/**
* DeregisterServiceInstance
**/
func (zc *ShenYuZkClient) DeregisterServiceInstance(metaData interface{}) (deRegisterResult bool, err error) {
mdr, ok := metaData.(*model.MetaDataRegister)
if !ok {
logger.Fatalf("get zk client metaData error %v:", err)
}
if err := zc.ensureName(mdr.AppName); err != nil {
return false, err
}
path := zc.Zcp.ZkRoot + "/" + mdr.AppName
childs, stat, err := zc.ZkClient.Children(path)
if err != nil {
return false, err
}
for _, child := range childs {
fullPath := path + "/" + child
err := zc.ZkClient.Delete(fullPath, stat.Version)
if err != nil {
return false, err
}
}
return true, nil
}
/**
* GetServiceInstanceInfo
**/
func (zc *ShenYuZkClient) GetServiceInstanceInfo(metaData interface{}) (instances interface{}, err error) {
mdr := zc.checkCommonParam(metaData, err)
path := zc.Zcp.ZkRoot + "/" + mdr.AppName
var nodes []*model.MetaDataRegister
data, _, err := zc.ZkClient.Get(path)
if err != nil {
logger.Fatalf("zk Get node failure, err %v:", err)
}
node := new(model.MetaDataRegister)
err = json.Unmarshal(data, node)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
return nodes, nil
}
/**
* GetEphemeralServiceInstanceInfo
**/
func (zc *ShenYuZkClient) GetEphemeralServiceInstanceInfo(metaData interface{}) (instances interface{}, err error) {
mdr := zc.checkCommonParam(metaData, err)
path := zc.Zcp.ZkRoot + "/" + mdr.AppName
childs, _, err := zc.ZkClient.Children(path)
if err != nil {
if err == zk.ErrNoNode {
return []*model.MetaDataRegister{}, nil //default return empty MetaDataRegister
}
return nil, err
}
var nodes []*model.MetaDataRegister
for _, child := range childs {
fullPath := path + "/" + child
data, _, err := zc.ZkClient.Get(fullPath)
if err != nil {
if err == zk.ErrNoNode {
continue
}
return nil, err
}
node := new(model.MetaDataRegister)
err = json.Unmarshal(data, node)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
return nodes, nil
}
/**
* RegisterNodeInstance zk node
**/
func (zc *ShenYuZkClient) RegisterServiceInstance(metaData interface{}) (registerResult bool, err error) {
mdr := zc.checkCommonParam(metaData, err)
err = zc.ensureRoot()
if err != nil {
logger.Fatalf("ensureRoot failure, err %v:", err)
}
path := zc.Zcp.ZkRoot + "/" + mdr.AppName
data, err := json.Marshal(metaData)
if err != nil {
return false, err
}
_, err = zc.ZkClient.Create(path, data, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return false, err
}
return true, nil
}
/**
* check common MetaDataRegister
**/
func (zc *ShenYuZkClient) checkCommonParam(metaData interface{}, err error) *model.MetaDataRegister {
mdr, ok := metaData.(*model.MetaDataRegister)
if !ok {
logger.Fatalf("get zk client metaData error %v:", err)
}
return mdr
}
/**
* close zkClient
**/
func (zc *ShenYuZkClient) Close() {
zc.ZkClient.Close()
}
/**
* ensure zkRoot avoid create error
**/
func (zc *ShenYuZkClient) ensureRoot() error {
exists, _, err := zc.ZkClient.Exists(zc.Zcp.ZkRoot)
if err != nil {
return err
}
if !exists {
_, err := zc.ZkClient.Create(zc.Zcp.ZkRoot, []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return err
}
}
return nil
}
/**
* ensure zkRoot&nodeName
**/
func (zc *ShenYuZkClient) ensureName(name string) error {
path := zc.Zcp.ZkRoot + "/" + name
logger.Infof("ensureName check, path is %v: ->", path)
exists, _, err := zc.ZkClient.Exists(path) //avoid create error
logger.Infof("ensureName check result is %v: ->", exists)
if err != nil {
return err
}
if !exists {
_, err = zc.ZkClient.Create(path, []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil && err == zk.ErrNodeExists {
logger.Infof("ensureName inner create success")
return nil
}
}
return nil
}