LCOV - code coverage report
Current view: top level - journal - journal.cpp (source / functions) Hit Total Coverage
Test: CoherentDB code coverage Lines: 94 98 95.9 %
Date: 2011-02-13 Functions: 21 22 95.5 %

          Line data    Source code
       1             : /*
       2             :  * (C) Copyright 2011 Marek Dopiera
       3             :  * 
       4             :  * This file is part of CoherentDB.
       5             :  * 
       6             :  * CoherentDB is free software: you can redistribute it and/or modify it
       7             :  * under the terms of the GNU General Public License as published by
       8             :  * the Free Software Foundation, either version 3 of the License, or
       9             :  * (at your option) any later version.
      10             :  * 
      11             :  * CoherentDB is distributed in the hope that it will be useful, but
      12             :  * WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
      14             :  * General Public License for more details.
      15             :  * 
      16             :  * You should have received a copy of the GNU General Public
      17             :  * License along with CoherentDB. If not, see
      18             :  * http://www.gnu.org/licenses/.
      19             :  */
      20             : 
      21             : #include <journal/journal.h>
      22             : 
      23             : namespace coherent {
      24             : namespace journal {
      25             : 
      26             : using namespace std;
      27             : using namespace util;
      28             : using namespace boost;
      29             : 
      30             : //====== recovery_except =======================================================
      31             : 
      32             : //====== journal ===============================================================
      33             : 
      34             : journal::journal()
      35             : {
      36             :         in_mem_thread = new boost::thread(in_mem_journal_thread(*this));
      37           1 :         sync_thread = new boost::thread(sync_journal_thread(*this));
      38             : 
      39           1 :         journal_callback_worker_factory factory(*this);
      40           1 :         this->cb_workers.start(2,factory);
      41             : }
      42           2 : 
      43           1 : journal::~journal()
      44           1 : {
      45             :         LOG(TRACE, "thread " << pthread_self() << " is ~journal");
      46           1 :         
      47             :         reqs.no_more_input();
      48           1 :         sync_reqs.no_more_input();
      49             : 
      50           1 :         in_mem_thread->join();
      51           1 :         delete in_mem_thread;
      52             : 
      53           1 :         sync_thread->join();
      54           1 :         delete sync_thread;
      55             : 
      56           1 :         this->cb_workers.stop();
      57           1 : }
      58             : 
      59           1 : void journal::insert(
      60           1 :                         owner_id_t owner,
      61             :                         multi_buffer const & buf,
      62          15 :                         insert_cb & cb
      63             :                         ) throw()
      64             : {
      65             :         reqs.push(req_ptr(new insert_req(owner,buf,cb)));
      66           0 :         //cb.insert_success(owner,buf,random());
      67             : }
      68          15 : 
      69             : void journal::erase(
      70          15 :                 owner_id_t owner,
      71             :                 handle_t handle,
      72          15 :                 erase_cb & cb
      73             :                 ) throw()
      74             : {
      75             :         reqs.push(req_ptr(new erase_req(owner,handle,cb)));
      76           0 : }
      77             : 
      78          15 : void journal::recover(
      79          15 :                 recovery_dispatcher & dispatcher
      80             :                 ) throw(recovery_dispatcher)
      81           0 : {
      82             : }
      83             : 
      84             : //====== in_mem_journal_thread =================================================
      85           0 : in_mem_journal_thread::in_mem_journal_thread( journal & j)
      86             :         : journal_single_thread(j)
      87             : {
      88           1 : }
      89           1 : 
      90             : void in_mem_journal_thread::operator()() const
      91           1 : {
      92             :         LOG(TRACE, "thread " << pthread_self() << " has been started");
      93           1 : 
      94             :         boost::optional<journal::req_ptr> res = j.reqs.pop();
      95           1 :         while (res)
      96             :         {
      97           2 :                 res.get()->execute(j);
      98          31 :                 res = j.reqs.pop();
      99             :         }
     100          30 : 
     101             :         LOG(TRACE, "thread " << pthread_self() << " finishing");
     102             : }
     103             : 
     104           1 : //====== requests ==============================================================
     105           1 : 
     106             : journal::insert_req::insert_req(
     107             :                         owner_id_t owner,
     108             :                         multi_buffer const & buf,
     109          15 :                         insert_cb & cb
     110             :                         ) :
     111             :                 owner(owner),
     112             :                 buf(buf),
     113             :                 cb(cb)
     114             : {
     115             : }
     116          15 : 
     117             : void journal::insert_req::execute(journal & j)
     118          15 : {
     119             :         handle_t handle;
     120             :         do {
     121          15 :                 handle = random();
     122             :         } while( key_exists(j.insert_reqs,make_pair(this->owner,handle)));
     123             : 
     124          15 :         j.insert_reqs[make_pair(this->owner,handle)] = shared_from_this();
     125          15 : 
     126          15 :         j.sync_reqs.push(
     127             :                 sync_req_ptr(
     128          15 :                         new insert_sync_req(this->owner,handle,this->buf)));
     129             : }
     130             : 
     131             : journal::erase_req::erase_req(
     132          15 :                 owner_id_t const owner,
     133          15 :                 handle_t const handle,
     134             :                 erase_cb & cb
     135          15 :          ) :
     136             :         owner(owner),
     137             :         handle(handle),
     138             :         cb(cb)
     139             : {
     140             : }
     141             : 
     142          15 : void journal::erase_req::execute(journal & j)
     143             : {
     144             :         LOG(TRACE, "thread " << pthread_self() << " made a sync_req");
     145             : 
     146          15 :         j.erase_reqs[make_pair(this->owner,this->handle)] = shared_from_this();
     147             : 
     148          15 :         j.sync_reqs.push(
     149             :                 sync_req_ptr(
     150          15 :                         new erase_sync_req(this->owner,this->handle)));
     151             : }
     152             : 
     153             : //====== sync_journal_thread ===================================================
     154          15 : sync_journal_thread::sync_journal_thread( journal & j)
     155          15 :         : journal_single_thread(j)
     156             : {
     157             : //otworzyc plik
     158           1 : }
     159           1 : 
     160             : void sync_journal_thread::operator()() const
     161             : {
     162             :         LOG(TRACE, "thread " << pthread_self() << " has been started");
     163             :         boost::optional<journal::sync_req_ptr> res = j.sync_reqs.pop();
     164           1 :         while (res)
     165             :         {
     166             :                 LOG(TRACE, "thread " << pthread_self() << " handle sync_req");
     167           2 :                 res.get()->execute(j);
     168          31 :                 res = j.sync_reqs.pop();
     169             :         }
     170             :         LOG(TRACE, "thread " << pthread_self() << " finishing");
     171          30 : }
     172          30 : 
     173             : //====== sync_reqs =============================================================
     174           1 : 
     175           1 : journal::sync_req::sync_req(
     176             :                 owner_id_t owner,
     177             :                 handle_t handle
     178             :                 ) : owner(owner), handle(handle)
     179          30 : {
     180             : }
     181             : 
     182          30 : journal::insert_sync_req::insert_sync_req(
     183             :                 owner_id_t owner,
     184          30 :                 handle_t handle,
     185             :                 const multi_buffer & buf
     186          15 :                 ) : sync_req(owner,handle), buf(buf)
     187             : {
     188             : }
     189             : 
     190          15 : void journal::insert_sync_req::execute( journal & j)
     191             : {
     192             :         LOG(TRACE, "thread " << pthread_self() << " execute");
     193             :         j.cb_workers.schedule_work( cb_req_ptr( new insert_cb_req(this->owner, this->handle)));
     194          15 : }
     195             : 
     196          15 : journal::erase_sync_req::erase_sync_req(
     197          15 :                 owner_id_t owner,
     198          15 :                 handle_t handle
     199             :                 ) : sync_req(owner,handle)
     200          15 : {
     201             : }
     202             : 
     203          15 : void journal::erase_sync_req::execute( journal & j)
     204             : {
     205             :         LOG(TRACE, "thread " << pthread_self() << " execute");
     206             :         j.cb_workers.schedule_work( cb_req_ptr( new erase_cb_req(this->owner, this->handle)));
     207          15 : }
     208             : 
     209          15 : //====== cb_reqs ===============================================================
     210          15 : 
     211          15 : journal::cb_req::cb_req(
     212             :                 owner_id_t owner,
     213             :                 handle_t handle
     214             :                 ) : owner(owner), handle(handle)
     215          30 : {
     216             : }
     217             : 
     218          30 : journal::insert_cb_req::insert_cb_req(
     219             :                 owner_id_t owner,
     220          30 :                 handle_t handle
     221             :                 ) : cb_req( owner,handle)
     222             : {
     223          15 : }
     224             : 
     225             : journal::erase_cb_req::erase_cb_req(
     226          15 :                 owner_id_t owner,
     227             :                 handle_t handle
     228          15 :                 ) : cb_req( owner,handle)
     229             : {
     230          15 : }
     231             : 
     232             : void journal::insert_cb_req::execute( journal & j)
     233          15 : {
     234             :         shared_ptr<insert_req> req =
     235          15 :                 get_map_val(j.insert_reqs,make_pair(this->owner,this->handle));
     236             : 
     237             :         r_assert( req->owner == this->owner, "owner " << req->owner << " != " << this->owner);
     238             : 
     239             :         LOG(TRACE, "thread " << pthread_self() << " cb.insert_success");
     240          30 :         req->cb.insert_success( req->owner, req->buf, this->handle);
     241             : }
     242          15 : 
     243             : void journal::erase_cb_req::execute( journal & j)
     244          15 : {
     245          15 :         shared_ptr<erase_req> req =
     246          15 :                 get_map_val(j.erase_reqs, make_pair(this->owner,this->handle));
     247             :         r_assert( req->owner == this->owner, "owner " << req->owner << " != " << this->owner);
     248          15 : 
     249             :         LOG(TRACE, "thread " << pthread_self() << " cb.erase_success");
     250             :         req->cb.erase_success( req->owner, req->handle);
     251          30 : }
     252          15 : 
     253             : } // namespace journal 
     254          15 : } // namespace coherent
     255          15 : 

Generated by: LCOV version 1.9