Skip to content

kinghajj/kirk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

26 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

kirk, a highly-flexible thread pool for Rust

Build Status Coverage Status

Documentation

The documentation is hosted on GitHub Pages.

Stability

The API and internal designs are subject to rapid change, so breakages could happen frequently until a pleasant and high-performance interface is found. Play around with this and figure out ways to improve it, but don't rely on it yet for anything truly important.

Examples

Pools are generic not only to the jobs performed, but also to the method for giving jobs to workers. They may also be scoped, allowing safe access to data in the stack of the originating thread.

// unscoped, dynamic dispatch using a Chase-Lev deque
let mut pool1 = Pool::<Deque<Task>>::new(deque::Options::default());
pool1.push(|| { println!("Hello!") });
pool1.push(|| { println!("World!") });

// unscoped, dynamic dispatch using a locked channel
let mut pool2 = Pool::<Channel<Task>>::new(channel::Options::default());
pool2.push(|| { println!("Hello!") });
pool2.push(|| { println!("World!") });

// unscoped, static dispatch
enum Msg { Hello, World };
impl Job for Msg {
    fn perform(self) {
        match Msg {
            Msg::Hello => println!("Hello!"),
            Msg::World => println!("World!"),
        }
    }
}
let mut pool3 = Pool::<Deque<Msg>>::new(deque::Options::default());
pool3.push(Msg::Hello);
pool3.push(Msg::World);

// scoped, dynamic dispatch
let mut items = [0usize; 8];
crossbeam::scope(|scope| {
    let mut pool = Pool::<Deque<Task>>::scoped(&scope, deque::Options::default());
    for (i, e) in items.iter_mut().enumerate() {
        pool.push(move || *e = i)
    }
});

// scoped, static dispatch
struct Work<'a> { i: usize, e: &'a mut usize }
impl Job for Work {
    fn perform(self) {
        *self.e = i;
    }
}
crossbeam::scope(|scope| {
    let mut pool = Pool::<Deque<Work>>::scoped(&scope, deque::Options::default());
    for (i, e) in items.iter_mut().enumerate() {
        pool.push(Work { i: i, e: e });
    }
}

Design

I was inspired to start this after attending the January 2016 Bay Area Rust meetup and watching Aaron Turon's presentation about crossbeam. The initial version of this crate supported only boxed, dynamically-dispatched tasks (closures). At this stage, it was essentially scoped_threadpool with a different task-passing mechanism.

The first breakthrough came when I realized that a simple Job trait would make both static and dynamic dispatch possible:

pub trait Job: Send {
    fn perform(self);
}

pub struct Task<'a>(Box<FnBox + Send + 'a>);

impl<'a> Job for Task<'a> {
    #[inline]
    fn perform(self) {
        let Task(task) = self;
        task.call_box();
    }
}

impl<'a, F> From<F> for Task<'a> where F: FnOnce() + Send + 'a
{
    #[inline]
    fn from(f: F) -> Task<'a> {
        Task(Box::new(f))
    }
}

Much better: static dispatch with no additional allocation, while maintaining convenient dynamic dispatch. I soon added support for 'static jobs that run outside a crossbeam::scope.

At this point, I was pretty happy with the result. However, scoped_threadpool, with its simple Arc<Mutex<Receiver>> method for delivering tasks to worker threads, stayed in the back of my mind. Eventually I decided to write benchmarks comparing the two, but since it didn't support static dispatch, I wrote a parallel set of pool and worker types. That's when the next insight came: the pool implementations were highly similar, except for some book-keeping related to how each implemented job communication. So I factored out those details into a family of traits:

pub trait Crew {
    type Job: Job;
    type Member: Worker;
    type Settings: Parameters;

    fn new(settings: Self::Settings) -> Self;
    fn hire(&mut self) -> Self::Member;
    fn give<F>(&mut self, job: F) where Self::Job: From<F>;
    fn stop(&mut self);
}

pub trait Worker: Send {
    fn run(&mut self);
}

pub trait Parameters: Copy + Clone {
    fn num_workers(&self) -> usize;
}

The implementation of Pool became rather straightforward:

pub struct Pool<C>
    where C: Crew
{
    crew: C,
}

impl<'scope, C, J> Pool<C>
    where J: Job + 'scope,
          C: Crew<Job = J> + 'scope
{
    pub fn scoped(scope: &Scope<'scope>, settings: C::Settings) -> Pool<C> {
        let mut crew = C::new(settings);
        for _ in 0..settings.num_workers() {
            let mut worker = crew.hire();
            scope.spawn(move || {
                worker.run();
            });
        }
        Pool { crew: crew }
    }

    pub fn push<F>(&mut self, f: F)
        where J: From<F>
    {
        self.crew.give(f);
    }
}

The icing on the cake: this crate never uses unsafe:

$ grep -r --include=*.rs unsafe .
$

As dbaupp pointed out, however, the current design has a major drawback compared to other scoped threadpools because of this, since it cannot reuse the same pool for different scopes throughout the lifetime of an application.

About

A highly-flexible thread pool for Rust

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages