Skip to content

Thread Pool In C++ – Writing a Concurrent Task Queue

Although resorting to multi-threading isn’t always the best solution, having extra threads can sometimes unblock your work. A very common pattern is to use a thread pool to pick up blocking work from your main thread, so the main thread can instead be used for more important updates. In this article, we write a simple implementation of thread pools in C++!

Weaving looms making beautiful textiles from many threads. Illustration of how many threads can be sinchronized to run a beautiful program!
Figure 1: Some weaving looms making amazing things with multiple threads. This is truly an example of concurrent thread pools in the real world!

Therefore, if you are interested in learning how to use threads, mutexes, and condition variables in practice, you are in the right place! Aside from dealing with common C++ structures like classes and queues, we will make heavy use of concurrency synchronisation tools.

The thread pool implemented in this video is distributed in Github as a library (check out how to create libraries with CMake).

What Are Thread Pools?

In simple terms, a thread pool is a set of worker threads that pick up and execute tasks. Typically, the user would push tasks to a queue, and the worker threads can pop tasks from the queue to be executed, like shown in the diagram below.

Diagram showing different components of a thread pool: worker thread, thread executors and the task queue.
Figure 2: A diagram of how thread pools are structured.

In practice, thread pools can be an efficient form of managing threads as the worker threads are only initialised once. For this reason, we avoid some of the expensive operations involved in creating threads, namely the initialisation and destruction of OS threads over and over again.

As an example, you may be running an application where the main thread is perhaps running the important program logic and updating the UI. However, computationally-heavy tasks may be executing in the thread pool, so your main thread is unblocked.

Here’s a list of tasks that can potentially block your main thread:

  • Loading large files.
  • Client-server requests.
  • Computationally-heavy operation (such as image processing).

Theoretically, your program’s responsiveness can benefit from running these “blocking” tasks in a thread-pool backend. Some popular frameworks even have thread-pools as part of the main multi-threading management: look at Rust’s Tokio.

The C++ Thread Pool Requirements and Architecture

As you can see in Figure 2, the architecture for the thread-pool written in this post is very simple. This post assumes that we the thread pool doesn’t have crazy requirements, so we can assume that:

  • We can pass arbitrary function that can take arbitrary input parameter types. In this case, we can make the parameters to our tasks be std::variants for simplicity.
  • The thread pool needs an appropriate queue to hold tasks and their parameters, hence the queue should be thread-safe.
  • The execution of the thread pool will only stop when all the tasks in the queue have executed. Interestingly, this should avoid a lot of extra logic in the implementation to request threads to stop, and also to avoid calling on the pthread primitives (or the Windows equivalent).

Finally, the list below outlines the main concepts needed for our implementation:

  • Tasks – a structure representing a task that our thread pool can execute. In the simplest implementation, this structure holds a lambda (or function pointer) of the function to be executed, as well as inputs to it.
  • A task queue – the container that holds the tasks to be picked up by the thread pool. For more information, check out the thread-safe queue section.
  • The thread pool – a struct/class that actually carries the thread pool logic. This holds the tasks queue, an std::vector of std::jthreads, as well as the logic to push/pop from the tasks queue.

Working With Queues in a Multi-Threading Environment In C++

At the heart of our implementation, we use a queue to hold a list of tasks to be executed. Therefore, we need queues that can be accessed by different threads at the same time: a thread-safe queue.

Thread-safe queue’s are actually quite common in software development. However, the implementation can vary depending on your requirements:

  • How many producers are there for this queue? How many threads will be “pushing” to it?
  • Will there be many threads “popping” from the queue?
  • Do we always need the “pop” operation to return something? Can it block a thread?
  • Does the queue need to be atomic – no mutex locking allowed?

The list goes on and on. The more you think about this theorical safe queue, the more requirements you can come up with.

However, since this post aims to keep it stupidly simple, we only require a thread-safe queue that protects the underlying queue data and can potentially block when “popping” an empty queue.

Of course, blocking isn’t ideal but as you will see in the actual ThreadPool implementation, we will ensure this never happens. For fancier thread-safe queues, I’d recommend taking a look at boosts lockfree::queue and lockfree::spsc_queue .

A C++ Implementation of a Simple Thread-Safe Queue

Without further ado, here’s the first important code block: the thread-safe queue, the heart of our thread pool.

