python/rocketmq/v5/client/client_configuration.py (82 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
from rocketmq.grpc_protocol import AddressScheme, Endpoints
from rocketmq.v5.client.connection import RpcEndpoints
from rocketmq.v5.log import logger
class Credentials:
def __init__(self, ak="", sk=""):
self.__ak = ak if ak is not None else ""
self.__sk = sk if sk is not None else ""
@property
def ak(self):
return self.__ak
@property
def sk(self):
return self.__sk
class ClientConfiguration:
def __init__(
self, endpoints: str, credentials: Credentials, namespace="", request_timeout=3
):
self.__rpc_endpoints = RpcEndpoints(
ClientConfiguration.__parse_endpoints(endpoints)
)
self.__credentials = credentials
self.__request_timeout = request_timeout # seconds
self.__namespace = namespace
@staticmethod
def __parse_endpoints(endpoints_str):
if len(endpoints_str) == 0:
return None
else:
try:
endpoints = Endpoints()
addresses = endpoints_str.split(";")
endpoints.scheme = ClientConfiguration.__parse_endpoints_scheme_type(
ClientConfiguration.__parse_endpoints_prefix(
addresses[0].split(":")[0]
)
)
for address in addresses:
if len(address) == 0:
continue
ad = endpoints.addresses.add()
address = ClientConfiguration.__parse_endpoints_prefix(address)
ad.host = address.split(":")[0]
ad.port = int(address.split(":")[1])
return endpoints
except Exception as e:
logger.error(
f"client configuration parse {endpoints_str} exception: {e}"
)
return None
@staticmethod
def __parse_endpoints_scheme_type(host):
try:
socket.inet_pton(socket.AF_INET, host)
return AddressScheme.IPv4
except socket.error:
try:
socket.inet_pton(socket.AF_INET6, host)
return AddressScheme.IPv6
except socket.error:
return AddressScheme.DOMAIN_NAME
@staticmethod
def __parse_endpoints_prefix(endpoints_str):
http_prefix = "http://"
https_prefix = "https://"
if endpoints_str.startswith(http_prefix):
return endpoints_str[len(http_prefix):]
elif endpoints_str.startswith(https_prefix):
return endpoints_str[len(https_prefix):]
return endpoints_str
""" property """
@property
def rpc_endpoints(self) -> RpcEndpoints:
return self.__rpc_endpoints
@property
def namespace(self):
return self.__namespace
@property
def credentials(self):
return self.__credentials
@property
def request_timeout(self):
return self.__request_timeout