Line data Source code
1 : /*
2 : * (C) Copyright 2010 Marek Dopiera
3 : *
4 : * This file is part of CoherentDB.
5 : *
6 : * CoherentDB is free software: you can redistribute it and/or modify it
7 : * under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation, either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * CoherentDB is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public
17 : * License along with CoherentDB. If not, see
18 : * http://www.gnu.org/licenses/.
19 : */
20 :
21 : #include <cmath>
22 : #include <ctime>
23 : #include <pthread.h>
24 : #include <memory_manager/manager.h>
25 : #include <memory_manager/session.h>
26 : #include <memory_manager/allocator.h>
27 : #include <config/config.h>
28 : #include <debug/asserts.h>
29 : #include <cstdio>
30 : #include <vector>
31 : #include <math.h>
32 :
33 : using namespace std;
34 : using namespace coherent::memory_manager;
35 : using namespace coherent::config;
36 :
37 : static const int threads_count = 4;
38 : static const int writer_bytes = 16 * 1024 * 1024;
39 : static const int sorting_threads_count = 4;
40 : static const int sorted_bytes = 4 * 1024 * 1024;
41 :
42 : // total memory usage for writer <= 2 * writer_bytes
43 : // total memory usage for mergesort < threads_count * sorted_bytes * (5 + ceil(lg sorting_threads_count))
44 :
45 3 : void* writer(void*)
46 : {
47 7 : memory_session ms;
48 :
49 : try
50 : {
51 4 : const int alloc_size = writer_bytes / sizeof (int);
52 :
53 4 : int* a = allocate<int>(alloc_size);
54 5468847 : for (int i = 0; i < alloc_size; ++i)
55 5468843 : a[i] = i;
56 4 : deallocate(a, alloc_size);
57 :
58 8 : vector<int, coherent::memory_manager::allocator<int> > v;
59 1619306 : for (int i = 0; i < alloc_size / 8; ++i)
60 1619302 : v.push_back(3);
61 4 : v.resize(alloc_size);
62 4 : v.clear();
63 :
64 4 : printf("writer ended allocating and writing data\n");
65 : }
66 0 : catch (out_of_session_memory& ex)
67 : {
68 : r_assert(false, "out_of_session_memory");
69 : }
70 0 : catch (out_of_total_memory)
71 : {
72 : r_assert(false, "out_of_total_memory");
73 : }
74 :
75 : r_assert(ms.get_allocated_bytes() == 0, "Not all memory was deallocated in writer");
76 :
77 4 : return 0;
78 : }
79 :
80 4192218 : void merge(int* begin, int* end, int* target)
81 : {
82 4192218 : const int len = end - begin;
83 4192218 : int* left = begin;
84 4192218 : int* middle = begin + len / 2;
85 4192218 : int* right = middle;
86 :
87 87358094 : for (int i = 0; i < len; ++i)
88 : {
89 83165876 : if (right == end || (left != middle && *left <= *right))
90 : {
91 41456327 : target[i] = *left;
92 41456327 : ++left;
93 : }
94 : else
95 : {
96 41709549 : target[i] = *right;
97 41709549 : ++right;
98 : }
99 : }
100 4192218 : }
101 :
102 8379939 : void merge_sort(int* begin, int* end, memory_session* ms)
103 : {
104 8379939 : if (end - begin <= 1)
105 4191312 : return;
106 :
107 4188627 : const int len = end - begin;
108 4188627 : int* buffer = allocate<int>(len);
109 :
110 74734553 : for (int i = 0; i < len; ++i)
111 : {
112 70546358 : buffer[i] = begin[i];
113 : }
114 :
115 4188195 : merge_sort(buffer, buffer + len / 2, ms);
116 4186827 : merge_sort(buffer + len / 2, buffer + len, ms);
117 :
118 4186252 : merge(buffer, buffer + len, begin);
119 :
120 8380662 : deallocate(buffer, len);
121 : }
122 :
123 : struct merge_sub_sorter_data
124 : {
125 : memory_session* ms;
126 : int* begin;
127 : int* end;
128 : int threadsSpawn;
129 : };
130 :
131 28 : void* merge_sub_sorter(void* data)
132 : {
133 : try
134 : {
135 28 : merge_sub_sorter_data* d = static_cast<merge_sub_sorter_data*> (data);
136 :
137 28 : d->ms->begin();
138 :
139 28 : if (d->threadsSpawn == 1)
140 : {
141 16 : merge_sort(d->begin, d->end, d->ms);
142 : }
143 : else
144 : {
145 12 : const int len = d->end - d->begin;
146 12 : int* buffer = allocate<int>(len);
147 5685834 : for (int i = 0; i < len; ++i)
148 5685822 : buffer[i] = d->begin[i];
149 :
150 12 : pthread_t* threads = new pthread_t[2];
151 12 : merge_sub_sorter_data* spawn_data = allocate<merge_sub_sorter_data > (2);
152 36 : for (int i = 0; i < 2; ++i)
153 : {
154 24 : spawn_data[i].ms = d->ms;
155 24 : spawn_data[i].begin = buffer + i * len / 2;
156 24 : spawn_data[i].end = buffer + (i + 1) * len / 2;
157 24 : spawn_data[i].threadsSpawn = d->threadsSpawn / 2 + d->threadsSpawn % 2 * i;
158 : r_assert(!pthread_create(threads + i, 0, merge_sub_sorter, spawn_data + i), "Could not create thread");
159 : }
160 :
161 36 : for (int i = 0; i < 2; ++i)
162 24 : pthread_join(threads[i], 0);
163 :
164 12 : deallocate(spawn_data, 2);
165 :
166 12 : merge(buffer, buffer + len, d->begin);
167 :
168 12 : deallocate(buffer, len);
169 : }
170 :
171 28 : d->ms->end();
172 : }
173 0 : catch (out_of_session_memory& ex)
174 : {
175 : r_assert(false, "out_of_session_memory");
176 : }
177 0 : catch (out_of_total_memory)
178 : {
179 : r_assert(false, "out_of_total_memory");
180 : }
181 :
182 28 : return 0;
183 : }
184 :
185 4 : void* merge_sorter(void*)
186 : {
187 8 : memory_session ms;
188 :
189 : try
190 : {
191 4 : ms.set_limit_bytes(sorted_bytes * (5 + ceil(log2(sorting_threads_count))));
192 :
193 4 : const int data_size = sorted_bytes / sizeof (int);
194 :
195 4 : int* data = allocate<int>(data_size);
196 4027470 : for (int i = 0; i < data_size; ++i)
197 : {
198 4027466 : data[i] = rand() - RAND_MAX / 2;
199 : }
200 :
201 : clockid_t cid;
202 : timespec start_ts;
203 4 : pthread_getcpuclockid(pthread_self(), &cid);
204 4 : clock_gettime(cid, &start_ts);
205 4 : printf("merge_sorter initialized and generated %dB in %4ld.%03ld\n", sorted_bytes, start_ts.tv_sec, start_ts.tv_nsec / 1000000);
206 :
207 4 : merge_sub_sorter_data* d = allocate<merge_sub_sorter_data > (1);
208 4 : d->ms = &ms;
209 4 : d->begin = data;
210 4 : d->end = data + data_size;
211 4 : d->threadsSpawn = sorting_threads_count;
212 : pthread_t thread;
213 : r_assert(!pthread_create(&thread, 0, merge_sub_sorter, d), "Could not create thread");
214 :
215 4 : pthread_join(thread, 0);
216 :
217 4 : deallocate(d, 1);
218 :
219 4194304 : for (int i = 0; i < data_size - 1; ++i)
220 : {
221 : r_assert(data[i] <= data[i + 1], "Data was not sorted correctly");
222 : }
223 :
224 4 : deallocate(data, data_size);
225 :
226 : timespec ts;
227 4 : pthread_getcpuclockid(pthread_self(), &cid);
228 4 : clock_gettime(cid, &ts);
229 4 : ts.tv_nsec -= start_ts.tv_nsec;
230 4 : ts.tv_sec -= start_ts.tv_sec;
231 4 : if (ts.tv_nsec < 0)
232 : {
233 0 : ts.tv_sec -= 1;
234 0 : ts.tv_nsec += 1000000000;
235 : }
236 4 : printf("merge_sorter sorted %dB in %4ld.%03ld\n", sorted_bytes, ts.tv_sec, ts.tv_nsec / 1000000);
237 : }
238 0 : catch (out_of_session_memory& ex)
239 : {
240 : r_assert(false, "out_of_session_memory");
241 : }
242 0 : catch (out_of_total_memory)
243 : {
244 : r_assert(false, "out_of_total_memory");
245 : }
246 :
247 : r_assert(ms.get_allocated_bytes() == 0, "Not all memory was deallocated in merge_sorter");
248 :
249 4 : return 0;
250 : }
251 :
252 1 : int main(const int argc, const char *const *const argv)
253 : {
254 1 : srand(time(0));
255 :
256 2 : scoped_test_enabler test_setup(argc, argv);
257 :
258 1 : memory_manager::init(*test_setup.get_config());
259 1 : fprintf(stderr, "page size: %u\n", memory_manager::instance->get_page_size());
260 :
261 : try
262 : {
263 1 : pthread_t* threads = new pthread_t[threads_count];
264 :
265 : {
266 5 : for (int i = 0; i < threads_count; ++i)
267 : r_assert(!pthread_create(threads + i, 0, writer, 0), "Could not create thread");
268 :
269 5 : for (int i = 0; i < threads_count; ++i)
270 4 : pthread_join(threads[i], 0);
271 : }
272 :
273 : {
274 5 : for (int i = 0; i < threads_count; ++i)
275 : r_assert(!pthread_create(threads + i, 0, merge_sorter, 0), "Could not create thread");
276 :
277 5 : for (int i = 0; i < threads_count; ++i)
278 4 : pthread_join(threads[i], 0);
279 : }
280 : }
281 0 : catch (out_of_session_memory& ex)
282 : {
283 : r_assert(false, "out_of_session_memory");
284 : }
285 0 : catch (out_of_total_memory)
286 : {
287 : r_assert(false, "out_of_total_memory");
288 : }
289 :
290 1 : return 0;
291 3 : }
|