asfpy/aioldap.py (96 lines of code) (raw):

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Asynchronous LDAP client.""" # # TYPICAL USAGE: # # ### TBD. look at code for now. # import asyncio import logging import concurrent.futures import bonsai # Re-map the LDAPSearchScope constants to our namespace. # These can now be used as (eg.) asfpy.aioldap.SCOPE.SUBTREE from bonsai import LDAPSearchScope as SCOPE LOGGER = logging.getLogger(__name__) class ASF_LDAPConnection: def __init__(self, client, executor): # NOTE: must be instantiated within one of the EXECUTOR threads. # Shared Executor holding one or more threads. self.executor = executor # bonsai.LDAPConnection ties itself to a loop. Thus, whatever # loop is used to create the connection must also be used to # perform its operations. We will construct that loop here, # and use it for all bonsai actions. It does not have any # thread affinity, until it is running. Thus, we can use this # loop within any thread of the Executor. self.loop = asyncio.new_event_loop() # Hack around GnuTLS bug with async. # See: https://github.com/noirello/bonsai/issues/25 # and: https://github.com/noirello/bonsai/issues/69 # # In THIS executor thread, we'll perform the synchronous # connection, so that the main thread's event loop will # not be blocked. async def do_connect(): ### for testing: pretend this connection blocks #import time; time.sleep(5) # Tell bonsai to connect synchronously. bonsai.set_connect_async(False) try: # Ties to self.loop (the running loop now). return await client.connect(is_async=True) finally: # Make sure this always gets reset. bonsai.set_connect_async(True) # The underlying LDAP connection, tied to our loop. self.conn = self.loop.run_until_complete(do_connect()) def close(self): self.conn.close() self.conn = None # ensure self is unusable async def use_loop(self, loop, method, *args, **kw): "Running in LOOP, run/wait for METHOD with *ARGS and **KW." if loop is None: loop = asyncio.get_running_loop() def call_method(): # This synchronous function is now running within a thread # of the Executor. Use some async to run the LDAP connection's # method within our loop. return self.loop.run_until_complete(method(*args, **kw)) # Wait within the caller's loop for the result. return await loop.run_in_executor(self.executor, call_method) async def search(self, base, attrs, scope=SCOPE.SUBTREE, loop=None): return await self.use_loop(loop, self.conn.search, base, scope, attrlist=attrs) async def whoami(self, loop=None): return await self.use_loop(loop, self.conn.whoami) ### TBD ASF-specific custom methods? or use app-specific subclasses? class ASF_LDAPClient: CONNECTION_CLASS = ASF_LDAPConnection def __init__(self, uri, binddn, bindpw): self.client = bonsai.LDAPClient(uri) self.client.set_credentials("SIMPLE", binddn, bindpw) self.client.set_cert_policy("allow") # TODO: Load our cert(?) self.executor = concurrent.futures.ThreadPoolExecutor( thread_name_prefix='aioldap') def connect(self, loop=None): if loop is None: loop = asyncio.get_running_loop() # Run (blocking) in an executor thread. def blocking_connect(): return self.CONNECTION_CLASS(self.client, self.executor) future = loop.run_in_executor(self.executor, blocking_connect) #print('CONN-FUTURE:', future) return ConnectContextManager(future) # For debugging, we want the ASF_ prefix, but callers can skip it. LDAPClient = ASF_LDAPClient class ConnectContextManager: def __init__(self, future): self.future = future async def __aenter__(self): self.conn = await self.future return self.conn async def __aexit__(self, exc_type, exc_val, exc_tb): self.conn.close() def test_conns(client): # Run three tasks in parallel: heartbeat, connA, connB. The latter # two will open, run a couple LDAP queries, then close. Then re-open. # These will run in one event loop (the LDAPClient has some private # loops; no peeking). # # Should see: smooth heartbeat, even when an artifical delay is # introduced to the connect() process. # Some random services to read SERVICE_BASE = 'cn=%s,ou=groups,ou=services,dc=apache,dc=org' SERVICES = [ 'board', 'infrastructure-root', 'asf-secretary', ] # Start hanging tasks off this. loop = asyncio.new_event_loop() t0 = loop.time() def ts(): return f'[{loop.time() - t0 :.2f}]' async def print_me(): async with client.connect() as conn: print('ME:', await conn.whoami()) async def heartbeat(): while True: print(f'{ts()} heartbeat') await asyncio.sleep(2) import random async def conn_usage(name): while True: # Stagger the connections await asyncio.sleep(random.randint(0, 3)) async with client.connect() as conn: for _ in range(5): s = random.choice(SERVICES) rv = await conn.search(SERVICE_BASE % (s,), ['owner', 'member',]) print(f'{ts()} CONN[{name}]: RV=', rv) await asyncio.sleep(random.randint(1, 3)) # between reconnections await asyncio.sleep(random.randint(0, 5)) loop.create_task(print_me()) loop.create_task(heartbeat()) loop.create_task(conn_usage('A')) loop.create_task(conn_usage('B')) loop.run_forever() if __name__ == '__main__': import os, getpass u = os.environ.get('AIOLDAP_USER') or getpass.getuser() dn = 'uid=%s,ou=people,dc=apache,dc=org' % u p = os.environ.get('AIOLDAP_PASSWORD') or getpass.getpass(f"Password for {u}: ") c = ASF_LDAPClient('ldaps://ldap-us.apache.org:636', dn, p) test_conns(c)