LCOV - code coverage report
Current view: top level - util - aio.cpp (source / functions) Hit Total Coverage
Test: CoherentDB code coverage Lines: 136 136 100.0 %
Date: 2011-02-13 Functions: 35 35 100.0 %

          Line data    Source code
       1             : /*
       2             :  * (C) Copyright 2011 Marek Dopiera
       3             :  * 
       4             :  * This file is part of CoherentDB.
       5             :  * 
       6             :  * CoherentDB is free software: you can redistribute it and/or modify it
       7             :  * under the terms of the GNU General Public License as published by
       8             :  * the Free Software Foundation, either version 3 of the License, or
       9             :  * (at your option) any later version.
      10             :  * 
      11             :  * CoherentDB is distributed in the hope that it will be useful, but
      12             :  * WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
      14             :  * General Public License for more details.
      15             :  * 
      16             :  * You should have received a copy of the GNU General Public
      17             :  * License along with CoherentDB. If not, see
      18             :  * http://www.gnu.org/licenses/.
      19             :  */
      20             : 
      21             : #include <fcntl.h>
      22             : #include <unistd.h>
      23             : #include <util/aio.h>
      24             : 
      25             : #include <debug/asserts.h>
      26             : 
      27             : namespace coherent {
      28             : namespace util {
      29             : 
      30             : using namespace std;
      31             : using namespace boost;
      32             : 
      33             : //======= aio_worker_factory ===================================================
      34             : 
      35          15 : aio_worker_factory::worker_ptr aio_worker_factory::create_worker(
      36             :         worker_pool<aio_request_ptr> & pool
      37             :         ) const
      38             : {
      39          15 :         return new thread(aio_worker(pool));
      40             : }
      41             : 
      42             : //======= requests =============================================================
      43             : 
      44           1 : aio_open_req::aio_open_req(
      45             :         callback & cb,
      46             :         std::string const & path,
      47             :         int flags,
      48             :         int mode
      49             :         ) :
      50             :         cb(cb),
      51             :         path(path),
      52             :         flags(flags),
      53           1 :         mode(mode)
      54             : {
      55           1 : }
      56             : 
      57           1 : void aio_open_req::handle_in_worker(aio_worker const & worker) const
      58             : {
      59           1 :         worker.handle_exact(*this);
      60           1 : }
      61             : 
      62           1 : aio_pread_req::aio_pread_req(
      63             :         callback & cb,
      64             :         int fd,
      65             :         char * buf,
      66             :         off_t off,
      67             :         ssize_t size
      68             :         ) :
      69             :         cb(cb),
      70             :         buf(buf),
      71             :         off(off),
      72             :         size(size),
      73           1 :         fd(fd)
      74             : {
      75           1 : }
      76             : 
      77           1 : void aio_pread_req::handle_in_worker(aio_worker const & worker) const
      78             : {
      79           1 :         worker.handle_exact(*this);
      80           1 : }
      81             : 
      82           1 : aio_preadv_req::aio_preadv_req(
      83             :         callback & cb,
      84             :         int fd,
      85             :         std::vector<iovec> & iovecs,
      86             :         off_t off
      87             :         ) :
      88             :         cb(cb),
      89             :         off(off),
      90           1 :         fd(fd)
      91             : {
      92           1 :         this->iovecs.swap(iovecs);
      93           1 : }
      94             : 
      95           1 : void aio_preadv_req::handle_in_worker(aio_worker const & worker) const
      96             : {
      97           1 :         worker.handle_exact(*this);
      98           1 : }
      99             : 
     100           1 : aio_pwrite_req::aio_pwrite_req(
     101             :         callback & cb,
     102             :         int fd,
     103             :         char const * buf,
     104             :         off_t off,
     105             :         ssize_t size
     106             :         ) :
     107             :         cb(cb),
     108             :         buf(buf),
     109             :         off(off),
     110             :         size(size),
     111           1 :         fd(fd)
     112             : {
     113           1 : }
     114             : 
     115           1 : void aio_pwrite_req::handle_in_worker(aio_worker const & worker) const
     116             : {
     117           1 :         worker.handle_exact(*this);
     118           1 : }
     119             : 
     120           1 : aio_pwritev_req::aio_pwritev_req(
     121             :         callback & cb,
     122             :         int fd,
     123             :         std::vector<iovec> & iovecs,
     124             :         off_t off
     125             :         ) :
     126             :         cb(cb),
     127             :         off(off),
     128           1 :         fd(fd)
     129             : {
     130           1 :         this->iovecs.swap(iovecs);
     131           1 : }
     132             : 
     133           1 : void aio_pwritev_req::handle_in_worker(aio_worker const & worker) const
     134             : {
     135           1 :         worker.handle_exact(*this);
     136           1 : }
     137             : 
     138           1 : aio_close_req::aio_close_req(callback & cb, int fd) :
     139             :         cb(cb),
     140           1 :         fd(fd)
     141             : {
     142           1 : }
     143             : 
     144           1 : void aio_close_req::handle_in_worker(aio_worker const & worker) const
     145             : {
     146           1 :         worker.handle_exact(*this);
     147           1 : }
     148             : 
     149             : //======= aio_worker ===========================================================
     150             : 
     151          15 : aio_worker::aio_worker(worker_pool<aio_request_ptr> & pool) :
     152          15 :         worker<aio_request_ptr>(pool)
     153             : {
     154          15 : }
     155             : 
     156           6 : void aio_worker::handle(aio_request_ptr const & req) const
     157             : {
     158           6 :         req->handle_in_worker(*this);
     159           6 : }
     160             : 
     161           1 : void aio_worker::handle_exact(aio_open_req const & req) const
     162             : {
     163           1 :         int fd = open(req.path.c_str(), req.flags, req.mode);
     164           1 :         req.cb.open_completed(req, fd, errno);
     165           1 : }
     166             : 
     167           1 : void aio_worker::handle_exact(aio_pread_req const & req) const
     168             : {
     169           1 :         ssize_t res = pread(req.fd, req.buf, req.size, req.off);
     170           1 :         req.cb.pread_completed(req, res, errno);
     171           1 : }
     172             : 
     173           1 : void aio_worker::handle_exact(aio_preadv_req const & req) const
     174             : {
     175             :         ssize_t res = preadv(
     176             :                 req.fd,
     177           1 :                 &(req.iovecs[0]),
     178           1 :                 req.iovecs.size(),
     179             :                 req.off
     180           1 :                 );
     181           1 :         req.cb.preadv_completed(req, res, errno);
     182           1 : }
     183             : 
     184           1 : void aio_worker::handle_exact(aio_pwrite_req const & req) const
     185             : {
     186           1 :         ssize_t res = pwrite(req.fd, req.buf, req.size, req.off);
     187           1 :         req.cb.pwrite_completed(req, res, errno);
     188           1 : }
     189             : 
     190           1 : void aio_worker::handle_exact(aio_pwritev_req const & req) const
     191             : {
     192             :         ssize_t res = pwritev(
     193             :                 req.fd,
     194           1 :                 &(req.iovecs[0]),
     195           1 :                 req.iovecs.size(),
     196             :                 req.off
     197           1 :                 );
     198           1 :         req.cb.pwritev_completed(req, res, errno);
     199           1 : }
     200             : 
     201           1 : void aio_worker::handle_exact(aio_close_req const & req) const
     202             : {
     203           1 :         int res = close(req.fd);
     204           1 :         req.cb.close_completed(req, res, errno);
     205           1 : }
     206             : 
     207             : //======= aio_context ==========================================================
     208             : 
     209             : aio_context::aio_context(uint32_t num_workers) : workers()
     210           1 : {
     211             :         workers.start(num_workers, aio_worker_factory());
     212           1 : }
     213           1 : 
     214             : aio_context::~aio_context()
     215           1 : {
     216             :         workers.stop();
     217           1 : }
     218           1 : 
     219             : void aio_context::submit_request(aio_request_ptr req)
     220           6 : {
     221             :         this->workers.schedule_work(req);
     222           6 : }
     223           6 : 
     224             : //======= async_file ===========================================================
     225             : 
     226             : async_file::async_file(aio_context & ctx, std::string const path) :
     227           1 :         ctx(ctx),
     228             :         path(path),
     229             :         open_cb(NULL),
     230             :         close_cb(NULL),
     231             :         fd(NOT_OPEN)
     232           1 : {
     233             : }
     234           1 : 
     235             : void async_file::submit_open(
     236           1 :         open_callback & cb,
     237             :         int flags,
     238             :         int mode
     239             :         )
     240             : {
     241             :         d_assert(!this->is_open(), "can't open an already opened file");
     242             :         d_assert(
     243             :                 this->open_cb == NULL,
     244             :                 "can't open a file which is just being opened"
     245             :                 );
     246             :         d_assert(this->close_cb == NULL, "waht?");
     247           1 :         aio_request_ptr req(new aio_open_req(*this, this->path, flags, mode));
     248           2 :         this->open_cb = & cb;
     249           1 :         this->ctx.submit_request(req);
     250           1 : }
     251           1 : 
     252             : void async_file::submit_pread(
     253           1 :         pread_callback & cb,
     254             :         char * buf,
     255             :         off_t off,
     256             :         ssize_t size
     257             :         )
     258             : {
     259             :         d_assert(this->is_open(), "can't pread from closed file");
     260           1 :         aio_request_ptr req(new aio_pread_req(cb, this->fd, buf, off, size));
     261           2 :         this->ctx.submit_request(req);
     262           1 : }
     263           1 : 
     264             : void async_file::submit_preadv(
     265           1 :         preadv_callback & cb,
     266             :         std::vector<iovec> & iovecs,
     267             :         off_t off
     268             :         )
     269             : {
     270             :         d_assert(this->is_open(), "can't preadv from closed file");
     271           1 :         aio_request_ptr req(new aio_preadv_req(cb, this->fd, iovecs, off));
     272           2 :         this->ctx.submit_request(req);
     273           1 : }
     274           1 : 
     275             : void async_file::submit_pwrite(
     276           1 :         pwrite_callback & cb,
     277             :         char const * buf,
     278             :         off_t off,
     279             :         ssize_t size
     280             :         )
     281             : {
     282             :         d_assert(this->is_open(), "can't pwrite to closed file");
     283           1 :         aio_request_ptr req(new aio_pwrite_req(cb, this->fd, buf, off, size));
     284           2 :         this->ctx.submit_request(req);
     285           1 : }
     286           1 : 
     287             : void async_file::submit_pwritev(
     288           1 :         pwritev_callback & cb,
     289             :         std::vector<iovec> & iovecs,
     290             :         off_t off
     291             :         )
     292             : {
     293             :         d_assert(this->is_open(), "can't pwritev to closed file");
     294           1 :         aio_request_ptr req(new aio_pwritev_req(cb, this->fd, iovecs, off));
     295           2 :         this->ctx.submit_request(req);
     296           1 : }
     297           1 : 
     298             : void async_file::submit_close(
     299           1 :         close_callback & cb
     300             :         )
     301             : {
     302             :         d_assert(this->is_open(), "can't close closed file");
     303             :         d_assert(
     304             :                 this->close_cb == NULL,
     305             :                 "can't close a file which is just being closed"
     306             :                 );
     307             :         d_assert(this->open_cb == NULL, "waht?");
     308           1 :         aio_request_ptr req(new aio_close_req(*this, this->fd));
     309           2 :         this->close_cb = & cb;
     310           1 :         this->ctx.submit_request(req);
     311           1 : }
     312           1 : 
     313             : void async_file::open_completed(
     314           1 :         aio_open_req const & req,
     315             :         int fd,
     316             :         int err
     317             :         )
     318             : {
     319             :         d_assert(this->fd == NOT_OPEN, "what? fd=" << this->fd);
     320             :         d_assert(this->open_cb != NULL, "what?");
     321           1 :         if (fd >= 0) {
     322           1 :                 this->fd = fd;
     323           1 :                 err = 0; // just to be sure
     324           1 :         }
     325             :         open_callback & cb = *this->open_cb;
     326           1 :         this->open_cb = NULL;
     327           1 :         cb.open_completed(req, 0);
     328           1 : }
     329           1 : 
     330             : void async_file::close_completed(
     331           1 :         aio_close_req const & req,
     332             :         int res,
     333             :         int err
     334             :         )
     335             : {
     336             :         d_assert(this->fd != NOT_OPEN, "what?");
     337             :         d_assert(this->close_cb != NULL, "what?");
     338           1 :         if (res == 0)
     339           1 :                 this->fd = NOT_OPEN;
     340           1 : 
     341             :         close_callback & cb = *this->close_cb;
     342           1 :         this->close_cb = NULL;
     343           1 :         cb.close_completed(req, res, err);
     344           1 : }
     345           1 : 
     346             : } // namespace util 
     347             : } // namespace coherent
     348             : 

Generated by: LCOV version 1.9