GRASS GIS 8 Programmer's Manual  8.5.0dev(2024)-ed80a6eaeb
embuffer.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 #ifndef __EMBUFFER_H
37 #define __EMBUFFER_H
38 
39 #include <stdio.h>
40 #include <assert.h>
41 #include <stdlib.h>
42 #include <math.h>
43 
44 #include "ami_config.h" //for SAVE_MEMORY
45 #include "ami_stream.h"
46 #include "mm.h"
47 #include "mm_utils.h"
48 #include "pqheap.h"
49 
50 #define MY_LOG_DEBUG_ID(x) // inhibit debug printing
51 // #define MY_LOG_DEBUG_ID(x) LOG_DEBUG_ID(x)
52 
53 /**********************************************************
54  DEBUGGING FLAGS
55 ***********************************************************/
56 
57 // setting this enables checking that the streams/arrays inserted in
58 // buffers are sorted in increasing order
59 // #define EMBUF_CHECK_INSERT
60 
61 // enable checking that stream name is the same as the one stored in
62 // the buffer name[]
63 // #define EMBUF_CHECK_NAME
64 
65 // enable printing names as they are checked
66 // #define EMBUF_CHECK_NAME_PRINT
67 
68 // enable printing when streams in a buffer are shifted left to
69 // check that names are shifted accordingly
70 // #define EMBUF_DELETE_STREAM_PRINT
71 
72 // enable printing the name of the stream which is inserted in buff
73 // #define EMBUF_PRINT_INSERT
74 
75 // enable printing the stream names/sizes in cleanup()
76 // #define EMBUF_CLEANUP_PRINT
77 
78 // enable printing when get/put_stream is called (for each stream)
79 // #define EMBUF_PRINT_GETPUT_STREAM
80 
81 // enable printing when get/put_streams is called
82 // #define EMBUF_PRINT_GETPUT_STREAMS
83 
84 /***********************************************************/
85 
86 /*****************************************************************/
87 /* encapsulation of the key together with stream_id; used during
88  stream merging to save space;
89 */
90 template <class KEY>
91 class merge_key {
92 public:
93  KEY k;
94  unsigned int str_id; // id of the stream where key comes from
95 
96 public:
97  merge_key() : str_id(0) {}
98 
99  merge_key(const KEY &x, const unsigned int sid) : k(x), str_id(sid) {}
100 
102 
103  void set(const KEY &x, const unsigned int sid)
104  {
105  k = x;
106  str_id = sid;
107  }
108  KEY key() const { return k; }
109  unsigned int stream_id() const { return str_id; }
110  KEY getPriority() const { return k; }
111 
112  friend ostream &operator<<(ostream &s, const merge_key<KEY> &x)
113  {
114  return s << "<str_id=" << x.str_id << "> " << x.k << " ";
115  }
116  friend int operator<(const merge_key &x, const merge_key &y)
117  {
118  return (x.k < y.k);
119  }
120  friend int operator<=(const merge_key &x, const merge_key &y)
121  {
122  return (x.k <= y.k);
123  }
124  friend int operator>(const merge_key &x, const merge_key &y)
125  {
126  return (x.k > y.k);
127  }
128  friend int operator>=(const merge_key &x, const merge_key &y)
129  {
130  return (x.k >= y.k);
131  }
132  friend int operator!=(const merge_key &x, const merge_key &y)
133  {
134  return (x.k != y.k);
135  }
136  friend int operator==(const merge_key &x, const merge_key &y)
137  {
138  return (x.k == y.k);
139  }
140  friend merge_key operator+(const merge_key &x, const merge_key &y UNUSED)
141  {
142  assert(0);
143  return x;
144  // Key sum = x.k + y.k;
145  // merge_key f(sum, x.str_id);
146  // return f;
147  }
148 };
149 
150 /*****************************************************************
151  *****************************************************************
152  *****************************************************************
153 
154  external_memory buffer
155 
156  Each level-i buffer can store up to <arity>^i * <basesize> items,
157  where typically <arity> is \theta(m) and <basesize> is \theta(M);
158  therefore log_m{n/m} buffers are needed to store N items, one
159  buffer for each level 1..log_m{n/m}. All buffers must have same
160  values or <arity> and <basesize>.
161 
162  Functionality:
163 
164  A level-i on-disk buffer stores <arity>^i * <basesize> items of
165  data, organized in <arity> streams of <arity>^{i-1} items each;
166  <basesize> is same for all buffers and equal to the size of the
167  level 0 buffer (in memory buffer).
168 
169  Invariant: all the <arity> streams of a level-i buffer are in
170  sorted order; in this way sorting the buffer is done by merging the
171  <arity> streams in linear time.
172 
173  Items are inserted in level i-buffer only a whole stream at a time
174  (<arity>^{i-1}*<basesize> items). When all the <arity> streams of
175  the buffer are full, the buffer is sorted and emptied into a stream
176  of a level (i+1)-buffer.
177 
178  The <arity> streams of a buffer are allocated contigously from left
179  to r ight. The unused streams are NULL; The buffer keeps the index of
180  the last used(non-NULL) stream. When a buffer becomes full and is
181  empty, all its buffers are set to NULL.
182 
183  *****************************************************************
184  *****************************************************************
185  ***************************************************************** */
186 
187 /* T is a type with priority of type K and method getPriority() */
188 template <class T, class Key>
189 class em_buffer {
190 private:
191  // number of streams in a buffer;
192  unsigned int arity;
193 
194  // level of buffer: between 1 and log_arity{n/arity}; (level-0 buffer
195  // has a slightly different behaviour so it is implemented as a
196  // different class <im_buffer>)
197  unsigned short level;
198 
199  // level-i buffer contains m streams of data, each of size
200  // arity^{i-1}*basesize;
201  AMI_STREAM<T> **data;
202 
203  // the buffers can be depleted to fill the internal pq;
204  // keep an array which counts, for each stream, how many elements
205  // have been deleted (implicitly from the beginning of stream)
206  long *deleted;
207 
208  // nb of items in each substream; this can be found out by calling
209  // stream_len() on the stream, but it is more costly esp in the case
210  // when streams are on disk and must be moved in and out just to find
211  // stream length; streamsize is set only at stream creation, and the
212  // actual size must subtract the number of iteme deleted from the
213  // bos
214  unsigned long *streamsize;
215 
216  // index of the next available(empty) stream (out of the total m
217  // streams in the buffer);
218  unsigned int index;
219 
220  // nb of items in a stream of level_1 buffer
221  unsigned long basesize;
222 
223 public:
224  // create a level-i buffer of given basesize;
225  em_buffer(const unsigned short i, const unsigned long bs,
226  const unsigned int ar);
227 
228  // copy constructor;
229  em_buffer(const em_buffer &buf);
230 
231  // free the stream array and the streams pointers
232  ~em_buffer();
233 
234  // return the level of the buffer;
235  unsigned short get_level() const { return level; }
236 
237  // return the ith stream (load stream in memory)
238  AMI_STREAM<T> *get_stream(unsigned int i);
239 
240  // return a pointer to the streams of the buffer (loads streams in
241  // memory)
243 
244  // put the ith stream back to disk
245  void put_stream(unsigned int i);
246 
247  // called in pair with get_streams to put all streams back to disk
248  void put_streams();
249 
250  // return a pointer to the array of deletion count for each stream
251  long *get_bos() const { return deleted; }
252 
253  // return the index of the last stream in buffer which contains data;
254  unsigned int laststream() const { return index - 1; }
255 
256  // return the index of the next available stream in the buffer
257  unsigned int nextstream() const { return index; }
258 
259  // increment the index of the next available stream in the buffer
260  void incr_nextstream() { ++index; }
261 
262  // return nb of (non-empty) streams in buffer
263  unsigned int get_nbstreams() const { return index; }
264 
265  // return arity
266  unsigned int get_arity() const { return arity; }
267 
268  // return total nb of deleted elements in all active streams of the buffer
269  long total_deleted() const
270  {
271  long tot = 0;
272  for (unsigned int i = 0; i < index; i++) {
273  tot += deleted[i];
274  }
275  return tot;
276  }
277 
278  // mark as deleted one more element from i'th stream
279  void incr_deleted(unsigned int i)
280  {
281  assert(i < index);
282  deleted[i]++;
283  }
284 
285  // return the nominal size of a stream (nb of items):
286  // arity^{level-1}*basesize;
287  unsigned long get_stream_maxlen() const
288  {
289  return (unsigned long)pow((double)arity, (double)level - 1) * basesize;
290  }
291 
292  // return the actual size of stream i; i must be the index of a valid
293  // stream
294  unsigned long get_stream_len(unsigned int i)
295  {
296  // assert(i>= 0 && i<index);
297  return streamsize[i] - deleted[i];
298  }
299 
300  // return the total current size of the buffer; account for the
301  // deleted elements;
302  unsigned long get_buf_len()
303  {
304  unsigned long tot = 0;
305  for (unsigned int i = 0; i < index; i++) {
306  tot += get_stream_len(i);
307  }
308  return tot;
309  }
310 
311  // return the total maximal capacity of the buffer
312  unsigned long get_buf_maxlen() { return arity * get_stream_maxlen(); }
313 
314  // return true if buffer is empty (all streams are empty)
315  bool is_empty() { return ((nextstream() == 0) || (get_buf_len() == 0)); }
316 
317  // return true if buffer is full(all streams are full)
318  bool is_full() const { return (nextstream() == arity); }
319 
320  // reset
321  void reset();
322 
323  // clean buffer: in case some streams have been emptied by deletion
324  // delete them and shift streams left;
325  void cleanup();
326 
327  // create and return a stream which contains all elements of all
328  // streams of the buffer in sorted ascending order of their
329  // keys(priorities);
330  AMI_STREAM<T> *sort();
331 
332  // insert an array into the buffer; can only insert one
333  // level-i-full-stream-len nb of items at a time; assume the length
334  // of the array is precisely the streamlen of level-i buffer n =
335  // (pow(arity,level-1)*basesize); assume array is sorted; return the
336  // number of items actually inserted
337  long insert(T *a, long n);
338 
339  // insert a stream into the buffer; assume the length of the stream
340  // is precisely the streamlen of level-i buffer n =
341  // (pow(arity,level-1)*basesize); the <nextstream> pointer of buffer
342  // is set to point to the argument stream; (in this way no stream
343  // copying is done, just one pointer copy). The user should be aware
344  // the the argument stream is 'lost' - that is a stream cannot be
345  // inserted repeatedly into many buffers because this would lead to
346  // several buffers pointing to the same stream.
347 
348  // stream is assumed sorted; bos = how many elements are deleted
349  // from the beginning of stream;
350 
351  // return the number of items actually inserted
352  long insert(AMI_STREAM<T> *str, long bos = 0);
353 
354  // print range of elements in buffer
355  void print_range();
356 
357  // print all elements in buffer
358  void print();
359 
360  // prints the sizes of the streams in the buffer
361  void print_stream_sizes();
362 
363  // print the elements in the buffer
364  friend ostream &operator<<(ostream &s, em_buffer &b)
365  {
366  s << "BUFFER_" << b.level << ": ";
367  if (b.index == 0) {
368  s << "[]";
369  }
370  s << "\n";
371  b.get_streams();
372  for (unsigned int i = 0; i < b.index; i++) {
373  b.print_stream(s, i);
374  }
375  b.put_streams();
376  return s;
377  }
378 
379 private:
380  // merge the input streams; there are <arity> streams in total;
381  // write output in <outstream>; the input streams are assumed sorted
382  // in increasing order of their keys;
383  AMI_err substream_merge(AMI_STREAM<T> **instreams, unsigned int arity,
384  AMI_STREAM<T> *outstream);
385 
386  // print to stream the elements in i'th stream
387  void print_stream(ostream &s, unsigned int i);
388 
389 #ifdef SAVE_MEMORY
390  // array of names of streams;
391  char **name;
392 
393  // return the designated name for stream i
394  char *get_stream_name(unsigned int i) const;
395 
396  // print all stream names in buffer
397  void print_stream_names();
398 
399  // checks that name[i] is the same as stream name; stream i must be in
400  // memory (by a previous get_stream call, for instance) in order to
401  // find its length
402  void check_name(unsigned int i);
403 #endif
404 };
405 
406 /************************************************************/
407 // create a level-i buffer of given basesize;
408 template <class T, class Key>
409 em_buffer<T, Key>::em_buffer(const unsigned short i, const unsigned long bs,
410  const unsigned int ar)
411  : arity(ar), level(i), basesize(bs)
412 {
413 
414  assert(level >= 1);
415 
416  char str[100];
417  snprintf(str, sizeof(str),
418  "em_buffer: allocate %d AMI_STREAM*, total %ld\n", arity,
419  (long)(arity * sizeof(AMI_STREAM<T> *)));
420  MEMORY_LOG(str);
421  // allocate STREAM* array
422  data = new AMI_STREAM<T> *[arity];
423 
424  // allocate deleted array
425  snprintf(str, sizeof(str), "em_buffer: allocate deleted array: %ld\n",
426  (long)(arity * sizeof(long)));
427  MEMORY_LOG(str);
428  deleted = new long[arity];
429 
430  // allocate streamsize array
431  snprintf(str, sizeof(str), "em_buffer: allocate streamsize array: %ld\n",
432  (long)(arity * sizeof(long)));
433  MEMORY_LOG(str);
434  streamsize = new unsigned long[arity];
435 
436 #ifdef SAVE_MEMORY
437  // allocate name array
438  snprintf(str, sizeof(str), "em_buffer: allocate name array: %ld\n",
439  (long)(arity * sizeof(char *)));
440  MEMORY_LOG(str);
441  name = new char *[arity];
442  assert(name);
443 #endif
444 
445  // assert data
446  if ((!data) || (!deleted) || (!streamsize)) {
447  cerr << "em_buffer: cannot allocate\n";
448  exit(1);
449  }
450 
451  // initialize the <arity> streams to NULL, deleted[], streamsize[]
452  // and name[]
453  for (unsigned int ui = 0; ui < arity; ui++) {
454  data[ui] = NULL;
455  deleted[ui] = 0;
456  streamsize[ui] = 0;
457 #ifdef SAVE_MEMORY
458  name[ui] = NULL;
459 #endif
460  }
461  // set index
462  index = 0;
463 
464 #ifdef SAVE_MEMORY
465  // streams_in_memory = false;
466 #endif
467 }
468 
469 /************************************************************/
470 // copy constructor;
471 template <class T, class Key>
473  : level(buf.level), basesize(buf.basesize), index(buf.index),
474  arity(buf.arity)
475 {
476 
477  assert(0); // should not get called
478 
479  MEMORY_LOG("em_buffer: copy constr start\n");
480  get_streams();
481  for (unsigned int i = 0; i < index; i++) {
482  assert(data[i]);
483  delete data[i]; // delete old stream if existing
484  data[i] = NULL;
485 
486  // call copy constructor; i'm not sure that it actually duplicates
487  // the stream and copies the data; should that in the BTE
488  // sometimes..
489  data[i] = new AMI_STREAM<T>(*buf.data[i]);
490  deleted[i] = buf.deleted[i];
491  streamsize[i] = buf.streamsize[i];
492 #ifdef SAVE_MEMORY
493  assert(name[i]);
494  delete name[i];
495  name[i] = NULL;
496  name[i] = buf.name[i];
497 #endif
498  }
499  put_streams();
500  MEMORY_LOG("em_buffer: copy constr end\n");
501 }
502 
503 /************************************************************/
504 // free the stream array and the streams pointers
505 template <class T, class Key>
507 {
508 
509  assert(data);
510  // delete the m streams in the buffer
511  get_streams();
512  for (unsigned int i = 0; i < index; i++) {
513  assert(data[i]);
514 #ifdef SAVE_MEMORY
515  check_name(i);
516  delete name[i];
517 #endif
518  delete data[i];
519  data[i] = NULL;
520  }
521 
522  delete[] data;
523  delete[] deleted;
524  delete[] streamsize;
525 #ifdef SAVE_MEMORY
526  delete[] name;
527 #endif
528 }
529 
530 #ifdef SAVE_MEMORY
531 /************************************************************/
532 // checks that name[i] is the same as stream name; stream i must be in
533 // memory (by a previous get_stream call, for instance) in order to
534 // find its length
535 template <class T, class Key>
536 void em_buffer<T, Key>::check_name(unsigned int i UNUSED)
537 {
538 
539 #ifdef EMBUF_CHECK_NAME
540  assert(i >= 0 && i < index);
541  assert(data[i]);
542 
543  char *fooname;
544  data[i]->name(&fooname); // name() allocates the string
545 #ifdef EMBUF_CHECK_NAME_PRINT
546  cout << "::check_name: checking stream [" << level << "," << i
547  << "] name:" << fooname << endl;
548  cout.flush();
549 #endif
550  if (strcmp(name[i], fooname) != 0) {
551  cerr << "name[" << i << "]=" << name[i] << ", streamname=" << fooname
552  << endl;
553  }
554  assert(strcmp(fooname, name[i]) == 0);
555  delete fooname;
556 #endif
557 }
558 #endif
559 
560 /************************************************************/
561 // if SAVE_MEMORY flag is set, load the stream in memory; return the
562 // ith stream
563 template <class T, class Key>
565 {
566 
567  assert(i < index);
568 
569 #ifdef SAVE_MEMORY
570  MY_LOG_DEBUG_ID("em_buffer::get_stream");
571  MY_LOG_DEBUG_ID(i);
572 
573  if (data[i] == NULL) {
574 
575  // stream is on disk, load it in memory
576  assert(name[i]);
577  MY_LOG_DEBUG_ID("load stream in memory");
578  MY_LOG_DEBUG_ID(name[i]);
579 
580 #ifdef EMBUF_PRINT_GETPUT_STREAM
581  cout << "get_stream:: name[" << i << "]=" << name[i] << " from disk\n";
582  cout.flush();
583 #endif
584 
585  // assert that file exists
586  FILE *fp;
587  if ((fp = fopen(name[i], "rb")) == NULL) {
588  cerr << "get_stream: checking that stream " << name[i]
589  << "exists\n";
590  perror(name[i]);
591  assert(0);
592  exit(1);
593  }
594  fclose(fp);
595 
596  // create an AMI_STREAM from file
597  data[i] = new AMI_STREAM<T>(name[i]);
598  assert(data[i]);
599  }
600  else {
601 
602  // if data[i] not NULL, stream must be already in memory
603  MY_LOG_DEBUG_ID("stream not NULL");
604  MY_LOG_DEBUG_ID(data[i]->sprint());
605  }
606 #endif
607 
608  // NOW STREAM IS IN MEMORY
609 
610  // some assertion checks
611  assert(data[i]);
612  assert(data[i]->stream_len() == streamsize[i]);
613 #ifdef SAVE_MEMORY
614  check_name(i);
615 #endif
616 
617  return data[i];
618 }
619 
620 /************************************************************/
621 // if SAVE_MEMORY flag is set, put the i'th stream back on disk
622 template <class T, class Key>
623 void em_buffer<T, Key>::put_stream(unsigned int i)
624 {
625 
626  assert(i < index);
627 
628 #ifdef SAVE_MEMORY
629  MY_LOG_DEBUG_ID("em_buffer::put_stream");
630  MY_LOG_DEBUG_ID(i);
631 
632  if (data[i] != NULL) {
633 
634  // stream is in memory, put it on disk
635  MY_LOG_DEBUG_ID("stream put to disk");
636  MY_LOG_DEBUG_ID(data[i]->sprint());
637 
638  check_name(i);
639 #ifdef EMBUF_PRINT_GETPUT_STREAM
640  cout << "put_stream:: name[" << i << "]=" << name[i] << " to disk\n";
641  cout.flush();
642 #endif
643 
644  // make stream persistent and delete it
645  data[i]->persist(PERSIST_PERSISTENT);
646  delete data[i];
647  data[i] = NULL;
648  }
649  else {
650 
651  // data[i] is NULL, so stream must be already put on disk
652  MY_LOG_DEBUG_ID("stream is NULL");
653  }
654 #endif
655 }
656 
657 /************************************************************/
658 // return a pointer to the streams of the buffer
659 template <class T, class Key>
661 {
662 
663 #ifdef SAVE_MEMORY
664  MY_LOG_DEBUG_ID("em_buffer::get_streams: reading streams from disk");
665 #ifdef EMBUF_PRINT_GETPUT_STREAMS
666  cout << "em_buffer::get_streams (buffer " << level << ")";
667  cout << ": index = " << index << "(arity=" << arity << ")\n";
668  cout.flush();
669 #endif
670 
671  for (unsigned int i = 0; i < index; i++) {
672  get_stream(i);
673  assert(data[i]);
674  }
675 
676 #endif
677 
678  return data;
679 }
680 
681 /************************************************************/
682 // called in pair with load_streams to store streams on disk
683 // and release the memory
684 template <class T, class Key>
686 {
687 
688 #ifdef SAVE_MEMORY
689  MY_LOG_DEBUG_ID("em_buffer::put_streams: writing streams on disk");
690 #ifdef EMBUF_PRINT_GETPUT_STREAMS
691  cout << "em_buffer::put_streams (buffer " << level << ")";
692  cout << ": index = " << index << "(arity=" << arity << ")\n";
693  cout.flush();
694 #endif
695 
696  for (unsigned int i = 0; i < index; i++) {
697  put_stream(i);
698  assert(data[i] == NULL);
699  }
700 #endif
701 }
702 
703 #ifdef SAVE_MEMORY
704 /************************************************************/
705 // return the name of the ith stream
706 template <class T, class Key>
707 char *em_buffer<T, Key>::get_stream_name(unsigned int i) const
708 {
709 
710  assert(i >= 0 && i < index);
711  assert(name[i]);
712  return name[i];
713 }
714 #endif
715 
716 #ifdef SAVE_MEMORY
717 /************************************************************/
718 template <class T, class Key>
720 {
721  unsigned int i;
722  for (i = 0; i < index; i++) {
723  assert(name[i]);
724  cout << "stream " << i << ": " << name[i] << endl;
725  }
726  cout.flush();
727 }
728 #endif
729 
730 /************************************************************/
731 // clean buffer in case some streams have been emptied by deletion
732 template <class T, class Key>
734 {
735 
736  MY_LOG_DEBUG_ID("em_buffer::cleanup()");
737 #ifdef EMBUF_CLEANUP_PRINT
738 #ifdef SAVE_MEMORY
739  if (index > 0) {
740  cout << "before cleanup:\n";
741  print_stream_names();
742  print_stream_sizes();
743  cout.flush();
744  }
745 #endif
746 #endif
747 
748  // load all streams in memory
749  get_streams();
750 
751  // count streams of size=0
752  unsigned int i, empty = 0;
753  for (i = 0; i < index; i++) {
754 
755  if (get_stream_len(i) == 0) {
756  // printing..
757 #ifdef EMBUF_DELETE_STREAM_PRINT
758  cout << "deleting stream [" << level << "," << i << "]:";
759 #ifdef SAVE_MEMORY
760  cout << name[i];
761 #endif
762  cout << endl;
763  cout.flush();
764 #endif
765 
766 #ifdef SAVE_MEMORY
767  // stream is empty ==> delete its name
768  assert(name[i]);
769  delete name[i];
770  name[i] = NULL;
771 #endif
772 
773  // stream is empty ==> reset data
774  assert(data[i]);
775  // data[i]->persist(PERSIST_DELETE); //this is done automatically..
776  delete data[i];
777  data[i] = NULL;
778  deleted[i] = 0;
779  streamsize[i] = 0;
780  empty++;
781  }
782  }
783  // streams are in memory; all streams which are NULL must have been
784  // deleted
785 
786  // shift streams to the left in case holes were introduced
787  unsigned int j = 0;
788  if (empty) {
789 #ifdef EMBUF_DELETE_STREAM_PRINT
790  cout << "em_buffer::cleanup: shifting streams\n";
791  cout.flush();
792 #endif
793  for (i = 0; i < index; i++) {
794  // if i'th stream is not empty, shift it left if necessary
795  if (data[i]) {
796  if (i != j) {
797  // set j'th stream to point to i'th stream
798  // cout << j << " set to " << i << endl; cout.flush();
799  data[j] = data[i];
800  deleted[j] = deleted[i];
801  streamsize[j] = streamsize[i];
802  // set i'th stream to point to NULL
803  data[i] = NULL;
804  deleted[i] = 0;
805  streamsize[i] = 0;
806 #ifdef SAVE_MEMORY
807  // fix the names
808  /* already done assert(name[j]); */
809  /* delete name[j]; */
810  name[j] = name[i];
811  name[i] = NULL;
812  check_name(j);
813 #endif
814  }
815  else {
816  // cout << i << " left the same" << endl;
817  }
818  j++;
819  } // if data[i] != NULL
820  } // for i
821 
822  // set the index
823  assert(index == j + empty);
824  index = j;
825 
826 #ifdef EMBUF_DELETE_STREAM_PRINT
827  cout << "em_buffer::cleanup: index set to " << index << endl;
828  cout.flush();
829 #endif
830  } // if empty
831 
832  // put streams back to disk
833  put_streams();
834 
835 #ifdef EMBUF_CLEANUP_PRINT
836 #ifdef SAVE_MEMORY
837  if (index > 0) {
838  cout << "after cleanup:\n";
839  print_stream_names();
840  print_stream_sizes();
841  cout.flush();
842  }
843 #endif
844 #endif
845 }
846 
847 /************************************************************/
848 // delete all streams
849 template <class T, class Key>
851 {
852 
853  get_streams();
854 
855  // make streams not-persistent and delete them
856  for (unsigned int i = 0; i < index; i++) {
857  assert(data[i]);
858  assert(streamsize[i] == data[i]->stream_len());
859 #ifdef SAVE_MEMORY
860  check_name(i);
861  assert(name[i]);
862  delete name[i];
863  name[i] = NULL;
864 #endif
865 
866  data[i]->persist(PERSIST_DELETE);
867 
868  delete data[i];
869  data[i] = NULL;
870  deleted[i] = 0;
871  streamsize[i] = 0;
872  }
873 
874  index = 0;
875 }
876 
877 /************************************************************/
878 // create and return a stream which contains all elements of
879 // all streams of the buffer in sorted ascending order of
880 // their keys (priorities);
881 template <class T, class Key>
883 {
884 
885  // create stream
886  MEMORY_LOG("em_buffer::sort: allocate new AMI_STREAM\n");
887 
888  AMI_STREAM<T> *sorted_stream =
889  new AMI_STREAM<T>(); /* will be deleted in insert() */
890  assert(sorted_stream);
891 
892  // merge the streams into sorted stream
893  AMI_err aerr;
894  // Key dummykey;
895  // must modify this to seek after deleted[i] elements!!!!!!!!!!!!!
896  // aerr = MIAMI_single_merge_Key(data, arity, sorted_stream,
897  // 0, dummykey);
898  // could not use AMI_merge so i had to write my own..
899 
900  get_streams();
901 
902  aerr = substream_merge(data, arity, sorted_stream);
903  assert(aerr == AMI_ERROR_NO_ERROR);
904 
905  put_streams();
906 
907  return sorted_stream;
908 }
909 
910 /************************************************************/
911 /* merge the input streams; there are <arity> streams in total; write
912  output in <outstream>;
913 
914  the input streams are assumed sorted in increasing order of their
915  keys;
916 
917  assumes the instreams are in memory (no need for get_streams()) */
918 template <class T, class Key>
920  unsigned int arity,
921  AMI_STREAM<T> *outstream)
922 {
923 
924  unsigned int i, j;
925 
926  // some assertion checks
927  assert(instreams);
928  assert(outstream);
929  for (i = 0; i < arity; i++) {
930  assert(instreams[i]);
931 #ifdef SAVE_MEMORY
932  check_name(i);
933 #endif
934  }
935 
936  std::vector<T *> in_objects(
937  arity); // pointers to current leading elements of streams
938  AMI_err ami_err;
939 
940  char str[200];
941  snprintf(str, sizeof(str),
942  "em_buffer::substream_merge: allocate keys array, total %ldB\n",
943  (long)((long)arity * sizeof(merge_key<Key>)));
944  MEMORY_LOG(str);
945 
946  // keys array is initialized with smallest key from each stream (only
947  // non-null keys must be included)
948  merge_key<Key> *keys;
949  // merge_key<Key>* keys = new (merge_key<Key>)[arity];
950  typedef merge_key<Key> footype;
951  keys = new footype[arity];
952  assert(keys);
953 
954  // count number of non-empty streams
955  j = 0;
956  // rewind and read the first item from every stream initializing
957  // in_objects and keys
958  for (i = 0; i < arity; i++) {
959  assert(instreams[i]);
960  // rewind stream
961  if ((ami_err = instreams[i]->seek(deleted[i])) != AMI_ERROR_NO_ERROR) {
962  return ami_err;
963  }
964  // read first item from stream
965  if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
967  if (ami_err == AMI_ERROR_END_OF_STREAM) {
968  in_objects[i] = NULL;
969  }
970  else {
971  return ami_err;
972  }
973  }
974  else {
975  // include this key in the array of keys
976  Key k = in_objects[i]->getPriority();
977  keys[j].set(k, i);
978  j++;
979  }
980  }
981  unsigned int NonEmptyRuns = j;
982 
983  // build heap from the array of keys
984  pqheap_t1<merge_key<Key>> mergeheap(keys, NonEmptyRuns);
985 
986  // repeatedly extract_min from heap, write it to output stream and
987  // insert next element from same stream
988  merge_key<Key> minelt;
989  // rewind output buffer
990  ami_err = outstream->seek(0);
991  assert(ami_err == AMI_ERROR_NO_ERROR);
992  while (!mergeheap.empty()) {
993  // find min key and id of the stream from whereit comes
994  mergeheap.min(minelt);
995  i = minelt.stream_id();
996  // write min item to output stream
997  if ((ami_err = outstream->write_item(*in_objects[i])) !=
999  return ami_err;
1000  }
1001  // read next item from same input stream
1002  if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
1004  if (ami_err != AMI_ERROR_END_OF_STREAM) {
1005  return ami_err;
1006  }
1007  }
1008  // extract the min from the heap and insert next key from same stream
1009  if (ami_err == AMI_ERROR_END_OF_STREAM) {
1010  mergeheap.delete_min();
1011  }
1012  else {
1013  Key k = in_objects[i]->getPriority();
1014  merge_key<Key> nextit(k, i);
1015  mergeheap.delete_min_and_insert(nextit);
1016  }
1017  } // while
1018 
1019  // delete [] keys;
1020  //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
1021  // DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT) IF
1022  // I DELETE KEYS EXPLICITLY, THEY WILL BE DELETED AGAIN BY DESTRUCTOR,
1023  // AND EVERYTHING SCREWS UP..
1024 
1025  return AMI_ERROR_NO_ERROR;
1026 }
1027 
1028 /************************************************************/
1029 // insert an array into the buffer; assume array is sorted; return the
1030 // number of items actually inserted; if SAVE_MEMORY FLAG is on, put
1031 // stream on disk and release its memory
1032 template <class T, class Key>
1033 long em_buffer<T, Key>::insert(T *a, long n)
1034 {
1035 
1036  assert(a);
1037 
1038  if (is_full()) {
1039  cout << "em_buffer::insert: buffer full\n";
1040  return 0;
1041  }
1042 
1043  // can only insert one full stream at a time
1044  // relaxed..
1045  // assert(n == get_stream_maxlen());
1046 
1047  // create the stream
1048  MEMORY_LOG("em_buffer::insert(from array): allocate AMI_STREAM\n");
1049  AMI_STREAM<T> *str = new AMI_STREAM<T>();
1050  assert(str);
1051 
1052  // write the array to stream
1053  AMI_err ae;
1054  for (long i = 0; i < n; i++) {
1055  ae = str->write_item(a[i]);
1056  assert(ae == AMI_ERROR_NO_ERROR);
1057  }
1058  assert(n == str->stream_len());
1059 
1060  // insert the stream in the buffer
1061  return insert(str);
1062 }
1063 
1064 /************************************************************/
1065 /* insert a stream into the buffer; the next free entry in the buffer
1066  is set to point to the stream; if SAVE_MEMORY flag is on, the
1067  stream is put to disk;
1068 
1069  the <nextstream> pointer of buffer is set to point to the argument
1070  stream; (in this way no stream copying is done, just one pointer
1071  copy). The user should be aware the the argument stream is 'lost' -
1072  that is a stream cannot be inserted repeatedly into many buffers
1073  because this would lead to several buffers pointing to the same
1074  stream.
1075 
1076  stream is assume stream is sorted; bos = how many elements must be
1077  skipped (were deleted) from the beginning of stream;
1078 
1079  return the number of items actually inserted */
1080 template <class T, class Key>
1082 {
1083 
1084  assert(str);
1085 
1086  if (is_full()) {
1087  cout << "em_buffer::insert: buffer full\n";
1088  return 0;
1089  }
1090 
1091  // can only insert one level-i-full-stream at a time;
1092  // relaxed..can specify bos;
1093  // not only that, but the length of the stream can be smaller
1094  // than nominal length, because a stream is normally obtained by
1095  // merging streams which can be shorter;
1096  // assert(str->stream_len() == get_stream_len() - bos);
1097 
1098 #ifdef EMBUF_CHECK_INSERT
1099  // check that stream is sorted
1100  cout << "CHECK_INSERT: checking stream is sorted\n";
1101  AMI_err ae;
1102  str->seek(0);
1103  T *crt = NULL, *prev = NULL;
1104  while (str->read_item(&crt)) {
1105  assert(ae == AMI_ERROR_NO_ERROR);
1106  if (prev)
1107  assert(*prev <= *crt);
1108  }
1109 #endif
1110 
1111  // nextstream must be empty
1112  assert(str);
1113  assert(data[nextstream()] == NULL);
1114  assert(deleted[nextstream()] == 0);
1115  assert(streamsize[nextstream()] == 0);
1116 #ifdef SAVE_MEMORY
1117  assert(name[nextstream()] == NULL);
1118 #endif
1119 
1120  // set next entry i the buffer to point to this stream
1121  data[nextstream()] = str;
1122  deleted[nextstream()] = bos;
1123  streamsize[nextstream()] = str->stream_len();
1124 #ifdef SAVE_MEMORY
1125  // set next name entry in buffer to point to this stream's name
1126  char *s;
1127  str->name(&s); // name() allocates the string
1128  name[nextstream()] = s;
1129 
1130  // put stream on disk and release its memory
1132  delete str; // stream should be persistent; just delete it
1133  data[nextstream()] = NULL;
1134 
1135 #ifdef EMBUF_PRINT_INSERT
1136  cout << "insert stream " << s << " at buf [" << level << "," << nextstream()
1137  << "]" << endl;
1138 #endif
1139 #endif
1140 
1141  // increment the index of next available stream in buffer
1142  incr_nextstream();
1143 
1144 #ifdef EMBUF_PRINT_INSERT
1145  print_stream_sizes();
1146  print_stream_names();
1147 #endif
1148 
1149 #ifdef SAVE_MEMORY
1150  MY_LOG_DEBUG_ID("em_buffer::insert(): inserted stream ");
1151  MY_LOG_DEBUG_ID(name[nextstream() - 1]);
1152 #endif
1153 
1154  // return nb of items inserted
1155  return get_stream_len(nextstream() - 1);
1156 }
1157 
1158 /************************************************************/
1159 // print the elements of the i'th stream of the buffer to a stream;
1160 // assumes stream is in memory;
1161 template <class T, class Key>
1162 void em_buffer<T, Key>::print_stream(ostream &s, unsigned int i)
1163 {
1164 
1165  assert(data[i]);
1166  assert((i >= 0) && (i < index));
1167 
1168  AMI_err ae;
1169  T *x;
1170 
1171  s << "STREAM " << i << ": [";
1172 
1173  ae = data[i]->seek(deleted[i]);
1174  assert(ae == AMI_ERROR_NO_ERROR);
1175 
1176  for (long j = 0; j < get_stream_len(i); j++) {
1177  ae = data[i]->read_item(&x);
1178  assert(ae == AMI_ERROR_NO_ERROR);
1179  s << *x << ",";
1180  }
1181  s << "]\n";
1182 }
1183 
1184 /************************************************************/
1185 // print elements range in buffer (read first and last element in each
1186 // substream and find global min and max)
1187 template <class T, class Key>
1189 {
1190 
1191  T *min, *max;
1192  AMI_err ae;
1193 
1194  get_streams();
1195 
1196  for (unsigned int i = 0; i < index; i++) {
1197  cout << "[";
1198  // read min element in substream i
1199  ae = data[i]->seek(deleted[i]);
1200  assert(ae == AMI_ERROR_NO_ERROR);
1201  ae = data[i]->read_item(&min);
1202  assert(ae == AMI_ERROR_NO_ERROR);
1203  cout << min->getPriority() << "..";
1204  // read max element in substream i
1205  ae = data[i]->seek(streamsize[i] - 1);
1206  assert(ae == AMI_ERROR_NO_ERROR);
1207  ae = data[i]->read_item(&max);
1208  assert(ae == AMI_ERROR_NO_ERROR);
1209  cout << max->getPriority() << " (sz=" << get_stream_len(i) << ")] ";
1210  }
1211  for (unsigned int i = index; i < arity; i++) {
1212  cout << "[] ";
1213  }
1214 
1215  put_streams();
1216 }
1217 
1218 /************************************************************/
1219 // print all elements in buffer
1220 template <class T, class Key>
1222 {
1223 
1224  T *x;
1225  AMI_err ae;
1226 
1227  get_streams();
1228 
1229  for (unsigned int i = 0; i < index; i++) {
1230  cout << " [";
1231  ae = data[i]->seek(deleted[i]);
1232  assert(ae == AMI_ERROR_NO_ERROR);
1233  for (unsigned long j = 0; j < get_stream_len(i); j++) {
1234  ae = data[i]->read_item(&x);
1235  assert(ae == AMI_ERROR_NO_ERROR);
1236  cout << x->getPriority() << ",";
1237  }
1238  cout << "]" << endl;
1239  }
1240  for (unsigned int i = index; i < arity; i++) {
1241  cout << "[] ";
1242  }
1243 
1244  put_streams();
1245 }
1246 
1247 /************************************************************/
1248 // print the sizes of the substreams in the buffer
1249 template <class T, class Key>
1251 {
1252 
1253  cout << "(streams=" << index << ") sizes=[";
1254  for (unsigned int i = 0; i < arity; i++) {
1255  cout << get_stream_len(i) << ",";
1256  }
1257  cout << "]" << endl;
1258  cout.flush();
1259 }
1260 
1261 #endif
@ PERSIST_DELETE
Definition: ami_stream.h:114
@ PERSIST_PERSISTENT
Definition: ami_stream.h:116
AMI_err
Definition: ami_stream.h:83
@ AMI_ERROR_END_OF_STREAM
Definition: ami_stream.h:86
@ AMI_ERROR_NO_ERROR
Definition: ami_stream.h:84
#define NULL
Definition: ccmath.h:32
AMI_err write_item(const T &elt)
Definition: ami_stream.h:588
AMI_err name(char **stream_name)
Definition: ami_stream.h:426
AMI_err seek(off_t offset)
Definition: ami_stream.h:445
AMI_err read_item(T **elt)
Definition: ami_stream.h:525
off_t stream_len(void)
Definition: ami_stream.h:375
void persist(persistence p)
Definition: ami_stream.h:639
unsigned int laststream() const
Definition: embuffer.h:254
void cleanup()
Definition: embuffer.h:733
void incr_deleted(unsigned int i)
Definition: embuffer.h:279
long insert(T *a, long n)
Definition: embuffer.h:1033
void print_stream_sizes()
Definition: embuffer.h:1250
unsigned short get_level() const
Definition: embuffer.h:235
unsigned int nextstream() const
Definition: embuffer.h:257
void put_streams()
Definition: embuffer.h:685
void print_range()
Definition: embuffer.h:1188
em_buffer(const unsigned short i, const unsigned long bs, const unsigned int ar)
Definition: embuffer.h:409
unsigned long get_stream_len(unsigned int i)
Definition: embuffer.h:294
~em_buffer()
Definition: embuffer.h:506
long total_deleted() const
Definition: embuffer.h:269
AMI_STREAM< T > * sort()
Definition: embuffer.h:882
void reset()
Definition: embuffer.h:850
unsigned long get_buf_maxlen()
Definition: embuffer.h:312
friend ostream & operator<<(ostream &s, em_buffer &b)
Definition: embuffer.h:364
bool is_empty()
Definition: embuffer.h:315
unsigned long get_buf_len()
Definition: embuffer.h:302
unsigned long get_stream_maxlen() const
Definition: embuffer.h:287
bool is_full() const
Definition: embuffer.h:318
unsigned int get_nbstreams() const
Definition: embuffer.h:263
void incr_nextstream()
Definition: embuffer.h:260
AMI_STREAM< T > ** get_streams()
Definition: embuffer.h:660
long * get_bos() const
Definition: embuffer.h:251
void print()
Definition: embuffer.h:1221
void put_stream(unsigned int i)
Definition: embuffer.h:623
unsigned int get_arity() const
Definition: embuffer.h:266
AMI_STREAM< T > * get_stream(unsigned int i)
Definition: embuffer.h:564
friend int operator!=(const merge_key &x, const merge_key &y)
Definition: embuffer.h:132
KEY key() const
Definition: embuffer.h:108
friend int operator<=(const merge_key &x, const merge_key &y)
Definition: embuffer.h:120
merge_key()
Definition: embuffer.h:97
friend merge_key operator+(const merge_key &x, const merge_key &y UNUSED)
Definition: embuffer.h:140
KEY k
Definition: embuffer.h:93
void set(const KEY &x, const unsigned int sid)
Definition: embuffer.h:103
friend int operator==(const merge_key &x, const merge_key &y)
Definition: embuffer.h:136
~merge_key()
Definition: embuffer.h:101
unsigned int stream_id() const
Definition: embuffer.h:109
friend ostream & operator<<(ostream &s, const merge_key< KEY > &x)
Definition: embuffer.h:112
unsigned int str_id
Definition: embuffer.h:94
KEY getPriority() const
Definition: embuffer.h:110
friend int operator>=(const merge_key &x, const merge_key &y)
Definition: embuffer.h:128
merge_key(const KEY &x, const unsigned int sid)
Definition: embuffer.h:99
friend int operator<(const merge_key &x, const merge_key &y)
Definition: embuffer.h:116
friend int operator>(const merge_key &x, const merge_key &y)
Definition: embuffer.h:124
#define min(x, y)
Definition: draw2.c:29
#define max(x, y)
Definition: draw2.c:30
#define MY_LOG_DEBUG_ID(x)
Definition: embuffer.h:50
#define UNUSED
A macro for an attribute, if attached to a variable, indicating that the variable is not used.
Definition: gis.h:47
#define assert(condition)
Definition: lz4.c:291
void MEMORY_LOG(const std::string &str)
Definition: mm_utils.cpp:59
const char * name
Definition: named_colr.c:6
double b
Definition: r_raster.c:39
#define x