fn executor_status_looper()

in backends/trtllm/src/looper.rs [65:168]


fn executor_status_looper(
    max_inflight_requests: usize,
    tokenizer: Tokenizer,
    mut backend: UniquePtr<TensorRtLlmBackendImpl>,
    mut backlog: UnboundedReceiver<GenerationContext>,
) {
    // Track the tuple (request_id, stream) for each request
    let mut in_flights =
        HashMap::<u64, GenerationContext>::with_capacity(max_inflight_requests * 2);

    'scheduler: loop {
        // Is there any request pending to be scheduled?
        let awaiting_requests = backlog.len();
        for _ in 0..awaiting_requests {
            // Retrieve all the requests
            if let Some(ctx) = backlog.blocking_recv() {
                // Submit all the request to the executor and move the context to the in-flight tracker
                let request = &ctx.request;
                let generation_params = &request.parameters;
                let stopping_params = &request.stopping_parameters;
                let input_ids = request.input_ids.as_deref();

                // Submit to the TensorRT-LLM executor for scheduling
                match backend.pin_mut().submit(
                    &input_ids.unwrap(), // This is checked beforehand in validate()
                    stopping_params.max_new_tokens,
                    generation_params.top_k,
                    generation_params.top_p,
                    generation_params.temperature,
                    generation_params.repetition_penalty,
                    generation_params.frequency_penalty,
                    generation_params.seed,
                ) {
                    Ok(request_id) => {
                        // Insert the context linked to the generated request id in the tracker
                        debug!("[in-flight] Added {}", request_id);
                        in_flights.insert(request_id, ctx);
                    }
                    Err(e) => {
                        // Return to the caller
                        let what = e.to_string();
                        error!(error = what.as_str(), "Failed to schedule request");

                        let err = Err(InferError::Overloaded(TryAcquireError::NoPermits));
                        if let Err(_) = ctx.streamer.send(err) {
                            error!("Failed to send back error to the client");
                        }
                    }
                };
            } else {
                break 'scheduler;
            }
        }

        if backend.num_tokens_ready() > 0 {
            let mut backend = backend.pin_mut();
            match backend.as_mut().pull_tokens() {
                Ok(responses) => {
                    // Iterate through all the decoded token
                    for step in responses.deref() {
                        if let Some(ctx) = in_flights.get_mut(&step.request_id) {
                            // Update the starting timestamp if not set
                            // This value might not be the actual real starting time of the request
                            // on the executor side - Need to expose more info from the executor to
                            // retrieve this value
                            // TODO : Expose actual real starting time for a request on FFI layer
                            if ctx.start.is_none() {
                                ctx.start = Some(Instant::now());
                            }

                            // Try to map the generation step to a DecodedToken
                            let response = match DecodedToken::try_from(step) {
                                Ok(decoded_token) => {
                                    post_process_decoded_token(&tokenizer, ctx, decoded_token)
                                }
                                Err(err) => Err(err),
                            };

                            // Attempt to send back the response to the client
                            if let Err(_) = ctx.streamer.send(response) {
                                // Client has dropped, remove from tracked requests
                                debug!(
                                    "Client dropped - removing request {} from tracked requests",
                                    step.request_id
                                );
                                backend.as_mut().cancel(step.request_id);
                                let _ = in_flights.remove(&step.request_id);
                            }
                        } else {
                            warn!("Untracked request {}", step.request_id,);
                        }
                    }
                }
                Err(ref err) => {
                    error!("Failed to get responses from the executor: {}.", err.what());
                    break 'scheduler;
                }
            }
        }

        // Hint the CPU we are spin-locking
        hint::spin_loop();
    }
}