GRASS GIS 8 Programmer's Manual  8.5.0dev(2024)-f63024f571
empq.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 #ifndef __EMPQ_H
37 #define __EMPQ_H
38 
39 #include <stdio.h>
40 #include <assert.h>
41 
42 #include "ami_config.h" //for SAVE_MEMORY
43 #include "ami_stream.h"
44 #include "mm.h"
45 #include "mm_utils.h" //for MEMORY_LOG, getAvailableMemory
46 #include "imbuffer.h"
47 #include "embuffer.h"
48 #include "pqheap.h"
49 #include "minmaxheap.h"
50 
51 template <class T, class Key>
53 #define ExtendedMergeStream AMI_STREAM<ExtendedEltMergeType<T, Key>>
54 
55 /**********************************************************
56  DEBUGGING FLAGS
57 ***********************************************************/
58 
59 // enables printing messages when buffers are emptied
60 // #define EMPQ_EMPTY_BUF_PRINT
61 
62 // enables printing when pq gets filled from buffers
63 // #define EMPQ_PQ_FILL_PRINT
64 
65 // enables printing inserts
66 // #define EMPQ_PRINT_INSERT
67 
68 // enables printing deletes
69 // #define EMPQ_PRINT_EXTRACTALL
70 
71 // enables printing the empq on insert/extract_all_min
72 // #define EMPQ_PRINT_EMPQ
73 
74 // enable printing the size of the EMPQ and nb of active streams
75 // on fillpq() amd on empty_buff_0
76 // #define EMPQ_PRINT_SIZE
77 
78 // enable printing 'fill pq from B0' in extract_min()
79 // #define EMPQ_PRINT_FILLPQ_FROM_BUFF0
80 
81 // enable expensive size asserts
82 // #define EMPQ_ASSERT_EXPENSIVE
83 
84 /**********************************************************/
85 
86 /* external memory priority queue
87 
88  Functionality:
89 
90  Keep a pqueue PQ of size \theta(M) in memory. Keep a buffer B0 of
91  size \theta(M) in memory. keep an array of external-memory
92  buffers, one for each level 1..log_m{n/m} (where N is the maximum
93  number of items in pqueue at any time).
94 
95  invariants:
96  1. PQ contains the smallest items in the structure.
97 
98  2. each stream of any external memory buffers is sorted in
99  increasing order.
100 
101  insert(x): if (x < maximum_item(PQ) exchange x with
102  maximum_item(PQ); if buffer B0 is full, empty it; insert x in B0;
103 
104  extract_min():
105 
106  analysis:
107 
108  1. inserts: once the buffer B0 is empty, the next sizeof(B0)
109  inserts are free; one insert can cause many I/Os if cascading
110  emptying of external buffers Bi occurs. Emptying level-i buffer
111  costs <arity>^i*sizeof(B0)/B I/Os and occurs every
112  N/<arity>^i*sizeof(B0) inserts (or less, if deletes too). It can be
113  proved that the amortized time of 1 insert is 1/B*maxnb_buffers.
114 */
115 
116 /*
117 T is assumed to be a class for which getPriority() and getValue()
118 are implemented; for simplicity it is assumed that the comparison
119 operators have been overloaded on T such that
120 x < y <==> x.getPriority() < y.getPriority()
121 */
122 
123 template <class T, class Key>
124 class em_pqueue {
125 
126 private:
127  // in memory priority queue
128  MinMaxHeap<T> *pq;
129 
130  // pqueue size
131  unsigned long pqsize;
132 
133  // in-memory buffer
134  im_buffer<T> *buff_0;
135 
136  // in-memory buffer size
137  unsigned long bufsize;
138 
139  // external memory buffers
140  em_buffer<T, Key> **buff;
141 
142  /* number of external memory buffers statically allocated in the
143  beginning; since the number of buffers needed is \log_m{n/m}, we
144  cannot know it in advance; estimate it roughly and then reallocate
145  it dynamically on request;
146 
147  TO DO: dynamic reallocation with a bigger nb of external buffer
148  if structure becomes full */
149  unsigned short max_nbuf;
150 
151  // index of next external buffer entry available for use (i.e. is NULL)
152  unsigned short crt_buf;
153 
154  // external buffer arity
155  unsigned int buf_arity;
156 
157 public:
158  // create an em_pqueue of specified size
159  em_pqueue(long pq_sz, long buf_sz, unsigned short nb_buf,
160  unsigned int buf_ar);
161 
162  // create an em_pqueue capable to store <= N elements
163  em_pqueue();
164  em_pqueue(long N UNUSED) { em_pqueue(); }; // N not used
165 
166 #ifdef SAVE_MEMORY
167  // create an empq, initialize its pq with im and insert amis in
168  // buff[0]; im should not be used/deleted after that outside empq
170 #endif
171 
172  // copy constructor
173  em_pqueue(const em_pqueue &ep);
174 
175  // clean up
176  ~em_pqueue();
177 
178  // return the nb of elements in the structure
179  unsigned long size();
180 
181  // return true if empty
182  bool is_empty();
183 
184  // return true if full
185  bool is_full()
186  {
187  cout << "em_pqueue::is_full(): sorry not implemented\n";
188  exit(1);
189  }
190 
191  // return the element with minimum priority in the structure
192  bool min(T &elt);
193 
194  // delete the element with minimum priority in the structure;
195  // return false if pq is empty
196  bool extract_min(T &elt);
197 
198  // extract all elts with min key, add them and return their sum
199  bool extract_all_min(T &elt);
200 
201  // insert an element; return false if insertion fails
202  bool insert(const T &elt);
203 
204  // return maximum capacity of i-th external buffer
205  long maxlen(unsigned short i);
206 
207  // return maximum capacity of em_pqueue
208  long maxlen();
209 
210  // delete all the data in the pq; reset to empty but don't free memory
211  void clear();
212 
213  // print structure
214  void print_range();
215 
216  void print();
217 
218  // print the detailed size of empq (pq, buf_0, buff[i])
219  void print_size();
220 
221  friend ostream &operator<<(ostream &s, const em_pqueue &empq)
222  {
223  s << "EM_PQ: pq size=" << empq.pqsize
224  << ", buff_0 size=" << empq.bufsize << ", ext_bufs=" << empq.crt_buf
225  << "(max " << empq.max_nbuf << ")\n";
226  s << "IN_MEMORY PQ: \n" << *(empq.pq) << "\n";
227  s << "IN_MEMORY BUFFER: \n" << *(empq.buff_0) << "\n";
228  for (unsigned short i = 0; i < empq.crt_buf; i++) {
229  // s << "EM_BUFFER " << i << ":\n" ;
230  s << *(empq.buff[i]);
231  }
232  return s;
233  }
234 
235 protected:
236  // return the nb of active streams in the buffer
238  {
239  int totstr = 0;
240  for (unsigned short i = 0; i < crt_buf; i++) {
241  totstr += buff[i]->get_nbstreams();
242  }
243  return totstr;
244  }
245 
246  // called when buff_0 is full to empty it on external level_1 buffer;
247  // can produce cascading emptying
248  bool empty_buff_0();
249 
250  // sort and empty buffer i into buffer (i+1) recursively;
251  // called recursively by empty_buff_0() to empty subsequent buffers
252  // i must be a valid (i<crt_buf) full buffer
253  void empty_buff(unsigned short i);
254 
255  /* merge the first <K> elements of the streams of input buffer,
256  starting at position <buf.deleted[i]> in each stream; there are
257  <buf.arity> streams in total; write output in <outstream>; the
258  items written in outstream are of type <merge_output_type> which
259  extends T with the stream nb and buffer nb the item comes from;
260  this information is needed later to distribute items back; do not
261  delete the K merged elements from the input streams; <bufid> is the
262  id of the buffer whose streams are being merged;
263 
264  the input streams are assumed sorted in increasing order of keys; */
266  long K);
267 
268  /* merge the first <K> elements of the input streams; there are
269  <arity> streams in total; write output in <outstream>;
270 
271  the input streams are assumed sorted in increasing order of their
272  keys; */
273  AMI_err merge_streams(ExtendedMergeStream **instr, unsigned short arity,
274  ExtendedMergeStream *outstr, long K);
275 
276  // deletes one element from <buffer, stream>
277  void delete_str_elt(unsigned short buf_id, unsigned int stream_id);
278 
279  /* copy the minstream in the internal pqueue while merging with
280  buff_0; the smallest <pqsize> elements between minstream and
281  buff_0 will be inserted in internal pqueue; also, the elements
282  from minstram which are inserted in pqueue must be marked as
283  deleted in the source streams; */
284  void merge_bufs2pq(ExtendedMergeStream *minstream);
285 
286  // clean buffers in case some streams have been emptied
287  void cleanup();
288 
289  // called when pq must be filled from external buffers
290  bool fillpq();
291 
292  // print the nb of elements in each stream
293  void print_stream_sizes();
294 };
295 
296 #endif
AMI_err
Definition: ami_stream.h:83
unsigned int get_nbstreams() const
Definition: embuffer.h:263
void clear()
Definition: empq_impl.h:1482
em_pqueue(const em_pqueue &ep)
void print_stream_sizes()
Definition: empq_impl.h:1568
void print()
Definition: empq_impl.h:1523
void delete_str_elt(unsigned short buf_id, unsigned int stream_id)
Definition: empq_impl.h:954
bool fillpq()
Definition: empq_impl.h:574
void print_range()
Definition: empq_impl.h:1498
bool insert(const T &elt)
Definition: empq_impl.h:1000
~em_pqueue()
Definition: empq_impl.h:490
bool extract_min(T &elt)
Definition: empq_impl.h:727
bool is_full()
Definition: empq.h:185
void merge_bufs2pq(ExtendedMergeStream *minstream)
Definition: empq_impl.h:851
bool min(T &elt)
Definition: empq_impl.h:658
em_pqueue(long N UNUSED)
Definition: empq.h:164
long maxlen()
Definition: empq_impl.h:538
unsigned long size()
Definition: empq_impl.h:550
AMI_err merge_buffer(em_buffer< T, Key > *buf, ExtendedMergeStream *outstr, long K)
Definition: empq_impl.h:1257
void empty_buff(unsigned short i)
Definition: empq_impl.h:1155
bool extract_all_min(T &elt)
Definition: empq_impl.h:802
void cleanup()
Definition: empq_impl.h:968
AMI_err merge_streams(ExtendedMergeStream **instr, unsigned short arity, ExtendedMergeStream *outstr, long K)
Definition: empq_impl.h:1380
int active_streams()
Definition: empq.h:237
bool is_empty()
Definition: empq_impl.h:563
void print_size()
Definition: empq_impl.h:1546
bool empty_buff_0()
Definition: empq_impl.h:1086
friend ostream & operator<<(ostream &s, const em_pqueue &empq)
Definition: empq.h:221
#define N
Definition: e_intersect.c:926
#define ExtendedMergeStream
Definition: empq.h:53
#define UNUSED
A macro for an attribute, if attached to a variable, indicating that the variable is not used.
Definition: gis.h:47