src/jobserver.rs (102 lines of code) (raw):

use std::io; use std::process::Command; use std::sync::Arc; use futures::channel::mpsc; use futures::channel::oneshot; use futures::StreamExt; use crate::errors::*; // The execution model of sccache is that on the first run it spawns a server // in the background and detaches it. // When normally executing the rust compiler from either cargo or make, it // will use cargo/make's jobserver and limit its resource usage accordingly. // When executing the rust compiler through the sccache server, that jobserver // is not available, and spawning as many rustc as there are CPUs can lead to // a quadratic use of the CPU resources (each rustc spawning as many threads // as there are CPUs). // One way around this issue is to inherit the jobserver from cargo or make // when the sccache server is spawned, but that means that in some cases, the // cargo or make process can't terminate until the sccache server terminates // after its idle timeout (which also never happens if SCCACHE_IDLE_TIMEOUT=0). // Also, if the sccache server ends up shared between multiple runs of // cargo/make, then which jobserver is used doesn't make sense anymore. // Ideally, the sccache client would give a handle to the jobserver it has // access to, so that the rust compiler would "just" use the jobserver it // would have used if it had run without sccache, but that adds some extra // complexity, and requires to use Unix domain sockets. // What we do instead is to arbitrary use our own jobserver. // Unfortunately, that doesn't absolve us from having to deal with the original // jobserver, because make may give us file descriptors to its pipes, and the // simple fact of keeping them open can block it. // So if it does give us those file descriptors, close the preemptively. // // unsafe because it can use the wrong fds. #[cfg(not(windows))] pub unsafe fn discard_inherited_jobserver() { if let Some(value) = ["CARGO_MAKEFLAGS", "MAKEFLAGS", "MFLAGS"] .into_iter() .find_map(|env| std::env::var(env).ok()) { if let Some(auth) = value.rsplit(' ').find_map(|arg| { arg.strip_prefix("--jobserver-auth=") .or_else(|| arg.strip_prefix("--jobserver-fds=")) }) { if !auth.starts_with("fifo:") { let mut parts = auth.splitn(2, ','); let read = parts.next().unwrap(); let write = match parts.next() { Some(w) => w, None => return, }; let read = read.parse().unwrap(); let write = write.parse().unwrap(); if read < 0 || write < 0 { return; } unsafe { if libc::fcntl(read, libc::F_GETFD) == -1 { return; } if libc::fcntl(write, libc::F_GETFD) == -1 { return; } libc::close(read); libc::close(write); } } } } } #[derive(Clone)] pub struct Client { helper: Option<Arc<jobserver::HelperThread>>, tx: Option<mpsc::UnboundedSender<oneshot::Sender<io::Result<jobserver::Acquired>>>>, inner: jobserver::Client, } pub struct Acquired { _token: Option<jobserver::Acquired>, } impl Client { pub fn new() -> Client { Client::new_num(crate::util::num_cpus()) } pub fn new_num(num: usize) -> Client { let inner = jobserver::Client::new(num).expect("failed to create jobserver"); Client::_new(inner, false) } fn _new(inner: jobserver::Client, inherited: bool) -> Client { let (helper, tx) = if inherited { (None, None) } else { let (tx, mut rx) = mpsc::unbounded::<oneshot::Sender<_>>(); let helper = inner .clone() .into_helper_thread(move |token| { let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); rt.block_on(async { if let Some(sender) = rx.next().await { drop(sender.send(token)); } }); }) .expect("failed to spawn helper thread"); (Some(Arc::new(helper)), Some(tx)) }; Client { inner, helper, tx } } /// Configures this jobserver to be inherited by the specified command pub fn configure(&self, cmd: &mut Command) { self.inner.configure(cmd) } /// Returns a future that represents an acquired jobserver token. /// /// This should be invoked before any "work" is spawned (for whatever the /// definition of "work" is) to ensure that the system is properly /// rate-limiting itself. pub async fn acquire(&self) -> Result<Acquired> { let (helper, tx) = match (self.helper.as_ref(), self.tx.as_ref()) { (Some(a), Some(b)) => (a, b), _ => return Ok(Acquired { _token: None }), }; let (mytx, myrx) = oneshot::channel(); helper.request_token(); tx.unbounded_send(mytx).unwrap(); let acquired = myrx .await .context("jobserver helper panicked")? .context("failed to acquire jobserver token")?; Ok(Acquired { _token: Some(acquired), }) } }