GRASS GIS 8 Programmer's Manual  8.5.0dev(2024)-d6dec75dd4
empq_adaptive_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 #ifndef __EMPQ_ADAPTIVE_IMPL_H
37 #define __EMPQ_ADAPTIVE_IMPL_H
38 
39 #include <stdio.h>
40 #include <assert.h>
41 
42 #include "ami_config.h"
43 #include "ami_stream.h"
44 #include "mm.h"
45 #include "mm_utils.h"
46 #include "empq_adaptive.h"
47 
48 #include "ami_sort.h"
49 
50 // EMPQAD_DEBUG defined in "empqAdaptive.H"
51 
52 //------------------------------------------------------------
53 // allocate an internal pqueue of size precisely twice
54 // the size of the pqueue within the em_pqueue;
55 //
56 // This constructor uses a user defined amount of memory
57 
58 template <class T, class Key>
60 {
61  regim = INMEM;
62  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue" << endl;
63 
64  //------------------------------------------------------------
65  // set the size precisely as in empq constructor since we cannot
66  // really call the em__pqueue constructor, because we don't want
67  // the space allocated; we just want the sizes;
68  // AMI_err ae;
69  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: "
70  << ((float)inMem / (1 << 20)) << "MB" << endl;
71 
72  initPQ(inMem);
73 }
74 
75 //------------------------------------------------------------
76 // This more resembles the original constructor which is greedy
77 template <class T, class Key>
79 {
80  regim = INMEM;
81  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue" << endl;
82 
83  //------------------------------------------------------------
84  // set the size precisely as in empq constructor since we cannot
85  // really call the em__pqueue constructor, because we don't want
86  // the space allocated; we just want the sizes;
87  size_t mm_avail = getAvailableMemory();
88  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: available memory: "
89  << ((float)mm_avail / (1 << 20)) << "MB" << endl;
90 
91  initPQ(mm_avail);
92 }
93 
94 //------------------------------------------------------------
95 // This method initialized the PQ based on the memory passed
96 // into it
97 template <class T, class Key>
98 void EMPQueueAdaptive<T, Key>::initPQ(size_t initMem)
99 {
100  AMI_err ae;
101  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: "
102  << ((float)initMem / (1 << 20)) << "MB" << endl;
103 
104  /* same calculations as empq constructor in order to estimate
105  overhead memory; this is because we want to allocate a pqueue of
106  size exactly double the size of the pqueue inside the empq;
107  switching from this pqueue to empq when the memory fills up will
108  be simple */
109  //------------------------------------------------------------
110  // AMI_STREAM memory usage
111  size_t sz_stream;
112  AMI_STREAM<T> dummy;
113  if ((ae = dummy.main_memory_usage(&sz_stream, MM_STREAM_USAGE_MAXIMUM)) !=
115  cerr << "EMPQueueAdaptive constr: failing to get stream_usage\n";
116  exit(1);
117  }
118 
119  // account for temporary memory usage
120  unsigned short max_nbuf = 2;
121  unsigned int buf_arity = initMem / (2 * sz_stream);
122  if (buf_arity > MAX_STREAMS_OPEN)
123  buf_arity = MAX_STREAMS_OPEN;
124  unsigned long mm_overhead = buf_arity * sizeof(merge_key<Key>) +
125  max_nbuf * sizeof(em_buffer<T, Key>) +
126  2 * sz_stream + max_nbuf * sz_stream;
127  mm_overhead *= 8; // overestimate..this should be fixed with
128  // a precise accounting of the extra memory required
129 
130  EMPQAD_DEBUG cout << "sz_stream: " << sz_stream
131  << " buf_arity: " << buf_arity
132  << " mm_overhead: " << mm_overhead
133  << " mm_avail: " << initMem << ".\n";
134 
135  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: memory overhead set to "
136  << ((float)mm_overhead / (1 << 20)) << "MB" << endl;
137  if (mm_overhead > initMem) {
138  cerr << "overhead bigger than available memory (" << initMem << "); "
139  << "increase -m and try again\n";
140  exit(1);
141  }
142  initMem -= mm_overhead;
143  //------------------------------------------------------------
144 
145  long pqsize = initMem / sizeof(T);
146  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: pqsize set to " << pqsize << endl;
147 
148  // initialize in memory pqueue and set em to NULL
149  im = new MinMaxHeap<T>(pqsize);
150  assert(im);
151  em = NULL;
152 }
153 
154 template <class T, class Key>
156 {
157  switch (regim) {
158  case INMEM:
159  delete im;
160  break;
161  case EXTMEM:
162  delete em;
163  break;
164  case EXTMEM_DEBUG:
165  delete dim;
166  delete em;
167  break;
168  }
169 }
170 
171 // return the maximum nb of elts that can fit
172 template <class T, class Key>
174 {
175  long m = -1;
176  switch (regim) {
177  case INMEM:
178  assert(im);
179  m = im->get_maxsize();
180  break;
181  case EXTMEM:
182  assert(em);
183  m = em->maxlen();
184  break;
185  case EXTMEM_DEBUG:
186  m = em->maxlen();
187  break;
188  }
189  return m;
190 }
191 
192 // return true if empty
193 template <class T, class Key>
195 {
196  bool v = false;
197  switch (regim) {
198  case INMEM:
199  assert(im);
200  v = im->empty();
201  break;
202  case EXTMEM:
203  assert(em);
204  v = em->is_empty();
205  break;
206  case EXTMEM_DEBUG:
207  assert(dim->empty() == em->is_empty());
208  v = em->is_empty();
209  break;
210  }
211  return v;
212 }
213 
214 // return true if full
215 template <class T, class Key>
217 {
218  cerr << "EMPQueueAdaptive::is_full(): sorry not implemented\n";
219  assert(0);
220  exit(1);
221 }
222 
223 // return the element with minimum priority in the structure
224 template <class T, class Key>
226 {
227  bool v = false, v1;
228  T tmp;
229  switch (regim) {
230  case INMEM:
231  assert(im);
232  v = im->min(elt);
233  break;
234  case EXTMEM:
235  assert(em);
236  v = em->min(elt);
237  break;
238  case EXTMEM_DEBUG:
239  v1 = dim->min(tmp);
240  v = em->min(elt);
241  // dim->verify();
242  if (!(tmp == elt)) {
243  cerr << "------------------------------" << endl;
244  cerr << dim << endl;
245  cerr << "------------------------------" << endl;
246  em->print();
247  cerr << "------------------------------" << endl;
248  cerr << "tmp=" << tmp << endl;
249  cerr << "elt=" << elt << endl;
250  cerr << "------------------------------" << endl;
251  dim->destructiveVerify();
252  }
253  assert(v == v1);
254  assert(tmp == elt);
255  break;
256  }
257  return v;
258 }
259 
260 /* switch over to using an external priority queue */
261 template <class T, class Key>
263 {
264  switch (regim) {
265  case INMEM:
266  im->clear();
267  break;
268  case EXTMEM:
269  em->clear();
270  break;
271  case EXTMEM_DEBUG:
272  dim->clear();
273  break;
274  }
275 }
276 
277 template <class T, class Key>
279 {
280  switch (regim) {
281  case INMEM:
282  im->verify();
283  break;
284  case EXTMEM:
285  break;
286  case EXTMEM_DEBUG:
287  dim->verify();
288  break;
289  }
290 }
291 
292 // extract all elts with min key, add them and return their sum
293 template <class T, class Key>
295 {
296  bool v = false, v1;
297  T tmp;
298  switch (regim) {
299  case INMEM:
300  assert(im);
301  v = im->extract_all_min(elt);
302  break;
303  case EXTMEM:
304  assert(em);
305  v = em->extract_all_min(elt);
306  break;
307  case EXTMEM_DEBUG:
308  v1 = dim->extract_all_min(tmp);
309  v = em->extract_all_min(elt);
310  assert(dim->size() == em->size());
311  assert(v == v1);
312  assert(tmp == elt);
313  break;
314  }
315  return v;
316 }
317 
318 // return the nb of elements in the structure
319 template <class T, class Key>
321 {
322  long v = 0, v1;
323  switch (regim) {
324  case INMEM:
325  assert(im);
326  v = im->size();
327  break;
328  case EXTMEM:
329  assert(em);
330  v = em->size();
331  break;
332  case EXTMEM_DEBUG:
333  v1 = dim->size();
334  v = em->size();
335  assert(v == v1);
336  break;
337  }
338  return v;
339 }
340 
341 // ----------------------------------------------------------------------
342 template <class T, class Key>
344 {
345  bool v = false, v1;
346  T tmp;
347  switch (regim) {
348  case INMEM:
349  assert(im);
350  v = im->extract_min(elt);
351  break;
352  case EXTMEM:
353  assert(em);
354  v = em->extract_min(elt);
355  break;
356  case EXTMEM_DEBUG:
357  v1 = dim->extract_min(tmp);
358  v = em->extract_min(elt);
359  assert(v == v1);
360  assert(tmp == elt);
361  assert(dim->size() == em->size());
362  break;
363  }
364  return v;
365 }
366 
367 //------------------------------------------------------------
368 /* insert an element; if regim == INMEM, try insert it in im, and if
369  it is full, extract_max pqsize/2 elements of im into a stream,
370  switch to EXTMEM and insert the stream into em; if regim is
371  EXTMEM, insert in em; */
372 template <class T, class Key>
374 {
375  bool v = false;
376  switch (regim) {
377  case INMEM:
378  if (!im->full()) {
379  im->insert(elt);
380  v = true;
381  }
382  else {
383  makeExternal();
384  v = em->insert(elt); // insert the element
385  }
386  break;
387  case EXTMEM:
388  v = em->insert(elt);
389  break;
390  case EXTMEM_DEBUG:
391  dim->insert(elt);
392  v = em->insert(elt);
393  assert(dim->size() == em->size());
394  break;
395  }
396  return v;
397 }
398 
399 template <class T, class Key>
401 {
402  assert(size() == 0);
403  switch (regim) {
404  case INMEM:
405  makeExternal();
406  break;
407  case EXTMEM:
408  break;
409  case EXTMEM_DEBUG:
410  assert(0);
411  break;
412  }
413  dim = new UnboundedMinMaxHeap<T>();
414  regim = EXTMEM_DEBUG;
415 }
416 
417 template <class T>
418 class baseCmpType {
419 public:
420  static int compare(const T &x, const T &y)
421  {
422  return (x < y ? -1 : (x > y ? 1 : 0));
423  }
424 };
425 
426 /* switch over to using an external priority queue */
427 template <class T, class Key>
429 {
430  AMI_err ae;
431 #ifndef NDEBUG
432  long sizeCheck;
433  sizeCheck = size();
434 #endif
435 
436  assert(regim == INMEM);
437  regim = EXTMEM;
438 
439  EMPQAD_DEBUG cout << endl
440  << "EMPQUEUEADAPTIVE: memory full: "
441  << "switching to external-memory pqueue " << endl;
442 
443  // create an AMI_stream and write in it biggest half elts of im;
444  AMI_STREAM<T> *amis0 = new AMI_STREAM<T>();
445  AMI_STREAM<T> *amis1 = NULL;
446  assert(amis0);
447  unsigned long pqsize = im->size();
448  // assert(im->size() == im->get_maxsize());
449  T x;
450  for (unsigned long i = 0; i < pqsize / 2; i++) {
451  int z = im->extract_max(x);
452  assert(z);
453  ae = amis0->write_item(x);
454  assert(ae == AMI_ERROR_NO_ERROR);
455  }
456  assert(amis0->stream_len() == pqsize / 2);
458  {
459  cout << "written " << pqsize / 2 << " elts to stream\n";
460  cout.flush();
461  }
462 
463  assert(im->size() == pqsize / 2 + (pqsize % 2));
464 
466 
467  // sort the stream
468  baseCmpType<T> fun;
469  AMI_sort(amis0, &amis1, &fun); // XXX laura: replaced this to use a cmp obj
470  assert(amis1);
471  delete amis0;
473  {
474  cout << "sorted the stream\n";
475  cout.flush();
476  }
477 
479 
480  // set im to NULL and initialize em from im and amis1
481  em = new em_pqueue<T, Key>(im, amis1);
482  im = NULL;
483  assert(em);
485  {
486  cout << "empq initialized from im\n";
487  cout.flush();
488  }
490  {
491  em->print_size();
492  }
493 
495 #ifndef NDEBUG
496  assert(sizeCheck == size());
497 #endif
498 }
499 
500 #endif
AMI_err AMI_sort(AMI_STREAM< T > *instream, AMI_STREAM< T > **outstream, Compare *cmp, int deleteInputStream=0)
Definition: ami_sort.h:85
#define MAX_STREAMS_OPEN
Definition: ami_stream.h:63
AMI_err
Definition: ami_stream.h:83
@ AMI_ERROR_NO_ERROR
Definition: ami_stream.h:84
#define NULL
Definition: ccmath.h:32
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition: ami_stream.h:477
AMI_err write_item(const T &elt)
Definition: ami_stream.h:588
off_t stream_len(void)
Definition: ami_stream.h:375
bool extract_all_min(T &elt)
bool extract_min(T &elt)
bool insert(const T &elt)
static int compare(const T &x, const T &y)
@ EXTMEM_DEBUG
Definition: empq_adaptive.h:45
@ EXTMEM
Definition: empq_adaptive.h:45
@ INMEM
Definition: empq_adaptive.h:45
#define EMPQAD_DEBUG
Definition: empq_adaptive.h:43
#define assert(condition)
Definition: lz4.c:291
@ MM_STREAM_USAGE_MAXIMUM
Definition: mm.h:79
void LOG_avail_memo()
Definition: mm_utils.cpp:45
size_t getAvailableMemory()
Definition: mm_utils.cpp:52
#define x