rd-hashd/src/workqueue.rs (142 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//! A dynamically sized worker pool.
//!
//! Workers are created on-demand as work is queued and dropped after idling
//! longer than the specified timeout.
//!
//! Idle worker management is implicitly performed on each queueing. To reap
//! idle workers while there are no new work items being queued, call
//! `Workqueue::keep_books()` periodically.
//!
//! # Examples
//! ```
//! use std::time::Duration;
//! use std::thread::sleep;
//!
//! let mut wq = workqueue::WorkQueue::new(Duration::from_secs(3));
//!
//! wq.queue(|| { println!("hello 1"); Duration::from_secs(1); println!("bye 1"); });
//! wq.queue(|| { println!("hello 2"); Duration::from_secs(1); println!("bye 2"); });
//! wq.queue(|| { println!("hello 3"); Duration::from_secs(1); println!("bye 3"); });
//!
//! while wq.nr_workers() > 0 {
//! wq.keep_books();
//! sleep(Duration::from_secs(1));
//! }
//! ```
use log::{debug, error, trace};
use std::cell::RefCell;
use std::collections::{HashMap, VecDeque};
use std::rc::Rc;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{spawn, JoinHandle};
use std::time::{Duration, Instant};
/// A worker thread's self representation.
struct Worker {
id: usize,
work_rx: Receiver<Box<dyn FnOnce() + Send>>,
ack_tx: Sender<usize>,
err_tx: Sender<usize>,
}
/// A worker seen from the workqueue.
struct WorkerHandle {
work_tx: Sender<Box<dyn FnOnce() + Send>>,
last_active_at: Instant,
join_handle: Option<JoinHandle<()>>,
}
/// A WorkerHandle contains mutable fields and needs to be on two collections -
/// the main map and idle list. They're never accessed concurrently. Wrap them
/// in Rc w/ interior mutability.
type WorkerRef = Rc<RefCell<WorkerHandle>>;
/// A dynamically sized worker pool.
pub struct WorkQueue {
idle_timeout: Duration,
workers: HashMap<usize, WorkerRef>,
idle_workers: VecDeque<WorkerRef>,
ack_tx: Sender<usize>,
ack_rx: Receiver<usize>,
err_tx: Sender<usize>,
err_rx: Receiver<usize>,
}
impl Worker {
pub fn new(
id: usize,
work_rx: Receiver<Box<dyn FnOnce() + Send>>,
ack_tx: Sender<usize>,
err_tx: Sender<usize>,
) -> Self {
Worker {
id,
work_rx,
ack_tx,
err_tx,
}
}
pub fn run(self) {
debug!("worker-{:x}: starting", self.id);
for work in self.work_rx {
trace!("worker-{:x}: executing {:p}", self.id, work);
let work = std::panic::AssertUnwindSafe(work);
let result = std::panic::catch_unwind(|| {
work();
});
if let Err(err) = result {
self.err_tx.send(self.id).unwrap();
std::panic::resume_unwind(err);
}
trace!("worker-{:x}: complete", self.id);
self.ack_tx.send(self.id).unwrap();
}
debug!("worker-{:x}: exiting", self.id);
}
}
impl WorkQueue {
pub fn new(idle_timeout: Duration) -> Self {
let (ack_tx, ack_rx) = channel();
let (err_tx, err_rx) = channel();
WorkQueue {
idle_timeout,
workers: HashMap::new(),
idle_workers: VecDeque::new(),
ack_tx,
ack_rx,
err_tx,
err_rx,
}
}
/// Produce a unique stable id for the worker.
fn wref_id(wref: &WorkerRef) -> usize {
let whptr = wref.as_ptr();
whptr as usize
}
/// Manage idle workers. Implictily called on queueing. Call periodically
/// to trigger idle worker management.
pub fn keep_books(&mut self) {
let now = Instant::now();
// process acks and queue newly idle workers at the front
for id in self.ack_rx.try_iter() {
let wref = self.workers[&id].clone();
wref.borrow_mut().last_active_at = now;
self.idle_workers.push_front(wref);
trace!("worker-{:x}: idle", id);
}
// process errors and panic the thread if any.
for id in self.err_rx.try_iter() {
let wref = self.workers[&id].clone();
self.workers.remove(&id);
let jh = wref.borrow_mut().join_handle.take().unwrap();
if let Err(e) = jh.join() {
match e.downcast_ref::<&str>() {
Some(s) => {
error!("worker-{:x}: {}", id, s);
panic!("A worker thread panicked: {}", s);
}
None => {
error!("worker-{:x}: panic", id);
panic!("A worker thread panicked!")
}
}
}
}
// pop workers which idled for too long from the back and destroy
let mut jhs = Vec::<JoinHandle<()>>::new();
loop {
match self.idle_workers.back() {
Some(wref) => {
if now.duration_since(wref.borrow().last_active_at) < self.idle_timeout {
break;
}
}
_ => break,
}
// removing from workers and idle_workers drops the worker
let wref = self.idle_workers.pop_back().unwrap();
let id = Self::wref_id(&wref);
self.workers.remove(&id);
jhs.push(wref.borrow_mut().join_handle.take().unwrap());
debug!("worker-{:x}: dropped", id);
}
for jh in jhs {
jh.join().unwrap();
}
}
fn get_next_worker(&mut self) -> WorkerRef {
// use an existing idle worker if there's one
if let Some(wref) = self.idle_workers.pop_front() {
return wref;
}
// gotta spawn a new one
let (work_tx, work_rx) = channel();
let wref: WorkerRef = Rc::new(RefCell::new(WorkerHandle {
work_tx,
last_active_at: Instant::now(),
join_handle: None,
}));
let id = Self::wref_id(&wref);
self.workers.insert(id, wref.clone());
let worker = Worker::new(id, work_rx, self.ack_tx.clone(), self.err_tx.clone());
wref.borrow_mut().join_handle = Some(spawn(|| worker.run()));
wref
}
/// Queue a `FnOnce` closure for execution on a worker thread. If there is
/// no available idle worker thread, a new one will be created immediately.
pub fn queue<F>(&mut self, work: F)
where
F: FnOnce() + Send + 'static,
{
self.keep_books();
self.get_next_worker()
.borrow()
.work_tx
.send(Box::new(work))
.unwrap();
}
/// The number of all workers. This changes only across `queue()` and
/// `keep_books()` calls.
pub fn nr_workers(&self) -> usize {
self.workers.len()
}
/// The number of idle workers. This changes only across `queue()` and
/// `keep_books()` calls.
pub fn nr_idle_workers(&self) -> usize {
self.idle_workers.len()
}
}
impl Drop for WorkQueue {
fn drop(&mut self) {
debug!("workqueue: dropping");
// remember all join handles and then clear both collections
let jhs: Vec<JoinHandle<()>> = self
.workers
.values()
.map(|wref| wref.borrow_mut().join_handle.take().unwrap())
.collect();
self.workers.clear();
self.idle_workers.clear();
// everyone is dying, wait for them
for jh in jhs {
jh.join().unwrap();
}
debug!("workqueue: dropped");
}
}
#[cfg(test)]
mod tests {
use super::WorkQueue;
use std::sync::mpsc::channel;
use std::thread::sleep;
use std::time::Duration;
/// Basic feature test. Due to the dynamic nature of the test, it might be
/// flaky when the machine is under heavy load.
#[test]
fn test() {
let _ = ::env_logger::try_init();
let mut wq = WorkQueue::new(Duration::from_millis(500));
let (tx, rx) = channel::<String>();
println!("Spawning three hello, bye workers");
let tx_copy = tx.clone();
wq.queue(move || {
tx_copy.send("hello 1".to_string()).unwrap();
sleep(Duration::from_millis(100));
tx_copy.send("bye 1".to_string()).unwrap();
});
let tx_copy = tx.clone();
wq.queue(move || {
tx_copy.send("hello 2".to_string()).unwrap();
sleep(Duration::from_millis(100));
tx_copy.send("bye 2".to_string()).unwrap();
});
let tx_copy = tx.clone();
wq.queue(move || {
tx_copy.send("hello 3".to_string()).unwrap();
sleep(Duration::from_millis(100));
tx_copy.send("bye 3".to_string()).unwrap();
});
// All three should be in flight.
assert_eq!(wq.nr_workers(), 3);
assert_eq!(wq.nr_idle_workers(), 0);
println!("Waiting for execution");
while wq.nr_workers() > wq.nr_idle_workers() {
wq.keep_books();
sleep(Duration::from_millis(10));
}
drop(tx);
let replies: Vec<String> = rx.iter().collect();
println!("Replies: {:?}", replies);
assert_eq!(replies.len(), 6);
for v in &replies[..3] {
assert!(v.starts_with("hello"));
}
for v in &replies[3..] {
assert!(v.starts_with("bye"));
}
// None should be in flight but all workers should still be around.
wq.keep_books();
assert_eq!(wq.nr_workers(), 3);
assert_eq!(wq.nr_idle_workers(), 3);
println!("Sleeping 750ms and repeaing timed out workers");
sleep(Duration::from_millis(750));
wq.keep_books();
// All should be gone by now
assert_eq!(wq.nr_workers(), 0);
assert_eq!(wq.nr_idle_workers(), 0);
}
#[test]
#[should_panic(expected = "A worker thread panicked: Boom!!!")]
fn test_panic_propagation() {
let _ = ::env_logger::try_init();
let mut wq = WorkQueue::new(Duration::from_millis(500));
wq.queue(move || {
panic!("Boom!!!");
});
for _ in 0..5 {
wq.keep_books();
sleep(Duration::from_secs(1));
}
panic!("Boooooooooom!");
}
}