parallel-cellular-automata
Framework for building parallel cellular automata.
threadpool.hpp
Go to the documentation of this file.
1 
11 #ifndef PARALLEL_CELLULAR_AUTOMATA_THREADPOOL_HPP
12 #define PARALLEL_CELLULAR_AUTOMATA_THREADPOOL_HPP
13 #include <atomic>
14 #include <future>
15 #include <memory>
16 #include <pthread.h>
17 #include <queues.hpp>
18 #include <thread>
19 #include <type_traits>
20 #include <vector>
26 {
27  private:
28  std::vector<std::thread> &threads;
29 
30  public:
36  ThreadJoiner(std::vector<std::thread> &t);
41  ~ThreadJoiner();
42 };
43 
44 namespace ca
45 {
51 {
52 
53  using QueuesContentType = std::function<void()>;
54 
55  public:
61  Threadpool(unsigned nw = 0);
62 
67  ~Threadpool();
68 
74  size_t get_number_workers() const;
75 
87  template <class F, class... Args>
88  auto submit(F &&f, Args &&...args) -> std::future<typename std::result_of<F(Args...)>::type>
89  {
90  // type returned by f(args)
91  using return_value_type = typename std::result_of<F(Args...)>::type;
92 
93  /*
94  * std::bind creates a function wrapper "g" s.t. g == f(args)
95  */
96  auto task = std::make_shared<std::packaged_task<return_value_type()>>(
97  std::bind(std::forward<F>(f), std::forward<Args>(args)...));
98  // with the packaged task we split the execution from the return value
99  std::future<return_value_type> future_result(task->get_future());
100 
101  if (local_queue)
102  {
103  // to avoid cache ping-pong and improve performances each thread pushes on its individual queue if it can.
104  local_queue->push([task]() {
105  // dereference the package
106  (*task)();
107  });
108  }
109  else
110  {
111  threadpool_work_queue.push([task]() { (*task)(); });
112  }
113 
114  return future_result;
115  }
116 
117  private:
118  std::atomic<bool> done;
119  ThreadSafeQueue<QueuesContentType> threadpool_work_queue;
120  std::vector<std::unique_ptr<WorkStealingQueue<QueuesContentType>>> workers_queues;
121  std::vector<std::thread> threads;
122  ThreadJoiner joiner;
123 
124  // Try to steal the work from other thread's queues.
125  bool try_stealing_work(QueuesContentType &result)
126  {
127  unsigned qsize = workers_queues.size();
128  for (unsigned i = 0; i < qsize; ++i)
129  {
130  unsigned const index = (thread_index + 1) % qsize;
131  if (workers_queues[index]->try_steal(result))
132  {
133  return true;
134  }
135  }
136  return false;
137  }
138 
139  // thread-local variables containing the thread index and the local queue.
140  static thread_local WorkStealingQueue<QueuesContentType> *local_queue;
141  static thread_local unsigned thread_index;
142 
143  // work stealing worker
144  void worker_thread(unsigned index);
145 };
146 } // namespace ca
147 
148 #endif
RAII class to join threads.
Definition: threadpool.hpp:26
ThreadJoiner(std::vector< std::thread > &t)
Construct a new ThreadJoiner object.
Definition: threadpool.cpp:17
~ThreadJoiner()
Destroy the Thread Joiner object and in doing this join all the threads in the vector.
Definition: threadpool.cpp:21
void push(T elem)
Insert an element into the queue.
Definition: queues.hpp:51
void push(T data)
Push data into the queue.
Definition: queues.hpp:179
Work-stealing threadpool.
Definition: threadpool.hpp:51
~Threadpool()
Destroy the Threadpool object.
Definition: threadpool.cpp:71
size_t get_number_workers() const
Get the number of worker threads.
Definition: threadpool.cpp:66
auto submit(F &&f, Args &&...args) -> std::future< typename std::result_of< F(Args...)>::type >
Submit work to the threadpool. This function takes a function and its arguments and returns a future ...
Definition: threadpool.hpp:88
Threadpool(unsigned nw=0)
Construct a new Threadpool object.
Definition: threadpool.cpp:40
Namespace of the framework.
Definition: barrier.hpp:20
This file contains the definition and implementation of some useful thread-safe queues.