GRASS GIS 8 Programmer's Manual  8.4.0dev(2024)-535c39c9fc
empq_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 #ifndef __EMPQ_IMPL_H
37 #define __EMPQ_IMPL_H
38 
39 #include <ostream>
40 #include <vector>
41 
42 #include "empq.h"
43 
44 #if (0)
45 #include "option.H"
46 #define MY_LOG_DEBUG_ID(x) \
47  if (GETOPT("debug")) \
48  cerr << __FILE__ << ":" << __LINE__ << " " << x << endl;
49 #endif
50 
51 #undef XXX
52 #define XXX if (0)
53 
54 #define MY_LOG_DEBUG_ID(x)
55 
56 /*****************************************************************/
57 /* encapsulation of the element=<key/prio, data> together with <buffer_id>
58  and <stream_id>; used during stream merging to remember where each
59  key comes from;
60 
61  assumes that class T implements: Key getPriority()
62 
63  implements operators {<, <=, ...} such that a< b iff a.x.prio < b.x.prio
64 */
65 template <class T, class Key>
67 
68 private:
69  T x;
70  unsigned short buf_id;
71  unsigned int str_id;
72 
73 public:
75 
76  ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid)
77  : x(e), buf_id(bid), str_id(sid)
78  {
79  }
80 
82 
83  void set(T &e, unsigned short bid, unsigned int sid)
84  {
85  x = e;
86  buf_id = bid;
87  str_id = sid;
88  }
89  T elt() const { return x; }
90  unsigned short buffer_id() const { return buf_id; }
91  unsigned int stream_id() const { return str_id; }
92  Key getPriority() const { return x.getPriority(); }
93  // print
94  friend ostream &operator<<(ostream &s,
96  {
97  return s << "<buf_id=" << elt.buf_id << ",str_id=" << elt.str_id << "> "
98  << elt.x << " ";
99  }
100 
103  {
104  return (e1.getPriority() < e2.getPriority());
105  }
108  {
109  return (e1.getPriority() <= e2.getPriority());
110  }
113  {
114  return (e1.getPriority() > e2.getPriority());
115  }
118  {
119  return (e1.getPriority() >= e2.getPriority());
120  }
123  {
124  return (e1.getPriority() != e2.getPriority());
125  }
128  {
129  return (e1.getPriority() == e2.getPriority());
130  }
131 };
132 
133 //************************************************************/
134 // create an em_pqueue
135 template <class T, class Key>
136 em_pqueue<T, Key>::em_pqueue(long pq_sz, long buf_sz, unsigned short nb_buf,
137  unsigned int buf_ar)
138  : pqsize(pq_sz), bufsize(buf_sz), max_nbuf(nb_buf), crt_buf(0),
139  buf_arity(buf_ar)
140 {
141 
142  //____________________________________________________________
143  // ESTIMATE AVAILABLE MEMORY BEFORE ALLOCATION
144  AMI_err ae;
145  size_t mm_avail = getAvailableMemory();
146  printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
147  mm_avail / (float)(1 << 20));
148  printf("EM_PQUEUE:available memory before allocation: %ldB\n", mm_avail);
149 
150  //____________________________________________________________
151  // ALLOCATE STRUCTURE
152  // some dummy checks..
153  assert(pqsize > 0 && bufsize > 0);
154 
155  MEMORY_LOG("em_pqueue: allocating int pqueue\n");
156  // initialize in memory pqueue
157  pq = new MinMaxHeap<T>(pqsize);
158  assert(pq);
159 
160  MEMORY_LOG("em_pqueue: allocating buff_0\n");
161  // initialize in memory buffer
162  buff_0 = new im_buffer<T>(bufsize);
163  assert(buff_0);
164 
165  char str[200];
166  snprintf(str, sizeof(str),
167  "em_pqueue: allocating array of %ld buff pointers\n",
168  (long)max_nbuf);
169  MEMORY_LOG(str);
170 
171  // allocate ext memory buffers array
172  buff = new em_buffer<T, Key> *[max_nbuf];
173  assert(buff);
174  for (unsigned short i = 0; i < max_nbuf; i++) {
175  buff[i] = NULL;
176  }
177 
178  //____________________________________________________________
179  // some memory checks- make sure the empq fits in memory !!
180 
181  // estimate available memory after allocation
182  mm_avail = getAvailableMemory();
183  printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
184  mm_avail / (float)(1 << 20));
185 
186  // estimate AMI_STREAM memory usage
187  size_t sz_stream;
188  AMI_STREAM<T> dummy;
189  if ((ae = dummy.main_memory_usage(&sz_stream, MM_STREAM_USAGE_MAXIMUM)) !=
191  cout << "em_pqueue constructor: failing to get stream_usage\n";
192  exit(1);
193  }
194  cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
195  cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
196 
197  // estimate memory overhead
198  long mm_overhead = buf_arity * sizeof(merge_key<Key>) +
199  max_nbuf * sizeof(em_buffer<T, Key>) + 2 * sz_stream +
200  max_nbuf * sz_stream;
201 
202  mm_overhead *= 8; // overestimate
203  cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
204  if (mm_overhead > mm_avail) {
205  cout << "overhead bigger than available memory"
206  << "increase -m and try again\n";
207  exit(1);
208  }
209  mm_avail -= mm_overhead;
210 
211  // arity*sizeof(AMI_STREAM) < memory
212  cout << "pqsize=" << pqsize << ", bufsize=" << bufsize
213  << ", maximum allowed arity=" << mm_avail / sz_stream << endl;
214  if (buf_arity * sz_stream > mm_avail) {
215  cout << "sorry - empq exceeds memory limits\n";
216  cout << "try again decreasing arity or pqsize/bufsize\n";
217  cout.flush();
218  }
219 }
220 
221 //************************************************************/
222 // create an em_pqueue capable to store <= N elements
223 template <class T, class Key>
225 {
226 
227  MY_LOG_DEBUG_ID("em_pqueue constructor");
228 
229  /************************************************************/
230  // available memory
231  AMI_err ae;
232  // available memory
233  size_t mm_avail = getAvailableMemory();
234  printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
235  mm_avail / (float)(1 << 20));
236  cout.flush();
237 
238  // AMI_STREAM memory usage
239  size_t sz_stream;
240  AMI_STREAM<T> dummy;
241  if ((ae = dummy.main_memory_usage(&sz_stream, MM_STREAM_USAGE_MAXIMUM)) !=
243  cout << "em_pqueue constructor: failing to get main_memory_usage\n";
244  exit(1);
245  }
246  cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
247  cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
248  cout.flush();
249  // assume max_nbuf=2 suffices; check after arity is computed
250  max_nbuf = 2;
251 
252  // account for temporary memory usage (set up a preliminary arity)
253  buf_arity = mm_avail / (2 * sz_stream);
254  long mm_overhead = buf_arity * sizeof(merge_key<Key>) +
255  max_nbuf * sizeof(em_buffer<T, Key>) + 2 * sz_stream +
256  max_nbuf * sz_stream;
257 
258  mm_overhead *= 8; // overestimate
259  cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
260  if (mm_overhead > mm_avail) {
261  cout << "overhead bigger than available memory"
262  << "increase -m and try again\n";
263  exit(1);
264  }
265  mm_avail -= mm_overhead;
266 
267 #ifdef SAVE_MEMORY
268  // assign M/2 to pq
269  pqsize = mm_avail / (2 * sizeof(T));
270  // assign M/2 to buff_0
271  bufsize = mm_avail / (2 * sizeof(T));
272 #else
273  // assign M/4 to pq
274  pqsize = mm_avail / (4 * sizeof(T));
275  // assign M/4 to buff_0
276  bufsize = mm_avail / (4 * sizeof(T));
277 #endif
278 
279  cout << "EM_PQUEUE: pqsize set to " << pqsize << endl;
280  cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
281  cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
282 
283  // assign M/2 to AMI_STREAMS and compute arity
284  /* arity is mainly constrained by the size of an AMI_STREAM; the
285  rest of the memory must accommodate for arity * max_nbuf
286  *sizeof(AMI_STREAM); there are some temporary stuff like arity *
287  sizeof(long) (the deleted array), arity * sizeof(T) (the array of
288  keys for merging) and so on, but the main factor is the
289  AMI_STREAM size which is roughly B * LBS * 2 (each AMI_STREAM
290  allocates 2 logical blocks) */
291 #ifdef SAVE_MEMORY
292  buf_arity = mm_avail / (2 * sz_stream);
293 #else
294  buf_arity = mm_avail / (2 * max_nbuf * sz_stream);
295 #endif
296 
297  // overestimate usage
298  if (buf_arity > 3) {
299  buf_arity -= 3;
300  }
301  else {
302  buf_arity = 1;
303  }
304 
305  cout << "EM_PQUEUE: arity set to " << buf_arity << endl;
306 
307  crt_buf = 0;
308 
309  // initialize in memory pqueue
310  MEMORY_LOG("em_pqueue: allocating int pqueue\n");
311  pq = new MinMaxHeap<T>(pqsize);
312  assert(pq);
313 
314  // initialize in memory buffer
315  MEMORY_LOG("em_pqueue: allocating buff_0\n");
316  buff_0 = new im_buffer<T>(bufsize);
317  assert(buff_0);
318 
319  // allocate ext memory buffers array
320  char str[200];
321  snprintf(str, sizeof(str),
322  "em_pqueue: allocating array of %ld buff pointers\n",
323  (long)max_nbuf);
324  MEMORY_LOG(str);
325  // allocate ext memory buffers array
326  buff = new em_buffer<T, Key> *[max_nbuf];
327  assert(buff);
328  for (unsigned short i = 0; i < max_nbuf; i++) {
329  buff[i] = NULL;
330  }
331 
332  // max nb of items the structure can accommodate (constrained by max_nbuf)
333  cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
334  cout.flush();
335 
336  // check that structure can accommodate N elements
337  // assert(N < buf_arity * (buf_arity + 1) * bufsize);
338  // assert(N < maxlen());
339  mm_avail = getAvailableMemory();
340  printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
341  mm_avail / (float)(1 << 20));
342 }
343 
344 #ifdef SAVE_MEMORY
345 //************************************************************/
346 // create an empq, initialize its pq with im and insert amis in
347 // buff[0]; im should not be used/deleted after that outside empq;
348 //
349 // assumption: im was allocated such that maxsize = mm_avail/T;
350 // when this constructor is called im is only half full, so we must
351 // free half of its space and give to buff_0
352 template <class T, class Key>
354 {
355  AMI_err ae;
356  int pqcapacity; /* amount of memory we can use for each of new
357  minmaxheap, and em-buffer */
358  unsigned int pqcurrentsize; /* number of elements currently in im */
359  assert(im && amis);
360 
361  pqcapacity = im->get_maxsize() / 2; // we think this memory is now available
362  pqsize = pqcapacity + 1; // truncate errors
363  pqcurrentsize = im->size();
364  // assert( pqcurrentsize <= pqsize);
365  if (!(pqcurrentsize <= pqsize)) {
366  cout << "EMPQ: pq maxsize=" << pqsize
367  << ", pq crtsize=" << pqcurrentsize << "\n";
368  assert(0);
369  exit(1);
370  }
371 
372  LOG_avail_memo();
373 
374  /* at this point im is allocated all memory, but it is only at most
375  half full; we need to relocate im to half space and to allocate
376  buff_0 the other half; since we use new, there is no realloc, so
377  we will copy to a file...*/
378 
379  {
380  // copy im to a stream and free its memory
381  T x;
382  AMI_STREAM<T> tmpstr;
383  for (unsigned int i = 0; i < pqcurrentsize; i++) {
384  im->extract_min(x);
385  ae = tmpstr.write_item(x);
386  assert(ae == AMI_ERROR_NO_ERROR);
387  }
388  delete im;
389  im = NULL;
390  LOG_avail_memo();
391 
392  // allocate pq and buff_0 half size
393  bufsize = pqcapacity;
394  cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize << " total "
395  << (float)bufsize * sizeof(T) / (1 << 20) << "MB\n";
396  cout.flush();
397  buff_0 = new im_buffer<T>(bufsize);
398  assert(buff_0);
399  cout << "EM_PQUEUE: allocating pq size=" << pqsize << " total "
400  << (float)pqcapacity * sizeof(T) / (1 << 20) << "MB\n";
401  cout.flush();
402  pq = new MinMaxHeap<T>(pqsize);
403  assert(pq);
404 
405  // fill pq from tmp stream
406  ae = tmpstr.seek(0);
407  assert(ae == AMI_ERROR_NO_ERROR);
408  T *elt;
409  for (unsigned int i = 0; i < pqcurrentsize; i++) {
410  ae = tmpstr.read_item(&elt);
411  assert(ae == AMI_ERROR_NO_ERROR);
412  pq->insert(*elt);
413  }
414  assert(pq->size() == pqcurrentsize);
415  }
416 
417  // estimate buf_arity
418  // AMI_STREAM memory usage
419  size_t sz_stream;
420  AMI_STREAM<T> dummy;
421  if ((ae = dummy.main_memory_usage(&sz_stream, MM_STREAM_USAGE_MAXIMUM)) !=
423  cout << "em_pqueue constructor: failing to get main_memory_usage\n";
424  exit(1);
425  }
426  cout << "EM_PQUEUE: AMI_stream memory usage: " << sz_stream << endl;
427  cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
428  // assume max_nbuf=2 suffices; check after arity is computed
429  max_nbuf = 2;
430  buf_arity = pqcapacity * sizeof(T) / sz_stream;
431  // should account for some overhead
432  if (buf_arity == 0) {
433  cout << "EM_PQUEUE: arity=0 (not enough memory..)\n";
434  exit(1);
435  }
436  if (buf_arity > 3) {
437  buf_arity -= 3;
438  }
439  else {
440  buf_arity = 1;
441  }
442 
443  // added on 05/16/2005 by Laura
444  if (buf_arity > MAX_STREAMS_OPEN) {
445  buf_arity = MAX_STREAMS_OPEN;
446  }
447 
448  // allocate ext memory buffer array
449  char str[200];
450  snprintf(str, sizeof(str),
451  "em_pqueue: allocating array of %ld buff pointers\n",
452  (long)max_nbuf);
453  MEMORY_LOG(str);
454  buff = new em_buffer<T, Key> *[max_nbuf];
455  assert(buff);
456  for (unsigned short i = 0; i < max_nbuf; i++) {
457  buff[i] = NULL;
458  }
459  crt_buf = 0;
460 
461  cout << "EM_PQUEUE: new pqsize set to " << pqcapacity << endl;
462  cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
463  cout << "EM_PQUEUE: buf arity set to " << buf_arity << endl;
464  cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
465  cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
466  cout.flush();
467 
468  // estimate available remaining memory
469  size_t mm_avail = getAvailableMemory();
470  printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
471  mm_avail / (float)(1 << 20));
472 
473  // last thing: insert the input stream in external buffers
474  // allocate buffer if necessary
475  // assert(crt_buf==0 && !buff[0]);// given
476  if (amis->stream_len()) {
477  // create buff[0] as a level1 buffer
478  MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
479  buff[0] = new em_buffer<T, Key>(1, bufsize, buf_arity);
480  buff[0]->insert(amis);
481  crt_buf = 1;
482  }
483 }
484 
485 #endif
486 
487 //************************************************************/
488 // free space
489 template <class T, class Key>
491 {
492  // delete in memory pqueue
493  if (pq) {
494  delete pq;
495  pq = NULL;
496  }
497  // delete in memory buffer
498  if (buff_0) {
499  delete buff_0;
500  buff_0 = NULL;
501  }
502  // delete ext memory buffers
503  for (unsigned short i = 0; i < crt_buf; i++) {
504  if (buff[i])
505  delete buff[i];
506  }
507  delete[] buff;
508 }
509 
510 //************************************************************/
511 // return maximum capacity of i-th external buffer
512 template <class T, class Key>
513 long em_pqueue<T, Key>::maxlen(unsigned short i)
514 {
515 
516  if (i >= max_nbuf) {
517  printf("em_pqueue::max_len: level=%d exceeds capacity=%d\n", i,
518  max_nbuf);
519  return 0;
520  }
521  if (i < crt_buf) {
522  return buff[i]->get_buf_maxlen();
523  }
524  // try allocating buffer
525  em_buffer<T, Key> *tmp = new em_buffer<T, Key>(i + 1, bufsize, buf_arity);
526  if (!tmp) {
527  cout << "em_pqueue::max_len: cannot allocate\n";
528  return 0;
529  }
530  long len = tmp->get_buf_maxlen();
531  delete tmp;
532  return len;
533 }
534 
535 //************************************************************/
536 // return maximum capacity of em_pqueue
537 template <class T, class Key>
539 {
540  long len = 0;
541  for (unsigned short i = 0; i < max_nbuf; i++) {
542  len += maxlen(i);
543  }
544  return len + buff_0->get_buf_maxlen();
545 }
546 
547 //************************************************************/
548 // return the total nb of elements in the structure
549 template <class T, class Key>
550 unsigned long em_pqueue<T, Key>::size()
551 {
552  // sum up the lengths(nb of elements) of the external buffers
553  unsigned long elen = 0;
554  for (unsigned short i = 0; i < crt_buf; i++) {
555  elen += buff[i]->get_buf_len();
556  }
557  return elen + pq->size() + buff_0->get_buf_len();
558 }
559 
560 //************************************************************/
561 // return true if empty
562 template <class T, class Key>
564 {
565 
566  // return (size() == 0);
567  // more efficient?
568  return ((pq->size() == 0) && (buff_0->get_buf_len() == 0) && (size() == 0));
569 }
570 
571 //************************************************************/
572 // called when pq must be filled from external buffers
573 template <class T, class Key>
575 {
576 
577 #ifndef NDEBUG
578  {
579  int k = 0;
580  for (unsigned short i = 0; i < crt_buf; i++) {
581  k |= buff[i]->get_buf_len();
582  }
583  if (!k) {
584  cerr << "fillpq called with empty external buff!" << endl;
585  }
586  assert(k);
587  }
588 #endif
589 
590 #ifdef EMPQ_PQ_FILL_PRINT
591  cout << "filling pq\n";
592  cout.flush();
593 #endif
594  XXX cerr << "filling pq" << endl;
595  MY_LOG_DEBUG_ID("fillpq");
596 
597  AMI_err ae;
598  {
599  char str[200];
600  snprintf(str, sizeof(str),
601  "em_pqueue::fillpq: allocate array of %hd AMI_STREAMs\n",
602  crt_buf);
603  MEMORY_LOG(str);
604  }
605  // merge pqsize smallest elements from each buffer into a new stream
606  ExtendedMergeStream **outstreams;
607  outstreams = new ExtendedMergeStream *[crt_buf];
608 
609  /* gets stream of smallest pqsize elts from each level */
610  for (unsigned short i = 0; i < crt_buf; i++) {
611  MY_LOG_DEBUG_ID(crt_buf);
612  outstreams[i] = new ExtendedMergeStream();
613  assert(buff[i]->get_buf_len());
614  ae = merge_buffer(buff[i], outstreams[i], pqsize);
615  assert(ae == AMI_ERROR_NO_ERROR);
616  assert(outstreams[i]->stream_len());
617  // print_stream(outstreams[i]); cout.flush();
618  }
619 
620  /* merge above streams into pqsize elts in minstream */
621  if (crt_buf == 1) {
622  // just one level; make common case faster :)
623  merge_bufs2pq(outstreams[0]);
624  delete outstreams[0];
625  delete[] outstreams;
626  }
627  else {
628  // merge the outstreams to get the global mins and delete them
629  // afterwards
630  ExtendedMergeStream *minstream = new ExtendedMergeStream();
631  // cout << "merging streams\n";
632  ae = merge_streams(outstreams, crt_buf, minstream, pqsize);
633  assert(ae == AMI_ERROR_NO_ERROR);
634  for (int i = 0; i < crt_buf; i++) {
635  delete outstreams[i];
636  }
637  delete[] outstreams;
638 
639  // copy the minstream in the internal pqueue while merging with buff_0;
640  // the smallest <pqsize> elements between minstream and buff_0 will be
641  // inserted in internal pqueue;
642  // also, the elements from minstram which are inserted in pqueue must be
643  // marked as deleted in the source streams;
644  merge_bufs2pq(minstream);
645  delete minstream;
646  minstream = NULL;
647  // cout << "after merge_bufs2pq: \n" << *this << "\n";
648  }
649 
650  XXX assert(pq->size());
651  XXX cerr << "fillpq done" << endl;
652  return true;
653 }
654 
655 //************************************************************/
656 // return the element with minimum priority in the structure
657 template <class T, class Key>
659 {
660 
661  bool ok;
662 
663  MY_LOG_DEBUG_ID("em_pqueue::min");
664 
665  // try first the internal pqueue
666  if (!pq->empty()) {
667  ok = pq->min(elt);
668  assert(ok);
669  return ok;
670  }
671 
672  MY_LOG_DEBUG_ID("extract_min: reset pq");
673  pq->reset();
674 
675  // if external buffers are empty
676  if (crt_buf == 0) {
677  // if internal buffer is empty too, then nothing to extract
678  if (buff_0->is_empty()) {
679  // cerr << "min called on empty empq" << endl;
680  return false;
681  }
682  else {
683 #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
684  cout << "filling pq from B0\n";
685  cout.flush();
686 #endif
687  // ext. buffs empty; fill int pqueue from buff_0
688  long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
689  buff_0->reset(pqsize, n);
690  ok = pq->min(elt);
691  assert(ok);
692  return true;
693  }
694  }
695  else {
696  // external buffers are not empty;
697  // called when pq must be filled from external buffers
698  XXX print_size();
699  fillpq();
700  XXX cerr << "fillpq done; about to take min" << endl;
701  ok = pq->min(elt);
702  XXX cerr << "after taking min" << endl;
703  assert(ok);
704  return ok;
705  } // end of else (if ext buffers are not empty)
706 
707  assert(0); // not reached
708 }
709 
710 //************************************************************/
711 template <class T, class Key>
712 static void print_ExtendedMergeStream(ExtendedMergeStream &str)
713 {
714 
716  str.seek(0);
717  while (str.read_item(&x) == AMI_ERROR_NO_ERROR) {
718  cout << *x << ", ";
719  }
720  cout << "\n";
721 }
722 
723 //************************************************************/
724 // delete the element with minimum priority in the structure;
725 // return false if pq is empty
726 template <class T, class Key>
728 {
729 
730  bool ok;
731 
732  MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_MIN");
733 
734  // try first the internal pqueue
735  if (!pq->empty()) {
736  // cout << "extract from internal pq\n";
737  MY_LOG_DEBUG_ID("extract from internal pq");
738  ok = pq->extract_min(elt);
739  assert(ok);
740  return ok;
741  }
742 
743  // if internal pqueue is empty, fill it up
744  MY_LOG_DEBUG_ID("internal pq empty: filling it up from external buffers");
745  MY_LOG_DEBUG_ID("extract_min: reset pq");
746  pq->reset();
747 
748  // if external buffers are empty
749  if (crt_buf == 0) {
750  // if internal buffer is empty too, then nothing to extract
751  if (buff_0->is_empty()) {
752  return false;
753  }
754  else {
755 #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
756  cout << "filling pq from B0\n";
757  cout.flush();
758 #endif
759  MY_LOG_DEBUG_ID("filling pq from buff_0");
760  // ext. buffs empty; fill int pqueue from buff_0
761  long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
762  buff_0->reset(pqsize, n);
763  ok = pq->extract_min(elt);
764  assert(ok);
765  return true;
766  }
767  }
768  else {
769  // external buffers are not empty;
770  // called when pq must be filled from external buffers
771  MY_LOG_DEBUG_ID("filling pq from buffers");
772 #ifdef EMPQ_PRINT_SIZE
773  long x = size(), y;
774  y = x * sizeof(T) >> 20;
775  cout << "pqsize:[" << active_streams() << " streams: ";
776  print_stream_sizes();
777  cout << " total " << x << "(" << y << "MB)]" << endl;
778  cout.flush();
779 #endif
780  fillpq();
781  MY_LOG_DEBUG_ID("pq filled");
782  XXX cerr << "about to get the min" << endl;
783  assert(pq);
784  ok = pq->extract_min(elt);
785  if (!ok) {
786  cout << "failing assertion: pq->extract_min == true\n";
787  this->print();
788  assert(ok);
789  }
790 
791  return ok;
792  } // end of else (if ext buffers are not empty)
793 
794  assert(0); // not reached
795 }
796 
797 //************************************************************/
798 // extract all elts with min key, add them and return their sum
799 // delete the element with minimum priority in the structure;
800 // return false if pq is empty
801 template <class T, class Key>
803 {
804  // cout << "em_pqueue::extract_min_all(T): sorry not implemented\n";
805  // exit(1);
806 
807  T next_elt;
808  bool done = false;
809 
810  MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_ALL_MIN");
811 
812  // extract first elt
813  if (!extract_min(elt)) {
814  return false;
815  }
816  else {
817  while (!done) {
818  // peek at the next min elt to see if matches
819  if ((!min(next_elt)) ||
820  !(next_elt.getPriority() == elt.getPriority())) {
821  done = true;
822  }
823  else {
824  extract_min(next_elt);
825  elt = elt + next_elt;
826 
827  MY_LOG_DEBUG_ID("EXTRACT_ALL_MIN: adding ");
828  MY_LOG_DEBUG_ID(elt);
829  }
830  }
831  }
832 
833 #ifdef EMPQ_PRINT_EXTRACTALL
834  cout << "EXTRACTED: " << elt << endl;
835  cout.flush();
836 #endif
837 #ifdef EMPQ_PRINT_EMPQ
838  this->print();
839  cout << endl;
840 #endif
841  return true;
842 }
843 
844 //************************************************************/
845 // copy the minstream in the internal pqueue while merging with buff_0;
846 // the smallest <pqsize> elements between minstream and buff_0 will be
847 // inserted in internal pqueue;
848 // also, the elements from minstram which are inserted in pqueue must be
849 // marked as deleted in the source streams;
850 template <class T, class Key>
852 {
853 
854  // cout << "bufs2pq: \nminstream: "; print_stream(minstream);
855  MY_LOG_DEBUG_ID("merge_bufs2pq: enter");
856 
857  AMI_err ae;
858 
859  // sort the internal buffer
860  buff_0->sort();
861  // cout << "bufs2pq: \nbuff0: " << *buff_0 << endl;
862 
863  ae = minstream->seek(0); // rewind minstream
864  assert(ae == AMI_ERROR_NO_ERROR);
865 
866  bool strEmpty = false, bufEmpty = false;
867 
868  unsigned int bufPos = 0;
870  T bufElt, strElt;
871 
872  ae = minstream->read_item(&strItem);
873  if (ae == AMI_ERROR_END_OF_STREAM) {
874  strEmpty = true;
875  }
876  else {
877  assert(ae == AMI_ERROR_NO_ERROR);
878  }
879  if (bufPos < buff_0->get_buf_len()) {
880  bufElt = buff_0->get_item(bufPos);
881  }
882  else {
883  // cout << "buff0 empty\n";
884  bufEmpty = true;
885  }
886 
887  XXX cerr << "pqsize=" << pqsize << endl;
888  XXX if (strEmpty) cerr << "stream is empty!!" << endl;
889  for (unsigned int i = 0; i < pqsize; i++) {
890 
891  if (!bufEmpty) {
892  if ((!strEmpty) && (strElt = strItem->elt(),
893  bufElt.getPriority() > strElt.getPriority())) {
894  delete_str_elt(strItem->buffer_id(), strItem->stream_id());
895  pq->insert(strElt);
896  ae = minstream->read_item(&strItem);
897  if (ae == AMI_ERROR_END_OF_STREAM) {
898  strEmpty = true;
899  }
900  else {
901  assert(ae == AMI_ERROR_NO_ERROR);
902  }
903  }
904  else {
905  bufPos++;
906  pq->insert(bufElt);
907  if (bufPos < buff_0->get_buf_len()) {
908  bufElt = buff_0->get_item(bufPos);
909  }
910  else {
911  bufEmpty = true;
912  }
913  }
914  }
915  else {
916  if (!strEmpty) { // stream not empty
917  strElt = strItem->elt();
918  // cout << "i=" << i << "str & buff empty\n";
919  delete_str_elt(strItem->buffer_id(), strItem->stream_id());
920  pq->insert(strElt);
921  // cout << "insert " << strElt << "\n";
922  ae = minstream->read_item(&strItem);
923  if (ae == AMI_ERROR_END_OF_STREAM) {
924  strEmpty = true;
925  }
926  else {
927  assert(ae == AMI_ERROR_NO_ERROR);
928  }
929  }
930  else { // both empty: < pqsize items
931  break;
932  }
933  }
934  }
935 
936  // shift left buff_0 in case elements were deleted from the beginning
937  buff_0->shift_left(bufPos);
938 
939  MY_LOG_DEBUG_ID("pq filled");
940 #ifdef EMPQ_PQ_FILL_PRINT
941  cout << "merge_bufs2pq: pq filled; now cleaning\n";
942  cout.flush();
943 #endif
944  // this->print();
945  // clean buffers in case some streams have been emptied
946  cleanup();
947 
948  MY_LOG_DEBUG_ID("merge_bufs2pq: done");
949 }
950 
951 //************************************************************/
952 // deletes one element from <buffer, stream>
953 template <class T, class Key>
954 void em_pqueue<T, Key>::delete_str_elt(unsigned short buf_id,
955  unsigned int stream_id)
956 {
957 
958  // check them
959  assert(buf_id < crt_buf);
960  assert(stream_id < buff[buf_id]->get_nbstreams());
961  // update;
962  buff[buf_id]->incr_deleted(stream_id);
963 }
964 
965 //************************************************************/
966 // clean buffers in case some streams have been emptied
967 template <class T, class Key>
969 {
970 
971  MY_LOG_DEBUG_ID("em_pqueue::cleanup()");
972 #ifdef EMPQ_PQ_FILL_PRINT
973  cout << "em_pqueue: cleanup enter\n";
974  cout.flush();
975 #endif
976  // adjust buffers in case whole streams got deleted
977  for (unsigned short i = 0; i < crt_buf; i++) {
978  // cout << "clean buffer " << i << ": "; cout.flush();
979  buff[i]->cleanup();
980  }
981  if (crt_buf) {
982  short i = crt_buf - 1;
983  while ((i >= 0) && buff[i]->is_empty()) {
984  crt_buf--;
985  i--;
986  }
987  }
988 #ifdef EMPQ_PQ_FILL_PRINT
989  cout << "em_pqueue: cleanup done\n";
990  cout.flush();
991 #endif
992  // if a stream becomes too short move it on previous level
993  // to be added..
994  // cout <<"done cleaning up\n";
995 }
996 
997 //************************************************************/
998 // insert an element; return false if insertion fails
999 template <class T, class Key>
1001 {
1002  bool ok;
1003 #ifdef EMPQ_ASSERT_EXPENSIVE
1004  long init_size = size();
1005 #endif
1006  T val = x;
1007 
1008  MY_LOG_DEBUG_ID("\nEM_PQUEUE::INSERT");
1009  // if structure is empty insert x in pq; not worth the trouble..
1010  if ((crt_buf == 0) && (buff_0->is_empty())) {
1011  if (!pq->full()) {
1012  MY_LOG_DEBUG_ID("insert in pq");
1013  pq->insert(x);
1014  return true;
1015  }
1016  }
1017  if (!pq->empty()) {
1018  T pqmax;
1019 
1020  ok = pq->max(pqmax);
1021  assert(ok);
1022  // cout << "insert " << x << " max: " << pqmax << "\n";
1023  if (x <= pqmax) {
1024  // if x is smaller then pq_max and pq not full, insert x in pq
1025  if (!pq->full()) {
1026 #ifdef EMPQ_ASSERT_EXPENSIVE
1027  assert(size() == init_size);
1028 #endif
1029  pq->insert(x);
1030  return true;
1031  }
1032  else {
1033  // if x is smaller then pq_max and pq full, exchange x with
1034  // pq_max
1035  pq->extract_max(val);
1036  pq->insert(x);
1037  // cout << "max is: " << val << endl;
1038  }
1039  }
1040  }
1041  /* at this point, x >= pqmax.
1042  we need to insert val==x or val==old max.
1043  */
1044 
1045  // if buff_0 full, empty it
1046 #ifdef EMPQ_ASSERT_EXPENSIVE
1047  assert(size() == init_size);
1048 #endif
1049  if (buff_0->is_full()) {
1050 #ifdef EMPQ_PRINT_SIZE
1051  long x = size(), y;
1052  y = x * sizeof(T) >> 20;
1053  cout << "pqsize:[" << active_streams() << " streams: ";
1054  print_stream_sizes();
1055  cout << " total " << x << "(" << y << "MB)]" << endl;
1056  cout.flush();
1057 #endif
1058  empty_buff_0();
1059  }
1060 #ifdef EMPQ_ASSERT_EXPENSIVE
1061  assert(size() == init_size);
1062 #endif
1063  // insert x in buff_0
1064  assert(!buff_0->is_full());
1065  MY_LOG_DEBUG_ID("insert in buff_0");
1066  ok = buff_0->insert(val);
1067  assert(ok);
1068 
1069 #ifdef EMPQ_PRINT_INSERT
1070  cout << "INSERTED: " << x << endl;
1071  cout.flush();
1072 #endif
1073 #ifdef EMPQ_PRINT_EMPQ
1074  this->print();
1075  cout << endl;
1076 #endif
1077  MY_LOG_DEBUG_ID("EM_PQUEUE: INSERTED");
1078  return true;
1079 }
1080 
1081 //************************************************************/
1082 /* called when buff_0 is full to empty it on external level_1 buffer;
1083  can produce cascading emptying
1084 */
1085 template <class T, class Key>
1087 {
1088 #ifdef EMPQ_ASSERT_EXPENSIVE
1089  long init_size = size();
1090 #endif
1091 
1092 #ifdef EMPQ_EMPTY_BUF_PRINT
1093  cout << "emptying buff_0\n";
1094  cout.flush();
1095  print_size();
1096 #endif
1097  MY_LOG_DEBUG_ID("empty buff 0");
1098 
1099  assert(buff_0->is_full());
1100 
1101  // sort the buffer
1102  buff_0->sort();
1103  // cout << "sorted buff_0: \n" << *buff_0 << "\n";
1104 #ifdef EMPQ_ASSERT_EXPENSIVE
1105  assert(size() == init_size);
1106 #endif
1107  // allocate buffer if necessary
1108  if (!buff[0]) { // XXX should check crt_buf
1109  // create buff[0] as a level1 buffer
1110  MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
1111  buff[0] = new em_buffer<T, Key>(1, bufsize, buf_arity);
1112  }
1113  // check that buff_0 fills exactly a stream of buff[0]
1114  assert(buff_0->get_buf_len() == buff[0]->get_stream_maxlen());
1115 
1116  // save buff_0 to stream
1117  MY_LOG_DEBUG_ID("empty buff_0 to stream");
1118  AMI_STREAM<T> *buff_0_str = buff_0->save2str();
1119  assert(buff_0_str);
1120  // MY_LOG_DEBUG_ID("buff_0 emptied");
1121 
1122  // reset buff_0
1123  buff_0->reset();
1124  MY_LOG_DEBUG_ID("buf_0 now reset");
1125 
1126 #ifdef EMPQ_ASSERT_EXPENSIVE
1127  assert(size() + buff_0->maxlen() == init_size);
1128 #endif
1129 
1130  // copy data from buff_0 to buff[0]
1131  if (buff[0]->is_full()) {
1132  // if buff[0] full, empty it recursively
1133  empty_buff(0);
1134  }
1135  buff[0]->insert(buff_0_str);
1136  MY_LOG_DEBUG_ID("stream inserted in buff[0]");
1137 
1138  // update the crt_buf pointer if necessary
1139  if (crt_buf == 0)
1140  crt_buf = 1;
1141 
1142 #ifdef EMPQ_ASSERT_EXPENSIVE
1143  assert(size() == init_size);
1144 #endif
1145 
1146  return true;
1147 }
1148 
1149 //************************************************************/
1150 /* sort and empty buff[i] into buffer[i+1] recursively; called
1151  by empty_buff_0() to empty subsequent buffers; i must
1152  be a valid (i<crt_buf) full buffer;
1153 */
1154 template <class T, class Key>
1155 void em_pqueue<T, Key>::empty_buff(unsigned short i)
1156 {
1157 
1158 #ifdef EMPQ_ASSERT_EXPENSIVE
1159  long init_size = size();
1160 #endif
1161 #ifdef EMPQ_EMPTY_BUF_PRINT
1162  cout << "emptying buffer_" << i << "\n";
1163  cout.flush();
1164  print_size();
1165 #endif
1166  MY_LOG_DEBUG_ID("empty buff ");
1167  MY_LOG_DEBUG_ID(i);
1168 
1169  // i must be a valid, full buffer
1170  assert(i < crt_buf);
1171  assert(buff[i]->is_full());
1172 
1173  // check there is space to empty to
1174  if (i == max_nbuf - 1) {
1175  cerr << "empty_buff:: cannot empty further - structure is full..\n";
1176  print_size();
1177  cerr << "ext buff array should reallocate in a future version..\n";
1178  exit(1);
1179  }
1180 
1181  // create next buffer if necessary
1182  if (!buff[i + 1]) {
1183  // create buff[i+1] as a level-(i+2) buffer
1184  char str[200];
1185  snprintf(str, sizeof(str),
1186  "em_pqueue::empty_buff( %hd ) allocate new em_buffer\n", i);
1187  MEMORY_LOG(str);
1188  buff[i + 1] = new em_buffer<T, Key>(i + 2, bufsize, buf_arity);
1189  }
1190  assert(buff[i + 1]);
1191  // check that buff[i] fills exactly a stream of buff[i+1];
1192  // extraneous (its checked in insert)
1193  // assert(buff[i]->len() == buff[i+1]->streamlen());
1194 
1195  // sort the buffer into a new stream
1196  MY_LOG_DEBUG_ID("sort buffer ");
1197  AMI_STREAM<T> *sorted_buf = buff[i]->sort();
1198 
1199  // assert(sorted_buf->stream_len() == buff[i]->len());
1200  // this is just for debugging
1201  if (sorted_buf->stream_len() != buff[i]->get_buf_len()) {
1202  cout << "sorted_stream_len: " << sorted_buf->stream_len()
1203  << " , bufflen: " << buff[i]->get_buf_len() << endl;
1204  cout.flush();
1205  AMI_err ae;
1206  ae = sorted_buf->seek(0);
1207  assert(ae == AMI_ERROR_NO_ERROR);
1208  T *x;
1209  while (sorted_buf->read_item(&x) == AMI_ERROR_NO_ERROR) {
1210  cout << *x << ", ";
1211  cout.flush();
1212  }
1213  cout << "\n";
1214 #ifdef EMPQ_ASSERT_EXPENSIVE
1215  assert(sorted_buf->stream_len() == buff[i]->len());
1216 #endif
1217  }
1218 #ifdef EMPQ_ASSERT_EXPENSIVE
1219  assert(size() == init_size);
1220 #endif
1221  // reset buff[i] (delete all its streams )
1222  buff[i]->reset();
1223 #ifdef EMPQ_ASSERT_EXPENSIVE
1224  assert(size() == init_size - sorted_buf->stream_len());
1225 #endif
1226 
1227  // link sorted buff[i] as a substream into buff[i+1];
1228  // sorted_buf is a new stream, so it starts out with 0 deleted elements;
1229  // of ocurse, its length might be smaller than nominal;
1230  if (buff[i + 1]->is_full()) {
1231  empty_buff(i + 1);
1232  }
1233  buff[i + 1]->insert(sorted_buf, 0);
1234 
1235  // update the crt_buf pointer if necessary
1236  if (crt_buf < i + 2)
1237  crt_buf = i + 2;
1238 
1239 #ifdef EMPQ_ASSERT_EXPENSIVE
1240  assert(size() == init_size);
1241 #endif
1242 }
1243 
1244 //************************************************************/
1245 /* merge the first <K> elements of the streams of input buffer,
1246  starting at position <buf.deleted[i]> in each stream; there are
1247  <buf.arity> streams in total; write output in <outstream>; the
1248  items written in outstream are of type <merge_output_type> which
1249  extends T with the stream nb and buffer nb the item comes from;
1250  this information is needed later to distribute items back; do not
1251  delete the K merged elements from the input streams; <bufid> is the
1252  id of the buffer whose streams are being merged;
1253 
1254  the input streams are assumed sorted in increasing order of keys;
1255 */
1256 template <class T, class Key>
1258  ExtendedMergeStream *outstream, long K)
1259 {
1260  long *bos = buf->get_bos();
1261  /* buff[0] is a level-1 buffer and so on */
1262  unsigned short bufid = buf->get_level() - 1;
1263  /* Pointers to current leading elements of streams */
1264  unsigned int arity = buf->get_nbstreams();
1265  AMI_STREAM<T> **instreams = buf->get_streams();
1266  std::vector<T *> in_objects(arity);
1267  AMI_err ami_err;
1268  unsigned int i, j;
1269 
1270  MY_LOG_DEBUG_ID("merge_buffer ");
1271  MY_LOG_DEBUG_ID(buf->get_level());
1272 
1273  assert(outstream);
1274  assert(instreams);
1275  assert(buf->get_buf_len());
1276  assert(K > 0);
1277 
1278  // array initialized with first key from each stream (only non-null keys
1279  // must be included)
1280  MEMORY_LOG("em_pqueue::merge_buffer: allocate keys array\n");
1281  merge_key<Key> *keys = new merge_key<Key>[arity];
1282 
1283  /* count number of non-empty runs */
1284  j = 0;
1285  /* rewind and read the first item from every stream */
1286  for (i = 0; i < arity; i++) {
1287  assert(instreams[i]);
1288  // rewind stream
1289  if ((ami_err = instreams[i]->seek(bos[i])) != AMI_ERROR_NO_ERROR) {
1290  cerr << "WARNING!!! EARLY EXIT!!!" << endl;
1291  return ami_err;
1292  }
1293  /* read first item */
1294  ami_err = instreams[i]->read_item(&(in_objects[i]));
1295  switch (ami_err) {
1297  in_objects[i] = NULL;
1298  break;
1299  case AMI_ERROR_NO_ERROR:
1300  // cout << "stream " << i << " read " << *in_objects[i] << "\n";
1301  // cout.flush();
1302  // include this key in the array of keys
1303  keys[j] = merge_key<Key>(in_objects[i]->getPriority(), i);
1304  // cout << "key " << j << "set to " << keys[j] << "\n";
1305  j++;
1306  break;
1307  default:
1308  cerr << "WARNING!!! EARLY EXIT!!!" << endl;
1309  return ami_err;
1310  }
1311  }
1312  unsigned int NonEmptyRuns = j;
1313  // cout << "nonempyruns = " << NonEmptyRuns << "\n";
1314 
1315  // build heap from the array of keys
1316  pqheap_t1<merge_key<Key>> mergeheap(keys, NonEmptyRuns);
1317 
1318  // cout << "heap is : " << mergeheap << "\n";
1319  // repeatedly extract_min from heap and insert next item from same stream
1320  long extracted = 0;
1321  // rewind output buffer
1322  ami_err = outstream->seek(0);
1323  assert(ami_err == AMI_ERROR_NO_ERROR);
1325  while (!mergeheap.empty() && (extracted < K)) {
1326  // find min key and id of stream it comes from
1327  i = mergeheap.min().stream_id();
1328  // write min item to output stream
1329  out = ExtendedEltMergeType<T, Key>(*in_objects[i], bufid, i);
1330  if ((ami_err = outstream->write_item(out)) != AMI_ERROR_NO_ERROR) {
1331  cerr << "WARNING!!! EARLY EXIT!!!" << endl;
1332  return ami_err;
1333  }
1334  // cout << "wrote " << out << "\n";
1335  extracted++; // update nb of extracted elements
1336  // read next item from same input stream
1337  ami_err = instreams[i]->read_item(&(in_objects[i]));
1338  switch (ami_err) {
1340  mergeheap.delete_min();
1341  break;
1342  case AMI_ERROR_NO_ERROR:
1343  // extract the min from the heap and insert next key from the
1344  // same stream
1345  {
1346  Key k = in_objects[i]->getPriority();
1347  mergeheap.delete_min_and_insert(merge_key<Key>(k, i));
1348  }
1349  break;
1350  default:
1351  cerr << "WARNING!!! early breakout!!!" << endl;
1352  return ami_err;
1353  }
1354  // cout << "PQ: " << mergeheap << "\n";
1355  } // while
1356 
1357  // delete [] keys;
1358  //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
1359  // DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
1360  // IF I DELETE KEYS EXPLICITLY, THEY WILL BE DELETED AGAIN BY
1361  // DESTRUCTOR, AND EVERYTHING SCREWS UP..
1362 
1363  buf->put_streams();
1364  MY_LOG_DEBUG_ID("merge_buffer: done");
1365  // cout << "done merging buffer\n";
1366 
1367  assert(extracted == outstream->stream_len());
1368  assert(extracted); // something in, something out
1369  return AMI_ERROR_NO_ERROR;
1370 }
1371 
1372 //************************************************************/
1373 /* merge the first <K> elements of the input streams; there are <arity>
1374  streams in total; write output in <outstream>;
1375 
1376  the input streams are assumed sorted in increasing order of their
1377  keys;
1378 */
1379 template <class T, class Key>
1381  unsigned short arity,
1382  ExtendedMergeStream *outstream, long K)
1383 {
1384 
1385  MY_LOG_DEBUG_ID("enter merge_streams");
1386  assert(arity > 1);
1387 
1388  // Pointers to current leading elements of streams
1389  std::vector<ExtendedEltMergeType<T, Key>> in_objects(arity);
1390 
1391  AMI_err ami_err;
1392  // unsigned int i;
1393  unsigned int nonEmptyRuns = 0; // count number of non-empty runs
1394 
1395  // array initialized with first element from each stream (only non-null keys
1396  // must be included)
1397  MEMORY_LOG("em_pqueue::merge_streams: allocate keys array\n");
1398 
1399  merge_key<Key> *keys = new merge_key<Key>[arity];
1400  assert(keys);
1401 
1402  // rewind and read the first item from every stream
1403  for (int i = 0; i < arity; i++) {
1404  // rewind stream
1405  if ((ami_err = instreams[i]->seek(0)) != AMI_ERROR_NO_ERROR) {
1406  return ami_err;
1407  }
1408  // read first item
1410  ami_err = instreams[i]->read_item(&objp);
1411  switch (ami_err) {
1412  case AMI_ERROR_NO_ERROR:
1413  in_objects[i] = *objp;
1414  keys[nonEmptyRuns] = merge_key<Key>(in_objects[i].getPriority(), i);
1415  nonEmptyRuns++;
1416  break;
1418  break;
1419  default:
1420  return ami_err;
1421  }
1422  }
1423  assert(nonEmptyRuns <= arity);
1424 
1425  // build heap from the array of keys
1426  pqheap_t1<merge_key<Key>> mergeheap(
1427  keys, nonEmptyRuns); /* takes ownership of keys */
1428 
1429  // repeatedly extract_min from heap and insert next item from same stream
1430  long extracted = 0;
1431  // rewind output buffer
1432  ami_err = outstream->seek(0);
1433  assert(ami_err == AMI_ERROR_NO_ERROR);
1434 
1435  while (!mergeheap.empty() && (extracted < K)) {
1436  // find min key and id of stream it comes from
1437  int id = mergeheap.min().stream_id();
1438  // write min item to output stream
1439  assert(id < nonEmptyRuns);
1440  assert(id >= 0);
1441  assert(mergeheap.size() == nonEmptyRuns);
1442  ExtendedEltMergeType<T, Key> obj = in_objects[id];
1443  if ((ami_err = outstream->write_item(obj)) != AMI_ERROR_NO_ERROR) {
1444  return ami_err;
1445  }
1446  // cout << "wrote " << *in_objects[i] << "\n";
1447 
1448  // extract the min from the heap and insert next key from same stream
1449  assert(id < nonEmptyRuns);
1450  assert(id >= 0);
1452  ami_err = instreams[id]->read_item(&objp);
1453  switch (ami_err) {
1454  case AMI_ERROR_NO_ERROR: {
1455  in_objects[id] = *objp;
1456  merge_key<Key> tmp =
1457  merge_key<Key>(in_objects[id].getPriority(), id);
1458  mergeheap.delete_min_and_insert(tmp);
1459  }
1460  extracted++; // update nb of extracted elements
1461  break;
1463  mergeheap.delete_min();
1464  break;
1465  default:
1466  return ami_err;
1467  }
1468  } // while
1469 
1470  // delete [] keys;
1471  //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
1472  // DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
1473  // IF I DELETE KEYS EXPLICITLY, THEY WILL BE DELETED AGAIN BY
1474  // DESTRUCTOR, AND EVERYTHING SCREWS UP..
1475 
1476  MY_LOG_DEBUG_ID("merge_streams: done");
1477  return AMI_ERROR_NO_ERROR;
1478 }
1479 
1480 //************************************************************/
1481 template <class T, class Key>
1483 {
1484  pq->clear();
1485  buff_0->clear();
1486 
1487  for (int i = 0; i < crt_buf; i++) {
1488  if (buff[i]) {
1489  delete buff[i];
1490  buff[i] = NULL;
1491  }
1492  }
1493  crt_buf = 0;
1494 }
1495 
1496 //************************************************************/
1497 template <class T, class Key>
1499 {
1500  cout << "EM_PQ: [pq=" << pqsize << ", b=" << bufsize
1501  << ", bufs=" << max_nbuf << ", ar=" << buf_arity << "]\n";
1502 
1503  cout << "PQ: ";
1504  // pq->print_range();
1505  pq->print();
1506  cout << endl;
1507 
1508  cout << "B0: ";
1509  // buff_0->print_range();
1510  buff_0->print();
1511  cout << "\n";
1512 
1513  for (unsigned short i = 0; i < crt_buf; i++) {
1514  cout << "B" << i + 1 << ": ";
1515  buff[i]->print_range();
1516  cout << endl;
1517  }
1518  cout.flush();
1519 }
1520 
1521 //************************************************************/
1522 template <class T, class Key>
1524 {
1525  cout << "EM_PQ: [pq=" << pqsize << ", b=" << bufsize
1526  << ", bufs=" << max_nbuf << ", ar=" << buf_arity << "]\n";
1527 
1528  cout << "PQ: ";
1529  pq->print();
1530  cout << endl;
1531 
1532  cout << "B0: ";
1533  buff_0->print();
1534  cout << "\n";
1535 
1536  for (unsigned short i = 0; i < crt_buf; i++) {
1537  cout << "B" << i + 1 << ": " << endl;
1538  buff[i]->print();
1539  cout << endl;
1540  }
1541  cout.flush();
1542 }
1543 
1544 //************************************************************/
1545 template <class T, class Key>
1547 {
1548  // sum up the lengths(nb of elements) of the external buffers
1549  long elen = 0;
1550  cout << "EMPQ: pq=" << pq->size() << ",B0=" << buff_0->get_buf_len()
1551  << endl;
1552  cout.flush();
1553  for (unsigned short i = 0; i < crt_buf; i++) {
1554  assert(buff[i]);
1555  cout << "B_" << i + 1 << ":";
1556  cout.flush();
1557  buff[i]->print_stream_sizes();
1558  elen += buff[i]->get_buf_len();
1559  // cout << endl; cout.flush();
1560  }
1561  cout << "total: " << elen + pq->size() + buff_0->get_buf_len() << endl
1562  << endl;
1563  cout.flush();
1564 }
1565 
1566 /*****************************************************************/
1567 template <class T, class Key>
1569 {
1570  for (unsigned short i = 0; i < crt_buf; i++) {
1571  cout << "[";
1572  buff[i]->print_stream_sizes();
1573  cout << "]";
1574  }
1575  cout.flush();
1576 }
1577 
1578 #undef XXX
1579 
1580 #endif
#define MAX_STREAMS_OPEN
Definition: ami_stream.h:63
AMI_err
Definition: ami_stream.h:83
@ AMI_ERROR_END_OF_STREAM
Definition: ami_stream.h:86
@ AMI_ERROR_NO_ERROR
Definition: ami_stream.h:84
#define NULL
Definition: ccmath.h:32
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition: ami_stream.h:477
AMI_err write_item(const T &elt)
Definition: ami_stream.h:588
AMI_err seek(off_t offset)
Definition: ami_stream.h:445
AMI_err read_item(T **elt)
Definition: ami_stream.h:525
off_t stream_len(void)
Definition: ami_stream.h:375
bool extract_min(T &elt)
Definition: minmaxheap.h:568
HeapIndex size() const
Definition: minmaxheap.h:112
friend int operator<=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:106
friend ostream & operator<<(ostream &s, const ExtendedEltMergeType< T, Key > &elt)
Definition: empq_impl.h:94
unsigned int stream_id() const
Definition: empq_impl.h:91
friend int operator!=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:121
friend int operator>(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:111
friend int operator>=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:116
ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid)
Definition: empq_impl.h:76
unsigned short buffer_id() const
Definition: empq_impl.h:90
friend int operator<(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:101
Key getPriority() const
Definition: empq_impl.h:92
void set(T &e, unsigned short bid, unsigned int sid)
Definition: empq_impl.h:83
friend int operator==(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:126
HeapIndex get_maxsize() const
Definition: minmaxheap.h:786
unsigned short get_level() const
Definition: embuffer.h:235
void put_streams()
Definition: embuffer.h:685
unsigned long get_buf_maxlen()
Definition: embuffer.h:312
unsigned long get_buf_len()
Definition: embuffer.h:302
unsigned int get_nbstreams() const
Definition: embuffer.h:263
AMI_STREAM< T > ** get_streams()
Definition: embuffer.h:660
long * get_bos() const
Definition: embuffer.h:251
void clear()
Definition: empq_impl.h:1482
void print_stream_sizes()
Definition: empq_impl.h:1568
void print()
Definition: empq_impl.h:1523
void delete_str_elt(unsigned short buf_id, unsigned int stream_id)
Definition: empq_impl.h:954
bool fillpq()
Definition: empq_impl.h:574
void print_range()
Definition: empq_impl.h:1498
bool insert(const T &elt)
Definition: empq_impl.h:1000
~em_pqueue()
Definition: empq_impl.h:490
bool extract_min(T &elt)
Definition: empq_impl.h:727
void merge_bufs2pq(ExtendedMergeStream *minstream)
Definition: empq_impl.h:851
bool min(T &elt)
Definition: empq_impl.h:658
long maxlen()
Definition: empq_impl.h:538
unsigned long size()
Definition: empq_impl.h:550
AMI_err merge_buffer(em_buffer< T, Key > *buf, ExtendedMergeStream *outstr, long K)
Definition: empq_impl.h:1257
void empty_buff(unsigned short i)
Definition: empq_impl.h:1155
bool extract_all_min(T &elt)
Definition: empq_impl.h:802
void cleanup()
Definition: empq_impl.h:968
AMI_err merge_streams(ExtendedMergeStream **instr, unsigned short arity, ExtendedMergeStream *outstr, long K)
Definition: empq_impl.h:1380
bool is_empty()
Definition: empq_impl.h:563
void print_size()
Definition: empq_impl.h:1546
bool empty_buff_0()
Definition: empq_impl.h:1086
bool delete_min()
Definition: pqheap.h:435
void delete_min_and_insert(const T &x)
Definition: pqheap.h:507
bool empty(void)
Definition: pqheap.h:304
unsigned int size(void) const
Definition: pqheap.h:142
bool min(T &elt)
Definition: pqheap.h:318
#define min(x, y)
Definition: draw2.c:29
#define ExtendedMergeStream
Definition: empq.h:53
#define MY_LOG_DEBUG_ID(x)
Definition: empq_impl.h:54
#define XXX
Definition: empq_impl.h:52
#define assert(condition)
Definition: lz4.c:393
@ MM_STREAM_USAGE_MAXIMUM
Definition: mm.h:79
void MEMORY_LOG(const std::string &str)
Definition: mm_utils.cpp:59
void LOG_avail_memo()
Definition: mm_utils.cpp:45
size_t getAvailableMemory()
Definition: mm_utils.cpp:52
#define x