darabonba/runtime.py (169 lines of code) (raw):
from darabonba.core import DaraModel
from typing import Dict
from darabonba.policy.retry import RetryOptions
class ExtendsParameters(DaraModel):
def __init__(
self,
headers: Dict[str, str] = None,
queries: Dict[str, str] = None,
):
self.headers = headers
self.queries = queries
def validate(self):
pass
def to_map(self):
_map = super().to_map()
if _map is not None:
return _map
result = dict()
if self.headers is not None:
result['headers'] = self.headers
if self.queries is not None:
result['queries'] = self.queries
return result
def from_map(self, m: dict = None):
m = m or dict()
if m.get('headers') is not None:
self.headers = m.get('headers')
if m.get('queries') is not None:
self.queries = m.get('queries')
return self
class RuntimeOptions(DaraModel):
"""
The common runtime options model
"""
def __init__(
self,
retry_options: RetryOptions = None,
autoretry: bool = None,
ignore_ssl: bool = None,
max_attempts: int = None,
backoff_policy: str = None,
backoff_period: int = None,
read_timeout: int = None,
connect_timeout: int = None,
http_proxy: str = None,
https_proxy: str = None,
no_proxy: str = None,
max_idle_conns: int = None,
local_addr: str = None,
socks_5proxy: str = None,
socks_5net_work: str = None,
keep_alive: bool = None,
key: str = None,
cert: str = None,
ca: str = None,
extends_parameters: ExtendsParameters = None,
):
# retry options
self.retry_options = retry_options
# whether to try again
self.autoretry = autoretry
# ignore SSL validation
self.ignore_ssl = ignore_ssl
# privite key for client certificate
self.key = key
# client certificate
self.cert = cert
# server certificate
self.ca = ca
# maximum number of retries
self.max_attempts = max_attempts
# backoff policy
self.backoff_policy = backoff_policy
# backoff period
self.backoff_period = backoff_period
# read timeout
self.read_timeout = read_timeout
# connect timeout
self.connect_timeout = connect_timeout
# http proxy url
self.http_proxy = http_proxy
# https Proxy url
self.https_proxy = https_proxy
# agent blacklist
self.no_proxy = no_proxy
# maximum number of connections
self.max_idle_conns = max_idle_conns
# local addr
self.local_addr = local_addr
# SOCKS5 proxy
self.socks_5proxy = socks_5proxy
# SOCKS5 netWork
self.socks_5net_work = socks_5net_work
# whether to enable keep-alive
self.keep_alive = keep_alive
# Extends Parameters
self.extends_parameters = extends_parameters
def validate(self):
if self.retry_options:
self.retry_options.validate()
if self.extends_parameters:
self.extends_parameters.validate()
def to_map(self):
_map = super().to_map()
if _map is not None:
return _map
result = dict()
if self.retry_options is not None:
result['retryOptions'] = self.retry_options.to_map()
if self.autoretry is not None:
result['autoretry'] = self.autoretry
if self.ignore_ssl is not None:
result['ignoreSSL'] = self.ignore_ssl
if self.key is not None:
result['key'] = self.key
if self.cert is not None:
result['cert'] = self.cert
if self.ca is not None:
result['ca'] = self.ca
if self.max_attempts is not None:
result['max_attempts'] = self.max_attempts
if self.backoff_policy is not None:
result['backoff_policy'] = self.backoff_policy
if self.backoff_period is not None:
result['backoff_period'] = self.backoff_period
if self.read_timeout is not None:
result['readTimeout'] = self.read_timeout
if self.connect_timeout is not None:
result['connectTimeout'] = self.connect_timeout
if self.http_proxy is not None:
result['httpProxy'] = self.http_proxy
if self.https_proxy is not None:
result['httpsProxy'] = self.https_proxy
if self.no_proxy is not None:
result['noProxy'] = self.no_proxy
if self.max_idle_conns is not None:
result['maxIdleConns'] = self.max_idle_conns
if self.local_addr is not None:
result['localAddr'] = self.local_addr
if self.socks_5proxy is not None:
result['socks5Proxy'] = self.socks_5proxy
if self.socks_5net_work is not None:
result['socks5NetWork'] = self.socks_5net_work
if self.keep_alive is not None:
result['keepAlive'] = self.keep_alive
if self.extends_parameters is not None:
result['extendsParameters'] = self.extends_parameters.to_map()
return result
def from_map(self, m: dict = None):
m = m or dict()
if m.get('retryOptions') is not None:
self.retry_options = RetryOptions.from_map(m.get('retryOptions'))
if m.get('autoretry') is not None:
self.autoretry = m.get('autoretry')
if m.get('ignoreSSL') is not None:
self.ignore_ssl = m.get('ignoreSSL')
if m.get('key') is not None:
self.key = m.get('key')
if m.get('cert') is not None:
self.cert = m.get('cert')
if m.get('ca') is not None:
self.ca = m.get('ca')
if m.get('max_attempts') is not None:
self.max_attempts = m.get('max_attempts')
if m.get('backoff_policy') is not None:
self.backoff_policy = m.get('backoff_policy')
if m.get('backoff_period') is not None:
self.backoff_period = m.get('backoff_period')
if m.get('readTimeout') is not None:
self.read_timeout = m.get('readTimeout')
if m.get('connectTimeout') is not None:
self.connect_timeout = m.get('connectTimeout')
if m.get('httpProxy') is not None:
self.http_proxy = m.get('httpProxy')
if m.get('httpsProxy') is not None:
self.https_proxy = m.get('httpsProxy')
if m.get('noProxy') is not None:
self.no_proxy = m.get('noProxy')
if m.get('maxIdleConns') is not None:
self.max_idle_conns = m.get('maxIdleConns')
if m.get('localAddr') is not None:
self.local_addr = m.get('localAddr')
if m.get('socks5Proxy') is not None:
self.socks_5proxy = m.get('socks5Proxy')
if m.get('socks5NetWork') is not None:
self.socks_5net_work = m.get('socks5NetWork')
if m.get('keepAlive') is not None:
self.keep_alive = m.get('keepAlive')
if m.get('extendsParameters') is not None:
temp_model = ExtendsParameters()
self.extends_parameters = temp_model.from_map(m['extendsParameters'])
return self