in dc/s2n-quic-dc/src/stream/endpoint.rs [157:370]
fn build_stream<Env, P>(
now: Timestamp,
env: &Env,
peer: P,
stream_id: packet::stream::Id,
crypto: secret::map::Bidirectional,
map: &Map,
parameters: dc::ApplicationParams,
endpoint_type: endpoint::Type,
subscriber: Env::Subscriber,
subscriber_ctx: <Env::Subscriber as event::Subscriber>::ConnectionContext,
) -> Result<application::Builder<Env::Subscriber>>
where
Env: Environment,
P: Peer<Env>,
{
let features = peer.features();
let (sockets, recv_buffer) = peer.setup(env)?;
let source_queue_id = sockets.source_queue_id;
// construct shared reader state
let reader = recv::shared::State::new(
stream_id,
¶meters,
features,
recv_buffer,
endpoint_type,
&now,
);
let writer = {
let worker = sockets
.write_worker
.map(|socket| (send::state::State::new(stream_id, ¶meters), socket));
let (flow_offset, send_quantum, bandwidth) =
if let Some((worker, _socket)) = worker.as_ref() {
let flow_offset = worker.flow_offset();
let send_quantum = worker.send_quantum_packets();
let bandwidth = Some(worker.cca.bandwidth());
(flow_offset, send_quantum, bandwidth)
} else {
debug_assert!(
features.is_flow_controlled(),
"transports without flow control need background workers"
);
let flow_offset = VarInt::MAX;
let send_quantum = 10;
let bandwidth = None;
(flow_offset, send_quantum, bandwidth)
};
let flow = flow::non_blocking::State::new(flow_offset);
let path = send::path::Info {
max_datagram_size: parameters.max_datagram_size(),
send_quantum,
ecn: ExplicitCongestionNotification::Ect0,
next_expected_control_packet: VarInt::ZERO,
};
// construct shared writer state
let state = send::shared::State::new(flow, path, bandwidth);
(state, worker)
};
// construct shared common state between readers/writers
let common = {
let application = send::application::state::State {
is_reliable: stream_id.is_reliable,
};
let remote_addr = sockets.remote_addr;
let fixed = shared::FixedValues {
remote_ip: UnsafeCell::new(remote_addr.ip()),
application: UnsafeCell::new(application),
credentials: UnsafeCell::new(crypto.credentials),
};
shared::Common {
clock: env.clock().clone(),
gso: env.gso(),
remote_port: remote_addr.port().into(),
remote_queue_id: stream_id.queue_id.as_u64().into(),
local_queue_id: if let Some(id) = source_queue_id {
id.as_u64()
} else {
// use MAX as `None`
u64::MAX
}
.into(),
last_peer_activity: Default::default(),
fixed,
closed_halves: 0u8.into(),
subscriber: shared::Subscriber {
subscriber,
context: subscriber_ctx,
},
}
};
let crypto = {
let secret::map::Bidirectional {
application,
control,
credentials: _,
} = crypto;
let control = control.map(|c| (c.sealer, c.opener));
shared::Crypto::new(application.sealer, application.opener, control, map)
};
let shared = Arc::new(shared::Shared {
receiver: reader,
sender: writer.0,
common,
crypto,
});
// spawn the read worker
if let Some(socket) = sockets.read_worker {
let socket = socket.setup();
let shared = shared.clone();
let task = async move {
let mut reader = recv::worker::Worker::new(socket, shared, endpoint_type);
let mut prev_waker: Option<core::task::Waker> = None;
core::future::poll_fn(|cx| {
// update the waker if needed
if prev_waker
.as_ref()
.map_or(true, |prev| !prev.will_wake(cx.waker()))
{
prev_waker = Some(cx.waker().clone());
reader.update_waker(cx);
}
// drive the reader to completion
reader.poll(cx)
})
.await;
};
let span = debug_span!("worker::read");
if span.is_disabled() {
env.spawn_reader(task);
} else {
env.spawn_reader(task.instrument(span));
}
}
// spawn the write worker
if let Some((worker, socket)) = writer.1 {
let (socket, recv_buffer) = socket.setup();
let shared = shared.clone();
let task = async move {
let mut writer = send::worker::Worker::new(
socket,
recv_buffer,
Random::default(),
shared,
worker,
endpoint_type,
);
let mut prev_waker: Option<core::task::Waker> = None;
core::future::poll_fn(|cx| {
// update the waker if needed
if prev_waker
.as_ref()
.map_or(true, |prev| !prev.will_wake(cx.waker()))
{
prev_waker = Some(cx.waker().clone());
writer.update_waker(cx);
}
// drive the writer to completion
writer.poll(cx)
})
.await;
};
let span = debug_span!("worker::write");
if span.is_disabled() {
env.spawn_writer(task);
} else {
env.spawn_writer(task.instrument(span));
}
}
let read = recv::application::Builder::new(endpoint_type, env.reader_rt());
let write = send::application::Builder::new(env.writer_rt());
let stream = application::Builder {
read,
write,
shared,
sockets: sockets.application,
queue_time: now,
};
Ok(stream)
}