LCOV - code coverage report
Current view: top level - include/util - worker_pool.h (source / functions) Hit Total Coverage
Test: CoherentDB code coverage Lines: 57 57 100.0 %
Date: 2011-02-13 Functions: 58 82 70.7 %

          Line data    Source code
       1             : /*
       2             :  * (C) Copyright 2011 Marek Dopiera
       3             :  * 
       4             :  * This file is part of CoherentDB.
       5             :  * 
       6             :  * CoherentDB is free software: you can redistribute it and/or modify it
       7             :  * under the terms of the GNU General Public License as published by
       8             :  * the Free Software Foundation, either version 3 of the License, or
       9             :  * (at your option) any later version.
      10             :  * 
      11             :  * CoherentDB is distributed in the hope that it will be useful, but
      12             :  * WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
      14             :  * General Public License for more details.
      15             :  * 
      16             :  * You should have received a copy of the GNU General Public
      17             :  * License along with CoherentDB. If not, see
      18             :  * http://www.gnu.org/licenses/.
      19             :  */
      20             : 
      21             : #ifndef WORKER_POOL_H_3392
      22             : #define WORKER_POOL_H_3392
      23             : 
      24             : #include <queue>
      25             : 
      26             : #include <boost/optional.hpp>
      27             : #include <boost/thread/thread.hpp>
      28             : #include <boost/thread/mutex.hpp>
      29             : #include <boost/thread/condition_variable.hpp>
      30             : #include <boost/ptr_container/ptr_vector.hpp>
      31             : 
      32             : #include <debug/asserts.h>
      33             : 
      34             : namespace coherent {
      35             : namespace util {
      36             : 
      37             : template <class T>
      38             : class sync_queue
      39             : {
      40             : public:
      41             :         sync_queue();
      42             :         ~sync_queue(); //for assertions
      43             :         void push(T const & t);
      44             :         boost::optional<T> pop();
      45             :         void no_more_input();
      46             : 
      47             : private:
      48             :         boost::mutex mutex;
      49             :         boost::condition_variable cond;
      50             :         std::queue<T> q;
      51             :         bool no_more_data;
      52             : };
      53             : 
      54             : template <class T>
      55             : class worker_pool;
      56             : 
      57             : template <class T>
      58             : class worker
      59         171 : {
      60             : public:
      61             :         worker(worker_pool<T> & worker_pool);
      62             :         virtual ~worker();
      63             :         void operator()() const;
      64             : protected:
      65             :         virtual void handle(T const & t) const = 0; //I want workers to be stateless
      66             : private:
      67             :         worker_pool<T> & pool;
      68             : };
      69             : 
      70             : template <class T>
      71             : class worker_factory
      72           6 : {
      73             : public:
      74             :         typedef boost::thread* worker_ptr;
      75             : 
      76             :         virtual worker_ptr create_worker(worker_pool<T> & pool) const = 0;
      77             :         virtual ~worker_factory();
      78             : };
      79             : 
      80             : template <class T>
      81             : class worker_pool
      82             : {
      83             : public:
      84             :         worker_pool();
      85             :         ~worker_pool();
      86             :         void start(uint32_t num_workers, worker_factory<T> const & factory);
      87             :         void stop();
      88             :         void schedule_work(T const & work);
      89             : private:
      90             :         friend class worker<T>;
      91             : 
      92             :         typedef boost::ptr_vector<boost::thread> workers_t;
      93             :         workers_t workers;
      94             :         sync_queue<T> queue;
      95             : };
      96             : 
      97             : //========== IMPLEMENTATION ====================================================
      98             : 
      99             : //========== sync_queue ========================================================
     100             : 
     101             : template <class T>
     102           7 : sync_queue<T>::sync_queue() : no_more_data(false)
     103             : {
     104           7 : }
     105             : 
     106             : template <class T>
     107           7 : sync_queue<T>::~sync_queue()
     108             : {
     109             :         d_assert(
     110             :                 no_more_data && q.empty(),
     111             :                 "trying to destruct a queue but empty=" << q.empty() << " no_more_data="
     112             :                 << no_more_data
     113             :                 );
     114           7 : }
     115             : 
     116             : template <class T>
     117         199 : void sync_queue<T>::push(T const & t)
     118             : {
     119         398 :         boost::mutex::scoped_lock lock(this->mutex);
     120             :         d_assert(!no_more_data, "trying to push to a closing queue");
     121             : 
     122         199 :         this->q.push(t);
     123         199 :         this->cond.notify_one();
     124         199 : }
     125             : 
     126             : template <class T>
     127         258 : boost::optional<T> sync_queue<T>::pop()
     128             : {
     129         517 :         boost::mutex::scoped_lock lock(this->mutex);
     130         393 :         while (q.empty() && !no_more_data)
     131             :         {
     132         134 :                 this->cond.wait(lock);
     133             :         }
     134         259 :         if (!q.empty()) {
     135         295 :                 T res = this->q.front();
     136         199 :                 this->q.pop();
     137         199 :                 return boost::make_optional(res);
     138             :         } else {
     139             :                 d_assert(no_more_data, "what?!");
     140          60 :                 return boost::optional<T>();
     141             :         }
     142             :                 
     143             : }
     144             : 
     145             : template <class T>
     146           9 : void sync_queue<T>::no_more_input()
     147             : {
     148          18 :         boost::mutex::scoped_lock lock(this->mutex);
     149           9 :         this->no_more_data = true;
     150           9 :         this->cond.notify_all();
     151           9 : }
     152             : 
     153             : //========== worker ============================================================
     154             : 
     155             : template <class T>
     156          57 : worker<T>::worker(worker_pool<T> & pool) : pool(pool)
     157             : {
     158          57 : }
     159             : 
     160             : template <class T>
     161         228 : worker<T>::~worker()
     162         228 : {
     163         456 : }
     164             : 
     165             : template <class T>
     166          57 : void worker<T>::operator()() const
     167             : {
     168             :         LOG(TRACE, "thread " << pthread_self() << " has been started");
     169         114 :         boost::optional<T> res = this->pool.queue.pop();
     170         193 :         while (res)
     171             :         {
     172             :                 LOG(TRACE, "thread " << pthread_self() << " handle work");
     173         136 :                 this->handle(res.get());
     174         136 :                 res = this->pool.queue.pop();
     175             :         }
     176             :         LOG(TRACE, "thread " << pthread_self() << " finishing");
     177          57 : }
     178             : 
     179             : //========== worker_factory ====================================================
     180             : 
     181             : template <class T>
     182           6 : worker_factory<T>::~worker_factory()
     183           6 : {
     184          12 : }
     185             : 
     186             : //========== worker_pool =======================================================
     187             : 
     188             : template <class T>
     189           4 : worker_pool<T>::worker_pool()
     190             : {
     191           4 : }
     192             : 
     193             : template <class T>
     194           4 : worker_pool<T>::~worker_pool()
     195             : {
     196             :         d_assert(this->workers.empty(), "there are still workers in dtor");
     197           4 : }
     198             : 
     199             : template <class T>
     200           6 : void worker_pool<T>::start(uint32_t num_workers, worker_factory<T> const & factory)
     201             : {
     202             :         LOG(DEBUG, "worker pool starting " << num_workers << " threads");
     203             :         //FIXME exception safety
     204          63 :         for (uint32_t i = 0; i < num_workers; ++i)
     205          57 :                 this->workers.push_back(factory.create_worker(*this));
     206             :         LOG(DEBUG, "worker pool started");
     207           6 : }
     208             : 
     209             : template <class T>
     210           6 : void worker_pool<T>::stop()
     211             : {
     212             :         LOG(DEBUG, "worker pool stopping");
     213           6 :         this->queue.no_more_input();
     214             :         LOG(TRACE, "informed threads");
     215         126 :         for (
     216          69 :                 workers_t::iterator i = this->workers.begin();
     217             :                 i != this->workers.end();
     218             :                 ++i
     219             :                 )
     220             :         {
     221          57 :                 i->join();
     222             :         }
     223             :         LOG(DEBUG, "worker pool has joined all threads");
     224           6 :         this->workers.clear();
     225           6 : }
     226             : 
     227             : template <class T>
     228         136 : void worker_pool<T>::schedule_work(T const & work)
     229             : {
     230         136 :         this->queue.push(work);
     231         136 : }
     232             : 
     233             : } // namespace util
     234             : } // namespace coherent
     235             : 
     236             : #endif /* WORKER_POOL_H_3392 */

Generated by: LCOV version 1.9