Line data Source code
1 : /*
2 : * (C) Copyright 2011 Marek Dopiera
3 : *
4 : * This file is part of CoherentDB.
5 : *
6 : * CoherentDB is free software: you can redistribute it and/or modify it
7 : * under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation, either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * CoherentDB is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public
17 : * License along with CoherentDB. If not, see
18 : * http://www.gnu.org/licenses/.
19 : */
20 :
21 : #include <journal/journal.h>
22 :
23 : namespace coherent {
24 : namespace journal {
25 :
26 : using namespace std;
27 : using namespace util;
28 : using namespace boost;
29 :
30 : //====== recovery_except =======================================================
31 :
32 : //====== journal ===============================================================
33 :
34 : journal::journal()
35 : {
36 : in_mem_thread = new boost::thread(in_mem_journal_thread(*this));
37 1 : sync_thread = new boost::thread(sync_journal_thread(*this));
38 :
39 1 : journal_callback_worker_factory factory(*this);
40 1 : this->cb_workers.start(2,factory);
41 : }
42 2 :
43 1 : journal::~journal()
44 1 : {
45 : LOG(TRACE, "thread " << pthread_self() << " is ~journal");
46 1 :
47 : reqs.no_more_input();
48 1 : sync_reqs.no_more_input();
49 :
50 1 : in_mem_thread->join();
51 1 : delete in_mem_thread;
52 :
53 1 : sync_thread->join();
54 1 : delete sync_thread;
55 :
56 1 : this->cb_workers.stop();
57 1 : }
58 :
59 1 : void journal::insert(
60 1 : owner_id_t owner,
61 : multi_buffer const & buf,
62 15 : insert_cb & cb
63 : ) throw()
64 : {
65 : reqs.push(req_ptr(new insert_req(owner,buf,cb)));
66 0 : //cb.insert_success(owner,buf,random());
67 : }
68 15 :
69 : void journal::erase(
70 15 : owner_id_t owner,
71 : handle_t handle,
72 15 : erase_cb & cb
73 : ) throw()
74 : {
75 : reqs.push(req_ptr(new erase_req(owner,handle,cb)));
76 0 : }
77 :
78 15 : void journal::recover(
79 15 : recovery_dispatcher & dispatcher
80 : ) throw(recovery_dispatcher)
81 0 : {
82 : }
83 :
84 : //====== in_mem_journal_thread =================================================
85 0 : in_mem_journal_thread::in_mem_journal_thread( journal & j)
86 : : journal_single_thread(j)
87 : {
88 1 : }
89 1 :
90 : void in_mem_journal_thread::operator()() const
91 1 : {
92 : LOG(TRACE, "thread " << pthread_self() << " has been started");
93 1 :
94 : boost::optional<journal::req_ptr> res = j.reqs.pop();
95 1 : while (res)
96 : {
97 2 : res.get()->execute(j);
98 31 : res = j.reqs.pop();
99 : }
100 30 :
101 : LOG(TRACE, "thread " << pthread_self() << " finishing");
102 : }
103 :
104 1 : //====== requests ==============================================================
105 1 :
106 : journal::insert_req::insert_req(
107 : owner_id_t owner,
108 : multi_buffer const & buf,
109 15 : insert_cb & cb
110 : ) :
111 : owner(owner),
112 : buf(buf),
113 : cb(cb)
114 : {
115 : }
116 15 :
117 : void journal::insert_req::execute(journal & j)
118 15 : {
119 : handle_t handle;
120 : do {
121 15 : handle = random();
122 : } while( key_exists(j.insert_reqs,make_pair(this->owner,handle)));
123 :
124 15 : j.insert_reqs[make_pair(this->owner,handle)] = shared_from_this();
125 15 :
126 15 : j.sync_reqs.push(
127 : sync_req_ptr(
128 15 : new insert_sync_req(this->owner,handle,this->buf)));
129 : }
130 :
131 : journal::erase_req::erase_req(
132 15 : owner_id_t const owner,
133 15 : handle_t const handle,
134 : erase_cb & cb
135 15 : ) :
136 : owner(owner),
137 : handle(handle),
138 : cb(cb)
139 : {
140 : }
141 :
142 15 : void journal::erase_req::execute(journal & j)
143 : {
144 : LOG(TRACE, "thread " << pthread_self() << " made a sync_req");
145 :
146 15 : j.erase_reqs[make_pair(this->owner,this->handle)] = shared_from_this();
147 :
148 15 : j.sync_reqs.push(
149 : sync_req_ptr(
150 15 : new erase_sync_req(this->owner,this->handle)));
151 : }
152 :
153 : //====== sync_journal_thread ===================================================
154 15 : sync_journal_thread::sync_journal_thread( journal & j)
155 15 : : journal_single_thread(j)
156 : {
157 : //otworzyc plik
158 1 : }
159 1 :
160 : void sync_journal_thread::operator()() const
161 : {
162 : LOG(TRACE, "thread " << pthread_self() << " has been started");
163 : boost::optional<journal::sync_req_ptr> res = j.sync_reqs.pop();
164 1 : while (res)
165 : {
166 : LOG(TRACE, "thread " << pthread_self() << " handle sync_req");
167 2 : res.get()->execute(j);
168 31 : res = j.sync_reqs.pop();
169 : }
170 : LOG(TRACE, "thread " << pthread_self() << " finishing");
171 30 : }
172 30 :
173 : //====== sync_reqs =============================================================
174 1 :
175 1 : journal::sync_req::sync_req(
176 : owner_id_t owner,
177 : handle_t handle
178 : ) : owner(owner), handle(handle)
179 30 : {
180 : }
181 :
182 30 : journal::insert_sync_req::insert_sync_req(
183 : owner_id_t owner,
184 30 : handle_t handle,
185 : const multi_buffer & buf
186 15 : ) : sync_req(owner,handle), buf(buf)
187 : {
188 : }
189 :
190 15 : void journal::insert_sync_req::execute( journal & j)
191 : {
192 : LOG(TRACE, "thread " << pthread_self() << " execute");
193 : j.cb_workers.schedule_work( cb_req_ptr( new insert_cb_req(this->owner, this->handle)));
194 15 : }
195 :
196 15 : journal::erase_sync_req::erase_sync_req(
197 15 : owner_id_t owner,
198 15 : handle_t handle
199 : ) : sync_req(owner,handle)
200 15 : {
201 : }
202 :
203 15 : void journal::erase_sync_req::execute( journal & j)
204 : {
205 : LOG(TRACE, "thread " << pthread_self() << " execute");
206 : j.cb_workers.schedule_work( cb_req_ptr( new erase_cb_req(this->owner, this->handle)));
207 15 : }
208 :
209 15 : //====== cb_reqs ===============================================================
210 15 :
211 15 : journal::cb_req::cb_req(
212 : owner_id_t owner,
213 : handle_t handle
214 : ) : owner(owner), handle(handle)
215 30 : {
216 : }
217 :
218 30 : journal::insert_cb_req::insert_cb_req(
219 : owner_id_t owner,
220 30 : handle_t handle
221 : ) : cb_req( owner,handle)
222 : {
223 15 : }
224 :
225 : journal::erase_cb_req::erase_cb_req(
226 15 : owner_id_t owner,
227 : handle_t handle
228 15 : ) : cb_req( owner,handle)
229 : {
230 15 : }
231 :
232 : void journal::insert_cb_req::execute( journal & j)
233 15 : {
234 : shared_ptr<insert_req> req =
235 15 : get_map_val(j.insert_reqs,make_pair(this->owner,this->handle));
236 :
237 : r_assert( req->owner == this->owner, "owner " << req->owner << " != " << this->owner);
238 :
239 : LOG(TRACE, "thread " << pthread_self() << " cb.insert_success");
240 30 : req->cb.insert_success( req->owner, req->buf, this->handle);
241 : }
242 15 :
243 : void journal::erase_cb_req::execute( journal & j)
244 15 : {
245 15 : shared_ptr<erase_req> req =
246 15 : get_map_val(j.erase_reqs, make_pair(this->owner,this->handle));
247 : r_assert( req->owner == this->owner, "owner " << req->owner << " != " << this->owner);
248 15 :
249 : LOG(TRACE, "thread " << pthread_self() << " cb.erase_success");
250 : req->cb.erase_success( req->owner, req->handle);
251 30 : }
252 15 :
253 : } // namespace journal
254 15 : } // namespace coherent
255 15 :
|