in backends/trtllm/src/looper.rs [65:168]
fn executor_status_looper(
max_inflight_requests: usize,
tokenizer: Tokenizer,
mut backend: UniquePtr<TensorRtLlmBackendImpl>,
mut backlog: UnboundedReceiver<GenerationContext>,
) {
// Track the tuple (request_id, stream) for each request
let mut in_flights =
HashMap::<u64, GenerationContext>::with_capacity(max_inflight_requests * 2);
'scheduler: loop {
// Is there any request pending to be scheduled?
let awaiting_requests = backlog.len();
for _ in 0..awaiting_requests {
// Retrieve all the requests
if let Some(ctx) = backlog.blocking_recv() {
// Submit all the request to the executor and move the context to the in-flight tracker
let request = &ctx.request;
let generation_params = &request.parameters;
let stopping_params = &request.stopping_parameters;
let input_ids = request.input_ids.as_deref();
// Submit to the TensorRT-LLM executor for scheduling
match backend.pin_mut().submit(
&input_ids.unwrap(), // This is checked beforehand in validate()
stopping_params.max_new_tokens,
generation_params.top_k,
generation_params.top_p,
generation_params.temperature,
generation_params.repetition_penalty,
generation_params.frequency_penalty,
generation_params.seed,
) {
Ok(request_id) => {
// Insert the context linked to the generated request id in the tracker
debug!("[in-flight] Added {}", request_id);
in_flights.insert(request_id, ctx);
}
Err(e) => {
// Return to the caller
let what = e.to_string();
error!(error = what.as_str(), "Failed to schedule request");
let err = Err(InferError::Overloaded(TryAcquireError::NoPermits));
if let Err(_) = ctx.streamer.send(err) {
error!("Failed to send back error to the client");
}
}
};
} else {
break 'scheduler;
}
}
if backend.num_tokens_ready() > 0 {
let mut backend = backend.pin_mut();
match backend.as_mut().pull_tokens() {
Ok(responses) => {
// Iterate through all the decoded token
for step in responses.deref() {
if let Some(ctx) = in_flights.get_mut(&step.request_id) {
// Update the starting timestamp if not set
// This value might not be the actual real starting time of the request
// on the executor side - Need to expose more info from the executor to
// retrieve this value
// TODO : Expose actual real starting time for a request on FFI layer
if ctx.start.is_none() {
ctx.start = Some(Instant::now());
}
// Try to map the generation step to a DecodedToken
let response = match DecodedToken::try_from(step) {
Ok(decoded_token) => {
post_process_decoded_token(&tokenizer, ctx, decoded_token)
}
Err(err) => Err(err),
};
// Attempt to send back the response to the client
if let Err(_) = ctx.streamer.send(response) {
// Client has dropped, remove from tracked requests
debug!(
"Client dropped - removing request {} from tracked requests",
step.request_id
);
backend.as_mut().cancel(step.request_id);
let _ = in_flights.remove(&step.request_id);
}
} else {
warn!("Untracked request {}", step.request_id,);
}
}
}
Err(ref err) => {
error!("Failed to get responses from the executor: {}.", err.what());
break 'scheduler;
}
}
}
// Hint the CPU we are spin-locking
hint::spin_loop();
}
}