GRASS GIS 8 Programmer's Manual  8.5.0dev(2024)-d6dec75dd4
replacementHeapBlock.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 #ifndef REPLACEMENT_HEAPBLOCK_H
37 #define REPLACEMENT_HEAPBLOCK_H
38 
39 #include <assert.h>
40 
41 #include "mem_stream.h"
42 #include "replacementHeap.h"
43 
44 #define RBHEAP_DEBUG if (0)
45 
46 /*****************************************************************/
47 /* encapsulation of the element and the run it comes from;
48  */
49 template <class T>
51 public:
52  T value;
54 
56 
57  friend ostream &operator<<(ostream &s, const BlockHeapElement &p)
58  {
59  return s << "[" << p.value << "]";
60  };
61 };
62 
63 /*****************************************************************/
64 /*
65 This is a heap of HeapElements, i.e. elements which come from streams;
66 when an element is consumed, the heap knows how to replace it with the
67 next element from the same stream.
68 
69 Compare is a class that has a member function called "compare" which
70 is used to compare two elements of type T
71 */
72 template <class T, class Compare>
74 private:
75  BlockHeapElement<T> *mergeHeap; // the heap;
76  size_t arity; // max size
77  size_t size; // represents actual size, i.e. the nb of (non-empty)
78  // runs; they are stored contigously in the first
79  //<size> positions of mergeHeap; once a run becomes
80  // empty, it is deleted and size is decremented. size
81  // stores the next position where a HeapElement can be
82  // added.
83 
84 protected:
85  void heapify(size_t i);
86 
87  void buildheap();
88 
89  /* for each run in the heap, read an element from the run into the heap */
90  void init();
91 
92  /* add a run; make sure total nb of runs does not exceed heap arity */
93  void addRun(MEM_STREAM<T> *run);
94 
95  /* delete the i-th run (and the element); that is, swap the ith run
96  with the last one, and decrement size; just like in a heap, but
97  no heapify. Note: this function messes up the heap order. If the
98  user wants to maintain heap property should call heapify
99  specifically.
100  */
101  void deleteRun(size_t i);
102 
103 public:
104  // allocate array mergeHeap, where the streams are stored in runList
106 
107  // delete array mergeHeap
109 
110  // is heap empty?
111  int empty() const { return (size == 0); }
112 
113  // delete mergeHeap[0].value, replace it with the next element from
114  // the same stream, and re-heapify
115  T extract_min();
116 
117  ostream &print(ostream &s) const
118  {
119  s << "ReplacementheapBlock " << this << ": " << size << " runs";
120 #if (0)
121  char *runname;
122  off_t runlen;
123  for (size_t i = 0; i < size; i++) {
124  s << endl << " <- i=" << i << ": " << mergeHeap[i].run;
125  assert(mergeHeap[i].run);
126  mergeHeap[i].run->name(&runname);
127  runlen = mergeHeap[i].run->stream_len();
128  s << ", " << runname << ", len=" << runlen;
129  delete runname; // this should be safe
130  }
131 #endif
132  s << endl;
133  return s;
134  }
135 };
136 
137 /*****************************************************************/
138 // allocate array mergeHeap, where the streams are stored in runList
139 template <class T, class Compare>
141  queue<MEM_STREAM<T> *> *runList)
142 {
143 
144  RBHEAP_DEBUG cerr << "ReplacementHeapBlock " << endl;
145 
146  arity = runList->length();
147 
148  size = 0; // no run yet
149 
150  MEM_STREAM<T> *str;
151  mergeHeap = new BlockHeapElement<T>[arity];
152  for (unsigned int i = 0; i < arity; i++) {
153  // pop a stream from the list and add it to heap
154  str = NULL;
155  runList->dequeue(&str);
156  assert(str);
157  addRun(str);
158  }
159  init();
160 }
161 
162 /*****************************************************************/
163 template <class T, class Compare>
165 {
166 
167  if (!empty()) {
168  cerr << "warning: ~ReplacementHeapBlock: heap not empty!\n";
169  }
170  // delete the runs first
171  for (size_t i = 0; i < size; i++) {
172  if (mergeHeap[i].run)
173  delete mergeHeap[i].run;
174  }
175  delete[] mergeHeap;
176 }
177 
178 /*****************************************************************/
179 /* add a run; make sure total nb of runs does not exceed heap arity
180  */
181 template <class T, class Compare>
183 {
184 
185  assert(r);
186 
187  if (size == arity) {
188  cerr << "ReplacementHeapBlockBlock::addRun size =" << size
189  << ",arity=" << arity << " full, cannot add another run.\n";
190  assert(0);
191  exit(1);
192  }
193  assert(size < arity);
194 
195  mergeHeap[size].run = r;
196  size++;
197 
199  {
200  char *strname;
201  r->name(&strname);
202  cerr << "ReplacementHeapBlock::addRun added run " << strname
203  << " (rheap size=" << size << ")" << endl;
204  delete strname;
205  cerr.flush();
206  }
207 }
208 
209 /*****************************************************************/
210 /* delete the i-th run (and the value); that is, swap ith element with
211  the last one, and decrement size; just like in a heap, but no
212  heapify. Note: this function messes up the heap order. If the user
213  wants to maintain heap property should call heapify specifically.
214  */
215 template <class T, class Compare>
217 {
218 
219  assert(i < size && mergeHeap[i].run);
220 
222  {
223  cerr << "ReplacementHeapBlock::deleteRun deleting run " << i << ", "
224  << mergeHeap[i].run << endl;
225  print(cerr);
226  }
227 
228  // delete it
229  delete mergeHeap[i].run;
230  // and replace it with
231  if (size > 1) {
232  mergeHeap[i].value = mergeHeap[size - 1].value;
233  mergeHeap[i].run = mergeHeap[size - 1].run;
234  }
235  size--;
236 }
237 
238 /*****************************************************************/
239 /* for each run in the heap, read an element from the run into the
240  heap; if ith run is empty, delete it
241 */
242 template <class T, class Compare>
244 {
245  AMI_err err;
246  T *elt;
247  size_t i;
248 
249  RBHEAP_DEBUG cerr << "ReplacementHeapBlock::init ";
250 
251  i = 0;
252  while (i < size) {
253 
254  assert(mergeHeap[i].run);
255 
256  // Rewind run i
257  err = mergeHeap[i].run->seek(0);
258  if (err != AMI_ERROR_NO_ERROR) {
259  cerr << "ReplacementHeapBlock::Init(): cannot seek run " << i
260  << "\n";
261  assert(0);
262  exit(1);
263  }
264  // read first item from run i
265  err = mergeHeap[i].run->read_item(&elt);
266  if (err != AMI_ERROR_NO_ERROR) {
267  if (err == AMI_ERROR_END_OF_STREAM) {
268  deleteRun(i);
269  // need to iterate one more time with same i;
270  }
271  else {
272  cerr << "ReplacementHeapBlock::Init(): cannot read run " << i
273  << "\n";
274  assert(0);
275  exit(1);
276  }
277  }
278  else {
279  // copy.... can this be avoided? xxx
280  mergeHeap[i].value = *elt;
281 
282  i++;
283  }
284  }
285  buildheap();
286 }
287 
288 /*****************************************************************/
289 template <class T, class Compare>
291 {
292  size_t min_index = i;
293  size_t lc = rheap_lchild(i);
294  size_t rc = rheap_rchild(i);
295 
296  Compare cmpobj;
297  assert(i < size);
298  if ((lc < size) && (cmpobj.compare(mergeHeap[lc].value,
299  mergeHeap[min_index].value) == -1)) {
300  min_index = lc;
301  }
302  if ((rc < size) && (cmpobj.compare(mergeHeap[rc].value,
303  mergeHeap[min_index].value) == -1)) {
304  min_index = rc;
305  }
306 
307  if (min_index != i) {
308  BlockHeapElement<T> tmp = mergeHeap[min_index];
309 
310  mergeHeap[min_index] = mergeHeap[i];
311  mergeHeap[i] = tmp;
312 
313  heapify(min_index);
314  }
315 
316  return;
317 }
318 
319 /*****************************************************************/
320 template <class T, class Compare>
322 {
323 
324  if (size > 1) {
325  for (int i = rheap_parent(size - 1); i >= 0; i--) {
326  heapify(i);
327  }
328  }
329  RBHEAP_DEBUG cerr << "Buildheap done\n";
330  return;
331 }
332 
333 /*****************************************************************/
334 template <class T, class Compare>
336 {
337  T *elt, min;
338  AMI_err err;
339 
340  assert(!empty()); // user's job to check first if it's empty
341  min = mergeHeap[0].value;
342 
343  // read a new element from the same run
344  assert(mergeHeap[0].run);
345  err = mergeHeap[0].run->read_item(&elt);
346  if (err != AMI_ERROR_NO_ERROR) {
347  // if run is empty, delete it
348  if (err == AMI_ERROR_END_OF_STREAM) {
349  RBHEAP_DEBUG cerr << "rheap extract_min: run " << mergeHeap[0].run
350  << " empty. deleting\n ";
351  deleteRun(0);
352  }
353  else {
354  cerr << "ReplacementHeapBlock::extract_min: cannot read\n";
355  assert(0);
356  exit(1);
357  }
358  }
359  else {
360  // copy...can this be avoided?
361  mergeHeap[0].value = *elt;
362  }
363 
364  // restore heap
365  if (size > 0)
366  heapify(0);
367 
368  return min;
369 }
370 
371 #endif
AMI_err
Definition: ami_stream.h:83
@ AMI_ERROR_END_OF_STREAM
Definition: ami_stream.h:86
@ AMI_ERROR_NO_ERROR
Definition: ami_stream.h:84
void init(double work[])
Definition: as177.c:61
#define NULL
Definition: ccmath.h:32
MEM_STREAM< T > * run
friend ostream & operator<<(ostream &s, const BlockHeapElement &p)
void addRun(MEM_STREAM< T > *run)
ReplacementHeapBlock(queue< MEM_STREAM< T > * > *runList)
ostream & print(ostream &s) const
Definition: queue.h:43
#define min(x, y)
Definition: draw2.c:29
#define assert(condition)
Definition: lz4.c:291
double r
Definition: r_raster.c:39
#define RBHEAP_DEBUG
SYMBOL * err(FILE *fp, SYMBOL *s, char *msg)
Definition: symbol/read.c:216