Line data Source code
1 : /*
2 : * (C) Copyright 2011 Marek Dopiera
3 : *
4 : * This file is part of CoherentDB.
5 : *
6 : * CoherentDB is free software: you can redistribute it and/or modify it
7 : * under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation, either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * CoherentDB is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public
17 : * License along with CoherentDB. If not, see
18 : * http://www.gnu.org/licenses/.
19 : */
20 :
21 : #ifndef WORKER_POOL_H_3392
22 : #define WORKER_POOL_H_3392
23 :
24 : #include <queue>
25 :
26 : #include <boost/optional.hpp>
27 : #include <boost/thread/thread.hpp>
28 : #include <boost/thread/mutex.hpp>
29 : #include <boost/thread/condition_variable.hpp>
30 : #include <boost/ptr_container/ptr_vector.hpp>
31 :
32 : #include <debug/asserts.h>
33 :
34 : namespace coherent {
35 : namespace util {
36 :
37 : template <class T>
38 : class sync_queue
39 : {
40 : public:
41 : sync_queue();
42 : ~sync_queue(); //for assertions
43 : void push(T const & t);
44 : boost::optional<T> pop();
45 : void no_more_input();
46 :
47 : private:
48 : boost::mutex mutex;
49 : boost::condition_variable cond;
50 : std::queue<T> q;
51 : bool no_more_data;
52 : };
53 :
54 : template <class T>
55 : class worker_pool;
56 :
57 : template <class T>
58 : class worker
59 171 : {
60 : public:
61 : worker(worker_pool<T> & worker_pool);
62 : virtual ~worker();
63 : void operator()() const;
64 : protected:
65 : virtual void handle(T const & t) const = 0; //I want workers to be stateless
66 : private:
67 : worker_pool<T> & pool;
68 : };
69 :
70 : template <class T>
71 : class worker_factory
72 6 : {
73 : public:
74 : typedef boost::thread* worker_ptr;
75 :
76 : virtual worker_ptr create_worker(worker_pool<T> & pool) const = 0;
77 : virtual ~worker_factory();
78 : };
79 :
80 : template <class T>
81 : class worker_pool
82 : {
83 : public:
84 : worker_pool();
85 : ~worker_pool();
86 : void start(uint32_t num_workers, worker_factory<T> const & factory);
87 : void stop();
88 : void schedule_work(T const & work);
89 : private:
90 : friend class worker<T>;
91 :
92 : typedef boost::ptr_vector<boost::thread> workers_t;
93 : workers_t workers;
94 : sync_queue<T> queue;
95 : };
96 :
97 : //========== IMPLEMENTATION ====================================================
98 :
99 : //========== sync_queue ========================================================
100 :
101 : template <class T>
102 7 : sync_queue<T>::sync_queue() : no_more_data(false)
103 : {
104 7 : }
105 :
106 : template <class T>
107 7 : sync_queue<T>::~sync_queue()
108 : {
109 : d_assert(
110 : no_more_data && q.empty(),
111 : "trying to destruct a queue but empty=" << q.empty() << " no_more_data="
112 : << no_more_data
113 : );
114 7 : }
115 :
116 : template <class T>
117 199 : void sync_queue<T>::push(T const & t)
118 : {
119 398 : boost::mutex::scoped_lock lock(this->mutex);
120 : d_assert(!no_more_data, "trying to push to a closing queue");
121 :
122 199 : this->q.push(t);
123 199 : this->cond.notify_one();
124 199 : }
125 :
126 : template <class T>
127 258 : boost::optional<T> sync_queue<T>::pop()
128 : {
129 517 : boost::mutex::scoped_lock lock(this->mutex);
130 393 : while (q.empty() && !no_more_data)
131 : {
132 134 : this->cond.wait(lock);
133 : }
134 259 : if (!q.empty()) {
135 295 : T res = this->q.front();
136 199 : this->q.pop();
137 199 : return boost::make_optional(res);
138 : } else {
139 : d_assert(no_more_data, "what?!");
140 60 : return boost::optional<T>();
141 : }
142 :
143 : }
144 :
145 : template <class T>
146 9 : void sync_queue<T>::no_more_input()
147 : {
148 18 : boost::mutex::scoped_lock lock(this->mutex);
149 9 : this->no_more_data = true;
150 9 : this->cond.notify_all();
151 9 : }
152 :
153 : //========== worker ============================================================
154 :
155 : template <class T>
156 57 : worker<T>::worker(worker_pool<T> & pool) : pool(pool)
157 : {
158 57 : }
159 :
160 : template <class T>
161 228 : worker<T>::~worker()
162 228 : {
163 456 : }
164 :
165 : template <class T>
166 57 : void worker<T>::operator()() const
167 : {
168 : LOG(TRACE, "thread " << pthread_self() << " has been started");
169 114 : boost::optional<T> res = this->pool.queue.pop();
170 193 : while (res)
171 : {
172 : LOG(TRACE, "thread " << pthread_self() << " handle work");
173 136 : this->handle(res.get());
174 136 : res = this->pool.queue.pop();
175 : }
176 : LOG(TRACE, "thread " << pthread_self() << " finishing");
177 57 : }
178 :
179 : //========== worker_factory ====================================================
180 :
181 : template <class T>
182 6 : worker_factory<T>::~worker_factory()
183 6 : {
184 12 : }
185 :
186 : //========== worker_pool =======================================================
187 :
188 : template <class T>
189 4 : worker_pool<T>::worker_pool()
190 : {
191 4 : }
192 :
193 : template <class T>
194 4 : worker_pool<T>::~worker_pool()
195 : {
196 : d_assert(this->workers.empty(), "there are still workers in dtor");
197 4 : }
198 :
199 : template <class T>
200 6 : void worker_pool<T>::start(uint32_t num_workers, worker_factory<T> const & factory)
201 : {
202 : LOG(DEBUG, "worker pool starting " << num_workers << " threads");
203 : //FIXME exception safety
204 63 : for (uint32_t i = 0; i < num_workers; ++i)
205 57 : this->workers.push_back(factory.create_worker(*this));
206 : LOG(DEBUG, "worker pool started");
207 6 : }
208 :
209 : template <class T>
210 6 : void worker_pool<T>::stop()
211 : {
212 : LOG(DEBUG, "worker pool stopping");
213 6 : this->queue.no_more_input();
214 : LOG(TRACE, "informed threads");
215 126 : for (
216 69 : workers_t::iterator i = this->workers.begin();
217 : i != this->workers.end();
218 : ++i
219 : )
220 : {
221 57 : i->join();
222 : }
223 : LOG(DEBUG, "worker pool has joined all threads");
224 6 : this->workers.clear();
225 6 : }
226 :
227 : template <class T>
228 136 : void worker_pool<T>::schedule_work(T const & work)
229 : {
230 136 : this->queue.push(work);
231 136 : }
232 :
233 : } // namespace util
234 : } // namespace coherent
235 :
236 : #endif /* WORKER_POOL_H_3392 */
|