Line data Source code
1 : /*
2 : * (C) Copyright 2010 Marek Dopiera
3 : *
4 : * This file is part of CoherentDB.
5 : *
6 : * CoherentDB is free software: you can redistribute it and/or modify it
7 : * under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation, either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * CoherentDB is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public
17 : * License along with CoherentDB. If not, see
18 : * http://www.gnu.org/licenses/.
19 : */
20 :
21 : #include <errno.h>
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/uio.h>
25 :
26 : #include <cstring>
27 :
28 : #include <boost/lexical_cast.hpp>
29 :
30 : #include <debug/asserts.h>
31 : #include <util/file.h>
32 : #include <util/misc.h>
33 :
34 : namespace coherent {
35 : namespace util {
36 :
37 : using namespace std;
38 : using namespace boost;
39 :
40 : //==== global helper ===========================================================
41 :
42 : struct global_helper
43 : {
44 : global_helper() : page_size(::sysconf(_SC_PAGESIZE))
45 1 : {
46 : }
47 1 :
48 : uint32_t const page_size;
49 : };
50 :
51 : static global_helper const globals;
52 1 :
53 : //==== io_exception ============================================================
54 :
55 : io_exception::io_exception(file const & file, std::string const & msg) :
56 : message(
57 : string("file \"") + file.path + "\" (fd: " +
58 2 : lexical_cast<string>(file.fd) + "): " + msg
59 : )
60 2 : {
61 4 : }
62 2 :
63 : io_exception::io_exception(file const & file, int err, std::string const & op) :
64 2 : message(
65 : string("file \"") + file.path + "\" (fd: " +
66 1 : lexical_cast<string>(file.fd) + "): " + ::strerror(err) + " (" +
67 : lexical_cast<string>(err) + ")"
68 1 : )
69 3 : {
70 2 : }
71 2 :
72 : const char* io_exception::what() const throw()
73 1 : {
74 : return this->message.c_str();
75 0 : }
76 :
77 4 : io_exception::~io_exception()
78 : {
79 : }
80 4 :
81 : //==== file ====================================================================
82 4 :
83 : file::file(string const & path) :
84 : path(path),
85 : fd(NOT_OPEN)
86 : {
87 7 : }
88 :
89 7 : auto_ptr<file> file::create(string const & path, int flags, int mode)
90 : {
91 7 : auto_ptr<file> res(new file(path));
92 : res->create(flags, mode);
93 4 : return res;
94 : }
95 5 :
96 4 : std::auto_ptr<file> file::open(string const & path, int flags)
97 : {
98 : auto_ptr<file> res(new file(path));
99 : res->open(flags);
100 3 : return res;
101 : }
102 3 :
103 3 : file::~file()
104 : {
105 : d_assert(
106 : !this->is_open(),
107 : "destroying an open file: \"" << this->path << "\" fd: " << this->fd
108 : );
109 7 : }
110 :
111 : void file::create(int flags, int mode)
112 0 : {
113 : LOG(
114 : DEBUG,
115 : "create path=\"" << this->path << "\" flags=" <<
116 : flags << " mode=" << oct << mode
117 : );
118 : this->open_internal(flags, true, mode);
119 : }
120 :
121 4 : void file::open(int flags)
122 4 : {
123 : LOG(
124 : DEBUG,
125 : "open path=\"" << this->path << "\" flags=" << flags
126 : );
127 3 : this->open_internal(flags, false);
128 : }
129 :
130 3 : void file::open_internal(int flags, bool create, int mode)
131 3 : {
132 : d_assert(
133 : (flags & (O_EXCL | O_CREAT)) == 0,
134 : "inapropriate flags: "<< flags
135 : );
136 : d_assert(!this->is_open(), "file already open");
137 : this->fd = ::open(
138 : this->path.c_str(),
139 0 : flags | (create ? (O_CREAT | O_EXCL) : 0),
140 7 : mode
141 : );
142 : if (this->fd == -1) {
143 : io_exception ex(*this, errno, "open");
144 : LOG(DEBUG, "open failed: " << ex.what());
145 7 : throw ex;
146 7 : }
147 : LOG(DEBUG, "open succeeded path=\"" << this->path << "\" fd=" << this->fd);
148 1 :
149 1 : }
150 :
151 6 : void file::close()
152 : {
153 : LOG(
154 : DEBUG,
155 : "close fd=" << this->fd << " path=\"" << this->path << "\""
156 : );
157 : d_assert(this->is_open(), "file \"" << this->path << "\" not open");
158 : int err = ::close(this->fd);
159 : if (err)
160 6 : throw io_exception(*this, errno, "close");
161 6 : this->fd = NOT_OPEN;
162 6 : }
163 6 :
164 0 : file::multi_buffer_ptr file::read(uint32_t size, uint64_t offset)
165 6 : {
166 : LOG(
167 : DEBUG,
168 : "read fd=" << this->fd << " path=\"" << this->path << "\" off=" <<
169 : offset << " size=" << size
170 : );
171 :
172 : d_assert(this->is_open(), "file \"" << this->path << "\" not open");
173 : if (size == 0)
174 1008 : {
175 : multi_buffer::buffer_list list;
176 1008 : return multi_buffer_ptr(new multi_buffer(list, 0, 0));
177 1008 : }
178 :
179 2 : //The buffers should not be too small in order to be sure that the kernel
180 1 : //merges them, but on the other hand they shouldn't be too big not to stress
181 : //the allocator (fragmentation). I've arbitrarilly chosen 512Kb.
182 : uint32_t const single_buf_size = 512 * 1024;
183 :
184 : //if we're already dividing the read into smaller chunks we can as well
185 : //align it (e.g. O_DIRECT requires alignment to 512b and there are some bugs
186 1007 : //which make that not enough - let's be wasteful and align it to one page)
187 : uint32_t const alignment = globals.page_size;
188 :
189 : uint32_t const aligned_off = align_down(offset, alignment);
190 : uint32_t const shift = offset - aligned_off;
191 1007 : uint32_t const aligned_end = align_up(offset + size, alignment);
192 : uint32_t const aligned_size = aligned_end - aligned_off;
193 1007 :
194 1007 : multi_buffer::buffer_list bufs;
195 1007 : uint32_t const num_bufs =
196 1007 : ((aligned_end - aligned_off) % single_buf_size == 0)
197 : ? ((aligned_end - aligned_off) / single_buf_size)
198 2015 : : ((aligned_end - aligned_off) / single_buf_size + 1);
199 :
200 : LOG(
201 : TRACE,
202 : "num_bufs=" << num_bufs << " aligned_size=" << aligned_size <<
203 : " aligned_off=" << aligned_off << " aligned_end=" << aligned_end
204 : );
205 : struct iovec vecs[num_bufs];
206 :
207 : uint64_t cur_off;
208 1007 : uint32_t i;
209 1007 : for (
210 : i = 0, cur_off = aligned_off;
211 : cur_off < aligned_end;
212 : cur_off += single_buf_size, ++i
213 3603 : )
214 1007 : {
215 : d_assert(i < num_bufs, "i=" << i << " num_bufs=" << num_bufs);
216 : uint32_t const buf_len = (cur_off + single_buf_size >= aligned_end)
217 : ? (aligned_end - cur_off)
218 : : single_buf_size;
219 2596 : multi_buffer::buffer_ptr buf(new buffer(buf_len, alignment));
220 : bufs.push_back(buf);
221 : vecs[i].iov_base = buf->get_data();
222 2596 : vecs[i].iov_len = buf_len;
223 : LOG(TRACE, "allocated buffer " << cur_off << "-" << cur_off + buf_len);
224 2596 : }
225 2596 : ssize_t err;
226 2596 : do {
227 2596 : LOG(
228 : TRACE,
229 : "preadv(" << this->fd << ", (" << aligned_size << "), " << num_bufs
230 1007 : << ", " << aligned_off
231 1007 : );
232 : err = preadv(this->fd, vecs, num_bufs, aligned_off);
233 : LOG(TRACE, "preadv returned " << err);
234 : } while (err == -1 && errno == EINTR);
235 1007 :
236 1007 : if (
237 1007 : err != static_cast<ssize_t>(aligned_size)
238 0 : && (err < 0 || err < static_cast<ssize_t>(size + shift))
239 : )
240 1007 : {
241 : if (err >= 0)
242 : throw io_exception(
243 : *this,
244 : string("short read: ") + lexical_cast<string>(err) + " "
245 2 : + lexical_cast<string>(aligned_size)
246 : );
247 : else
248 2 : throw io_exception(*this, errno, "preadv");
249 4 : }
250 4 : return multi_buffer_ptr(
251 : new multi_buffer(bufs, min(static_cast<ssize_t>(size), err), shift)
252 0 : );
253 : }
254 :
255 2010 : void file::write(multi_buffer const & buf, uint64_t offset)
256 2010 : {
257 : LOG(
258 : DEBUG,
259 : "write fd=" << this->fd << " path=\"" << this->path << "\" off=" <<
260 : offset << " size=" << buf.get_size()
261 : );
262 : uint32_t const num_bufs = buf.buffers.size();
263 :
264 : struct iovec vecs[num_bufs];
265 100 :
266 : LOG(TRACE, "start dumping buffers");
267 : uint32_t size_left = buf.get_size();
268 100 : for (uint32_t i = 0; i < num_bufs; ++i) {
269 : buffer const & b = *buf.buffers[i];
270 100 : uint32_t buf_off = (i == 0) ? buf.left_off : 0;
271 : d_assert(b.get_size() >= buf_off, "mismatch: " << b.get_size() << " " <<
272 : buf_off
273 : );
274 500 : uint32_t buf_len = min(b.get_size() - buf_off, size_left);
275 : LOG(TRACE, "buf_len=" << buf_len << " buf_off=" << buf_off);
276 : vecs[i].iov_base = const_cast<char*>(b.get_data() + buf_off);
277 0 : vecs[i].iov_len = buf_len;
278 500 : size_left -= buf_len;
279 500 : }
280 : LOG(TRACE, "dump complete, running pwritev with offset=" << offset);
281 : d_assert(size_left == 0, "mismatch " << size_left);
282 500 : ssize_t err = pwritev(this->fd, vecs, num_bufs, offset);
283 : LOG(TRACE, "pwritev returned " << err << " errno=" << errno);
284 100 : if (err < 0)
285 100 : throw io_exception(*this, errno, "pwritev");
286 100 : else
287 100 : if (static_cast<uint64_t>(err) != buf.get_size())
288 100 : throw io_exception(
289 0 : *this,
290 : string("short write: ") + lexical_cast<string>(err) + " "
291 100 : + lexical_cast<string>(buf.get_size())
292 : );
293 :
294 0 : }
295 0 :
296 0 : } // namespace util
297 : } // namespace coherent
298 100 :
|