class TsQueue
{
public:
  void push(T const& val)
  {
    std::lock_guard<std::mutex> queue_lock{_queue_mutex};
    _queue.push(val);
    _queue_cv.notify_one();
  }

  T pop()
  {
    std::unique_lock<std::mutex> queue_lock{_queue_mutex};
    _queue_cv.wait(queue_lock, [&]{ return !_queue.empty(); });
    T ret = _queue.front();
    _queue.pop();
    return ret;
  }

private:
  std::queue<T> _queue;
  std::condition_variable _queue_cv;
  std::mutex _queue_mutex;
};

The implementation above is a combination of the first thread-safe queue implementation that came to mind, as well as some implementations on StackOverflow using std::condition_variables.

Some notes on the above implementation:

  • The underlying container is a simple std::queue.
  • A mutex is used to protect the underlying queue.
  • The notification/waiting mechanism is done with a std::condition_variable.
  • During the push operation, we simple request to lock the queue mutex, and notify a waiting thread with the condition variable.
  • However, on the pop operation, we use a further std::condition_variable::wait(...) call to wait until there’s an item to be popped.

Implementing a Thread Pool In C++

Interestingly, with a thread-safe queue, the thread pool implementation becomes really simple. Take a look at the implementation below, followed by some points on the code.

class ThreadPool
{
public:
  ThreadPool(std::size_t n_threads)
  {
    for (std::size_t i = 0; i < n_threads; ++i)
    {
      _threads.push_back(make_thread_handler(_queue));
    }
  }

  ~ThreadPool()
  {
    // Task = {Execute/Stop, function, args}
    Task const stop_task{TaskType::Stop, {}, {}};
    for (std::size_t i = 0; i < _threads.size(); ++i)
    {
      push(stop_task);
    }
  }

  bool push(Task const& task)
  {
    _queue.push(task);
    return true;
  }
private:
  TsQueue<Task> _queue;
  std::vector<std::jthread> _threads;
}
  • The thread pool is really just a management layer on top of the task queue.
  • Pushing to the thread pool simply means pushing a task to the queue.
  • Due to our simple architecture where the thread pool will only stop when all tasks are executed, there’s no need for a popping function.
  • The destructor must ensure that all threads in the pool will be signalled to stop. Hence it should push a stop task to each thread in the pool.

How Do The Threads Execute Tasks?

In the code thread pool code above, you can see that the _threads vector contains n_threads std::jthread elements. But what code is running in each thread? In simple terms, just the logic to pop from the queue and to stop when needed.

auto make_thread_handler(TsQueue<Task>& queue)
{
  return std::jthread{
    [&queue]{
      while (true)
      {
        auto const elem = queue.pop();
        switch (elem.type) {
        case TaskType::Execute:
          elem.task(elem.arguments);
          break;
        case TaskType::Stop:
          return;
        }
      }
    }
  };
}

If you’re wondering where the definitions for TaskType and Task are, take a look at the full library in the Github link in the following section.

Usage Of The Thread Pool

Take a look at a simple app that uses the thread pool to show an incremented integer. It also displays the thread id so you can make sure each count is indeed being executed in different threads!

int main()
{
  ThreadPool my_pool{4};
  std::mutex iostream_m;

  int job_n = 0;
  for (int i = 0; i < 100; ++i)
  {
    my_pool.push({
      TaskType::Execute, // TaskType
      [&iostream_m, job_n](std::vector<Param> const&) // Lambda
      {
        {
          std::lock_guard cout_guard{iostream_m};
          std::cout << "Hi from thread " << std::this_thread::get_id()
                    << " request " << job_n << '\n';
        }
        std::this_thread::sleep_for(std::chrono::milliseconds{50});
      },
      {} // Arguments
    });
    job_n++;
  }
}

The Thread Queue Library – Use This In Your Projects

You can find the full code in the C++ thread pool Github repository. The license is currently MIT, so you are free to use this code in your projects!

There are a few TODOs that I would certainly like to tackle, given there’s demand:

  • Clean up the API and make it easier to push arbitrary tasks.
  • Runtime checking on the arguments given to a task.
  • Support future returns for each task – very important!

Feel free to post a comment in case you have any feedback, improvement suggestions, or if you have spotted a typo!

Published inCMakeCPP

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *