parallelcxx11.hh
Go to the documentation of this file.
1 /* -*- mia-c++ -*-
2  *
3  * This file is part of MIA - a toolbox for medical image analysis
4  * Copyright (c) Leipzig, Madrid 1999-2017 Gert Wollny
5  *
6  * MIA is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with MIA; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #ifndef mia_core_parallelcxx11_hh
22 #define mia_core_parallelcxx11_hh
23 
24 #include <mia/core/defines.hh>
25 
26 #include <thread>
27 #include <atomic>
28 #include <mutex>
29 #include <cassert>
30 #include <vector>
31 
33 
34 typedef std::mutex CMutex;
35 typedef std::recursive_mutex CRecursiveMutex;
36 
37 
39 public:
40  static int get_max_tasks();
41  static void set_max_tasks(int mt);
42 private:
43  static int max_tasks;
44 };
45 
46 #define ATOMIC std::atomic
47 
48 template <typename Mutex>
49 class TScopedLock {
50 public:
51  TScopedLock(Mutex& m): m_mutex(m){
52  m_mutex.lock();
53  own_lock = true;
54  };
55 
56  TScopedLock(const TScopedLock<Mutex>& other) = delete;
57  TScopedLock& operator = (const TScopedLock<Mutex>& other) = delete;
58 
60  if (own_lock)
61  m_mutex.unlock();
62  };
63 
64  void release() {
65  if (own_lock) {
66  own_lock = false;
67  m_mutex.unlock();
68  }
69  }
70 private:
71  Mutex& m_mutex;
72  bool own_lock;
73 };
74 
77 
79 public:
80  C1DParallelRange(int begin, int end, int block = 1):
81  m_begin(begin),
82  m_end(end),
83  m_block(block),
84  m_current_wp(0)
85  {
86  assert(begin <= end);
87  }
88 
90  m_begin(orig.m_begin),
91  m_end(orig.m_end),
92  m_block(orig.m_block)
93  {
94  m_current_wp = orig.m_current_wp.load();
95  }
96 
98  int wp = m_current_wp++;
99  int begin = m_begin + wp * m_block;
100  int end = begin + m_block;
101  if (begin > m_end) {
102  return C1DParallelRange(m_end,m_end,0);
103  }
104  if (end > m_end) {
105  return C1DParallelRange(begin, m_end, 1);
106  }
107  return C1DParallelRange(begin, end, 1);
108  }
109 
110  bool empty() const {
111  return m_begin >= m_end;
112  }
113 
114  int begin() const {
115  return m_begin;
116  }
117 
118  int end() const {
119  return m_end;
120  }
121 
122 private:
123  int m_begin;
124  int m_end;
125  int m_block;
126  std::atomic<int> m_current_wp;
127 };
128 
129 // The functor f must actually be passed by value because a copy must
130 // be used.
131 //coverity[PASS_BY_VALUE]
132 template <typename Range, typename Func>
133 void pfor_callback(Range& range, Func f)
134 {
135  while (true) {
136  Range wp = range.get_next_workpackage();
137  if (!wp.empty())
138  f(wp);
139  else
140  break;
141  }
142 }
143 
144 template <typename Range, typename Func>
145 void pfor(Range range, const Func& f) {
146 
147  int max_treads = CMaxTasks::get_max_tasks();
148 
149  std::thread::hardware_concurrency();
150 
151  std::vector<std::thread> threads;
152  for (int i = 0; i < max_treads; ++i) {
153  threads.push_back(std::thread(pfor_callback<Range, Func>, std::ref(range), f));
154  }
155 
156  for (int i = 0; i < max_treads; ++i) {
157  threads[i].join();
158  }
159 };
160 
161 template <typename V>
162 class ReduceValue {
163 public:
164  typedef V Value;
165  ReduceValue(const Value& i):identity(i), value(i) {
166  }
167 
168  template <typename Reduce>
169  void reduce(const Value& v, Reduce r)
170  {
171  CScopedLock sl(mutex);
172  value = r(v, value);
173  }
174  const Value& get_identity() const {
175  return identity;
176  }
177  const Value& get_reduced() const {
178  return value;
179  }
180 private:
181  mutable CMutex mutex;
182  Value identity;
183  Value value;
184 };
185 
186 // The functor f must actually be passed by value because a copy must
187 // be used.
188 //coverity[PASS_BY_VALUE]
189 template <typename Range, typename Value, typename Func, typename Reduce>
190 void preduce_callback(Range& range, ReduceValue<Value>& v, Func f, Reduce r)
191 {
192  Value value = v.get_identity();
193  while (true) {
194  Range wp = range.get_next_workpackage();
195  if (!wp.empty())
196  value = f(wp, value);
197  else
198  break;
199  }
200  v.reduce(value, r);
201 }
202 
203 template <typename Range, typename Value, typename Func, typename Reduce>
204 Value preduce(Range range, Value identity, const Func& f, Reduce r)
205 {
206  int max_treads = CMaxTasks::get_max_tasks();
207 
208  ReduceValue<Value> value(identity);
209 
210  std::vector<std::thread> threads;
211  for (int i = 0; i < max_treads; ++i) {
212  threads.push_back(std::thread(preduce_callback<Range, Value, Func, Reduce>,
213  std::ref(range), std::ref(value), f, r));
214  }
215 
216  for (int i = 0; i < max_treads; ++i) {
217  threads[i].join();
218  }
219  return value.get_reduced();
220 };
221 
222 NS_MIA_END
223 
224 #endif
void pfor(Range range, const Func &f)
TScopedLock(Mutex &m)
const Value & get_reduced() const
C1DParallelRange get_next_workpackage()
TScopedLock< CRecursiveMutex > CRecursiveScopedLock
#define NS_MIA_BEGIN
conveniance define to start the mia namespace
Definition: defines.hh:33
void reduce(const Value &v, Reduce r)
ReduceValue(const Value &i)
bool empty() const
std::recursive_mutex CRecursiveMutex
static int get_max_tasks()
void release()
void pfor_callback(Range &range, Func f)
C1DParallelRange(const C1DParallelRange &orig)
#define EXPORT_CORE
Macro to manage Visual C++ style dllimport/dllexport.
Definition: defines.hh:101
const Value & get_identity() const
Value preduce(Range range, Value identity, const Func &f, Reduce r)
TScopedLock< CMutex > CScopedLock
void preduce_callback(Range &range, ReduceValue< Value > &v, Func f, Reduce r)
std::mutex CMutex
C1DParallelRange(int begin, int end, int block=1)
int begin() const
#define NS_MIA_END
conveniance define to end the mia namespace
Definition: defines.hh:36