crates/zbus/src/connection/mod.rs (1,271 lines of code) (raw):
//! Connection API.
use std::collections::HashMap;
use std::io::{
self,
ErrorKind,
};
use std::num::NonZeroU32;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::{
Arc,
OnceLock,
Weak,
};
use std::task::{
Context,
Poll,
};
use async_broadcast::{
InactiveReceiver,
Receiver,
Sender as Broadcaster,
broadcast,
};
use enumflags2::BitFlags;
use event_listener::{
Event,
EventListener,
};
use futures_core::Future;
use futures_util::StreamExt;
use ordered_stream::{
OrderedFuture,
OrderedStream,
PollResult,
};
use static_assertions::assert_impl_all;
use tracing::{
Instrument,
debug,
info_span,
instrument,
trace,
trace_span,
warn,
};
use zbus_names::{
BusName,
ErrorName,
InterfaceName,
MemberName,
OwnedUniqueName,
WellKnownName,
};
use zvariant::ObjectPath;
use crate::async_lock::{
Mutex,
Semaphore,
SemaphorePermit,
};
use crate::fdo::{
self,
ConnectionCredentials,
RequestNameFlags,
RequestNameReply,
};
use crate::message::{
Flags,
Message,
Type,
};
use crate::proxy::CacheProperties;
use crate::{
DBusError,
Error,
Executor,
MatchRule,
MessageStream,
ObjectServer,
OwnedGuid,
OwnedMatchRule,
Result,
Task,
blocking,
is_flatpak,
};
mod builder;
pub use builder::Builder;
pub mod socket;
pub use socket::Socket;
mod socket_reader;
use socket_reader::SocketReader;
pub(crate) mod handshake;
use handshake::Authenticated;
const DEFAULT_MAX_QUEUED: usize = 64;
const DEFAULT_MAX_METHOD_RETURN_QUEUED: usize = 8;
/// Inner state shared by Connection and WeakConnection
#[derive(Debug)]
pub(crate) struct ConnectionInner {
server_guid: OwnedGuid,
#[cfg(unix)]
cap_unix_fd: bool,
#[cfg(feature = "p2p")]
bus_conn: bool,
unique_name: OnceLock<OwnedUniqueName>,
registered_names: Mutex<HashMap<WellKnownName<'static>, NameStatus>>,
activity_event: Arc<Event>,
socket_write: Mutex<Box<dyn socket::WriteHalf>>,
// Our executor
executor: Executor<'static>,
// Socket reader task
#[allow(unused)]
socket_reader_task: OnceLock<Task<()>>,
pub(crate) msg_receiver: InactiveReceiver<Result<Message>>,
pub(crate) method_return_receiver: InactiveReceiver<Result<Message>>,
msg_senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
subscriptions: Mutex<Subscriptions>,
object_server: OnceLock<blocking::ObjectServer>,
object_server_dispatch_task: OnceLock<Task<()>>,
drop_event: Event,
}
impl Drop for ConnectionInner {
fn drop(&mut self) {
// Notify anyone waiting that the connection is going away. Since we're being dropped, it's
// not possible for any new listeners to be created after this notification, so this is
// race-free.
self.drop_event.notify(usize::MAX);
}
}
type Subscriptions = HashMap<OwnedMatchRule, (u64, InactiveReceiver<Result<Message>>)>;
pub(crate) type MsgBroadcaster = Broadcaster<Result<Message>>;
/// A D-Bus connection.
///
/// A connection to a D-Bus bus, or a direct peer.
///
/// Once created, the connection is authenticated and negotiated and messages can be sent or
/// received, such as [method calls] or [signals].
///
/// For higher-level message handling (typed functions, introspection, documentation reasons etc),
/// it is recommended to wrap the low-level D-Bus messages into Rust functions with the
/// [`proxy`] and [`interface`] macros instead of doing it directly on a `Connection`.
///
/// Typically, a connection is made to the session bus with [`Connection::session`], or to the
/// system bus with [`Connection::system`]. Then the connection is used with [`crate::Proxy`]
/// instances or the on-demand [`ObjectServer`] instance that can be accessed through
/// [`Connection::object_server`].
///
/// `Connection` implements [`Clone`] and cloning it is a very cheap operation, as the underlying
/// data is not cloned. This makes it very convenient to share the connection between different
/// parts of your code. `Connection` also implements [`std::marker::Sync`] and [`std::marker::Send`]
/// so you can send and share a connection instance across threads as well.
///
/// `Connection` keeps internal queues of incoming message. The default capacity of each of these is
/// 64. The capacity of the main (unfiltered) queue is configurable through the [`set_max_queued`]
/// method. When the queue is full, no more messages can be received until room is created for more.
/// This is why it's important to ensure that all [`crate::MessageStream`] and
/// [`crate::blocking::MessageIterator`] instances are continuously polled and iterated on,
/// respectively.
///
/// For sending messages you can use the [`Connection::send`] method.
///
/// To gracefully close a connection while waiting for any outstanding method calls to complete,
/// use [`Connection::graceful_shutdown`]. To immediately close a connection in a way that will
/// disrupt any outstanding method calls, use [`Connection::close`]. If you do not need the
/// shutdown to be immediate and do not care about waiting for outstanding method calls, you can
/// also simply drop the `Connection` instance, which will act similarly to spawning
/// `graceful_shutdown` in the background.
///
/// [method calls]: struct.Connection.html#method.call_method
/// [signals]: struct.Connection.html#method.emit_signal
/// [`proxy`]: attr.proxy.html
/// [`interface`]: attr.interface.html
/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
/// [`set_max_queued`]: struct.Connection.html#method.set_max_queued
///
/// ### Examples
///
/// #### Get the session bus ID
///
/// ```no_run
/// # zbus::block_on(async {
/// use zbus::Connection;
///
/// let connection = Connection::session().await?;
///
/// let reply_body = connection
/// .call_method(
/// Some("org.freedesktop.DBus"),
/// "/org/freedesktop/DBus",
/// Some("org.freedesktop.DBus"),
/// "GetId",
/// &(),
/// )
/// .await?
/// .body();
///
/// let id: &str = reply_body.deserialize()?;
/// println!("Unique ID of the bus: {}", id);
/// # Ok::<(), zbus::Error>(())
/// # }).unwrap();
/// ```
///
/// #### Monitoring all messages
///
/// Let's eavesdrop on the session bus 😈 using the [Monitor] interface:
///
/// ```rust,no_run
/// # zbus::block_on(async {
/// use futures_util::stream::TryStreamExt;
/// use zbus::{
/// Connection,
/// MessageStream,
/// };
///
/// let connection = Connection::session().await?;
///
/// connection
/// .call_method(
/// Some("org.freedesktop.DBus"),
/// "/org/freedesktop/DBus",
/// Some("org.freedesktop.DBus.Monitoring"),
/// "BecomeMonitor",
/// &(&[] as &[&str], 0u32),
/// )
/// .await?;
///
/// let mut stream = MessageStream::from(connection);
/// while let Some(msg) = stream.try_next().await? {
/// println!("Got message: {}", msg);
/// }
///
/// # Ok::<(), zbus::Error>(())
/// # }).unwrap();
/// ```
///
/// This should print something like:
///
/// ```console
/// Got message: Signal NameAcquired from org.freedesktop.DBus
/// Got message: Signal NameLost from org.freedesktop.DBus
/// Got message: Method call GetConnectionUnixProcessID from :1.1324
/// Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner:
/// Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus
/// Got message: Method call AddMatch from :1.918
/// Got message: Method return from org.freedesktop.DBus
/// ```
///
/// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor
#[derive(Clone, Debug)]
#[must_use = "Dropping a `Connection` will close the underlying socket."]
pub struct Connection {
pub(crate) inner: Arc<ConnectionInner>,
}
assert_impl_all!(Connection: Send, Sync, Unpin);
/// A method call whose completion can be awaited or joined with other streams.
///
/// This is useful for cache population method calls, where joining the [`JoinableStream`] with
/// an update signal stream can be used to ensure that cache updates are not overwritten by a cache
/// population whose task is scheduled later.
#[derive(Debug)]
pub(crate) struct PendingMethodCall {
stream: Option<MessageStream>,
serial: NonZeroU32,
}
impl Future for PendingMethodCall {
type Output = Result<Message>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_before(cx, None).map(|ret| {
ret.map_or_else(
|| {
Err(crate::Error::InputOutput(
io::Error::new(ErrorKind::BrokenPipe, "socket closed").into(),
))
},
|(_, r)| r,
)
})
}
}
impl OrderedFuture for PendingMethodCall {
type Ordering = zbus::message::Sequence;
type Output = Result<Message>;
fn poll_before(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
before: Option<&Self::Ordering>,
) -> Poll<Option<(Self::Ordering, Self::Output)>> {
let this = self.get_mut();
if let Some(stream) = &mut this.stream {
loop {
match Pin::new(&mut *stream).poll_next_before(cx, before) {
Poll::Ready(PollResult::Item {
data: Ok(msg),
ordering,
}) => {
if msg.header().reply_serial() != Some(this.serial) {
continue;
}
let res = match msg.message_type() {
Type::Error => Err(msg.into()),
Type::MethodReturn => Ok(msg),
_ => continue,
};
this.stream = None;
return Poll::Ready(Some((ordering, res)));
},
Poll::Ready(PollResult::Item { data: Err(e), ordering }) => {
return Poll::Ready(Some((ordering, Err(e))));
},
Poll::Ready(PollResult::NoneBefore) => {
return Poll::Ready(None);
},
Poll::Ready(PollResult::Terminated) => {
return Poll::Ready(None);
},
Poll::Pending => return Poll::Pending,
}
}
}
Poll::Ready(None)
}
}
impl Connection {
/// Send `msg` to the peer.
pub async fn send(&self, msg: &Message) -> Result<()> {
#[cfg(unix)]
if !msg.data().fds().is_empty() && !self.inner.cap_unix_fd {
return Err(Error::Unsupported);
}
self.inner.activity_event.notify(usize::MAX);
let mut write = self.inner.socket_write.lock().await;
write.send_message(msg).await
}
/// Send a method call.
///
/// Create a method-call message, send it over the connection, then wait for the reply.
///
/// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
/// error replies are returned as [`Error::MethodError`].
pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
&self,
destination: Option<D>,
path: P,
interface: Option<I>,
method_name: M,
body: &B,
) -> Result<Message>
where
D: TryInto<BusName<'d>>,
P: TryInto<ObjectPath<'p>>,
I: TryInto<InterfaceName<'i>>,
M: TryInto<MemberName<'m>>,
D::Error: Into<Error>,
P::Error: Into<Error>,
I::Error: Into<Error>,
M::Error: Into<Error>,
B: serde::ser::Serialize + zvariant::DynamicType,
{
self.call_method_raw(destination, path, interface, method_name, BitFlags::empty(), body)
.await?
.expect("no reply")
.await
}
/// Send a method call.
///
/// Send the given message, which must be a method call, over the connection and return an
/// object that allows the reply to be retrieved. Typically you'd want to use
/// [`Connection::call_method`] instead.
///
/// If the `flags` do not contain `MethodFlags::NoReplyExpected`, the return value is
/// guaranteed to be `Ok(Some(_))`, if there was no error encountered.
///
/// INTERNAL NOTE: If this method is ever made pub, flags should become `BitFlags<MethodFlags>`.
pub(crate) async fn call_method_raw<'d, 'p, 'i, 'm, D, P, I, M, B>(
&self,
destination: Option<D>,
path: P,
interface: Option<I>,
method_name: M,
flags: BitFlags<Flags>,
body: &B,
) -> Result<Option<PendingMethodCall>>
where
D: TryInto<BusName<'d>>,
P: TryInto<ObjectPath<'p>>,
I: TryInto<InterfaceName<'i>>,
M: TryInto<MemberName<'m>>,
D::Error: Into<Error>,
P::Error: Into<Error>,
I::Error: Into<Error>,
M::Error: Into<Error>,
B: serde::ser::Serialize + zvariant::DynamicType,
{
let _permit = acquire_serial_num_semaphore().await;
let mut builder = Message::method(path, method_name)?;
if let Some(sender) = self.unique_name() {
builder = builder.sender(sender)?;
}
if let Some(destination) = destination {
builder = builder.destination(destination)?;
}
if let Some(interface) = interface {
builder = builder.interface(interface)?;
}
for flag in flags {
builder = builder.with_flags(flag)?;
}
let msg = builder.build(body)?;
let msg_receiver = self.inner.method_return_receiver.activate_cloned();
let stream = Some(MessageStream::for_subscription_channel(
msg_receiver,
// This is a lie but we only use the stream internally so it's fine.
None,
self,
));
let serial = msg.primary_header().serial_num();
self.send(&msg).await?;
if flags.contains(Flags::NoReplyExpected) {
Ok(None)
} else {
Ok(Some(PendingMethodCall { stream, serial }))
}
}
/// Emit a signal.
///
/// Create a signal message, and send it over the connection.
pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
&self,
destination: Option<D>,
path: P,
interface: I,
signal_name: M,
body: &B,
) -> Result<()>
where
D: TryInto<BusName<'d>>,
P: TryInto<ObjectPath<'p>>,
I: TryInto<InterfaceName<'i>>,
M: TryInto<MemberName<'m>>,
D::Error: Into<Error>,
P::Error: Into<Error>,
I::Error: Into<Error>,
M::Error: Into<Error>,
B: serde::ser::Serialize + zvariant::DynamicType,
{
let _permit = acquire_serial_num_semaphore().await;
let mut b = Message::signal(path, interface, signal_name)?;
if let Some(sender) = self.unique_name() {
b = b.sender(sender)?;
}
if let Some(destination) = destination {
b = b.destination(destination)?;
}
let m = b.build(body)?;
self.send(&m).await
}
/// Reply to a message.
///
/// Given an existing message (likely a method call), send a reply back to the caller with the
/// given `body`.
pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<()>
where
B: serde::ser::Serialize + zvariant::DynamicType,
{
let _permit = acquire_serial_num_semaphore().await;
let mut b = Message::method_reply(call)?;
if let Some(sender) = self.unique_name() {
b = b.sender(sender)?;
}
let m = b.build(body)?;
self.send(&m).await
}
/// Reply an error to a message.
///
/// Given an existing message (likely a method call), send an error reply back to the caller
/// with the given `error_name` and `body`.
pub async fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<()>
where
B: serde::ser::Serialize + zvariant::DynamicType,
E: TryInto<ErrorName<'e>>,
E::Error: Into<Error>,
{
let _permit = acquire_serial_num_semaphore().await;
let mut b = Message::method_error(call, error_name)?;
if let Some(sender) = self.unique_name() {
b = b.sender(sender)?;
}
let m = b.build(body)?;
self.send(&m).await
}
/// Reply an error to a message.
///
/// Given an existing message (likely a method call), send an error reply back to the caller
/// using one of the standard interface reply types.
pub async fn reply_dbus_error(&self, call: &zbus::message::Header<'_>, err: impl DBusError) -> Result<()> {
let _permit = acquire_serial_num_semaphore().await;
let m = err.create_reply(call)?;
self.send(&m).await
}
/// Register a well-known name for this connection.
///
/// When connecting to a bus, the name is requested from the bus. In case of p2p connection, the
/// name (if requested) is used for self-identification.
///
/// You can request multiple names for the same connection. Use [`Connection::release_name`] for
/// deregistering names registered through this method.
///
/// Note that exclusive ownership without queueing is requested (using
/// [`RequestNameFlags::ReplaceExisting`] and [`RequestNameFlags::DoNotQueue`] flags) since that
/// is the most typical case. If that is not what you want, you should use
/// [`Connection::request_name_with_flags`] instead (but make sure then that name is requested
/// **after** you've set up your service implementation with the `ObjectServer`).
///
/// # Caveats
///
/// The associated `ObjectServer` will only handle method calls destined for the unique name of
/// this connection or any of the registered well-known names. If no well-known name is
/// registered, the method calls destined to all well-known names will be handled.
///
/// Since names registered through any other means than `Connection` or [`Builder`]
/// API are not known to the connection, method calls destined to those names will only be
/// handled by the associated `ObjectServer` if none of the names are registered through
/// `Connection*` API. Simply put, either register all the names through `Connection*` API or
/// none of them.
///
/// # Errors
///
/// Fails with `zbus::Error::NameTaken` if the name is already owned by another peer.
pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
where
W: TryInto<WellKnownName<'w>>,
W::Error: Into<Error>,
{
self.request_name_with_flags(
well_known_name,
RequestNameFlags::ReplaceExisting | RequestNameFlags::DoNotQueue,
)
.await
.map(|_| ())
}
/// Register a well-known name for this connection.
///
/// This is the same as [`Connection::request_name`] but allows to specify the flags to use when
/// requesting the name.
///
/// If the [`RequestNameFlags::DoNotQueue`] flag is not specified and request ends up in the
/// queue, you can use [`fdo::NameAcquiredStream`] to be notified when the name is acquired. A
/// queued name request can be cancelled using [`Connection::release_name`].
///
/// If the [`RequestNameFlags::AllowReplacement`] flag is specified, the requested name can be
/// lost if another peer requests the same name. You can use [`fdo::NameLostStream`] to be
/// notified when the name is lost
///
/// # Example
///
/// ```no_run
/// #
/// # zbus::block_on(async {
/// use enumflags2::BitFlags;
/// use futures_util::stream::StreamExt;
/// use zbus::Connection;
/// use zbus::fdo::{
/// DBusProxy,
/// RequestNameFlags,
/// RequestNameReply,
/// };
///
/// let name = "org.freedesktop.zbus.QueuedNameTest";
/// let conn1 = Connection::session().await?;
/// // This should just work right away.
/// conn1.request_name(name).await?;
///
/// let conn2 = Connection::session().await?;
/// // A second request from the another connection will fail with `DoNotQueue` flag, which is
/// // implicit with `request_name` method.
/// assert!(conn2.request_name(name).await.is_err());
///
/// // Now let's try w/o `DoNotQueue` and we should be queued.
/// let reply = conn2
/// .request_name_with_flags(name, RequestNameFlags::AllowReplacement.into())
/// .await?;
/// assert_eq!(reply, RequestNameReply::InQueue);
/// // Another request should just give us the same response.
/// let reply = conn2
/// // The flags on subsequent requests will however be ignored.
/// .request_name_with_flags(name, BitFlags::empty())
/// .await?;
/// assert_eq!(reply, RequestNameReply::InQueue);
/// let mut acquired_stream = DBusProxy::new(&conn2)
/// .await?
/// .receive_name_acquired()
/// .await?;
/// assert!(conn1.release_name(name).await?);
/// // This would have waited forever if `conn1` hadn't just release the name.
/// let acquired = acquired_stream.next().await.unwrap();
/// assert_eq!(acquired.args().unwrap().name, name);
///
/// // conn2 made the mistake of being too nice and allowed name replacemnt, so conn1 should be
/// // able to take it back.
/// let mut lost_stream = DBusProxy::new(&conn2).await?.receive_name_lost().await?;
/// conn1.request_name(name).await?;
/// let lost = lost_stream.next().await.unwrap();
/// assert_eq!(lost.args().unwrap().name, name);
///
/// # Ok::<(), zbus::Error>(())
/// # }).unwrap();
/// ```
///
/// # Caveats
///
/// * Same as that of [`Connection::request_name`].
/// * If you wish to track changes to name ownership after this call, make sure that the
/// [`fdo::NameAcquired`] and/or [`fdo::NameLostStream`] instance(s) are created **before**
/// calling this method. Otherwise, you may loose the signal if it's emitted after this call
/// but just before the stream instance get created.
pub async fn request_name_with_flags<'w, W>(
&self,
well_known_name: W,
flags: BitFlags<RequestNameFlags>,
) -> Result<RequestNameReply>
where
W: TryInto<WellKnownName<'w>>,
W::Error: Into<Error>,
{
let well_known_name = well_known_name.try_into().map_err(Into::into)?;
// We keep the lock until the end of this function so that the (possibly) spawned task
// doesn't end up accessing the name entry before it's inserted.
let mut names = self.inner.registered_names.lock().await;
match names.get(&well_known_name) {
Some(NameStatus::Owner(_)) => return Ok(RequestNameReply::AlreadyOwner),
Some(NameStatus::Queued(_)) => return Ok(RequestNameReply::InQueue),
None => (),
}
if !self.is_bus() {
names.insert(well_known_name.to_owned(), NameStatus::Owner(None));
return Ok(RequestNameReply::PrimaryOwner);
}
let dbus_proxy = fdo::DBusProxy::builder(self)
.cache_properties(CacheProperties::No)
.build()
.await?;
let mut acquired_stream = dbus_proxy.receive_name_acquired().await?;
let mut lost_stream = dbus_proxy.receive_name_lost().await?;
let reply = dbus_proxy.request_name(well_known_name.clone(), flags).await?;
let lost_task_name = format!("monitor name {well_known_name} lost");
let name_lost_fut = if flags.contains(RequestNameFlags::AllowReplacement) {
let weak_conn = WeakConnection::from(self);
let well_known_name = well_known_name.to_owned();
Some(
async move {
loop {
let signal = lost_stream.next().await;
let inner = match weak_conn.upgrade() {
Some(conn) => conn.inner.clone(),
None => break,
};
match signal {
Some(signal) => match signal.args() {
Ok(args) if args.name == well_known_name => {
tracing::info!(
"Connection `{}` lost name `{}`",
// SAFETY: This is bus connection so unique name can't be
// None.
inner.unique_name.get().unwrap(),
well_known_name
);
inner.registered_names.lock().await.remove(&well_known_name);
break;
},
Ok(_) => (),
Err(e) => warn!("Failed to parse `NameLost` signal: {}", e),
},
None => {
trace!("`NameLost` signal stream closed");
// This is a very strange state we end up in. Now the name is
// question remains in the queue
// forever. Maybe we can do better here but I
// think it's a very unlikely scenario anyway.
//
// Can happen if the connection is lost/dropped but then the whole
// `Connection` instance will go away soon anyway and hence this
// strange state along with it.
break;
},
}
}
}
.instrument(info_span!("{}", lost_task_name)),
)
} else {
None
};
let status = match reply {
RequestNameReply::InQueue => {
let weak_conn = WeakConnection::from(self);
let well_known_name = well_known_name.to_owned();
let task_name = format!("monitor name {well_known_name} acquired");
let task = self.executor().spawn(
async move {
loop {
let signal = acquired_stream.next().await;
let inner = match weak_conn.upgrade() {
Some(conn) => conn.inner.clone(),
None => break,
};
match signal {
Some(signal) => match signal.args() {
Ok(args) if args.name == well_known_name => {
let mut names = inner.registered_names.lock().await;
if let Some(status) = names.get_mut(&well_known_name) {
let task =
name_lost_fut.map(|fut| inner.executor.spawn(fut, &lost_task_name));
*status = NameStatus::Owner(task);
break;
}
// else the name was released in the meantime. :shrug:
},
Ok(_) => (),
Err(e) => warn!("Failed to parse `NameAcquired` signal: {}", e),
},
None => {
trace!("`NameAcquired` signal stream closed");
// See comment above for similar state in case of `NameLost`
// stream.
break;
},
}
}
}
.instrument(info_span!("{}", task_name)),
&task_name,
);
NameStatus::Queued(task)
},
RequestNameReply::PrimaryOwner | RequestNameReply::AlreadyOwner => {
let task = name_lost_fut.map(|fut| self.executor().spawn(fut, &lost_task_name));
NameStatus::Owner(task)
},
RequestNameReply::Exists => return Err(Error::NameTaken),
};
names.insert(well_known_name.to_owned(), status);
Ok(reply)
}
/// Deregister a previously registered well-known name for this service on the bus.
///
/// Use this method to deregister a well-known name, registered through
/// [`Connection::request_name`].
///
/// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
/// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
/// was not previously registered or already deregistered.
pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
where
W: TryInto<WellKnownName<'w>>,
W::Error: Into<Error>,
{
let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?;
let mut names = self.inner.registered_names.lock().await;
// FIXME: Should be possible to avoid cloning/allocation here
if names.remove(&well_known_name.to_owned()).is_none() {
return Ok(false);
};
if !self.is_bus() {
return Ok(true);
}
fdo::DBusProxy::builder(self)
.cache_properties(CacheProperties::No)
.build()
.await?
.release_name(well_known_name)
.await
.map(|_| true)
.map_err(Into::into)
}
/// Check if `self` is a connection to a message bus.
///
/// This will return `false` for p2p connections. When the `p2p` feature is disabled, this will
/// always return `true`.
pub fn is_bus(&self) -> bool {
#[cfg(feature = "p2p")]
{
self.inner.bus_conn
}
#[cfg(not(feature = "p2p"))]
{
true
}
}
/// The unique name of the connection, if set/applicable.
///
/// The unique name is assigned by the message bus or set manually using
/// [`Connection::set_unique_name`].
pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
self.inner.unique_name.get()
}
/// Set the unique name of the connection (if not already set).
///
/// This is mainly provided for bus implementations. All other users should not need to use this
/// method. Hence why this method is only available when the `bus-impl` feature is enabled.
///
/// # Panics
///
/// This method panics if the unique name is already set. It will always panic if the connection
/// is to a message bus as it's the bus that assigns peers their unique names.
#[cfg(feature = "bus-impl")]
pub fn set_unique_name<U>(&self, unique_name: U) -> Result<()>
where
U: TryInto<OwnedUniqueName>,
U::Error: Into<Error>,
{
let name = unique_name.try_into().map_err(Into::into)?;
self.set_unique_name_(name);
Ok(())
}
/// The capacity of the main (unfiltered) queue.
pub fn max_queued(&self) -> usize {
self.inner.msg_receiver.capacity()
}
/// Set the capacity of the main (unfiltered) queue.
pub fn set_max_queued(&mut self, max: usize) {
self.inner.msg_receiver.clone().set_capacity(max);
}
/// The server's GUID.
pub fn server_guid(&self) -> &OwnedGuid {
&self.inner.server_guid
}
/// The underlying executor.
///
/// When a connection is built with internal_executor set to false, zbus will not spawn a
/// thread to run the executor. You're responsible to continuously [tick the executor][tte].
/// Failure to do so will result in hangs.
///
/// # Examples
///
/// Here is how one would typically run the zbus executor through tokio's scheduler:
///
/// ```no_run
/// use tokio::task::spawn;
/// use zbus::connection::Builder;
///
/// # struct SomeIface;
/// #
/// # #[zbus::interface]
/// # impl SomeIface {
/// # }
/// #
/// #[tokio::main]
/// async fn main() {
/// let conn = Builder::session()
/// .unwrap()
/// .internal_executor(false)
/// # // This is only for testing a deadlock that used to happen with this combo.
/// # .serve_at("/some/iface", SomeIface)
/// # .unwrap()
/// .build()
/// .await
/// .unwrap();
/// {
/// let conn = conn.clone();
/// spawn(async move {
/// loop {
/// conn.executor().tick().await;
/// }
/// });
/// }
///
/// // All your other async code goes here.
/// }
/// ```
///
/// **Note**: zbus 2.1 added support for tight integration with tokio. This means, if you use
/// zbus with tokio, you do not need to worry about this at all. All you need to do is enable
/// `tokio` feature. You should also disable the (default) `async-io` feature in your
/// `Cargo.toml` to avoid unused dependencies. Also note that **prior** to zbus 3.0, disabling
/// `async-io` was required to enable tight `tokio` integration.
///
/// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick
pub fn executor(&self) -> &Executor<'static> {
&self.inner.executor
}
/// Get a reference to the associated [`ObjectServer`].
///
/// The `ObjectServer` is created on-demand.
///
/// **Note**: Once the `ObjectServer` is created, it will be replying to all method calls
/// received on `self`. If you want to manually reply to method calls, do not use this
/// method (or any of the `ObjectServer` related API).
pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
// FIXME: Maybe it makes sense after all to implement Deref<Target= ObjectServer> for
// crate::ObjectServer instead of this wrapper?
struct Wrapper<'a>(&'a blocking::ObjectServer);
impl Deref for Wrapper<'_> {
type Target = ObjectServer;
fn deref(&self) -> &Self::Target {
self.0.inner()
}
}
Wrapper(self.sync_object_server(true, None))
}
pub(crate) fn sync_object_server(&self, start: bool, started_event: Option<Event>) -> &blocking::ObjectServer {
self.inner
.object_server
.get_or_init(move || self.setup_object_server(start, started_event))
}
fn setup_object_server(&self, start: bool, started_event: Option<Event>) -> blocking::ObjectServer {
if start {
self.start_object_server(started_event);
}
blocking::ObjectServer::new(self)
}
#[instrument(skip(self))]
pub(crate) fn start_object_server(&self, started_event: Option<Event>) {
self.inner.object_server_dispatch_task.get_or_init(|| {
trace!("starting ObjectServer task");
let weak_conn = WeakConnection::from(self);
let obj_server_task_name = "ObjectServer task";
self.inner.executor.spawn(
async move {
let mut stream = match weak_conn.upgrade() {
Some(conn) => {
let mut builder = MatchRule::builder().msg_type(Type::MethodCall);
if let Some(unique_name) = conn.unique_name() {
builder = builder.destination(&**unique_name).expect("unique name");
}
let rule = builder.build();
match conn.add_match(rule.into(), None).await {
Ok(stream) => stream,
Err(e) => {
// Very unlikely but can happen I guess if connection is closed.
debug!("Failed to create message stream: {}", e);
return;
},
}
},
None => {
trace!("Connection is gone, stopping associated object server task");
return;
},
};
if let Some(started_event) = started_event {
started_event.notify(1);
}
trace!("waiting for incoming method call messages..");
while let Some(msg) = stream.next().await.and_then(|m| {
if let Err(e) = &m {
debug!("Error while reading from object server stream: {:?}", e);
}
m.ok()
}) {
if let Some(conn) = weak_conn.upgrade() {
let hdr = msg.header();
// If we're connected to a bus, skip the destination check as the
// server will only send us method calls destined to us.
if !conn.is_bus() {
match hdr.destination() {
// Unique name is already checked by the match rule.
Some(BusName::Unique(_)) | None => (),
Some(BusName::WellKnown(dest)) => {
let names = conn.inner.registered_names.lock().await;
// destination doesn't matter if no name has been registered
// (probably means the name is registered through external
// means).
if !names.is_empty() && !names.contains_key(dest) {
trace!("Got a method call for a different destination: {}", dest);
continue;
}
},
}
}
let server = conn.object_server();
if let Err(e) = server.dispatch_call(&msg, &hdr).await {
debug!("Error dispatching message. Message: {:?}, error: {:?}", msg, e);
}
} else {
// If connection is completely gone, no reason to keep running the task
// anymore.
trace!("Connection is gone, stopping associated object server task");
break;
}
}
}
.instrument(info_span!("{}", obj_server_task_name)),
obj_server_task_name,
)
});
}
pub(crate) async fn add_match(
&self,
rule: OwnedMatchRule,
max_queued: Option<usize>,
) -> Result<Receiver<Result<Message>>> {
use std::collections::hash_map::Entry;
if self.inner.msg_senders.lock().await.is_empty() {
// This only happens if socket reader task has errored out.
return Err(Error::InputOutput(Arc::new(io::Error::new(
io::ErrorKind::BrokenPipe,
"Socket reader task has errored out",
))));
}
let mut subscriptions = self.inner.subscriptions.lock().await;
let msg_type = rule.msg_type().unwrap_or(Type::Signal);
match subscriptions.entry(rule.clone()) {
Entry::Vacant(e) => {
let max_queued = max_queued.unwrap_or(DEFAULT_MAX_QUEUED);
let (sender, mut receiver) = broadcast(max_queued);
receiver.set_await_active(false);
if self.is_bus() && msg_type == Type::Signal {
fdo::DBusProxy::builder(self)
.cache_properties(CacheProperties::No)
.build()
.await?
.add_match_rule(e.key().inner().clone())
.await?;
}
e.insert((1, receiver.clone().deactivate()));
self.inner.msg_senders.lock().await.insert(Some(rule), sender);
Ok(receiver)
},
Entry::Occupied(mut e) => {
let (num_subscriptions, receiver) = e.get_mut();
*num_subscriptions += 1;
if let Some(max_queued) = max_queued {
if max_queued > receiver.capacity() {
receiver.set_capacity(max_queued);
}
}
Ok(receiver.activate_cloned())
},
}
}
pub(crate) async fn remove_match(&self, rule: OwnedMatchRule) -> Result<bool> {
use std::collections::hash_map::Entry;
let mut subscriptions = self.inner.subscriptions.lock().await;
// TODO when it becomes stable, use HashMap::raw_entry and only require expr: &str
// (both here and in add_match)
let msg_type = rule.msg_type().unwrap_or(Type::Signal);
match subscriptions.entry(rule) {
Entry::Vacant(_) => Ok(false),
Entry::Occupied(mut e) => {
let rule = e.key().inner().clone();
e.get_mut().0 -= 1;
if e.get().0 == 0 {
if self.is_bus() && msg_type == Type::Signal {
fdo::DBusProxy::builder(self)
.cache_properties(CacheProperties::No)
.build()
.await?
.remove_match_rule(rule.clone())
.await?;
}
e.remove();
self.inner.msg_senders.lock().await.remove(&Some(rule.into()));
}
Ok(true)
},
}
}
pub(crate) fn queue_remove_match(&self, rule: OwnedMatchRule) {
let conn = self.clone();
let task_name = format!("Remove match `{}`", *rule);
let remove_match = async move { conn.remove_match(rule).await }.instrument(trace_span!("{}", task_name));
self.inner.executor.spawn(remove_match, &task_name).detach();
}
pub(crate) async fn new(
auth: Authenticated,
#[allow(unused)] bus_connection: bool,
executor: Executor<'static>,
) -> Result<Self> {
#[cfg(unix)]
let cap_unix_fd = auth.cap_unix_fd;
macro_rules! create_msg_broadcast_channel {
($size:expr) => {{
let (msg_sender, msg_receiver) = broadcast($size);
let mut msg_receiver = msg_receiver.deactivate();
msg_receiver.set_await_active(false);
(msg_sender, msg_receiver)
}};
}
// The unfiltered message channel.
let (msg_sender, msg_receiver) = create_msg_broadcast_channel!(DEFAULT_MAX_QUEUED);
let mut msg_senders = HashMap::new();
msg_senders.insert(None, msg_sender);
// The special method return & error channel.
let (method_return_sender, method_return_receiver) =
create_msg_broadcast_channel!(DEFAULT_MAX_METHOD_RETURN_QUEUED);
let rule = MatchRule::builder().msg_type(Type::MethodReturn).build().into();
msg_senders.insert(Some(rule), method_return_sender.clone());
let rule = MatchRule::builder().msg_type(Type::Error).build().into();
msg_senders.insert(Some(rule), method_return_sender);
let msg_senders = Arc::new(Mutex::new(msg_senders));
let subscriptions = Mutex::new(HashMap::new());
let connection = Self {
inner: Arc::new(ConnectionInner {
activity_event: Arc::new(Event::new()),
socket_write: Mutex::new(auth.socket_write),
server_guid: auth.server_guid,
#[cfg(unix)]
cap_unix_fd,
#[cfg(feature = "p2p")]
bus_conn: bus_connection,
unique_name: OnceLock::new(),
subscriptions,
object_server: OnceLock::new(),
object_server_dispatch_task: OnceLock::new(),
executor,
socket_reader_task: OnceLock::new(),
msg_senders,
msg_receiver,
method_return_receiver,
registered_names: Mutex::new(HashMap::new()),
drop_event: Event::new(),
}),
};
if let Some(unique_name) = auth.unique_name {
connection.set_unique_name_(unique_name);
}
Ok(connection)
}
/// Create a `Connection` to the session/user message bus.
pub async fn session() -> Result<Self> {
Builder::session()?.build().await
}
/// Create a `Connection` to the system-wide message bus.
pub async fn system() -> Result<Self> {
Builder::system()?.build().await
}
/// Return a listener, notified on various connection activity.
///
/// This function is meant for the caller to implement idle or timeout on inactivity.
pub fn monitor_activity(&self) -> EventListener {
self.inner.activity_event.listen()
}
/// Return the peer credentials.
///
/// The fields are populated on the best effort basis. Some or all fields may not even make
/// sense for certain sockets or on certain platforms and hence will be set to `None`.
///
/// # Caveats
///
/// Currently `unix_group_ids` and `linux_security_label` fields are not populated.
pub async fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
self.inner.socket_write.lock().await.peer_credentials().await
}
/// Close the connection.
///
/// After this call, all reading and writing operations will fail.
pub async fn close(self) -> Result<()> {
self.inner.activity_event.notify(usize::MAX);
self.inner.socket_write.lock().await.close().await.map_err(Into::into)
}
/// Gracefully close the connection, waiting for all other references to be dropped.
///
/// This will not disrupt any incoming or outgoing method calls, and will await their
/// completion.
///
/// # Caveats
///
/// * This will not prevent new incoming messages from keeping the connection alive (and
/// indefinitely delaying this method's completion).
///
/// * The shutdown will not complete until the underlying connection is fully dropped, so beware
/// of deadlocks if you are holding any other clones of this `Connection`.
///
/// # Example
///
/// ```no_run
/// # use std::error::Error;
/// # use zbus::connection::Builder;
/// # use zbus::interface;
/// #
/// # struct MyInterface;
/// #
/// # #[interface(name = "foo.bar.baz")]
/// # impl MyInterface {
/// # async fn do_thing(&self) {}
/// # }
/// #
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn Error>> {
/// let conn = Builder::session()?
/// .name("foo.bar.baz")?
/// .serve_at("/foo/bar/baz", MyInterface)?
/// .build()
/// .await?;
///
/// # let some_exit_condition = std::future::ready(());
/// some_exit_condition.await;
///
/// conn.graceful_shutdown().await;
/// #
/// # Ok(())
/// # }
/// ```
pub async fn graceful_shutdown(self) {
let listener = self.inner.drop_event.listen();
drop(self);
listener.await;
}
pub(crate) fn init_socket_reader(
&self,
socket_read: Box<dyn socket::ReadHalf>,
already_read: Vec<u8>,
#[cfg(unix)] already_received_fds: Vec<std::os::fd::OwnedFd>,
) {
let inner = &self.inner;
inner
.socket_reader_task
.set(
SocketReader::new(
socket_read,
inner.msg_senders.clone(),
already_read,
#[cfg(unix)]
already_received_fds,
inner.activity_event.clone(),
)
.spawn(&inner.executor),
)
.expect("Attempted to set `socket_reader_task` twice");
}
fn set_unique_name_(&self, name: OwnedUniqueName) {
self.inner
.unique_name
.set(name)
// programmer (probably our) error if this fails.
.expect("unique name already set");
}
}
impl From<crate::blocking::Connection> for Connection {
fn from(conn: crate::blocking::Connection) -> Self {
conn.into_inner()
}
}
// Internal API that allows keeping a weak connection ref around.
#[derive(Debug)]
pub(crate) struct WeakConnection {
inner: Weak<ConnectionInner>,
}
impl WeakConnection {
/// Upgrade to a Connection.
pub fn upgrade(&self) -> Option<Connection> {
self.inner.upgrade().map(|inner| Connection { inner })
}
}
impl From<&Connection> for WeakConnection {
fn from(conn: &Connection) -> Self {
Self {
inner: Arc::downgrade(&conn.inner),
}
}
}
#[derive(Debug)]
enum NameStatus {
// The task waits for name lost signal if owner allows replacement.
Owner(#[allow(unused)] Option<Task<()>>),
// The task waits for name acquisition signal.
Queued(#[allow(unused)] Task<()>),
}
static SERIAL_NUM_SEMAPHORE: Semaphore = Semaphore::new(1);
// Make message creation and sending an atomic operation, using an async
// semaphore if flatpak portal is detected to workaround an xdg-dbus-proxy issue:
//
// https://github.com/flatpak/xdg-dbus-proxy/issues/46
async fn acquire_serial_num_semaphore() -> Option<SemaphorePermit<'static>> {
if is_flatpak() {
Some(SERIAL_NUM_SEMAPHORE.acquire().await)
} else {
None
}
}
#[cfg(test)]
mod tests {
use std::pin::pin;
use std::time::Duration;
use ntest::timeout;
use test_log::test;
use super::*;
use crate::fdo::DBusProxy;
#[cfg(windows)]
#[test]
fn connect_autolaunch_session_bus() {
let addr = crate::win32::autolaunch_bus_address().expect("Unable to get session bus address");
crate::block_on(async { addr.connect().await }).expect("Unable to connect to session bus");
}
#[cfg(target_os = "macos")]
#[test]
#[ignore = "fails in ci"]
fn connect_launchd_session_bus() {
use crate::address::transport::Launchd;
use crate::address::{
Address,
Transport,
};
crate::block_on(async {
let addr = Address::from(Transport::Launchd(Launchd::new("DBUS_LAUNCHD_SESSION_BUS_SOCKET")));
addr.connect().await
})
.expect("Unable to connect to session bus");
}
#[ignore = "fails in ci"]
#[test]
#[timeout(15000)]
fn disconnect_on_drop() {
// Reproducer for https://github.com/dbus2/zbus/issues/308 where setting up the
// objectserver would cause the connection to not disconnect on drop.
crate::utils::block_on(test_disconnect_on_drop());
}
async fn test_disconnect_on_drop() {
#[derive(Default)]
struct MyInterface {}
#[crate::interface(name = "dev.peelz.FooBar.Baz")]
impl MyInterface {
#[allow(clippy::unused_self)]
fn do_thing(&self) {}
}
let name = "dev.peelz.foobar";
let connection = Builder::session()
.unwrap()
.name(name)
.unwrap()
.serve_at("/dev/peelz/FooBar", MyInterface::default())
.unwrap()
.build()
.await
.unwrap();
let connection2 = Connection::session().await.unwrap();
let dbus = DBusProxy::new(&connection2).await.unwrap();
let mut stream = dbus
.receive_name_owner_changed_with_args(&[(0, name), (2, "")])
.await
.unwrap();
drop(connection);
// If the connection is not dropped, this will hang forever.
stream.next().await.unwrap();
// Let's still make sure the name is gone.
let name_has_owner = dbus.name_has_owner(name.try_into().unwrap()).await.unwrap();
assert!(!name_has_owner);
}
#[ignore = "fails in ci"]
#[tokio::test(start_paused = true)]
#[timeout(15000)]
async fn test_graceful_shutdown() {
// If we have a second reference, it should wait until we drop it.
let connection = Connection::session().await.unwrap();
let clone = connection.clone();
let mut shutdown = pin!(connection.graceful_shutdown());
// Due to start_paused above, tokio will auto-advance time once the runtime is idle.
// See https://docs.rs/tokio/latest/tokio/time/fn.pause.html.
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(u64::MAX)) => {},
_ = &mut shutdown => {
panic!("Graceful shutdown unexpectedly completed");
}
}
drop(clone);
shutdown.await;
// An outstanding method call should also be sufficient to keep the connection alive.
struct GracefulInterface {
method_called: Event,
wait_before_return: Option<EventListener>,
announce_done: Event,
}
#[crate::interface(name = "dev.peelz.TestGracefulShutdown")]
impl GracefulInterface {
async fn do_thing(&mut self) {
self.method_called.notify(1);
if let Some(listener) = self.wait_before_return.take() {
listener.await;
}
self.announce_done.notify(1);
}
}
let method_called = Event::new();
let method_called_listener = method_called.listen();
let trigger_return = Event::new();
let wait_before_return = Some(trigger_return.listen());
let announce_done = Event::new();
let done_listener = announce_done.listen();
let interface = GracefulInterface {
method_called,
wait_before_return,
announce_done,
};
let name = "dev.peelz.TestGracefulShutdown";
let obj = "/dev/peelz/TestGracefulShutdown";
let connection = Builder::session()
.unwrap()
.name(name)
.unwrap()
.serve_at(obj, interface)
.unwrap()
.build()
.await
.unwrap();
// Call the method from another connection - it won't return until we tell it to.
let client_conn = Connection::session().await.unwrap();
tokio::spawn(async move {
client_conn
.call_method(Some(name), obj, Some(name), "DoThing", &())
.await
.unwrap();
});
// Avoid races - make sure we've actually received the method call before we drop our
// Connection handle.
method_called_listener.await;
let mut shutdown = pin!(connection.graceful_shutdown());
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(u64::MAX)) => {},
_ = &mut shutdown => {
// While that method call is outstanding, graceful shutdown should not complete.
panic!("Graceful shutdown unexpectedly completed");
}
}
// If we let the call complete, then the shutdown should complete eventually.
trigger_return.notify(1);
shutdown.await;
// The method call should have been allowed to finish properly.
done_listener.await;
}
}
#[cfg(feature = "p2p")]
#[cfg(test)]
mod p2p_tests {
use futures_util::stream::TryStreamExt;
use ntest::timeout;
use test_log::test;
use zvariant::{
Endian,
NATIVE_ENDIAN,
};
use super::*;
use crate::{
AuthMechanism,
Guid,
};
// Same numbered client and server are already paired up.
async fn test_p2p(
server1: Connection,
client1: Connection,
server2: Connection,
client2: Connection,
) -> Result<()> {
let forward1 = {
let stream = MessageStream::from(server1.clone());
let sink = client2.clone();
stream.try_for_each(move |msg| {
let sink = sink.clone();
async move { sink.send(&msg).await }
})
};
let forward2 = {
let stream = MessageStream::from(client2.clone());
let sink = server1.clone();
stream.try_for_each(move |msg| {
let sink = sink.clone();
async move { sink.send(&msg).await }
})
};
let _forward_task = client1.executor().spawn(
async move { futures_util::try_join!(forward1, forward2) },
"forward_task",
);
let server_ready = Event::new();
let server_ready_listener = server_ready.listen();
let client_done = Event::new();
let client_done_listener = client_done.listen();
let server_future = async move {
let mut stream = MessageStream::from(&server2);
server_ready.notify(1);
let method = loop {
let m = stream.try_next().await?.unwrap();
if m.to_string() == "Method call Test" {
assert_eq!(m.body().deserialize::<u64>().unwrap(), 64);
break m;
}
};
// Send another message first to check the queueing function on client side.
server2
.emit_signal(None::<()>, "/", "org.zbus.p2p", "ASignalForYou", &())
.await?;
server2.reply(&method, &("yay")).await?;
client_done_listener.await;
Ok(())
};
let client_future = async move {
let mut stream = MessageStream::from(&client1);
server_ready_listener.await;
// We want to set non-native endian to ensure that:
// 1. the message is actually encoded with the specified endian.
// 2. the server side is able to decode it and replies in the same encoding.
let endian = match NATIVE_ENDIAN {
Endian::Little => Endian::Big,
Endian::Big => Endian::Little,
};
let method = Message::method("/", "Test")?
.interface("org.zbus.p2p")?
.endian(endian)
.build(&64u64)?;
client1.send(&method).await?;
// Check we didn't miss the signal that was sent during the call.
let m = stream.try_next().await?.unwrap();
client_done.notify(1);
assert_eq!(m.to_string(), "Signal ASignalForYou");
let reply = stream.try_next().await?.unwrap();
assert_eq!(reply.to_string(), "Method return");
// Check if the reply was in the non-native endian.
assert_eq!(Endian::from(reply.primary_header().endian_sig()), endian);
reply.body().deserialize::<String>()
};
let (val, _) = futures_util::try_join!(client_future, server_future,)?;
assert_eq!(val, "yay");
Ok(())
}
#[test]
#[timeout(15000)]
fn tcp_p2p() {
crate::utils::block_on(test_tcp_p2p()).unwrap();
}
async fn test_tcp_p2p() -> Result<()> {
let (server1, client1) = tcp_p2p_pipe().await?;
let (server2, client2) = tcp_p2p_pipe().await?;
test_p2p(server1, client1, server2, client2).await
}
async fn tcp_p2p_pipe() -> Result<(Connection, Connection)> {
let guid = Guid::generate();
#[cfg(not(feature = "tokio"))]
let (server_conn_builder, client_conn_builder) = {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let p1 = std::net::TcpStream::connect(addr).unwrap();
let p0 = listener.incoming().next().unwrap().unwrap();
(
Builder::tcp_stream(p0)
.server(guid)
.unwrap()
.p2p()
.auth_mechanism(AuthMechanism::Anonymous),
Builder::tcp_stream(p1).p2p(),
)
};
#[cfg(feature = "tokio")]
let (server_conn_builder, client_conn_builder) = {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let p1 = tokio::net::TcpStream::connect(addr).await.unwrap();
let p0 = listener.accept().await.unwrap().0;
(
Builder::tcp_stream(p0)
.server(guid)
.unwrap()
.p2p()
.auth_mechanism(AuthMechanism::Anonymous),
Builder::tcp_stream(p1).p2p(),
)
};
futures_util::try_join!(server_conn_builder.build(), client_conn_builder.build())
}
#[cfg(unix)]
#[test]
#[timeout(15000)]
fn unix_p2p() {
crate::utils::block_on(test_unix_p2p()).unwrap();
}
#[cfg(unix)]
async fn test_unix_p2p() -> Result<()> {
let (server1, client1) = unix_p2p_pipe().await?;
let (server2, client2) = unix_p2p_pipe().await?;
test_p2p(server1, client1, server2, client2).await
}
#[cfg(unix)]
async fn unix_p2p_pipe() -> Result<(Connection, Connection)> {
#[cfg(not(feature = "tokio"))]
use std::os::unix::net::UnixStream;
#[cfg(feature = "tokio")]
use tokio::net::UnixStream;
#[cfg(all(windows, not(feature = "tokio")))]
use uds_windows::UnixStream;
let guid = Guid::generate();
let (p0, p1) = UnixStream::pair().unwrap();
futures_util::try_join!(
Builder::unix_stream(p1).p2p().build(),
Builder::unix_stream(p0).server(guid).unwrap().p2p().build(),
)
}
// Compile-test only since we don't have a VM setup to run this with/in.
#[cfg(any(all(feature = "vsock", not(feature = "tokio")), feature = "tokio-vsock"))]
#[test]
#[timeout(15000)]
#[ignore]
fn vsock_p2p() {
crate::utils::block_on(test_vsock_p2p()).unwrap();
}
#[cfg(any(all(feature = "vsock", not(feature = "tokio")), feature = "tokio-vsock"))]
async fn test_vsock_p2p() -> Result<()> {
let (server1, client1) = vsock_p2p_pipe().await?;
let (server2, client2) = vsock_p2p_pipe().await?;
test_p2p(server1, client1, server2, client2).await
}
#[cfg(all(feature = "vsock", not(feature = "tokio")))]
async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
let guid = Guid::generate();
let listener = vsock::VsockListener::bind_with_cid_port(vsock::VMADDR_CID_ANY, 42).unwrap();
let addr = listener.local_addr().unwrap();
let client = vsock::VsockStream::connect(&addr).unwrap();
let server = listener.incoming().next().unwrap().unwrap();
futures_util::try_join!(
Builder::vsock_stream(server)
.server(guid)
.unwrap()
.p2p()
.auth_mechanism(AuthMechanism::Anonymous)
.build(),
Builder::vsock_stream(client).p2p().build(),
)
}
#[cfg(feature = "tokio-vsock")]
async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
let guid = Guid::generate();
let listener = tokio_vsock::VsockListener::bind(2, 42).unwrap();
let client = tokio_vsock::VsockStream::connect(3, 42).await.unwrap();
let server = listener.incoming().next().await.unwrap().unwrap();
futures_util::try_join!(
Builder::vsock_stream(server)
.server(guid)
.unwrap()
.p2p()
.auth_mechanism(AuthMechanism::Anonymous)
.build(),
Builder::vsock_stream(client).p2p().build(),
)
}
#[cfg(any(unix, not(feature = "tokio")))]
#[test]
#[timeout(15000)]
fn unix_p2p_cookie_auth() {
use std::fs::{
create_dir_all,
remove_file,
write,
};
use std::time::{
SystemTime as Time,
UNIX_EPOCH,
};
#[cfg(unix)]
use std::{
fs::{
Permissions,
set_permissions,
},
os::unix::fs::PermissionsExt,
};
use xdg_home::home_dir;
use crate::utils::block_on;
let cookie_context = "zbus-test-cookie-context";
let cookie_id = 123456789;
let cookie = hex::encode(b"our cookie");
// Ensure cookie directory exists.
let cookie_dir = home_dir().unwrap().join(".dbus-keyrings");
create_dir_all(&cookie_dir).unwrap();
#[cfg(unix)]
set_permissions(&cookie_dir, Permissions::from_mode(0o700)).unwrap();
// Create a cookie file.
let cookie_file = cookie_dir.join(cookie_context);
let ts = Time::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let cookie_entry = format!("{cookie_id} {ts} {cookie}");
write(&cookie_file, cookie_entry).unwrap();
// Explicit cookie ID.
let res1 = block_on(test_unix_p2p_cookie_auth(cookie_context, Some(cookie_id)));
// Implicit cookie ID (first one should be picked).
let res2 = block_on(test_unix_p2p_cookie_auth(cookie_context, None));
// Remove the cookie file.
remove_file(&cookie_file).unwrap();
res1.unwrap();
res2.unwrap();
}
#[cfg(any(unix, not(feature = "tokio")))]
async fn test_unix_p2p_cookie_auth(cookie_context: &'static str, cookie_id: Option<usize>) -> Result<()> {
#[cfg(all(unix, not(feature = "tokio")))]
use std::os::unix::net::UnixStream;
#[cfg(all(unix, feature = "tokio"))]
use tokio::net::UnixStream;
#[cfg(all(windows, not(feature = "tokio")))]
use uds_windows::UnixStream;
let guid = Guid::generate();
let (p0, p1) = UnixStream::pair().unwrap();
let mut server_builder = Builder::unix_stream(p0)
.server(guid)
.unwrap()
.p2p()
.auth_mechanism(AuthMechanism::Cookie)
.cookie_context(cookie_context)
.unwrap();
if let Some(cookie_id) = cookie_id {
server_builder = server_builder.cookie_id(cookie_id);
}
futures_util::try_join!(Builder::unix_stream(p1).p2p().build(), server_builder.build(),).map(|_| ())
}
#[test]
#[timeout(15000)]
fn channel_pair() {
crate::utils::block_on(test_channel_pair()).unwrap();
}
async fn test_channel_pair() -> Result<()> {
let (server1, client1) = create_channel_pair().await;
let (server2, client2) = create_channel_pair().await;
test_p2p(server1, client1, server2, client2).await
}
async fn create_channel_pair() -> (Connection, Connection) {
let (a, b) = socket::Channel::pair();
let guid = crate::Guid::generate();
let conn1 = Builder::authenticated_socket(a, guid.clone())
.unwrap()
.p2p()
.build()
.await
.unwrap();
let conn2 = Builder::authenticated_socket(b, guid)
.unwrap()
.p2p()
.build()
.await
.unwrap();
(conn1, conn2)
}
}