Line data Source code
1 : /*
2 : * (C) Copyright 2011 Marek Dopiera
3 : *
4 : * This file is part of CoherentDB.
5 : *
6 : * CoherentDB is free software: you can redistribute it and/or modify it
7 : * under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation, either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * CoherentDB is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public
17 : * License along with CoherentDB. If not, see
18 : * http://www.gnu.org/licenses/.
19 : */
20 :
21 : #include <fcntl.h>
22 : #include <unistd.h>
23 : #include <util/aio.h>
24 :
25 : #include <debug/asserts.h>
26 :
27 : namespace coherent {
28 : namespace util {
29 :
30 : using namespace std;
31 : using namespace boost;
32 :
33 : //======= aio_worker_factory ===================================================
34 :
35 15 : aio_worker_factory::worker_ptr aio_worker_factory::create_worker(
36 : worker_pool<aio_request_ptr> & pool
37 : ) const
38 : {
39 15 : return new thread(aio_worker(pool));
40 : }
41 :
42 : //======= requests =============================================================
43 :
44 1 : aio_open_req::aio_open_req(
45 : callback & cb,
46 : std::string const & path,
47 : int flags,
48 : int mode
49 : ) :
50 : cb(cb),
51 : path(path),
52 : flags(flags),
53 1 : mode(mode)
54 : {
55 1 : }
56 :
57 1 : void aio_open_req::handle_in_worker(aio_worker const & worker) const
58 : {
59 1 : worker.handle_exact(*this);
60 1 : }
61 :
62 1 : aio_pread_req::aio_pread_req(
63 : callback & cb,
64 : int fd,
65 : char * buf,
66 : off_t off,
67 : ssize_t size
68 : ) :
69 : cb(cb),
70 : buf(buf),
71 : off(off),
72 : size(size),
73 1 : fd(fd)
74 : {
75 1 : }
76 :
77 1 : void aio_pread_req::handle_in_worker(aio_worker const & worker) const
78 : {
79 1 : worker.handle_exact(*this);
80 1 : }
81 :
82 1 : aio_preadv_req::aio_preadv_req(
83 : callback & cb,
84 : int fd,
85 : std::vector<iovec> & iovecs,
86 : off_t off
87 : ) :
88 : cb(cb),
89 : off(off),
90 1 : fd(fd)
91 : {
92 1 : this->iovecs.swap(iovecs);
93 1 : }
94 :
95 1 : void aio_preadv_req::handle_in_worker(aio_worker const & worker) const
96 : {
97 1 : worker.handle_exact(*this);
98 1 : }
99 :
100 1 : aio_pwrite_req::aio_pwrite_req(
101 : callback & cb,
102 : int fd,
103 : char const * buf,
104 : off_t off,
105 : ssize_t size
106 : ) :
107 : cb(cb),
108 : buf(buf),
109 : off(off),
110 : size(size),
111 1 : fd(fd)
112 : {
113 1 : }
114 :
115 1 : void aio_pwrite_req::handle_in_worker(aio_worker const & worker) const
116 : {
117 1 : worker.handle_exact(*this);
118 1 : }
119 :
120 1 : aio_pwritev_req::aio_pwritev_req(
121 : callback & cb,
122 : int fd,
123 : std::vector<iovec> & iovecs,
124 : off_t off
125 : ) :
126 : cb(cb),
127 : off(off),
128 1 : fd(fd)
129 : {
130 1 : this->iovecs.swap(iovecs);
131 1 : }
132 :
133 1 : void aio_pwritev_req::handle_in_worker(aio_worker const & worker) const
134 : {
135 1 : worker.handle_exact(*this);
136 1 : }
137 :
138 1 : aio_close_req::aio_close_req(callback & cb, int fd) :
139 : cb(cb),
140 1 : fd(fd)
141 : {
142 1 : }
143 :
144 1 : void aio_close_req::handle_in_worker(aio_worker const & worker) const
145 : {
146 1 : worker.handle_exact(*this);
147 1 : }
148 :
149 : //======= aio_worker ===========================================================
150 :
151 15 : aio_worker::aio_worker(worker_pool<aio_request_ptr> & pool) :
152 15 : worker<aio_request_ptr>(pool)
153 : {
154 15 : }
155 :
156 6 : void aio_worker::handle(aio_request_ptr const & req) const
157 : {
158 6 : req->handle_in_worker(*this);
159 6 : }
160 :
161 1 : void aio_worker::handle_exact(aio_open_req const & req) const
162 : {
163 1 : int fd = open(req.path.c_str(), req.flags, req.mode);
164 1 : req.cb.open_completed(req, fd, errno);
165 1 : }
166 :
167 1 : void aio_worker::handle_exact(aio_pread_req const & req) const
168 : {
169 1 : ssize_t res = pread(req.fd, req.buf, req.size, req.off);
170 1 : req.cb.pread_completed(req, res, errno);
171 1 : }
172 :
173 1 : void aio_worker::handle_exact(aio_preadv_req const & req) const
174 : {
175 : ssize_t res = preadv(
176 : req.fd,
177 1 : &(req.iovecs[0]),
178 1 : req.iovecs.size(),
179 : req.off
180 1 : );
181 1 : req.cb.preadv_completed(req, res, errno);
182 1 : }
183 :
184 1 : void aio_worker::handle_exact(aio_pwrite_req const & req) const
185 : {
186 1 : ssize_t res = pwrite(req.fd, req.buf, req.size, req.off);
187 1 : req.cb.pwrite_completed(req, res, errno);
188 1 : }
189 :
190 1 : void aio_worker::handle_exact(aio_pwritev_req const & req) const
191 : {
192 : ssize_t res = pwritev(
193 : req.fd,
194 1 : &(req.iovecs[0]),
195 1 : req.iovecs.size(),
196 : req.off
197 1 : );
198 1 : req.cb.pwritev_completed(req, res, errno);
199 1 : }
200 :
201 1 : void aio_worker::handle_exact(aio_close_req const & req) const
202 : {
203 1 : int res = close(req.fd);
204 1 : req.cb.close_completed(req, res, errno);
205 1 : }
206 :
207 : //======= aio_context ==========================================================
208 :
209 : aio_context::aio_context(uint32_t num_workers) : workers()
210 1 : {
211 : workers.start(num_workers, aio_worker_factory());
212 1 : }
213 1 :
214 : aio_context::~aio_context()
215 1 : {
216 : workers.stop();
217 1 : }
218 1 :
219 : void aio_context::submit_request(aio_request_ptr req)
220 6 : {
221 : this->workers.schedule_work(req);
222 6 : }
223 6 :
224 : //======= async_file ===========================================================
225 :
226 : async_file::async_file(aio_context & ctx, std::string const path) :
227 1 : ctx(ctx),
228 : path(path),
229 : open_cb(NULL),
230 : close_cb(NULL),
231 : fd(NOT_OPEN)
232 1 : {
233 : }
234 1 :
235 : void async_file::submit_open(
236 1 : open_callback & cb,
237 : int flags,
238 : int mode
239 : )
240 : {
241 : d_assert(!this->is_open(), "can't open an already opened file");
242 : d_assert(
243 : this->open_cb == NULL,
244 : "can't open a file which is just being opened"
245 : );
246 : d_assert(this->close_cb == NULL, "waht?");
247 1 : aio_request_ptr req(new aio_open_req(*this, this->path, flags, mode));
248 2 : this->open_cb = & cb;
249 1 : this->ctx.submit_request(req);
250 1 : }
251 1 :
252 : void async_file::submit_pread(
253 1 : pread_callback & cb,
254 : char * buf,
255 : off_t off,
256 : ssize_t size
257 : )
258 : {
259 : d_assert(this->is_open(), "can't pread from closed file");
260 1 : aio_request_ptr req(new aio_pread_req(cb, this->fd, buf, off, size));
261 2 : this->ctx.submit_request(req);
262 1 : }
263 1 :
264 : void async_file::submit_preadv(
265 1 : preadv_callback & cb,
266 : std::vector<iovec> & iovecs,
267 : off_t off
268 : )
269 : {
270 : d_assert(this->is_open(), "can't preadv from closed file");
271 1 : aio_request_ptr req(new aio_preadv_req(cb, this->fd, iovecs, off));
272 2 : this->ctx.submit_request(req);
273 1 : }
274 1 :
275 : void async_file::submit_pwrite(
276 1 : pwrite_callback & cb,
277 : char const * buf,
278 : off_t off,
279 : ssize_t size
280 : )
281 : {
282 : d_assert(this->is_open(), "can't pwrite to closed file");
283 1 : aio_request_ptr req(new aio_pwrite_req(cb, this->fd, buf, off, size));
284 2 : this->ctx.submit_request(req);
285 1 : }
286 1 :
287 : void async_file::submit_pwritev(
288 1 : pwritev_callback & cb,
289 : std::vector<iovec> & iovecs,
290 : off_t off
291 : )
292 : {
293 : d_assert(this->is_open(), "can't pwritev to closed file");
294 1 : aio_request_ptr req(new aio_pwritev_req(cb, this->fd, iovecs, off));
295 2 : this->ctx.submit_request(req);
296 1 : }
297 1 :
298 : void async_file::submit_close(
299 1 : close_callback & cb
300 : )
301 : {
302 : d_assert(this->is_open(), "can't close closed file");
303 : d_assert(
304 : this->close_cb == NULL,
305 : "can't close a file which is just being closed"
306 : );
307 : d_assert(this->open_cb == NULL, "waht?");
308 1 : aio_request_ptr req(new aio_close_req(*this, this->fd));
309 2 : this->close_cb = & cb;
310 1 : this->ctx.submit_request(req);
311 1 : }
312 1 :
313 : void async_file::open_completed(
314 1 : aio_open_req const & req,
315 : int fd,
316 : int err
317 : )
318 : {
319 : d_assert(this->fd == NOT_OPEN, "what? fd=" << this->fd);
320 : d_assert(this->open_cb != NULL, "what?");
321 1 : if (fd >= 0) {
322 1 : this->fd = fd;
323 1 : err = 0; // just to be sure
324 1 : }
325 : open_callback & cb = *this->open_cb;
326 1 : this->open_cb = NULL;
327 1 : cb.open_completed(req, 0);
328 1 : }
329 1 :
330 : void async_file::close_completed(
331 1 : aio_close_req const & req,
332 : int res,
333 : int err
334 : )
335 : {
336 : d_assert(this->fd != NOT_OPEN, "what?");
337 : d_assert(this->close_cb != NULL, "what?");
338 1 : if (res == 0)
339 1 : this->fd = NOT_OPEN;
340 1 :
341 : close_callback & cb = *this->close_cb;
342 1 : this->close_cb = NULL;
343 1 : cb.close_completed(req, res, err);
344 1 : }
345 1 :
346 : } // namespace util
347 : } // namespace coherent
348 :
|