Line data Source code
1 : /*
2 : * (C) Copyright 2011 Marek Dopiera
3 : *
4 : * This file is part of CoherentDB.
5 : *
6 : * CoherentDB is free software: you can redistribute it and/or modify it
7 : * under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation, either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * CoherentDB is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public
17 : * License along with CoherentDB. If not, see
18 : * http://www.gnu.org/licenses/.
19 : */
20 :
21 : #include <config/config.h>
22 : #include <debug/asserts.h>
23 : #include <log/log.h>
24 : #include <util/misc.h>
25 : #include <util/thread.h>
26 : #include <journal/journal.h>
27 : #include <journal/test/journal_mocks.h>
28 :
29 : namespace coherent {
30 : namespace journal {
31 : namespace unittests {
32 :
33 : using namespace std;
34 : using namespace boost;
35 : using namespace coherent::util;
36 :
37 : //==== in_mem_journal ==========================================================
38 :
39 : struct journal_worker : public worker<in_mem_journal::req_ptr>
40 0 : {
41 : public:
42 0 : journal_worker(in_mem_journal & journal) :
43 : worker<in_mem_journal::req_ptr>(journal.workers),
44 0 : journal(journal)
45 : {
46 0 : }
47 :
48 : protected:
49 0 : virtual void handle(in_mem_journal::req_ptr const & t) const
50 : {
51 0 : t->execute(this->journal);
52 0 : }
53 :
54 : private:
55 : in_mem_journal & journal;
56 : };
57 :
58 : struct journal_worker_factory : public worker_factory<in_mem_journal::req_ptr>
59 0 : {
60 0 : journal_worker_factory(in_mem_journal & journal) : journal(journal)
61 : {
62 0 : }
63 :
64 0 : virtual worker_ptr create_worker(
65 : worker_pool<in_mem_journal::req_ptr> & pool
66 : ) const
67 : {
68 0 : return new thread(journal_worker(this->journal));
69 : }
70 : private:
71 : in_mem_journal & journal;
72 : };
73 :
74 0 : in_mem_journal::in_mem_journal()
75 : {
76 0 : journal_worker_factory fact(*this);
77 0 : this->workers.start(1, fact);
78 0 : }
79 :
80 0 : in_mem_journal::~in_mem_journal()
81 : {
82 0 : this->workers.stop();
83 0 : }
84 :
85 0 : void in_mem_journal::insert(
86 : owner_id_t owner,
87 : util::multi_buffer const & buf,
88 : insert_cb & cb
89 0 : ) throw()
90 : {
91 : req_ptr req = req_ptr(
92 : new insert_req(
93 : owner,
94 : buf,
95 : cb
96 0 : )
97 0 : );
98 0 : this->workers.schedule_work(req);
99 0 : }
100 :
101 0 : void in_mem_journal::erase(
102 : owner_id_t owner,
103 : handle_t handle,
104 : erase_cb & cb
105 0 : ) throw()
106 : {
107 : req_ptr req = req_ptr(
108 : new erase_req(
109 : owner,
110 : handle,
111 : cb
112 0 : )
113 0 : );
114 0 : this->workers.schedule_work(req);
115 0 : }
116 :
117 0 : void in_mem_journal::recover(
118 : recovery_dispatcher & dispatcher
119 0 : ) throw(recovery_dispatcher)
120 : {
121 0 : for (
122 0 : mapt_t::const_iterator it = this->contents.begin();
123 0 : it != this->contents.end();
124 : ++it)
125 : {
126 0 : multi_buffer::buffer_list lst;
127 0 : lst.push_back(it->second);
128 0 : multi_buffer buf(lst, it->second->get_size(), 0);
129 : dispatcher.dispatch(
130 0 : it->first.first, //owner
131 0 : it->first.second, //handle
132 : buf
133 0 : );
134 : }
135 :
136 0 : this->contents.clear();
137 0 : }
138 :
139 0 : in_mem_journal::insert_req::insert_req(
140 : owner_id_t owner,
141 : util::multi_buffer const & buf,
142 : insert_cb & cb
143 : ) :
144 : owner(owner),
145 : buf(buf),
146 0 : cb(cb)
147 : {
148 0 : }
149 :
150 0 : void in_mem_journal::insert_req::execute(in_mem_journal & journal)
151 : {
152 : handle_t handle;
153 0 : do {
154 0 : handle = random();
155 : } while (
156 0 : journal.contents.find(map_key_t(this->owner, handle))
157 0 : != journal.contents.end()
158 : );
159 0 : buffer_ptr new_buf(new buffer(this->buf.get_size()));
160 0 : this->buf.read(new_buf->get_data(), this->buf.get_size(), 0);
161 : journal.contents.insert(
162 0 : make_pair(map_key_t(this->owner, handle), new_buf)
163 0 : );
164 0 : this->cb.insert_success(this->owner, this->buf, handle);
165 0 : }
166 :
167 0 : in_mem_journal::erase_req::erase_req(
168 : owner_id_t const owner,
169 : handle_t const handle,
170 : erase_cb & cb
171 : ) :
172 : owner(owner),
173 : handle(handle),
174 0 : cb(cb)
175 : {
176 0 : }
177 :
178 0 : void in_mem_journal::erase_req::execute(in_mem_journal & journal)
179 : {
180 0 : bool erased = journal.contents.erase(map_key_t(this->owner, this->handle));
181 0 : r_assert(
182 : erased,
183 : "Entry (" << this->owner << "," << this->handle <<
184 : " does not exist in journal"
185 0 : );
186 0 : this->cb.erase_success(this->owner, this->handle);
187 0 : }
188 :
189 : //======== sync_journal_wrapper ================================================
190 :
191 1 : sync_journal_wrapper::sync_journal_wrapper(journal & impl) : impl(impl)
192 : {
193 1 : }
194 :
195 15 : journal::handle_t sync_journal_wrapper::insert(
196 : owner_id_t owner,
197 : util::multi_buffer const & buf
198 : )
199 : {
200 : struct dummy_cb : public journal::insert_cb
201 15 : {
202 15 : dummy_cb(
203 : completion & cmpl,
204 : owner_id_t owner,
205 : multi_buffer const & buf
206 : ) :
207 : cmpl(cmpl),
208 : owner(owner),
209 15 : buf(buf)
210 : {
211 15 : }
212 :
213 15 : virtual void insert_success(
214 : owner_id_t owner,
215 : util::multi_buffer const & buf,
216 : handle_t handle
217 : ) throw()
218 0 : {
219 : r_assert(
220 : this->owner == owner,
221 : "mismatch: " << this->owner << " " << owner
222 : );
223 : r_assert(
224 : &this->buf == &buf,
225 : "mismatch: " << reinterpret_cast<void const *>(&this->buf) <<
226 : " " << reinterpret_cast<void const *>(&buf)
227 : );
228 15 : this->handle = handle;
229 15 : this->cmpl.complete();
230 15 : }
231 :
232 0 : virtual void insert_failure(
233 : owner_id_t owner,
234 : util::multi_buffer const & buf,
235 : int err
236 : ) throw()
237 0 : {
238 : r_assert(
239 : this->owner == owner,
240 : "mismatch: " << this->owner << " " << owner
241 : );
242 : r_assert(
243 : &this->buf == &buf,
244 : "mismatch: " << reinterpret_cast<void const *>(&this->buf) <<
245 : " " << reinterpret_cast<void const *>(&buf)
246 : );
247 : r_assert(
248 : false,
249 : "unexpected failure for " << this->owner << " " <<
250 : reinterpret_cast<void const *>(&buf) << " " << err
251 : );
252 : this->cmpl.complete();
253 : }
254 :
255 : completion & cmpl;
256 : owner_id_t owner;
257 : multi_buffer const & buf;
258 : handle_t handle;
259 : };
260 :
261 30 : completion cmpl;
262 30 : dummy_cb cb(cmpl, owner, buf);
263 15 : this->impl.insert(owner, buf, cb);
264 15 : cmpl.wait();
265 :
266 15 : return cb.handle;
267 : }
268 :
269 15 : void sync_journal_wrapper::erase(
270 : owner_id_t owner,
271 : handle_t handle
272 : )
273 : {
274 : struct dummy_cb : public journal::erase_cb
275 15 : {
276 15 : dummy_cb(
277 : completion & cmpl,
278 : owner_id_t owner,
279 : handle_t handle
280 : ) :
281 : cmpl(cmpl),
282 : owner(owner),
283 15 : handle(handle)
284 : {
285 15 : }
286 :
287 15 : virtual void erase_success(
288 : owner_id_t owner,
289 : handle_t handle
290 : ) throw()
291 0 : {
292 : r_assert(
293 : this->owner == owner,
294 : "mismatch: " << this->owner << " " << owner
295 : );
296 : r_assert(
297 : this->handle == handle,
298 : "mismatch: " << this->handle << " " << handle
299 : );
300 15 : this->cmpl.complete();
301 15 : }
302 :
303 0 : virtual void erase_failure(
304 : owner_id_t owner,
305 : handle_t handle,
306 : int err
307 : ) throw()
308 0 : {
309 : r_assert(
310 : this->owner == owner,
311 : "mismatch: " << this->owner << " " << owner
312 : );
313 : r_assert(
314 : this->handle == handle,
315 : "mismatch: " << this->handle << " " << handle
316 : );
317 : r_assert(
318 : false,
319 : "unexpected failure for " << this->owner << " " <<
320 : this->handle << " " << err
321 : );
322 : this->cmpl.complete();
323 : }
324 :
325 : completion & cmpl;
326 : owner_id_t owner;
327 : handle_t handle;
328 : };
329 :
330 30 : completion cmpl;
331 30 : dummy_cb cb(cmpl, owner, handle);
332 15 : this->impl.erase(owner, handle, cb);
333 15 : cmpl.wait();
334 15 : }
335 :
336 0 : void sync_journal_wrapper::recover(
337 : recovery_dispatcher & dispatcher
338 : )
339 : {
340 0 : this->impl.recover(dispatcher);
341 0 : }
342 :
343 : } // namespace unittests
344 : } // namespace journal
345 3 : } // namespace coherent
346 :
|