46 #define MY_LOG_DEBUG_ID(x) \
47 if (GETOPT("debug")) \
48 cerr << __FILE__ << ":" << __LINE__ << " " << x << endl;
54 #define MY_LOG_DEBUG_ID(x)
65 template <
class T,
class Key>
70 unsigned short buf_id;
77 :
x(e), buf_id(bid), str_id(sid)
83 void set(T &e,
unsigned short bid,
unsigned int sid)
89 T
elt()
const {
return x; }
97 return s <<
"<buf_id=" <<
elt.buf_id <<
",str_id=" <<
elt.str_id <<
"> "
135 template <
class T,
class Key>
138 : pqsize(pq_sz), bufsize(buf_sz), max_nbuf(nb_buf), crt_buf(0),
146 printf(
"EM_PQUEUE:available memory before allocation: %.2fMB\n",
147 mm_avail / (
float)(1 << 20));
148 printf(
"EM_PQUEUE:available memory before allocation: %ldB\n", mm_avail);
153 assert(pqsize > 0 && bufsize > 0);
155 MEMORY_LOG(
"em_pqueue: allocating int pqueue\n");
166 snprintf(str,
sizeof(str),
167 "em_pqueue: allocating array of %ld buff pointers\n",
174 for (
unsigned short i = 0; i < max_nbuf; i++) {
183 printf(
"EM_PQUEUE: available memory after allocation: %.2fMB\n",
184 mm_avail / (
float)(1 << 20));
191 cout <<
"em_pqueue constructor: failing to get stream_usage\n";
194 cout <<
"EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
195 cout <<
"EM_PQUEUE: item size=" <<
sizeof(T) << endl;
200 max_nbuf * sz_stream;
203 cout <<
"EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
204 if (mm_overhead > mm_avail) {
205 cout <<
"overhead bigger than available memory"
206 <<
"increase -m and try again\n";
209 mm_avail -= mm_overhead;
212 cout <<
"pqsize=" << pqsize <<
", bufsize=" << bufsize
213 <<
", maximum allowed arity=" << mm_avail / sz_stream << endl;
214 if (buf_arity * sz_stream > mm_avail) {
215 cout <<
"sorry - empq exceeds memory limits\n";
216 cout <<
"try again decreasing arity or pqsize/bufsize\n";
223 template <
class T,
class Key>
234 printf(
"EM_PQUEUE:available memory before allocation: %.2fMB\n",
235 mm_avail / (
float)(1 << 20));
243 cout <<
"em_pqueue constructor: failing to get main_memory_usage\n";
246 cout <<
"EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
247 cout <<
"EM_PQUEUE: item size=" <<
sizeof(T) << endl;
253 buf_arity = mm_avail / (2 * sz_stream);
256 max_nbuf * sz_stream;
259 cout <<
"EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
260 if (mm_overhead > mm_avail) {
261 cout <<
"overhead bigger than available memory"
262 <<
"increase -m and try again\n";
265 mm_avail -= mm_overhead;
269 pqsize = mm_avail / (2 *
sizeof(T));
271 bufsize = mm_avail / (2 *
sizeof(T));
274 pqsize = mm_avail / (4 *
sizeof(T));
276 bufsize = mm_avail / (4 *
sizeof(T));
279 cout <<
"EM_PQUEUE: pqsize set to " << pqsize << endl;
280 cout <<
"EM_PQUEUE: bufsize set to " << bufsize << endl;
281 cout <<
"EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
292 buf_arity = mm_avail / (2 * sz_stream);
294 buf_arity = mm_avail / (2 * max_nbuf * sz_stream);
305 cout <<
"EM_PQUEUE: arity set to " << buf_arity << endl;
310 MEMORY_LOG(
"em_pqueue: allocating int pqueue\n");
321 snprintf(str,
sizeof(str),
322 "em_pqueue: allocating array of %ld buff pointers\n",
328 for (
unsigned short i = 0; i < max_nbuf; i++) {
333 cout <<
"EM_PQUEUE: maximum length is " << maxlen() <<
"\n";
340 printf(
"EM_PQUEUE: available memory after allocation: %.2fMB\n",
341 mm_avail / (
float)(1 << 20));
352 template <
class T,
class Key>
358 unsigned int pqcurrentsize;
362 pqsize = pqcapacity + 1;
363 pqcurrentsize = im->
size();
365 if (!(pqcurrentsize <= pqsize)) {
366 cout <<
"EMPQ: pq maxsize=" << pqsize
367 <<
", pq crtsize=" << pqcurrentsize <<
"\n";
383 for (
unsigned int i = 0; i < pqcurrentsize; i++) {
393 bufsize = pqcapacity;
394 cout <<
"EM_PQUEUE: allocating im_buffer size=" << bufsize <<
" total "
395 << (float)bufsize *
sizeof(T) / (1 << 20) <<
"MB\n";
399 cout <<
"EM_PQUEUE: allocating pq size=" << pqsize <<
" total "
400 << (float)pqcapacity *
sizeof(T) / (1 << 20) <<
"MB\n";
409 for (
unsigned int i = 0; i < pqcurrentsize; i++) {
414 assert(pq->size() == pqcurrentsize);
423 cout <<
"em_pqueue constructor: failing to get main_memory_usage\n";
426 cout <<
"EM_PQUEUE: AMI_stream memory usage: " << sz_stream << endl;
427 cout <<
"EM_PQUEUE: item size=" <<
sizeof(T) << endl;
430 buf_arity = pqcapacity *
sizeof(T) / sz_stream;
432 if (buf_arity == 0) {
433 cout <<
"EM_PQUEUE: arity=0 (not enough memory..)\n";
450 snprintf(str,
sizeof(str),
451 "em_pqueue: allocating array of %ld buff pointers\n",
456 for (
unsigned short i = 0; i < max_nbuf; i++) {
461 cout <<
"EM_PQUEUE: new pqsize set to " << pqcapacity << endl;
462 cout <<
"EM_PQUEUE: bufsize set to " << bufsize << endl;
463 cout <<
"EM_PQUEUE: buf arity set to " << buf_arity << endl;
464 cout <<
"EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
465 cout <<
"EM_PQUEUE: maximum length is " << maxlen() <<
"\n";
470 printf(
"EM_PQUEUE: available memory after allocation: %.2fMB\n",
471 mm_avail / (
float)(1 << 20));
478 MEMORY_LOG(
"em_pqueue::empty_buff_0: create new em_buffer\n");
480 buff[0]->insert(amis);
489 template <
class T,
class Key>
503 for (
unsigned short i = 0; i < crt_buf; i++) {
512 template <
class T,
class Key>
517 printf(
"em_pqueue::max_len: level=%d exceeds capacity=%d\n", i,
522 return buff[i]->get_buf_maxlen();
527 cout <<
"em_pqueue::max_len: cannot allocate\n";
537 template <
class T,
class Key>
541 for (
unsigned short i = 0; i < max_nbuf; i++) {
544 return len + buff_0->get_buf_maxlen();
549 template <
class T,
class Key>
553 unsigned long elen = 0;
554 for (
unsigned short i = 0; i < crt_buf; i++) {
555 elen += buff[i]->get_buf_len();
557 return elen + pq->size() + buff_0->get_buf_len();
562 template <
class T,
class Key>
568 return ((pq->size() == 0) && (buff_0->get_buf_len() == 0) && (size() == 0));
573 template <
class T,
class Key>
580 for (
unsigned short i = 0; i < crt_buf; i++) {
581 k |= buff[i]->get_buf_len();
584 cerr <<
"fillpq called with empty external buff!" << endl;
590 #ifdef EMPQ_PQ_FILL_PRINT
591 cout <<
"filling pq\n";
594 XXX cerr <<
"filling pq" << endl;
600 snprintf(str,
sizeof(str),
601 "em_pqueue::fillpq: allocate array of %hd AMI_STREAMs\n",
610 for (
unsigned short i = 0; i < crt_buf; i++) {
613 assert(buff[i]->get_buf_len());
614 ae = merge_buffer(buff[i], outstreams[i], pqsize);
616 assert(outstreams[i]->stream_len());
623 merge_bufs2pq(outstreams[0]);
624 delete outstreams[0];
632 ae = merge_streams(outstreams, crt_buf, minstream, pqsize);
634 for (
int i = 0; i < crt_buf; i++) {
635 delete outstreams[i];
644 merge_bufs2pq(minstream);
651 XXX cerr <<
"fillpq done" << endl;
657 template <
class T,
class Key>
678 if (buff_0->is_empty()) {
683 #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
684 cout <<
"filling pq from B0\n";
688 long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
689 buff_0->reset(pqsize, n);
700 XXX cerr <<
"fillpq done; about to take min" << endl;
702 XXX cerr <<
"after taking min" << endl;
711 template <
class T,
class Key>
726 template <
class T,
class Key>
738 ok = pq->extract_min(elt);
744 MY_LOG_DEBUG_ID(
"internal pq empty: filling it up from external buffers");
751 if (buff_0->is_empty()) {
755 #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
756 cout <<
"filling pq from B0\n";
761 long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
762 buff_0->reset(pqsize, n);
763 ok = pq->extract_min(elt);
772 #ifdef EMPQ_PRINT_SIZE
774 y =
x *
sizeof(T) >> 20;
775 cout <<
"pqsize:[" << active_streams() <<
" streams: ";
776 print_stream_sizes();
777 cout <<
" total " <<
x <<
"(" << y <<
"MB)]" << endl;
782 XXX cerr <<
"about to get the min" << endl;
784 ok = pq->extract_min(elt);
786 cout <<
"failing assertion: pq->extract_min == true\n";
801 template <
class T,
class Key>
813 if (!extract_min(elt)) {
819 if ((!
min(next_elt)) ||
820 !(next_elt.getPriority() == elt.getPriority())) {
824 extract_min(next_elt);
825 elt = elt + next_elt;
833 #ifdef EMPQ_PRINT_EXTRACTALL
834 cout <<
"EXTRACTED: " << elt << endl;
837 #ifdef EMPQ_PRINT_EMPQ
850 template <
class T,
class Key>
863 ae = minstream->seek(0);
866 bool strEmpty =
false, bufEmpty =
false;
868 unsigned int bufPos = 0;
872 ae = minstream->read_item(&strItem);
879 if (bufPos < buff_0->get_buf_len()) {
880 bufElt = buff_0->get_item(bufPos);
887 XXX cerr <<
"pqsize=" << pqsize << endl;
888 XXX if (strEmpty) cerr <<
"stream is empty!!" << endl;
889 for (
unsigned int i = 0; i < pqsize; i++) {
892 if ((!strEmpty) && (strElt = strItem->
elt(),
893 bufElt.getPriority() > strElt.getPriority())) {
896 ae = minstream->read_item(&strItem);
907 if (bufPos < buff_0->get_buf_len()) {
908 bufElt = buff_0->get_item(bufPos);
917 strElt = strItem->
elt();
922 ae = minstream->read_item(&strItem);
937 buff_0->shift_left(bufPos);
940 #ifdef EMPQ_PQ_FILL_PRINT
941 cout <<
"merge_bufs2pq: pq filled; now cleaning\n";
953 template <
class T,
class Key>
955 unsigned int stream_id)
960 assert(stream_id < buff[buf_id]->get_nbstreams());
962 buff[buf_id]->incr_deleted(stream_id);
967 template <
class T,
class Key>
972 #ifdef EMPQ_PQ_FILL_PRINT
973 cout <<
"em_pqueue: cleanup enter\n";
977 for (
unsigned short i = 0; i < crt_buf; i++) {
982 short i = crt_buf - 1;
983 while ((i >= 0) && buff[i]->is_empty()) {
988 #ifdef EMPQ_PQ_FILL_PRINT
989 cout <<
"em_pqueue: cleanup done\n";
999 template <
class T,
class Key>
1003 #ifdef EMPQ_ASSERT_EXPENSIVE
1004 long init_size = size();
1010 if ((crt_buf == 0) && (buff_0->is_empty())) {
1020 ok = pq->max(pqmax);
1026 #ifdef EMPQ_ASSERT_EXPENSIVE
1027 assert(size() == init_size);
1035 pq->extract_max(val);
1046 #ifdef EMPQ_ASSERT_EXPENSIVE
1047 assert(size() == init_size);
1049 if (buff_0->is_full()) {
1050 #ifdef EMPQ_PRINT_SIZE
1052 y =
x *
sizeof(T) >> 20;
1053 cout <<
"pqsize:[" << active_streams() <<
" streams: ";
1054 print_stream_sizes();
1055 cout <<
" total " <<
x <<
"(" << y <<
"MB)]" << endl;
1060 #ifdef EMPQ_ASSERT_EXPENSIVE
1061 assert(size() == init_size);
1064 assert(!buff_0->is_full());
1066 ok = buff_0->insert(val);
1069 #ifdef EMPQ_PRINT_INSERT
1070 cout <<
"INSERTED: " <<
x << endl;
1073 #ifdef EMPQ_PRINT_EMPQ
1085 template <
class T,
class Key>
1088 #ifdef EMPQ_ASSERT_EXPENSIVE
1089 long init_size = size();
1092 #ifdef EMPQ_EMPTY_BUF_PRINT
1093 cout <<
"emptying buff_0\n";
1099 assert(buff_0->is_full());
1104 #ifdef EMPQ_ASSERT_EXPENSIVE
1105 assert(size() == init_size);
1110 MEMORY_LOG(
"em_pqueue::empty_buff_0: create new em_buffer\n");
1114 assert(buff_0->get_buf_len() == buff[0]->get_stream_maxlen());
1126 #ifdef EMPQ_ASSERT_EXPENSIVE
1127 assert(size() + buff_0->maxlen() == init_size);
1131 if (buff[0]->is_full()) {
1135 buff[0]->insert(buff_0_str);
1142 #ifdef EMPQ_ASSERT_EXPENSIVE
1143 assert(size() == init_size);
1154 template <
class T,
class Key>
1158 #ifdef EMPQ_ASSERT_EXPENSIVE
1159 long init_size = size();
1161 #ifdef EMPQ_EMPTY_BUF_PRINT
1162 cout <<
"emptying buffer_" << i <<
"\n";
1171 assert(buff[i]->is_full());
1174 if (i == max_nbuf - 1) {
1175 cerr <<
"empty_buff:: cannot empty further - structure is full..\n";
1177 cerr <<
"ext buff array should reallocate in a future version..\n";
1185 snprintf(str,
sizeof(str),
1186 "em_pqueue::empty_buff( %hd ) allocate new em_buffer\n", i);
1201 if (sorted_buf->
stream_len() != buff[i]->get_buf_len()) {
1202 cout <<
"sorted_stream_len: " << sorted_buf->
stream_len()
1203 <<
" , bufflen: " << buff[i]->get_buf_len() << endl;
1206 ae = sorted_buf->
seek(0);
1214 #ifdef EMPQ_ASSERT_EXPENSIVE
1218 #ifdef EMPQ_ASSERT_EXPENSIVE
1219 assert(size() == init_size);
1223 #ifdef EMPQ_ASSERT_EXPENSIVE
1230 if (buff[i + 1]->is_full()) {
1233 buff[i + 1]->insert(sorted_buf, 0);
1236 if (crt_buf < i + 2)
1239 #ifdef EMPQ_ASSERT_EXPENSIVE
1240 assert(size() == init_size);
1256 template <
class T,
class Key>
1262 unsigned short bufid = buf->
get_level() - 1;
1266 std::vector<T *> in_objects(arity);
1280 MEMORY_LOG(
"em_pqueue::merge_buffer: allocate keys array\n");
1286 for (i = 0; i < arity; i++) {
1290 cerr <<
"WARNING!!! EARLY EXIT!!!" << endl;
1294 ami_err = instreams[i]->
read_item(&(in_objects[i]));
1297 in_objects[i] =
NULL;
1308 cerr <<
"WARNING!!! EARLY EXIT!!!" << endl;
1312 unsigned int NonEmptyRuns = j;
1322 ami_err = outstream->seek(0);
1325 while (!mergeheap.
empty() && (extracted < K)) {
1327 i = mergeheap.
min().stream_id();
1331 cerr <<
"WARNING!!! EARLY EXIT!!!" << endl;
1337 ami_err = instreams[i]->
read_item(&(in_objects[i]));
1346 Key k = in_objects[i]->getPriority();
1351 cerr <<
"WARNING!!! early breakout!!!" << endl;
1367 assert(extracted == outstream->stream_len());
1379 template <
class T,
class Key>
1381 unsigned short arity,
1389 std::vector<ExtendedEltMergeType<T, Key>> in_objects(arity);
1393 unsigned int nonEmptyRuns = 0;
1397 MEMORY_LOG(
"em_pqueue::merge_streams: allocate keys array\n");
1403 for (
int i = 0; i < arity; i++) {
1410 ami_err = instreams[i]->read_item(&objp);
1413 in_objects[i] = *objp;
1414 keys[nonEmptyRuns] =
merge_key<Key>(in_objects[i].getPriority(), i);
1423 assert(nonEmptyRuns <= arity);
1427 keys, nonEmptyRuns);
1432 ami_err = outstream->seek(0);
1435 while (!mergeheap.
empty() && (extracted < K)) {
1437 int id = mergeheap.
min().stream_id();
1439 assert(
id < nonEmptyRuns);
1449 assert(
id < nonEmptyRuns);
1452 ami_err = instreams[id]->read_item(&objp);
1455 in_objects[id] = *objp;
1481 template <
class T,
class Key>
1487 for (
int i = 0; i < crt_buf; i++) {
1497 template <
class T,
class Key>
1500 cout <<
"EM_PQ: [pq=" << pqsize <<
", b=" << bufsize
1501 <<
", bufs=" << max_nbuf <<
", ar=" << buf_arity <<
"]\n";
1513 for (
unsigned short i = 0; i < crt_buf; i++) {
1514 cout <<
"B" << i + 1 <<
": ";
1515 buff[i]->print_range();
1522 template <
class T,
class Key>
1525 cout <<
"EM_PQ: [pq=" << pqsize <<
", b=" << bufsize
1526 <<
", bufs=" << max_nbuf <<
", ar=" << buf_arity <<
"]\n";
1536 for (
unsigned short i = 0; i < crt_buf; i++) {
1537 cout <<
"B" << i + 1 <<
": " << endl;
1545 template <
class T,
class Key>
1550 cout <<
"EMPQ: pq=" << pq->size() <<
",B0=" << buff_0->get_buf_len()
1553 for (
unsigned short i = 0; i < crt_buf; i++) {
1555 cout <<
"B_" << i + 1 <<
":";
1557 buff[i]->print_stream_sizes();
1558 elen += buff[i]->get_buf_len();
1561 cout <<
"total: " << elen + pq->size() + buff_0->get_buf_len() << endl
1567 template <
class T,
class Key>
1570 for (
unsigned short i = 0; i < crt_buf; i++) {
1572 buff[i]->print_stream_sizes();
@ AMI_ERROR_END_OF_STREAM
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
AMI_err write_item(const T &elt)
AMI_err seek(off_t offset)
AMI_err read_item(T **elt)
friend int operator<=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
friend ostream & operator<<(ostream &s, const ExtendedEltMergeType< T, Key > &elt)
unsigned int stream_id() const
friend int operator!=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
friend int operator>(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
friend int operator>=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid)
unsigned short buffer_id() const
friend int operator<(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
void set(T &e, unsigned short bid, unsigned int sid)
friend int operator==(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
HeapIndex get_maxsize() const
unsigned short get_level() const
unsigned long get_buf_maxlen()
unsigned long get_buf_len()
unsigned int get_nbstreams() const
AMI_STREAM< T > ** get_streams()
void print_stream_sizes()
void delete_str_elt(unsigned short buf_id, unsigned int stream_id)
bool insert(const T &elt)
void merge_bufs2pq(ExtendedMergeStream *minstream)
AMI_err merge_buffer(em_buffer< T, Key > *buf, ExtendedMergeStream *outstr, long K)
void empty_buff(unsigned short i)
bool extract_all_min(T &elt)
AMI_err merge_streams(ExtendedMergeStream **instr, unsigned short arity, ExtendedMergeStream *outstr, long K)
void delete_min_and_insert(const T &x)
unsigned int size(void) const
#define ExtendedMergeStream
#define MY_LOG_DEBUG_ID(x)
#define assert(condition)
@ MM_STREAM_USAGE_MAXIMUM
void MEMORY_LOG(const std::string &str)
size_t getAvailableMemory()