55 #define MY_LOG_DEBUG_ID(x) //inhibit debug printing 115 void set(
const KEY &
x,
const unsigned int sid) {
129 friend ostream& operator<<(ostream& s, const merge_key<KEY> &
x) {
130 return s <<
"<str_id=" << x.str_id <<
"> " << x.k <<
" ";
210 template<
class T,
class Key>
220 unsigned short level;
237 unsigned long* streamsize;
244 unsigned long basesize;
250 em_buffer(
const unsigned short i,
const unsigned long bs,
251 const unsigned int ar);
270 void put_stream(
unsigned int i);
296 for (
unsigned int i=0; i< index; i++) {
312 return (
unsigned long)pow((
double)arity,(
double)level-1)*basesize;
319 return streamsize[i] - deleted[i];
325 unsigned long tot = 0;
326 for (
unsigned int i=0; i< index; i++) {
327 tot += get_stream_len(i);
334 return arity * get_stream_maxlen();
339 return ((nextstream() == 0) || (get_buf_len() == 0));
344 return (nextstream() == arity);
366 long insert(T* a,
long n);
392 void print_stream_sizes();
396 s <<
"BUFFER_" << b.level <<
": ";
402 for (
unsigned int i=0; i < b.index; i++) {
403 b.print_stream(s, i);
421 void print_stream(ostream& s,
unsigned int i);
430 char* get_stream_name(
unsigned int i)
const;
433 void print_stream_names();
439 void check_name(
unsigned int i);
447 template <
class T,
class Key>
449 const unsigned int ar) :
450 arity(ar), level(i), basesize(bs) {
452 assert((level>=1) && (basesize >=0));
455 sprintf(str,
"em_buffer: allocate %d AMI_STREAM*, total %ld\n",
462 sprintf(str,
"em_buffer: allocate deleted array: %ld\n",
463 (
long)(arity*
sizeof(
long)));
465 deleted =
new long[arity];
468 sprintf(str,
"em_buffer: allocate streamsize array: %ld\n",
469 (
long)(arity*
sizeof(
long)));
471 streamsize =
new unsigned long[arity];
475 sprintf(str,
"em_buffer: allocate name array: %ld\n",
476 (
long)(arity*
sizeof(
char*)));
478 name =
new char* [arity];
483 if ((!data) || (!deleted) || (!streamsize)) {
484 cerr <<
"em_buffer: cannot allocate\n";
490 for (
unsigned int ui=0; ui< arity; ui++) {
509 template<
class T,
class Key>
511 level(buf.level), basesize(buf.basesize),
512 index(buf.index), arity(buf.arity) {
518 for (
unsigned int i=0; i< index; i++) {
527 deleted[i] = buf.deleted[i];
528 streamsize[i] = buf.streamsize[i];
533 name[i] = buf.name[i];
543 template<
class T,
class Key>
549 for (
unsigned int i=0; i<index; i++) {
561 delete [] streamsize;
573 template<
class T,
class Key>
576 #ifdef EMBUF_CHECK_NAME 577 assert(i>=0 && i < index);
581 data[i]->name(&fooname);
582 #ifdef EMBUF_CHECK_NAME_PRINT 583 cout <<
"::check_name: checking stream [" << level <<
"," << i <<
"] name:" 587 if (strcmp(name[i], fooname) != 0) {
588 cerr <<
"name[" << i <<
"]=" << name[i]
589 <<
", streamname=" << fooname << endl;
591 assert(strcmp(fooname, name[i]) == 0);
603 template<
class T,
class Key>
606 assert(i>=0 && i < index);
612 if (data[i] ==
NULL) {
619 #ifdef EMBUF_PRINT_GETPUT_STREAM 620 cout <<
"get_stream:: name[" << i <<
"]=" << name[i] <<
" from disk\n";
626 if ((fp = fopen(name[i],
"rb")) ==
NULL) {
627 cerr <<
"get_stream: checking that stream " << name[i] <<
"exists\n";
651 assert(data[i]->stream_len() == streamsize[i]);
664 template<
class T,
class Key>
667 assert(i>=0 && i < index);
673 if (data[i] !=
NULL) {
680 #ifdef EMBUF_PRINT_GETPUT_STREAM 681 cout <<
"put_stream:: name[" << i <<
"]=" << name[i] <<
" to disk\n";
704 template<
class T,
class Key>
709 #ifdef EMBUF_PRINT_GETPUT_STREAMS 710 cout <<
"em_buffer::get_streams (buffer " << level <<
")";
711 cout <<
": index = " << index <<
"(arity=" << arity <<
")\n";
715 for (
unsigned int i=0; i<index; i++) {
731 template<
class T,
class Key>
736 #ifdef EMBUF_PRINT_GETPUT_STREAMS 737 cout <<
"em_buffer::put_streams (buffer " << level <<
")";
738 cout <<
": index = " << index <<
"(arity=" << arity <<
")\n";
742 for (
unsigned int i=0; i<index; i++) {
755 template<
class T,
class Key>
769 template<
class T,
class Key>
772 for (i=0; i<index; i++) {
774 cout <<
"stream " << i <<
": " << name[i] << endl;
785 template<
class T,
class Key>
789 #ifdef EMBUF_CLEANUP_PRINT 792 cout <<
"before cleanup:\n";
793 print_stream_names();
804 unsigned int i, empty=0;
805 for (i=0; i<index; i++) {
809 #ifdef EMBUF_DELETE_STREAM_PRINT 810 cout<<
"deleting stream [" << level <<
"," << i <<
"]:" ;
841 #ifdef EMBUF_DELETE_STREAM_PRINT 842 cout <<
"em_buffer::cleanup: shifting streams\n"; cout.flush();
844 for (i=0; i<index; i++) {
851 deleted[j] = deleted[i];
852 streamsize[j] = streamsize[i];
873 assert(index == j + empty);
876 #ifdef EMBUF_DELETE_STREAM_PRINT 877 cout <<
"em_buffer::cleanup: index set to " << index << endl;
885 #ifdef EMBUF_CLEANUP_PRINT 888 cout <<
"after cleanup:\n";
889 print_stream_names();
902 template<
class T,
class Key>
908 for (
unsigned int i=0; i<index; i++) {
910 assert(streamsize[i] == data[i]->stream_len());
935 template<
class T,
class Key>
940 MEMORY_LOG(
"em_buffer::sort: allocate new AMI_STREAM\n");
955 aerr = substream_merge(data, arity, sorted_stream);
960 return sorted_stream;
974 template<
class T,
class Key>
984 for (i = 0; i < arity ; i++ ) {
991 std::vector<T*> in_objects(arity);
996 sprintf(str,
"em_buffer::substream_merge: allocate keys array, total %ldB\n",
1006 keys =
new footype[arity];
1013 for (i = 0; i < arity ; i++ ) {
1020 if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
1023 in_objects[i] =
NULL;
1029 Key k = in_objects[i]->getPriority();
1034 unsigned int NonEmptyRuns = j;
1043 ami_err = outstream->
seek(0);
1045 while (!mergeheap.
empty()) {
1047 mergeheap.
min(minelt);
1050 if ((ami_err = outstream->
write_item(*in_objects[i]))
1055 if ((ami_err = instreams[i]->read_item(&(in_objects[i])))
1065 Key k = in_objects[i]->getPriority();
1088 template<
class T,
class Key>
1094 cout <<
"em_buffer::insert: buffer full\n";
1103 MEMORY_LOG(
"em_buffer::insert(from array): allocate AMI_STREAM\n");
1109 for (
long i=0; i< n; i++) {
1138 template<
class T,
class Key>
1144 cout <<
"em_buffer::insert: buffer full\n";
1156 #ifdef EMBUF_CHECK_INSERT 1158 cout <<
"CHECK_INSERT: checking stream is sorted\n";
1164 if (prev)
assert(*prev <= *crt);
1193 #ifdef EMBUF_PRINT_INSERT 1194 cout <<
"insert stream " << s <<
" at buf [" << level
1202 #ifdef EMBUF_PRINT_INSERT 1204 print_stream_names();
1222 template<
class T,
class Key>
1226 assert((i>=0) && (i<index));
1231 s <<
"STREAM " << i <<
": [";
1233 ae = data[i]->seek(deleted[i]);
1237 ae = data[i]->read_item(&x);
1249 template<
class T,
class Key>
1257 for (
unsigned int i=0; i< index; i++) {
1260 ae = data[i]->seek(deleted[i]);
1262 ae = data[i]->read_item(&min);
1264 cout << min->getPriority() <<
"..";
1266 ae = data[i]->seek(streamsize[i] - 1);
1268 ae = data[i]->read_item(&max);
1270 cout << max->getPriority()
1273 for (
unsigned int i=index; i< arity; i++) {
1284 template<
class T,
class Key>
1292 for (
unsigned int i=0; i<index; i++) {
1294 ae = data[i]->seek(deleted[i]);
1297 ae = data[i]->read_item(&x);
1299 cout << x->getPriority() <<
",";
1301 cout <<
"]" << endl;
1303 for (
unsigned int i=index; i< arity; i++) {
1314 template<
class T,
class Key>
1317 cout <<
"(streams=" << index <<
") sizes=[";
1318 for (
unsigned int i=0; i< arity; i++) {
1321 cout <<
"]" << endl;
unsigned long get_buf_maxlen()
AMI_err read_item(T **elt)
unsigned int nextstream() const
friend int operator<(const merge_key &x, const merge_key &y)
void set(const KEY &x, const unsigned int sid)
void MEMORY_LOG(std::string str)
AMI_err write_item(const T &elt)
void incr_deleted(unsigned int i)
void delete_min_and_insert(const T &x)
AMI_STREAM< T > ** get_streams()
AMI_err seek(off_t offset)
#define MY_LOG_DEBUG_ID(x)
unsigned int stream_id() const
#define assert(condition)
merge_key(const KEY &x, const unsigned int sid)
AMI_STREAM< T > * get_stream(unsigned int i)
friend int operator==(const merge_key &x, const merge_key &y)
friend merge_key operator+(const merge_key &x, const merge_key &y)
em_buffer(const unsigned short i, const unsigned long bs, const unsigned int ar)
friend int operator>(const merge_key &x, const merge_key &y)
long insert(T *a, long n)
unsigned long get_buf_len()
void print_stream_sizes()
long total_deleted() const
AMI_err name(char **stream_name)
unsigned int get_nbstreams() const
unsigned long get_stream_maxlen() const
unsigned int get_arity() const
void persist(persistence p)
unsigned int laststream() const
friend int operator!=(const merge_key &x, const merge_key &y)
unsigned short get_level() const
unsigned long get_stream_len(unsigned int i)
void put_stream(unsigned int i)
friend int operator<=(const merge_key &x, const merge_key &y)
friend ostream & operator<<(ostream &s, em_buffer &b)
friend int operator>=(const merge_key &x, const merge_key &y)