pulsar/asyncio.py (50 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # """ The Pulsar Python client APIs that work with the asyncio module. """ import asyncio import functools from typing import Any import _pulsar import pulsar class PulsarException(BaseException): """ The exception that wraps the Pulsar error code """ def __init__(self, result: pulsar.Result) -> None: """ Create the Pulsar exception. Parameters ---------- result: pulsar.Result The error code of the underlying Pulsar APIs. """ self._result = result def error(self) -> pulsar.Result: """ Returns the Pulsar error code. """ return self._result def __str__(self): """ Convert the exception to string. """ return f'{self._result.value} {self._result.name}' class Producer: """ The Pulsar message producer, used to publish messages on a topic. """ def __init__(self, producer: _pulsar.Producer) -> None: """ Create the producer. Users should not call this constructor directly. Instead, create the producer via `Client.create_producer`. Parameters ---------- producer: _pulsar.Producer The underlying Producer object from the C extension. """ self._producer: _pulsar.Producer = producer async def send(self, content: bytes) -> pulsar.MessageId: """ Send a message asynchronously. parameters ---------- content: bytes The message payload Returns ------- pulsar.MessageId The message id that represents the persisted position of the message. Raises ------ PulsarException """ builder = _pulsar.MessageBuilder() builder.content(content) future = asyncio.get_running_loop().create_future() self._producer.send_async(builder.build(), functools.partial(_set_future, future)) msg_id = await future return pulsar.MessageId( msg_id.partition(), msg_id.ledger_id(), msg_id.entry_id(), msg_id.batch_index(), ) async def close(self) -> None: """ Close the producer. Raises ------ PulsarException """ future = asyncio.get_running_loop().create_future() self._producer.close_async(functools.partial(_set_future, future, value=None)) await future class Client: """ The asynchronous version of `pulsar.Client`. """ def __init__(self, service_url, **kwargs) -> None: """ See `pulsar.Client.__init__` """ self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client async def create_producer(self, topic: str) -> Producer: """ Create a new producer on a given topic Parameters ---------- topic: str The topic name Returns ------- Producer The producer created Raises ------ PulsarException """ future = asyncio.get_running_loop().create_future() conf = _pulsar.ProducerConfiguration() # TODO: add more configs self._client.create_producer_async(topic, conf, functools.partial(_set_future, future)) return Producer(await future) async def close(self) -> None: """ Close the client and all the associated producers and consumers Raises ------ PulsarException """ future = asyncio.get_running_loop().create_future() self._client.close_async(functools.partial(_set_future, future, value=None)) await future def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any): def complete(): if result == _pulsar.Result.Ok: future.set_result(value) else: future.set_exception(PulsarException(result)) future.get_loop().call_soon_threadsafe(complete)