Thread pooling in C++11

This is adapted from my answer to another very similar post.

Let’s build a ThreadPool class:

class ThreadPool {
public:
    void Start();
    void QueueJob(const std::function<void()>& job);
    void Stop();
    void busy();

private:
    void ThreadLoop();

    bool should_terminate = false;           // Tells threads to stop looking for jobs
    std::mutex queue_mutex;                  // Prevents data races to the job queue
    std::condition_variable mutex_condition; // Allows threads to wait on new jobs or termination 
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> jobs;
};
  1. ThreadPool::Start

For an efficient threadpool implementation, once threads are created according to num_threads, it’s better not to
create new ones or destroy old ones (by joining). There will be a performance penalty, and it might even make your
application go slower than the serial version. Thus, we keep a pool of threads that can be used at any time (if they
aren’t already running a job).

Each thread should be running its own infinite loop, constantly waiting for new tasks to grab and run.

void ThreadPool::Start() {
    const uint32_t num_threads = std::thread::hardware_concurrency(); // Max # of threads the system supports
    threads.resize(num_threads);
    for (uint32_t i = 0; i < num_threads; i++) {
        threads.at(i) = std::thread(ThreadLoop);
    }
}
  1. ThreadPool::ThreadLoop

The infinite loop function. This is a while (true) loop waiting for the task queue to open up.

void ThreadPool::ThreadLoop() {
    while (true) {
        std::function<void()> job;
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            mutex_condition.wait(lock, [this] {
                return !jobs.empty() || should_terminate;
            });
            if (should_terminate) {
                return;
            }
            job = jobs.front();
            jobs.pop();
        }
        job();
    }
}
  1. ThreadPool::QueueJob

Add a new job to the pool; use a lock so that there isn’t a data race.

void ThreadPool::QueueJob(const std::function<void()>& job) {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        jobs.push(job);
    }
    mutex_condition.notify_one();
}

To use it:

thread_pool->QueueJob([] { /* ... */ });
  1. ThreadPool::busy
void ThreadPool::busy() {
    bool poolbusy;
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        poolbusy = jobs.empty();
    }
    return poolbusy;
}

The busy() function can be used in a while loop, such that the main thread can wait the threadpool to complete all the tasks before calling the threadpool destructor.

  1. ThreadPool::Stop

Stop the pool.

void ThreadPool::Stop() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        should_terminate = true;
    }
    mutex_condition.notify_all();
    for (std::thread& active_thread : threads) {
        active_thread.join();
    }
    threads.clear();
}

Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for
job to do.

I apologize if there are some syntax errors, I typed this code and and I have a bad memory. Sorry that I cannot provide
you the complete thread pool code; that would violate my job integrity.

Notes:

  • The anonymous code blocks are used so that when they are exited, the std::unique_lock variables created within them
    go out of scope, unlocking the mutex.
  • ThreadPool::Stop will not terminate any currently running jobs, it just waits for them to finish via active_thread.join().

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)