kirk - 一个高度灵活的线程池

jopen 8年前

kirk - 一个高度灵活的线程池Rust

Build Status Coverage Status


The documentation is hosted on GitHub Pages.


The API and internal designs are subject to rapid change, so breakages could happen frequently until a pleasant and high-performance interface is found. Play around with this and figure out ways to improve it, but don't rely on it yet for anything truly important.


Pools are generic not only to the jobs performed, but also to the method for giving jobs to workers. They may also be scoped, allowing safe access to data in the stack of the originating thread.

// unscoped, dynamic dispatch using a Chase-Lev deque  let mut pool1 = Pool::<Deque<Task>>::new(deque::Options::default());  pool1.push(|| { println!("Hello!") });  pool1.push(|| { println!("World!") });    // unscoped, dynamic dispatch using a locked channel  let mut pool2 = Pool::<Channel<Task>>::new(channel::Options::default());  pool2.push(|| { println!("Hello!") });  pool2.push(|| { println!("World!") });    // unscoped, static dispatch  enum Msg { Hello, World };  impl Job for Msg {      fn perform(self) {          match Msg {              Msg::Hello => println!("Hello!"),              Msg::World => println!("Hello!"),          }      }  }  let mut pool3 = Pool::<Deque<Msg>>::new(deque::Options::default());  pool3.push(Msg::Hello);  pool3.push(Msg::World);    // scoped, dynamic dispatch  let mut items = [0usize; 8];  crossbeam::scope(|scope| {      let mut pool = Pool::<Deque<Task>>::scoped(&scope, deque::Options::default());      for (i, e) in items.iter_mut().enumerate() {          pool.push(move || *e = i)      }  });    // scoped, static dispatch  struct Work<'a> { i: usize, e: &'a mut usize }  impl Job for Work {      fn perform(self) {          *self.e = i;      }  }  crossbeam::scope(|scope| {      let mut pool = Pool::<Deque<Work>>::scoped(&scope, deque::Options::default());      for (i, e) in items.iter_mut().enumerate() {          pool.push(Work { i: i, e: e });      }  }


I was inspired to start this after attending the January 2016 Bay Area Rust meetup and watching Aaron Turon's presentation about crossbeam. The initial version of this crate supported only boxed, dynamically-dispatched tasks (closures). At this stage, it was essentially scoped_threadpool with a different task-passing mechanism.

The first breakthrough came when I realized that a simple Job trait would make both static and dynamic dispatch possible:

pub trait Job: Send {      fn perform(self);  }    pub struct Task<'a>(Box<FnBox + Send + 'a>);    impl<'a> Job for Task<'a> {      #[inline]      fn perform(self) {          let Task(task) = self;          task.call_box();      }  }    impl<'a, F> From<F> for Task<'a> where F: FnOnce() + Send + 'a  {      #[inline]      fn from(f: F) -> Task<'a> {          Task(Box::new(f))      }  }

Much better: static dispatch with no additional allocation, while maintaining convenient dynamic dispatch. I soon added support for 'static jobs that run outside a crossbeam::scope.

At this point, I was pretty happy with the result. However, scoped_threadpool, with its simple Arc<Mutex<Receiver>> method for delivering tasks to worker threads, stayed in the back of my mind. Eventually I decided to write benchmarks comparing the two, but since it didn't support static dispatch, I wrote a parallel set of pool and worker types. That's when the next insight came: the pool implementations were highly similar, except for some book-keeping related to how each implemented job communication. So I factored out those details into a family of traits:

pub trait Crew {      type Job: Job;      type Member: Worker;      type Settings: Parameters;        fn new(settings: Self::Settings) -> Self;      fn hire(&mut self) -> Self::Member;      fn give<F>(&mut self, job: F) where Self::Job: From<F>;      fn stop(&mut self);  }    pub trait Worker: Send {      fn run(&mut self);  }    pub trait Parameters: Copy + Clone {      fn num_workers(&self) -> usize;  }

The implementation of Pool became rather straightforward:

pub struct Pool<C>      where C: Crew  {      crew: C,  }    impl<'scope, C, J> Pool<C>      where J: Job + 'scope,            C: Crew<Job = J> + 'scope  {      pub fn scoped(scope: &Scope<'scope>, settings: C::Settings) -> Pool<C> {          let mut crew = C::new(settings);          for _ in 0..settings.num_workers() {              let mut worker = crew.hire();              scope.spawn(move || {        ;              });          }          Pool { crew: crew }      }        pub fn push<F>(&mut self, f: F)          where J: From<F>      {          self.crew.give(f);      }  }

The icing on the cake: this crate never uses unsafe:

$ grep -r --include=*.rs unsafe .  $

As dbaupp pointed out, however, the current design has a major drawback compared to other scoped threadpools because of this, since it cannot reuse the same pool for different scopes throughout the lifetime of an application.