python/proton/_delivery.py (400 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import annotations
from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELEASED, PN_TRANSACTIONAL_STATE,
pn_delivery_abort, pn_delivery_aborted, pn_delivery_attachments, pn_delivery_link,
pn_delivery_local, pn_delivery_local_state,
pn_delivery_partial, pn_delivery_pending, pn_delivery_readable, pn_delivery_remote,
pn_delivery_remote_state,
pn_delivery_settle, pn_delivery_settled, pn_delivery_tag, pn_delivery_update, pn_delivery_updated,
pn_delivery_writable, pn_disposition_annotations, pn_disposition_condition, pn_disposition_data,
pn_disposition_get_section_number, pn_disposition_get_section_offset, pn_disposition_is_failed,
pn_disposition_is_undeliverable, pn_disposition_set_failed, pn_disposition_set_section_number,
pn_disposition_set_section_offset, pn_disposition_set_undeliverable, pn_disposition_type,
pn_custom_disposition,
pn_custom_disposition_get_type,
pn_custom_disposition_set_type,
pn_custom_disposition_data,
pn_rejected_disposition,
pn_rejected_disposition_condition,
pn_received_disposition,
pn_received_disposition_get_section_number,
pn_received_disposition_set_section_number,
pn_received_disposition_get_section_offset,
pn_received_disposition_set_section_offset,
pn_modified_disposition,
pn_modified_disposition_is_failed,
pn_modified_disposition_set_failed,
pn_modified_disposition_is_undeliverable,
pn_modified_disposition_set_undeliverable,
pn_modified_disposition_annotations,
pn_transactional_disposition,
pn_transactional_disposition_get_id,
pn_transactional_disposition_set_id,
pn_transactional_disposition_get_outcome_type,
pn_transactional_disposition_set_outcome_type,
pn_unsettled_next)
from ._condition import cond2obj, obj2cond, Condition
from ._data import dat2obj, obj2dat
from ._transport import Transport
from ._wrapper import Wrapper
from enum import IntEnum
from typing import Any, Optional, Union, TYPE_CHECKING
if TYPE_CHECKING:
from ._data import PythonAMQPData, symbol
from ._endpoints import Receiver, Sender # circular import
from ._reactor import Connection, Session
class DispositionType(IntEnum):
RECEIVED = PN_RECEIVED
"""
A non terminal state indicating how much (if any) message data
has been received for a delivery.
"""
ACCEPTED = PN_ACCEPTED
"""
A terminal state indicating that the delivery was successfully
processed. Once in this state there will be no further state
changes prior to the delivery being settled.
"""
REJECTED = PN_REJECTED
"""
A terminal state indicating that the delivery could not be
processed due to some error condition. Once in this state
there will be no further state changes prior to the delivery
being settled.
"""
RELEASED = PN_RELEASED
"""
A terminal state indicating that the delivery is being
returned to the sender. Once in this state there will be no
further state changes prior to the delivery being settled.
"""
MODIFIED = PN_MODIFIED
"""
A terminal state indicating that the delivery is being
returned to the sender and should be annotated by the
sender prior to further delivery attempts. Once in this
state there will be no further state changes prior to the
delivery being settled.
"""
TRANSACTIONAL_STATE = PN_TRANSACTIONAL_STATE
"""
A non-terminal delivery state indicating the transactional
state of a delivery
"""
@classmethod
def or_int(cls, i: int) -> Union[int, 'DispositionType']:
return cls(i) if i in cls._value2member_map_ else i
class Disposition:
"""
A delivery state.
Dispositions record the current state or final outcome of a
transfer. Every delivery contains both a local and remote
disposition. The local disposition holds the local state of the
delivery, and the remote disposition holds the last known remote
state of the delivery.
"""
RECEIVED = DispositionType.RECEIVED
ACCEPTED = DispositionType.ACCEPTED
REJECTED = DispositionType.REJECTED
RELEASED = DispositionType.RELEASED
MODIFIED = DispositionType.MODIFIED
TRANSACTIONAL_STATE = DispositionType.TRANSACTIONAL_STATE
@property
def type(self) -> Union[int, DispositionType]: ...
class RemoteDisposition(Disposition):
def __new__(cls, delivery_impl):
state = DispositionType.or_int(pn_delivery_remote_state(delivery_impl))
if state == 0:
return None
elif state == cls.RECEIVED:
return super().__new__(RemoteReceivedDisposition)
elif state == cls.REJECTED:
return super().__new__(RemoteRejectedDisposition)
elif state == cls.MODIFIED:
return super().__new__(RemoteModifiedDisposition)
elif state == cls.TRANSACTIONAL_STATE:
return super().__new__(RemoteTransactionalDisposition)
else:
return super().__new__(RemoteCustomDisposition)
class RemoteCustomDisposition(RemoteDisposition):
def __init__(self, delivery_impl):
impl = pn_custom_disposition(pn_delivery_remote(delivery_impl))
self._type = DispositionType.or_int(pn_custom_disposition_get_type(impl))
self._data = dat2obj(pn_custom_disposition_data(impl))
@property
def type(self) -> Union[int, DispositionType]:
return self._type
@property
def data(self) -> Optional[Any]:
"""Access the disposition as a :class:`Data` object.
Dispositions are an extension point in the AMQP protocol. The
disposition interface provides setters/getters for those
dispositions that are predefined by the specification, however
access to the raw disposition data is provided so that other
dispositions can be used.
The :class:`Data` object returned by this operation is valid until
the parent delivery is settled.
"""
r = self._data
return r if r != [] else None
def apply_to(self, local_disposition: LocalDisposition):
CustomDisposition(self._type, self._data).apply_to(local_disposition)
class RemoteReceivedDisposition(RemoteDisposition):
def __init__(self, delivery_impl):
impl = pn_received_disposition(pn_delivery_remote(delivery_impl))
self._section_number = pn_received_disposition_get_section_number(impl)
self._section_offset = pn_received_disposition_get_section_offset(impl)
@property
def type(self) -> Union[int, DispositionType]:
return Disposition.RECEIVED
@property
def section_number(self) -> int:
return self._section_number
@property
def section_offset(self) -> int:
return self._section_offset
def apply_to(self, local_disposition: LocalDisposition):
ReceivedDisposition(self._section_number, self._section_offset).apply_to(local_disposition)
class RemoteRejectedDisposition(RemoteDisposition):
def __init__(self, delivery_impl):
impl = pn_rejected_disposition(pn_delivery_remote(delivery_impl))
self._condition = cond2obj(pn_rejected_disposition_condition(impl))
@property
def type(self) -> Union[int, DispositionType]:
return Disposition.REJECTED
@property
def condition(self) -> Optional[Condition]:
return self._condition
def apply_to(self, local_disposition: LocalDisposition):
RejectedDisposition(self._condition).apply_to(local_disposition)
class RemoteModifiedDisposition(RemoteDisposition):
def __init__(self, delivery_impl):
impl = pn_modified_disposition(pn_delivery_remote(delivery_impl))
self._annotations = dat2obj(pn_modified_disposition_annotations(impl))
self._failed = pn_modified_disposition_is_failed(impl)
self._undeliverable = pn_modified_disposition_is_undeliverable(impl)
@property
def type(self) -> Union[int, DispositionType]:
return Disposition.MODIFIED
@property
def failed(self) -> bool:
return self._failed
@property
def undeliverable(self) -> bool:
return self._undeliverable
@property
def annotations(self) -> Optional[dict['symbol', 'PythonAMQPData']]:
return self._annotations
def apply_to(self, local_disposition: LocalDisposition):
ModifiedDisposition(self._failed, self._undeliverable, self._annotations).apply_to(local_disposition)
class RemoteTransactionalDisposition(RemoteDisposition):
def __init__(self, delivery_impl):
impl = pn_transactional_disposition(pn_delivery_remote(delivery_impl))
self._id = pn_transactional_disposition_get_id(impl)
self._outcome_type = pn_transactional_disposition_get_outcome_type(impl)
@property
def type(self) -> Union[int, DispositionType]:
return Disposition.TRANSACTIONAL_STATE
@property
def id(self):
return self._id
@property
def outcome_type(self):
return self._outcome_type
def apply_to(self, local_disposition: LocalDisposition):
TransactionalDisposition(self._id, self._outcome_type).apply_to(local_disposition)
class LocalDisposition(Disposition):
def __init__(self, delivery_impl):
self._impl = pn_delivery_local(delivery_impl)
self._data = None
self._condition = None
self._annotations = None
@property
def type(self) -> Union[int, DispositionType]:
"""
Get the type of this disposition object.
Defined values are:
* :const:`RECEIVED`
* :const:`ACCEPTED`
* :const:`REJECTED`
* :const:`RELEASED`
* :const:`MODIFIED`
"""
return DispositionType.or_int(pn_disposition_type(self._impl))
@property
def data(self) -> Optional[Any]:
return self._data
@data.setter
def data(self, obj: Any) -> None:
self._data = obj
@property
def section_number(self) -> int:
return pn_disposition_get_section_number(self._impl)
@section_number.setter
def section_number(self, n: int) -> None:
pn_disposition_set_section_number(self._impl, n)
@property
def section_offset(self) -> int:
return pn_disposition_get_section_offset(self._impl)
@section_offset.setter
def section_offset(self, n: int) -> None:
pn_disposition_set_section_offset(self._impl, n)
@property
def condition(self) -> Optional[Condition]:
return self._condition
@condition.setter
def condition(self, obj: Condition) -> None:
self._condition = obj
@property
def failed(self) -> bool:
return pn_disposition_is_failed(self._impl)
@failed.setter
def failed(self, b: bool) -> None:
pn_disposition_set_failed(self._impl, b)
@property
def undeliverable(self) -> bool:
return pn_disposition_is_undeliverable(self._impl)
@undeliverable.setter
def undeliverable(self, b: bool) -> None:
pn_disposition_set_undeliverable(self._impl, b)
@property
def annotations(self) -> Optional[dict['symbol', 'PythonAMQPData']]:
return self._annotations
@annotations.setter
def annotations(self, obj: dict['symbol', 'PythonAMQPData']) -> None:
self._annotations = obj
class ReceivedDisposition(LocalDisposition):
def __init__(self, section_number: int = None, section_offset: int = None):
self._section_number = section_number
self._section_offset = section_offset
@property
def type(self) -> Union[int, DispositionType]:
return Disposition.RECEIVED
@property
def section_number(self) -> int:
return self._section_number
@section_number.setter
def section_number(self, n: int) -> None:
self._section_number = n
@property
def section_offset(self) -> int:
return self._section_offset
@section_offset.setter
def section_offset(self, n: int) -> None:
self._section_offset = n
def apply_to(self, local_disposition: LocalDisposition):
disp = pn_received_disposition(local_disposition._impl)
if self._section_number:
pn_received_disposition_set_section_number(disp, self._section_number)
if self._section_offset:
pn_received_disposition_set_section_offset(disp, self._section_offset)
class CustomDisposition(LocalDisposition):
def __init__(self, type: int, data: Any = None):
self._type = type
self._data = data
def apply_to(self, local_disposition: LocalDisposition):
disp = pn_custom_disposition(local_disposition._impl)
pn_custom_disposition_set_type(disp, self._type)
obj2dat(self._data, pn_custom_disposition_data(disp))
class RejectedDisposition(LocalDisposition):
def __init__(self, condition: Optional[Condition] = None):
self._condition = condition
@property
def type(self) -> Union[int, DispositionType]:
return Disposition.REJECTED
@property
def condition(self) -> Optional[Condition]:
return self._condition
@condition.setter
def condition(self, obj: Condition) -> None:
self._condition = obj
def apply_to(self, local_disposition: LocalDisposition):
disp = pn_rejected_disposition(local_disposition._impl)
obj2cond(self._condition, pn_rejected_disposition_condition(disp))
class ModifiedDisposition(LocalDisposition):
def __init__(self, failed: bool = True, undeliverable: bool = None,
annotations: Optional[dict['symbol', 'PythonAMQPData']] = None):
self._failed = failed
self._undeliverable = undeliverable
self._annotations = annotations
@property
def type(self) -> Union[int, DispositionType]:
return Disposition.MODIFIED
@property
def failed(self) -> bool:
return self._failed
@failed.setter
def failed(self, b: bool) -> None:
self._failed = b
@property
def undeliverable(self) -> bool:
return self._undelivered
@undeliverable.setter
def undeliverable(self, b: bool) -> None:
self._undelivered = b
@property
def annotations(self) -> Optional[dict['symbol', 'PythonAMQPData']]:
return self._annotations
@annotations.setter
def annotations(self, obj: dict['symbol', 'PythonAMQPData']) -> None:
self._annotations = obj
def apply_to(self, local_disposition: LocalDisposition):
disp = pn_modified_disposition(local_disposition._impl)
if self._failed:
pn_modified_disposition_set_failed(disp, self._failed)
if self._undeliverable:
pn_modified_disposition_set_undeliverable(disp, self._undeliverable)
obj2dat(self._annotations, pn_modified_disposition_annotations(disp))
class TransactionalDisposition(LocalDisposition):
def __init__(self, id, outcome_type=None):
self._id = id
self._outcome_type = outcome_type
@property
def type(self) -> Union[int, DispositionType]:
return Disposition.TRANSACTIONAL_STATE
@property
def id(self):
return self._id
@id.setter
def id(self, id):
self._id = id
@property
def outcome_type(self):
return self._outcome_type
@outcome_type.setter
def outcome_type(self, type):
self._outcome_type = type
def apply_to(self, local_disposition: LocalDisposition):
disp = pn_transactional_disposition(local_disposition._impl)
pn_transactional_disposition_set_id(disp, self._id)
if self._outcome_type:
pn_transactional_disposition_set_outcome_type(disp, self._outcome_type)
class Delivery(Wrapper):
"""
Tracks and/or records the delivery of a message over a link.
"""
RECEIVED = Disposition.RECEIVED
"""
A non terminal state indicating how much (if any) message data
has been received for a delivery.
"""
ACCEPTED = Disposition.ACCEPTED
"""
A terminal state indicating that the delivery was successfully
processed. Once in this state there will be no further state
changes prior to the delivery being settled.
"""
REJECTED = Disposition.REJECTED
"""
A terminal state indicating that the delivery could not be
processed due to some error condition. Once in this state
there will be no further state changes prior to the delivery
being settled.
"""
RELEASED = Disposition.RELEASED
"""
A terminal state indicating that the delivery is being
returned to the sender. Once in this state there will be no
further state changes prior to the delivery being settled.
"""
MODIFIED = Disposition.MODIFIED
"""
A terminal state indicating that the delivery is being
returned to the sender and should be annotated by the
sender prior to further delivery attempts. Once in this
state there will be no further state changes prior to the
delivery being settled.
"""
get_context = pn_delivery_attachments
def __init__(self, impl):
if self.Uninitialized():
self._local = None
self._remote = None
@property
def remote(self) -> RemoteDisposition:
if self._remote is None:
self._remote = RemoteDisposition(self._impl)
return self._remote
@property
def local(self) -> LocalDisposition:
if self._local is None:
self._local = LocalDisposition(self._impl)
return self._local
@local.setter
def local(self, local_disposition: LocalDisposition) -> None:
if self._local is None:
self._local = LocalDisposition(self._impl)
local_disposition.apply_to(self._local)
@property
def tag(self) -> str:
"""
The identifier for the delivery.
"""
return pn_delivery_tag(self._impl)
@property
def writable(self) -> bool:
"""
``True`` for an outgoing delivery to which data can now be written,
``False`` otherwise.
"""
return pn_delivery_writable(self._impl)
@property
def readable(self) -> bool:
"""
``True`` for an incoming delivery that has data to read,
``False`` otherwise.
"""
return pn_delivery_readable(self._impl)
@property
def updated(self) -> bool:
"""
``True`` if the state of the delivery has been updated
(e.g. it has been settled and/or accepted, rejected etc.),
``False`` otherwise.
"""
return pn_delivery_updated(self._impl)
def update(self, state: Union[int, DispositionType, None] = None) -> None:
"""
Set the local state of the delivery e.g. :const:`ACCEPTED`,
:const:`REJECTED`, :const:`RELEASED`.
:param state: State of delivery, if omitted we assume the delivery state is already set
by other means
"""
if state:
if state == self.MODIFIED:
obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
elif state == self.REJECTED:
obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
elif state not in (self.ACCEPTED, self.RECEIVED, self.RELEASED):
obj2dat(self.local._data, pn_disposition_data(self.local._impl))
pn_delivery_update(self._impl, state)
else:
pn_delivery_update(self._impl, self.local_state)
@property
def pending(self) -> int:
"""
The amount of pending message data for a delivery.
"""
return pn_delivery_pending(self._impl)
@property
def partial(self) -> bool:
"""
``True`` for an incoming delivery if not all the data is
yet available, ``False`` otherwise.
"""
return pn_delivery_partial(self._impl)
@property
def local_state(self) -> Union[int, DispositionType]:
"""A local state of the delivery."""
return DispositionType.or_int(pn_delivery_local_state(self._impl))
@property
def remote_state(self) -> Union[int, DispositionType]:
"""A remote state of the delivery as indicated by the remote peer."""
return DispositionType.or_int(pn_delivery_remote_state(self._impl))
@property
def settled(self) -> bool:
"""
``True`` if the delivery has been settled by the remote peer,
``False`` otherwise.
"""
return pn_delivery_settled(self._impl)
def settle(self) -> None:
"""
Settles the delivery locally. This indicates the application
considers the delivery complete and does not wish to receive any
further events about it. Every delivery should be settled locally.
"""
pn_delivery_settle(self._impl)
@property
def unsettled_next(self) -> Optional['Delivery']:
"""
The next unsettled delivery on the link or ``None`` if there are
no more unsettled deliveries.
"""
return Delivery.wrap(pn_unsettled_next(self._impl))
@property
def aborted(self) -> bool:
"""
``True`` if the delivery has been aborted, ``False`` otherwise.
"""
return pn_delivery_aborted(self._impl)
def abort(self) -> None:
"""
Aborts the delivery. This indicates the application wishes to
invalidate any data that may have already been sent on this delivery.
The delivery cannot be aborted after it has been completely delivered.
"""
pn_delivery_abort(self._impl)
@property
def link(self) -> Union['Receiver', 'Sender']:
"""
The :class:`Link` on which the delivery was sent or received.
"""
from . import _endpoints
return _endpoints.Link.wrap(pn_delivery_link(self._impl))
@property
def session(self) -> Session:
"""
The :class:`Session` over which the delivery was sent or received.
"""
return self.link.session
@property
def connection(self) -> Connection:
"""
The :class:`Connection` over which the delivery was sent or received.
"""
return self.session.connection
@property
def transport(self) -> Optional[Transport]:
"""
The :class:`Transport` bound to the :class:`Connection` over which
the delivery was sent or received.
"""
return self.connection.transport