Line data Source code
1 : /*
2 : * (C) Copyright 2011 Marek Dopiera
3 : *
4 : * This file is part of CoherentDB.
5 : *
6 : * CoherentDB is free software: you can redistribute it and/or modify it
7 : * under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation, either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * CoherentDB is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public
17 : * License along with CoherentDB. If not, see
18 : * http://www.gnu.org/licenses/.
19 : */
20 :
21 : #ifndef JOURNAL_H_5312
22 : #define JOURNAL_H_5312
23 :
24 : #include <stdint.h>
25 :
26 : #include <exception>
27 :
28 : #include <boost/noncopyable.hpp>
29 : #include <boost/enable_shared_from_this.hpp>
30 :
31 : #include <util/misc.h>
32 : #include <util/multi_buffer.h>
33 : #include <util/worker_pool.h>
34 : #include <journal/absjournal.h>
35 : #include <iostream>
36 : #include <map>
37 :
38 : template<class K,class V>
39 15 : bool inline key_exists( const std::map<K,V> &m, const K &k)
40 : {
41 15 : return m.find(k) != m.end();
42 : }
43 :
44 : template<class K,class V>
45 30 : const V & get_map_val( const std::map<K,V> &m, const K &k)
46 : {
47 30 : typename std::map<K,V>::const_iterator it = m.find(k);
48 : r_assert(
49 : it != m.end(),
50 : "Key does not exist"//byc moze jakis toString by sie przydal
51 : );
52 30 : return it->second;
53 : }
54 :
55 : #define traverse(coll,it) \
56 : for( const_cast<__typeof( (coll).begin())> it = (cool).begin(); \
57 : it != (coll).end(); \
58 : it++)
59 :
60 : namespace coherent {
61 : namespace journal {
62 :
63 : typedef uint32_t owner_id_t;
64 : typedef uint32_t handle_t;
65 :
66 : class journal : public abs_journal
67 : {
68 :
69 : boost::thread * in_mem_thread;
70 : boost::thread * sync_thread;
71 :
72 : struct internal_req : public util::virtual_dest
73 : {
74 180 : virtual void execute(journal & ) = 0;
75 : };
76 :
77 : struct insert_req : public internal_req,
78 : public boost::enable_shared_from_this<insert_req>
79 : {
80 30 : insert_req(
81 : owner_id_t owner,
82 : util::multi_buffer const & buf,
83 : insert_cb & cb
84 : );
85 : virtual void execute(journal & );
86 :
87 : owner_id_t const owner;
88 : util::multi_buffer const & buf;
89 : insert_cb & cb;
90 : };
91 :
92 : struct erase_req : public internal_req,
93 : public boost::enable_shared_from_this<erase_req>
94 : {
95 30 : erase_req(
96 : owner_id_t const owner,
97 : handle_t const handle,
98 : erase_cb & cb
99 : );
100 : virtual void execute(journal & );
101 :
102 : owner_id_t const owner;
103 : handle_t const handle;
104 : erase_cb & cb;
105 : };
106 :
107 : struct sync_req : public internal_req//sprawdzic o co chodzi z virtual destem
108 : {
109 : sync_req(
110 30 : owner_id_t owner,
111 : handle_t handle
112 : );
113 :
114 : owner_id_t const owner;
115 : handle_t const handle;
116 : };
117 :
118 : struct insert_sync_req : public sync_req
119 : {
120 : insert_sync_req(
121 30 : owner_id_t owner,
122 : handle_t handle,
123 : const util::multi_buffer & buf
124 : );
125 : virtual void execute(journal & );
126 :
127 : const util::multi_buffer & buf;
128 : };
129 :
130 : struct erase_sync_req : public sync_req
131 : {
132 : erase_sync_req(
133 30 : owner_id_t owner,
134 : handle_t handle
135 : );
136 : virtual void execute(journal & );
137 : };
138 :
139 : struct cb_req : public internal_req
140 : {
141 : cb_req(
142 30 : owner_id_t owner,
143 : handle_t handle
144 : );
145 :
146 : owner_id_t const owner;
147 : handle_t const handle;
148 :
149 : };
150 :
151 : struct insert_cb_req : public cb_req
152 : {
153 : insert_cb_req(
154 30 : owner_id_t owner,
155 : handle_t handle
156 : );
157 : virtual void execute(journal &);
158 : };
159 :
160 : struct erase_cb_req : public cb_req
161 : {
162 : erase_cb_req(
163 30 : owner_id_t owner,
164 : handle_t handle
165 : );
166 : virtual void execute(journal &);
167 : };
168 :
169 : public:
170 :
171 : journal();
172 : ~journal();
173 :
174 : //XXX replace with something reasonable
175 : typedef std::exception recovery_except;
176 :
177 : virtual void insert(
178 : owner_id_t owner,
179 : util::multi_buffer const & buf,
180 : insert_cb & cb
181 : ) throw();
182 :
183 : virtual void erase(
184 : owner_id_t owner,
185 : handle_t handle,
186 : erase_cb & erase
187 : ) throw();
188 :
189 : // should be called before any insert or erase
190 : virtual void recover(
191 : recovery_dispatcher & dispatcher
192 : ) throw(recovery_dispatcher);
193 :
194 : public:
195 : typedef boost::shared_ptr<internal_req> req_ptr;//pomyslec nad requestem ktory nie bylby z najwyzszej klasy
196 : typedef boost::shared_ptr<sync_req> sync_req_ptr;
197 : typedef boost::shared_ptr<cb_req> cb_req_ptr;
198 :
199 : private:
200 :
201 : friend class in_mem_journal_thread;
202 : friend class sync_journal_thread;
203 : friend class journal_callback_worker;
204 :
205 : typedef std::map<std::pair<owner_id_t,handle_t>,
206 : boost::shared_ptr<insert_req> > insert_reqs_map_t;
207 :
208 : typedef std::map<std::pair<owner_id_t,handle_t>,
209 : boost::shared_ptr<erase_req> > erase_reqs_map_t;
210 :
211 : typedef util::worker_pool<cb_req_ptr> cb_workers_t;
212 :
213 : util::sync_queue<req_ptr> reqs;
214 : util::sync_queue<sync_req_ptr> sync_reqs;
215 :
216 : insert_reqs_map_t insert_reqs;
217 : erase_reqs_map_t erase_reqs;
218 :
219 : cb_workers_t cb_workers;
220 : };
221 :
222 : class journal_thread : public util::virtual_dest
223 : {
224 : public:
225 : journal_thread(journal & j) : j(j)
226 28 : {
227 : }
228 4 : protected:
229 : journal & j;
230 4 : };
231 :
232 : class journal_single_thread : public journal_thread
233 : {
234 : public:
235 : journal_single_thread(journal & j)
236 14 : : journal_thread(j)
237 : {
238 2 : }
239 2 : virtual void operator()() const = 0;
240 : };
241 2 :
242 : class in_mem_journal_thread : public journal_single_thread
243 : {
244 : public:
245 : in_mem_journal_thread( journal & j);
246 7 : virtual void operator()() const;
247 : };
248 :
249 : class sync_journal_thread : public journal_single_thread
250 : {
251 : public:
252 : sync_journal_thread( journal & j);
253 7 : virtual void operator()() const;
254 : };
255 :
256 : class journal_callback_worker : public journal_thread,
257 : public util::worker<journal::cb_req_ptr>
258 : {
259 : public:
260 : journal_callback_worker( journal &j) :
261 14 : journal_thread(j),
262 : util::worker<journal::cb_req_ptr>(j.cb_workers)
263 2 : {
264 : }
265 2 :
266 : virtual void handle(journal::cb_req_ptr const &t) const
267 2 : {
268 : t->execute(this->j);
269 30 : }
270 : };
271 30 :
272 30 : class journal_callback_worker_factory
273 : : public util::worker_factory<journal::cb_req_ptr>
274 : {
275 : public:
276 : journal_callback_worker_factory(journal &j) : j(j)
277 1 : {
278 : }
279 1 :
280 : virtual worker_ptr create_worker(
281 1 : util::worker_pool<journal::cb_req_ptr> & pool
282 : ) const
283 2 : {
284 : return new boost::thread(journal_callback_worker(this->j));
285 : }
286 : private:
287 2 : journal & j;
288 : };
289 :
290 : } // namespace journal
291 : } // namespace coherent
292 :
293 : #endif /* JOURNAL_H_5316 */
294 :
|