client/apache_shenyu_client/api.py (249 lines of code) (raw):
# -*- coding: utf-8 -*-
"""
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
"""
import requests
from requests.exceptions import (ReadTimeout, RequestException, ConnectTimeout)
from .config import GatewayConfig, ALL_ENV
from .exception import (EnvTypeExp, SetUpUriExp, SetUpRegisterExp, SetUpGatewayExp, GetRegisterTokenErr)
__all__ = ["GatewayProxy"]
class GatewayProxy(object):
SYS_DEFAULT_NAMESPACE_ID = "649330b6-c2d7-4edc-be8e-8a54df9eb385"
"""
gateway proxy class
"""
def __init__(self):
self.headers = {"Content-Type": "application/json;charset=UTF-8"}
self.env = GatewayConfig.uri.get("environment")
if not isinstance(self.env, str) or self.env not in ALL_ENV:
raise EnvTypeExp(env=self.env)
self.register_token = None
self._set_up_gateway_service_url()
self._setup_uri_params()
self._setup_register_params()
self._setup_register_discovery_config()
self._get_register_token()
if not self.register_token:
raise GetRegisterTokenErr(msg="can't get register token")
else:
self.headers.update({"X-Access-Token": self.register_token})
def _set_up_gateway_service_url(self):
try:
self.gateway_base_urls = GatewayConfig.__dict__.get(self.env, {}).get("servers", "").split(",")
self.port = GatewayConfig.__dict__.get(self.env, {}).get("port")
url_pre = "http://{}:{}"
self.gateway_base_urls = [url_pre.format(_url, self.port) for _url in self.gateway_base_urls]
self.register_meta_data_suffix = "/shenyu-client/register-metadata"
self.register_uri_suffix = "/shenyu-client/register-uri"
self.register_discovery_config_suffix = "/shenyu-client/register-discoveryConfig"
self.register_offline_suffix = "/shenyu-client/offline"
self.register_meta_data_path_list = [_url + self.register_meta_data_suffix for _url in
self.gateway_base_urls]
self.register_uri_list = [_url + self.register_uri_suffix for _url in self.gateway_base_urls]
self.register_discovery_config_list = [_url + self.register_discovery_config_suffix for _url in
self.gateway_base_urls]
except SetUpGatewayExp as sue:
raise SetUpUriExp(app_name=GatewayConfig.uri.get("app_name"), msg=str(sue), env=self.env)
def _setup_uri_params(self):
"""
setup uri params
"""
try:
self.host = GatewayConfig.uri.get("host")
self.port = GatewayConfig.uri.get("port")
self.app_name = GatewayConfig.uri.get("app_name")
self.rpc_type = GatewayConfig.uri.get("rpc_type")
self.context_path = GatewayConfig.uri.get("context_path")
self.register_type = GatewayConfig.register.get("register_type")
self.register_servers = GatewayConfig.register.get("register_servers")
except SetUpUriExp as se:
raise SetUpUriExp(app_name=GatewayConfig.uri.get("app_name"), msg=str(se), env=self.env)
def _setup_register_params(self):
"""
setup register params
"""
try:
self.register_token_type = GatewayConfig.register.get("register_type")
self.register_base_servers = GatewayConfig.register.get("servers").split(",")
self.register_namespace_id = GatewayConfig.register.get("namespace_id")
self.register_namespace_id = self.register_namespace_id.split(";") if self.register_namespace_id else [
self.SYS_DEFAULT_NAMESPACE_ID]
self.register_path = "/platform/login"
self.register_token_servers = [_url + self.register_path for _url in self.register_base_servers]
self.register_username = GatewayConfig.register.get("props", {}).get("username")
self.register_password = GatewayConfig.register.get("props", {}).get("password")
except SetUpRegisterExp as se:
raise SetUpRegisterExp(app_name=GatewayConfig.uri.get("app_name"), msg=str(se), env=self.env)
def _setup_register_discovery_config(self):
"""
setup register discovery config
"""
try:
self.discovery_type = GatewayConfig.discovery_config.get("discovery_type")
self.discovery_server_lists = GatewayConfig.discovery_config.get("server_lists")
self.discovery_register_path = GatewayConfig.discovery_config.get("register_path")
self.discovery_plugin_name = GatewayConfig.discovery_config.get("plugin_name")
self.discovery_props = GatewayConfig.discovery_config.get("props")
except SetUpRegisterExp as se:
raise SetUpRegisterExp(app_name=GatewayConfig.uri.get("app_name"), msg=str(se), env=self.env)
def _request(self, url, json_data):
"""
base post request
"""
if not url or not isinstance(url, str) or not isinstance(json_data, dict):
print("_request url or data format error")
return False
try:
res = requests.post(url, json=json_data, headers=self.headers, timeout=5)
status_code = res.status_code
msg = res.text
except ConnectTimeout as ce:
print("connect timeout, detail is:{}".format(str(ce)))
return False
except ReadTimeout as rte:
print("read time out, detail is:{}".format(str(rte)))
return False
except RequestException as rqe:
print("request except, detail is:{}".format(str(rqe)))
return False
except Exception as e:
print("request ({}) except, detail is:{}".format(url, str(e)))
return False
else:
# According to the interface return value of the gateway registry, the request is considered successful
# only when msg==success; if the interface return value of the gateway registry changes, the judgment
# method should also be modified
if msg == "success":
return True
print("request ({}) fail, status code is:{}, msg is:{}".format(res.url, status_code, msg))
return False
def _get_register_token(self):
"""
base get http request
"""
default_res = ""
params = {
"userName": self.register_username,
"password": self.register_password
}
try:
for url in self.register_token_servers:
res = requests.get(url, params=params, timeout=5)
status_code = res.status_code
res_data = res.json()
token = res_data.get("data", {}).get("token", "")
if token:
self.register_token = token
break
except ConnectTimeout as ce:
print("connect timeout, detail is:{}".format(str(ce)))
return False
except ReadTimeout as rte:
print("read time out, detail is:{}".format(str(rte)))
return False
except RequestException as rqe:
print("request except, detail is:{}".format(str(rqe)))
return False
except Exception as e:
print("get register token except, detail is:{}".format(str(e)))
return False
def register_uri(self):
"""
register uri
"""
json_data = {
"appName": self.app_name,
"contextPath": self.context_path,
"rpcType": self.rpc_type,
"namespaceId": self.register_namespace_id[0],
"host": self.host,
"port": self.port
}
register_flag = False
for _url in self.register_uri_list:
for _namespace in self.register_namespace_id:
if not _namespace:
continue
json_data["namespaceId"] = _namespace
res = self._request(_url, json_data)
if not res:
continue
else:
print("[SUCCESS], register uri success, register data is:{}".format(str(json_data)))
register_flag = True
break
if not register_flag:
print("[ERROR], register uri fail, app_name is:{}, host is:{}, port is:{}".format(self.app_name,
self.host,
self.port))
return register_flag
def register_metadata(self, **kwargs):
"""
register path to gateway
path: The path needs to be unique, for example, your path is: /order/findById, your request prefix
is: /hello, the path must be /hello/order/findById
register_all Register all paths ?
rule_name: Can be the same as path
enabled: Whether to open, If you want to open the gateway proxy, you must fill in True
path_desc: Path description, optional filling
register_meta_data: Need to register metadata, not for http request, fill in false
"""
if not kwargs.get("register_all") and not kwargs.get("path"):
return False
register_all = kwargs.get("register_all", False)
path = kwargs.get("path", "")
rule_name = kwargs.get("rule_name", "")
enabled = kwargs.get("enabled", True)
path_desc = kwargs.get("path_desc", "")
register_meta_data = kwargs.get("register_meta_data", False)
if register_all:
path = self.context_path + "**" if self.context_path.endswith("/") else self.context_path + "/**"
rule_name = path if not rule_name else rule_name
json_data = {
"appName": self.app_name,
"contextPath": self.context_path,
"namespaceId": self.register_namespace_id[0],
"path": path,
"pathDesc": path_desc,
"rpcType": self.rpc_type,
"ruleName": rule_name,
"enabled": enabled,
"registerMetaData": register_meta_data,
"pluginNames": []
}
register_flag = False
for _url in self.register_meta_data_path_list:
for _namespace in self.register_namespace_id:
if not _namespace:
continue
json_data["namespaceId"] = _namespace
res = self._request(_url, json_data)
if not res:
continue
else:
print("[SUCCESS], register metadata success, register data is:{}".format(str(json_data)))
register_flag = True
break
if not register_flag:
print("[ERROR],register metadata fail, app_name:{}, path:{}, contextPath:{}".format(self.app_name,
path,
self.context_path))
return register_flag
def register_discovery_config(self, **kwargs):
"""
register discovery config
"""
json_data = {
"name": "default" + self.discovery_type,
"selectorName": self.context_path,
"handler": {},
"listenerNode": self.discovery_register_path,
"serverList": self.register_servers,
"discoveryType": self.discovery_type,
"pluginName": self.discovery_plugin_name,
"props": self.discovery_props
}
register_flag = False
for _url in self.register_discovery_config_suffix:
for _namespace in self.register_namespace_id:
if not _namespace:
continue
json_data["namespaceId"] = _namespace
res = self._request(_url, json_data)
if not res:
continue
else:
print("[SUCCESS], register discovery config success, register data is:{}".format(str(json_data)))
register_flag = True
break
if not register_flag:
print("[ERROR],register discovery config fail, app_name:{}, contextPath:{}".format(self.app_name,
self.context_path))
return register_flag
def offline_register(self):
"""
offline register
let json_data = serde_json::json!({
"appName": app_name,
"contextPath": context_path,
"protocol": rpc_type,
"host": host.clone().unwrap(),
"port": port,
"namespaceId": namespace_id,
"eventType": EventType::REGISTER.to_string(),
});
"""
json_data = {
"appName": self.app_name,
"contextPath": self.context_path,
"rpcType": self.rpc_type,
"protocol": self.rpc_type,
"host": self.host,
"port": self.port,
"namespaceId": self.register_namespace_id[0],
"eventType": "OFFLINE"
}
register_flag = False
for _url in self.register_offline_suffix:
for _namespace in self.register_namespace_id:
if not _namespace:
continue
json_data["namespaceId"] = _namespace
res = self._request(_url, json_data)
if not res:
continue
else:
print("[SUCCESS], offline register success, register data is:{}".format(str(json_data)))
register_flag = True
break
if not register_flag:
print("[ERROR],offline register fail, app_name:{}, contextPath:{}".format(self.app_name,
self.context_path))
return register_flag