fn build_stream()

in dc/s2n-quic-dc/src/stream/endpoint.rs [157:370]


fn build_stream<Env, P>(
    now: Timestamp,
    env: &Env,
    peer: P,
    stream_id: packet::stream::Id,
    crypto: secret::map::Bidirectional,
    map: &Map,
    parameters: dc::ApplicationParams,
    endpoint_type: endpoint::Type,
    subscriber: Env::Subscriber,
    subscriber_ctx: <Env::Subscriber as event::Subscriber>::ConnectionContext,
) -> Result<application::Builder<Env::Subscriber>>
where
    Env: Environment,
    P: Peer<Env>,
{
    let features = peer.features();

    let (sockets, recv_buffer) = peer.setup(env)?;
    let source_queue_id = sockets.source_queue_id;

    // construct shared reader state
    let reader = recv::shared::State::new(
        stream_id,
        &parameters,
        features,
        recv_buffer,
        endpoint_type,
        &now,
    );

    let writer = {
        let worker = sockets
            .write_worker
            .map(|socket| (send::state::State::new(stream_id, &parameters), socket));

        let (flow_offset, send_quantum, bandwidth) =
            if let Some((worker, _socket)) = worker.as_ref() {
                let flow_offset = worker.flow_offset();
                let send_quantum = worker.send_quantum_packets();
                let bandwidth = Some(worker.cca.bandwidth());

                (flow_offset, send_quantum, bandwidth)
            } else {
                debug_assert!(
                    features.is_flow_controlled(),
                    "transports without flow control need background workers"
                );

                let flow_offset = VarInt::MAX;
                let send_quantum = 10;
                let bandwidth = None;

                (flow_offset, send_quantum, bandwidth)
            };

        let flow = flow::non_blocking::State::new(flow_offset);

        let path = send::path::Info {
            max_datagram_size: parameters.max_datagram_size(),
            send_quantum,
            ecn: ExplicitCongestionNotification::Ect0,
            next_expected_control_packet: VarInt::ZERO,
        };

        // construct shared writer state
        let state = send::shared::State::new(flow, path, bandwidth);

        (state, worker)
    };

    // construct shared common state between readers/writers
    let common = {
        let application = send::application::state::State {
            is_reliable: stream_id.is_reliable,
        };

        let remote_addr = sockets.remote_addr;

        let fixed = shared::FixedValues {
            remote_ip: UnsafeCell::new(remote_addr.ip()),
            application: UnsafeCell::new(application),
            credentials: UnsafeCell::new(crypto.credentials),
        };

        shared::Common {
            clock: env.clock().clone(),
            gso: env.gso(),
            remote_port: remote_addr.port().into(),
            remote_queue_id: stream_id.queue_id.as_u64().into(),
            local_queue_id: if let Some(id) = source_queue_id {
                id.as_u64()
            } else {
                // use MAX as `None`
                u64::MAX
            }
            .into(),
            last_peer_activity: Default::default(),
            fixed,
            closed_halves: 0u8.into(),
            subscriber: shared::Subscriber {
                subscriber,
                context: subscriber_ctx,
            },
        }
    };

    let crypto = {
        let secret::map::Bidirectional {
            application,
            control,
            credentials: _,
        } = crypto;

        let control = control.map(|c| (c.sealer, c.opener));

        shared::Crypto::new(application.sealer, application.opener, control, map)
    };

    let shared = Arc::new(shared::Shared {
        receiver: reader,
        sender: writer.0,
        common,
        crypto,
    });

    // spawn the read worker
    if let Some(socket) = sockets.read_worker {
        let socket = socket.setup();
        let shared = shared.clone();

        let task = async move {
            let mut reader = recv::worker::Worker::new(socket, shared, endpoint_type);

            let mut prev_waker: Option<core::task::Waker> = None;
            core::future::poll_fn(|cx| {
                // update the waker if needed
                if prev_waker
                    .as_ref()
                    .map_or(true, |prev| !prev.will_wake(cx.waker()))
                {
                    prev_waker = Some(cx.waker().clone());
                    reader.update_waker(cx);
                }

                // drive the reader to completion
                reader.poll(cx)
            })
            .await;
        };

        let span = debug_span!("worker::read");

        if span.is_disabled() {
            env.spawn_reader(task);
        } else {
            env.spawn_reader(task.instrument(span));
        }
    }

    // spawn the write worker
    if let Some((worker, socket)) = writer.1 {
        let (socket, recv_buffer) = socket.setup();
        let shared = shared.clone();

        let task = async move {
            let mut writer = send::worker::Worker::new(
                socket,
                recv_buffer,
                Random::default(),
                shared,
                worker,
                endpoint_type,
            );

            let mut prev_waker: Option<core::task::Waker> = None;
            core::future::poll_fn(|cx| {
                // update the waker if needed
                if prev_waker
                    .as_ref()
                    .map_or(true, |prev| !prev.will_wake(cx.waker()))
                {
                    prev_waker = Some(cx.waker().clone());
                    writer.update_waker(cx);
                }

                // drive the writer to completion
                writer.poll(cx)
            })
            .await;
        };

        let span = debug_span!("worker::write");

        if span.is_disabled() {
            env.spawn_writer(task);
        } else {
            env.spawn_writer(task.instrument(span));
        }
    }

    let read = recv::application::Builder::new(endpoint_type, env.reader_rt());
    let write = send::application::Builder::new(env.writer_rt());

    let stream = application::Builder {
        read,
        write,
        shared,
        sockets: sockets.application,
        queue_time: now,
    };

    Ok(stream)
}