in pypubsub.py [0:0]
def __init__(self, yml: dict):
# LDAP Settings
self.ldap = None
lyml = yml.get('clients', {}).get('ldap')
if isinstance(lyml, dict):
self.ldap = plugins.ldap.LDAPConnection(lyml)
# SQS?
self.sqs = yml.get('sqs')
# Main server config
server_ip = yml['server'].get('bind', PUBSUB_DEFAULT_IP)
server_port = int(yml['server'].get('port', PUBSUB_DEFAULT_PORT))
server_payload_limit = int(yml['server'].get('max_payload_size', PUBSUB_DEFAULT_MAX_PAYLOAD_SIZE))
tls_port = 0
tls_ctx = None
# TLS support, if configured
if 'tls' in yml['server'] and isinstance(yml['server']['tls'], dict):
for required_element in ("port", "cert", "key", ):
assert yml['server']['tls'].get(required_element), f"TLS: configuration option '{required_element}' is missing or invalid, cannot enable TLS!"
import ssl
tls_port = int(yml['server']['tls']['port'])
# Create TLS context and load cert+key
tls_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
assert os.path.isfile(yml['server']['tls']['cert']), f"Could not locate domain certificate file {yml['server']['tls']['cert']}"
assert os.path.isfile(yml['server']['tls']['key']), f"Could not locate domain certificate key {yml['server']['tls']['key']}"
tls_ctx.load_cert_chain(yml['server']['tls']['cert'], yml['server']['tls']['key'])
# Add intermediate cert chain, if available
if 'chain' in yml['server']['tls']:
assert os.path.isfile(yml['server']['tls']['chain']), f"Could not locate domain certificate chain {yml['server']['tls']['chain']}"
tls_ctx.load_verify_locations(yml['server']['tls']['chain'])
self.server = ServerConfig(ip=server_ip, port=server_port, payload_limit=server_payload_limit, tls_port=tls_port, tls_ctx=tls_ctx)
# Backlog settings
bma = yml['server'].get('backlog', {}).get('max_age', PUBSUB_DEFAULT_BACKLOG_AGE)
if isinstance(bma, str):
bma = bma.lower()
if bma.endswith('s'):
bma = int(bma.replace('s', ''))
elif bma.endswith('m'):
bma = int(bma.replace('m', '')) * 60
elif bma.endswith('h'):
bma = int(bma.replace('h', '')) * 3600
elif bma.endswith('d'):
bma = int(bma.replace('d', '')) * 86400
bqs = yml['server'].get('backlog', {}).get('size',
PUBSUB_DEFAULT_BACKLOG_SIZE)
bst = yml['server'].get('backlog', {}).get('storage')
self.backlog = BacklogConfig(max_age=bma, queue_size=bqs, storage=bst)
# Payloaders - clients that can post payloads
self.payloaders = [netaddr.IPNetwork(x) for x in yml['clients'].get('payloaders', [])]
# Binary backwards compatibility
self.oldschoolers = yml['clients'].get('oldschoolers', [])
# Secure topics, if any
self.secure_topics = set(yml['clients'].get('secure_topics', []) or [])