9 #include "udaf_common.h"
13 // NOTE: does not seem to be stable or correct with this setting
14 // compress only activates with this one, so compress is broken?
15 #define QUANT_LFTA1_SIZE 729
16 #define QUANT_LFTA2_SIZE 181
17 #define QUANT_LFTA3_SIZE 100
22 // #define QUANT_LFTA1_SIZE 378
23 // #define QUANT_LFTA2_SIZE 93
24 // #define QUANT_LFTA3_SIZE 50
27 #define QUANT_LFTA1_SIZE 202
28 #define QUANT_LFTA2_SIZE 49
29 #define QUANT_LFTA3_SIZE 25
33 // #define QUANT_EPS 0.01
34 // #define SKIPDIR_SIZE 100
35 // #define SKIPDIR_HEIGHT_MAX 7
36 //#define max(a,b) ((a) > (b) ? (a) : (b))
38 #define COMPRESSED_XFER
40 /****************************************************************/
42 /****************************************************************/
43 template <class T> struct tuple_t {
52 template <class T> struct skipnode_t {
58 template <class T> struct skipdir_t {
59 gs_uint32_t height; // height of tree
60 gs_uint32_t freeptr; // cursor space stack
61 gs_uint32_t headptr[SKIPDIR_HEIGHT_MAX+1]; // ptrs to levels
62 skipnode_t<T> list[SKIPDIR_SIZE+1];
66 /****************************************************************/
68 // fstring(5+(QUANT_LFTA3_SIZE+1)*4 +
69 // (2+lg(QUANT_LFTA3_SIZE)+(QUANT_LFTA3_SIZE+1)*3)*4)
70 template <class T> struct quant_udaf_lfta3_struct_t {
71 gs_uint32_t nelts; // # stream elements
72 gs_uint32_t freeptr; // ptr to cursor stack
73 gs_uint32_t usedptr; // ptr to allocated memory
74 gs_uint32_t circptr; // circulating ptr used for compression
76 tuple_t<T> t[QUANT_LFTA3_SIZE+1]; // samples + auxiliary info
77 skipdir_t<T> sd; // directory for searching tuples
80 /****************************************************************/
81 /* Skip List Functions */
82 /****************************************************************/
84 // Skip list cursor stack operations
85 template <class T> gs_uint32_t skipdir_alloc(skipdir_t<T> *sd)
87 gs_uint32_t ptr = sd->freeptr;
89 sd->freeptr = sd->list[ptr].next;
90 //printf("skipdir_alloc %d\n",ptr);
94 template <class T> void skipdir_free(skipdir_t<T> *sd, gs_uint32_t ptr)
96 sd->list[ptr].val = 0;
97 sd->list[ptr].down = 0;
98 sd->list[ptr].next = sd->freeptr;
100 //printf("skipdir_free %d\n",ptr);
104 template <class T> void skipdir_create(skipdir_t<T> *sd)
110 for (i=0; i < SKIPDIR_HEIGHT_MAX; i++)
112 for (i=1; i < SKIPDIR_SIZE; i++)
113 sd->list[i].next = i+1;
114 sd->list[SKIPDIR_SIZE].next = 0;
117 template <class T> void skipdir_destroy(skipdir_t<T> *sd)
123 template <class T> void skipdir_search(skipdir_t<T> *sd, T val, gs_uint32_t *ptrstack)
128 if (sd->height == 0) {
129 ptrstack[0] = ptrstack[1] = 0;
132 // search nonleaf nodes
133 ptr = sd->headptr[sd->height-1];
134 for (l=sd->height-1; l >= 0; l--) {
137 ptr = (l > 0) ? sd->headptr[l-1] : 0;
139 else if (val <= sd->list[ptr].val) {
141 ptr = (l > 0) ? sd->headptr[l-1] : 0;
144 while ((sd->list[ptr].next != 0) &&
145 (sd->list[sd->list[ptr].next].val < val))
146 ptr = sd->list[ptr].next;
148 ptr = sd->list[ptr].down;
155 template <class T> void skipdir_insert(skipdir_t<T> *sd, gs_uint32_t *ptrstack,
156 gs_uint32_t leafptr, T val)
158 gs_uint32_t newptr, oldptr;
161 // if path already existed then point to new duplicate
162 if ((ptrstack[1] == 0) && (sd->headptr[0] != 0)
163 && (sd->list[sd->headptr[0]].val == val)) {
164 sd->list[sd->headptr[0]].down = leafptr;
167 if ((ptrstack[1] != 0) && (sd->list[ptrstack[1]].next != 0)
168 && (sd->list[sd->list[ptrstack[1]].next].val == val)) {
169 sd->list[sd->list[ptrstack[1]].next].down = leafptr;
173 for (l=0; l < SKIPDIR_HEIGHT_MAX; l++) {
174 if (random() % 2) break;
175 newptr = skipdir_alloc<T>(sd);
176 if (!newptr) break; // out of memory
177 sd->list[newptr].val = val;
178 //copy(&val, &list[newptr[l]].val);
179 // link new directory node to level below
181 sd->list[newptr].down = oldptr;
183 sd->list[newptr].down = leafptr;
184 // insert node into current level
185 if ((l >= sd->height) || (ptrstack[l+1] == 0)) {
186 sd->list[newptr].next = sd->headptr[l];
187 sd->headptr[l] = newptr;
190 sd->list[newptr].next = sd->list[ptrstack[l+1]].next;
191 sd->list[ptrstack[l+1]].next = newptr;
195 if (l > sd->height) sd->height = l;
196 //fprintf(stderr,"new height = %u\n",sd->height);
200 template <class T> void skipdir_delete(skipdir_t<T> *sd, gs_uint32_t *ptrstack, T val)
205 for (l=0; l < sd->height; l++) {
206 if (ptrstack[l+1] == 0) {
207 delptr = sd->headptr[l];
208 if (delptr == 0) break;
209 if (sd->list[delptr].val == val) {
210 sd->headptr[l] = sd->list[delptr].next;
211 skipdir_free<T>(sd, delptr);
217 delptr = sd->list[ptrstack[l+1]].next;
218 if (delptr == 0) break;
219 if (sd->list[delptr].val == val) {
220 sd->list[ptrstack[l+1]].next = sd->list[delptr].next;
221 skipdir_free<T>(sd, delptr);
230 template <class T> void skipdir_print(skipdir_t<T> *sd)
235 for (l=sd->height-1; l >= 0; l--) {
236 for (ptr=sd->headptr[l]; ptr != 0; ptr=sd->list[ptr].next)
237 fprintf(stderr,"%u ", sd->list[ptr].val);
238 fprintf(stderr,"\n");
240 fprintf(stderr,"-------\n");
241 for (l=sd->height-1; l > 0; l--) {
242 for (ptr=sd->headptr[l]; ptr != 0; ptr=sd->list[ptr].next)
243 fprintf(stderr,"%u ", sd->list[sd->list[ptr].down].val);
244 fprintf(stderr,"\n");
246 fprintf(stderr,"-------\n");
252 /*************************** Version 3 **************************/
253 /* Version 3: LFTA-medium */
255 /* NIC performs O(log n) operations at each update. */
256 /****************************************************************/
258 /****************************************************************/
259 /* Helper functions */
260 /****************************************************************/
261 template <class T> gs_uint32_t quant_udaf_lfta3_cursor_alloc(quant_udaf_lfta3_struct_t<T> *s)
263 gs_uint32_t ptr = s->freeptr;
264 if (s->freeptr != 0) s->freeptr = s->t[ptr].next;
266 // printf("quant_udaf_lfta3_cursor_alloc %d freeptr %d\n",ptr, s->freeptr);
270 template <class T> void quant_udaf_lfta3_cursor_free(quant_udaf_lfta3_struct_t<T> *s, gs_uint32_t ptr)
272 s->t[ptr].next = s->freeptr;
275 //printf("quant_udaf_lfta3_cursor_free %d\n",ptr);
278 template <class T> void quant_lfta3_print(quant_udaf_lfta3_struct_t<T> *s)
281 gs_uint32_t ptr = s->usedptr;
284 fprintf(stderr,"<empty>\n");
287 //skipdir_print(&s->sd);
288 for (; ptr != 0; ptr=t[ptr].next) {
289 fprintf(stderr,"(%u, %u, %u) ",t[ptr].val,t[ptr].gap,t[ptr].del);
291 fprintf(stderr,"\n");
294 template <class T> void quant_lfta3_compress(quant_udaf_lfta3_struct_t<T> *s)
296 tuple_t<T> *t = s->t;
298 gs_uint32_t threshold;
299 gs_uint32_t ptrstack[SKIPDIR_HEIGHT_MAX+5];
301 threshold = (gs_uint32_t)ceil(2.0 * QUANT_EPS * (gs_float_t)s->nelts);
302 //if(s->circptr < 0 || s->circptr >= QUANT_LFTA3_SIZE)
303 // printf("1) s->circptr = %d\n",s->circptr);
304 //if(t[s->circptr].next < 0 || t[s->circptr].next >= QUANT_LFTA3_SIZE)
305 // printf("t[s->circptr].next = %d\n",t[s->circptr].next);
306 if ((s->circptr == 0) || (t[s->circptr].next == 0)
307 || (t[t[s->circptr].next].next == 0))
308 s->circptr = s->usedptr;
309 //if ((s->size % 10) != 0) return;
311 //if(s->circptr < 0 || s->circptr >= QUANT_LFTA3_SIZE)
312 // printf("2) s->circptr = %d\n",s->circptr);
313 delptr = t[s->circptr].next;
314 //if(delptr < 0 || delptr >= QUANT_LFTA3_SIZE)
315 // printf("delptr = %d\n",delptr);
316 //if(t[delptr].next < 0 || t[delptr].next >= QUANT_LFTA3_SIZE)
317 // printf("t[delptr].next = %d\n",t[delptr].next);
318 if (t[delptr].gap+t[t[delptr].next].gap+t[t[delptr].next].del < threshold) {
319 // delete from directory
320 if (t[s->circptr].val != t[delptr].val) {
321 // leftmost duplicate (if multiplicity)
322 skipdir_search<T>(&(s->sd), t[delptr].val, ptrstack);
323 if (t[delptr].val == t[t[delptr].next].val) {
324 //if(s->sd.headptr[0] < 0 || s->sd.headptr[0] >= QUANT_LFTA3_SIZE)
325 // printf("s->sd.headptr[0] = %d\n",s->sd.headptr[0]);
327 if ((ptrstack[1] == 0)
328 && (s->sd.headptr[0] != 0)
329 && (s->sd.list[s->sd.headptr[0]].val == t[delptr].val))
330 s->sd.list[s->sd.headptr[0]].down = t[delptr].next;
331 else if ((ptrstack[1] != 0)
332 && (s->sd.list[ptrstack[1]].next != 0)
333 && (s->sd.list[s->sd.list[ptrstack[1]].next].val == t[delptr].val))
334 s->sd.list[s->sd.list[ptrstack[1]].next].down = t[delptr].next;
337 // non-duplicates case
338 skipdir_delete<T>(&(s->sd), ptrstack, t[delptr].val);
342 //fprintf(stderr,"DELETED %u\n", t[delptr].val);
343 t[s->circptr].next = t[delptr].next;
344 quant_udaf_lfta3_cursor_free<T>(s, delptr);
347 s->circptr = t[s->circptr].next;
353 /****************************************************************/
354 /* LFTA3 functions */
355 /****************************************************************/
356 template <class T> void quant_udaf_lfta3_LFTA_AGGR_INIT_(gs_sp_t b) {
358 //printf("LFTA, sizeof(quant_udaf_lfta3_struct_t) is %lu\n",sizeof(quant_udaf_lfta3_struct_t<T>));
359 quant_udaf_lfta3_struct_t<T> *s = (quant_udaf_lfta3_struct_t<T> *)b;
361 s->usedptr = 0; // NULL ptr
363 // initialize cursor stack
366 for (i=1; i < QUANT_LFTA3_SIZE; i++)
368 s->t[QUANT_LFTA3_SIZE].next = 0;
369 skipdir_create<T>(&(s->sd));
371 //printf("sizeof(quant_udaf_lfta3_struct_t)=%lu\n",sizeof(quant_udaf_lfta3_struct_t<T>));
374 template <class T> void quant_udaf_lfta3_LFTA_AGGR_UPDATE_(gs_sp_t b, T v)
376 quant_udaf_lfta3_struct_t<T> *s = (quant_udaf_lfta3_struct_t<T> *)b;
377 tuple_t<T> *t = s->t;
378 gs_uint32_t ptr = s->usedptr;
379 gs_uint32_t newptr, delptr;
380 gs_uint32_t obj; // objective function
381 gs_uint32_t threshold;
382 gs_uint32_t ptrstack[SKIPDIR_HEIGHT_MAX+5];
383 gs_uint32_t debugptr;
385 //printf("AGGR_UPDATE start\n");
387 //fprintf(stderr,"nelts = %u\n",s->nelts);
388 // left boundary case
389 if ((ptr == 0) || (v < t[ptr].val)) {
390 if (t[ptr].val == v) {
392 //printf("AGGR_UPDATE END 1\n");
395 //printf("allocating (1) for %u ",v);
396 newptr = quant_udaf_lfta3_cursor_alloc<T>(s);
398 gslog(LOG_ALERT, "Out of space in quant_udaf_lfta3_LFTA_AGGR_UPDATE_ (1).\n");
404 t[newptr].next = s->usedptr;
406 //printf("AGGR_UPDATE END 2\n");
410 // locate $i$ such that (v_i-1 < v <= v_i)
411 skipdir_search<T>(&(s->sd), v, ptrstack);
413 //ptr = (ptrstack[0] == 0) ? s->usedptr : s->sd.list[ptrstack[0]].down;
414 ptr = (ptrstack[0] == 0) ? s->usedptr : ptrstack[0];
415 while ((t[ptr].next != 0) && (t[t[ptr].next].val < v))
420 if ((t[ptr].next != 0) && (t[t[ptr].next].val == v)) {
421 t[t[ptr].next].gap++;
422 printf("AGGR_UPDATE END 3\n");
427 // right boundary case
428 if (t[ptr].next == 0) {
429 //printf("allocating (2) for %u ",v);
430 newptr = quant_udaf_lfta3_cursor_alloc<T>(s);
432 gslog(LOG_ALERT, "Out of space in quant_udaf_lfta3_LFTA_AGGR_UPDATE_ (2).\n");
439 t[ptr].next = newptr;
440 //printf("AGGR_UPDATE END 4\n");
445 //printf("1) t[ptr].next =%d, ptr=%d\n",t[ptr].next,ptr);
446 obj = t[ptr].gap+t[t[ptr].next].gap+t[t[ptr].next].del;
447 threshold = (gs_uint32_t)ceil(2.0 * QUANT_EPS * (gs_float_t)s->nelts);
448 if (obj > threshold) {
449 //printf("allocating (3) for %u ",v);
450 newptr = quant_udaf_lfta3_cursor_alloc<T>(s);
452 gslog(LOG_ALERT, "Out of space in quant_udaf_lfta3_LFTA_AGGR_UPDATE_ (3).\n");
455 //printf("newptr=%d\n",newptr);
458 t[newptr].del = t[t[ptr].next].gap+t[t[ptr].next].del - 1;
459 t[newptr].next = t[ptr].next;
460 t[ptr].next = newptr;
461 skipdir_insert<T>(&(s->sd), ptrstack, newptr, v);
464 // insert into existing bucket
465 //printf("t[ptr].next =%d\n",t[ptr].next);
466 t[t[ptr].next].gap++;
468 quant_lfta3_compress<T>(s);
469 //printf("AGGR_UPDATE END 5\n");
472 template <class T> gs_int32_t quant_udaf_lfta3_LFTA_AGGR_FLUSHME_(gs_sp_t b) {
473 quant_udaf_lfta3_struct_t<T> *s = (quant_udaf_lfta3_struct_t<T> *)b;
482 template <class T> void quant_udaf_lfta3_LFTA_AGGR_OUTPUT_(struct gs_string *r, gs_sp_t b)
484 #ifdef COMPRESSED_XFER
485 quant_udaf_lfta3_struct_t<T> *s = (quant_udaf_lfta3_struct_t<T> *)b;
486 tuple_t<T> tmp[QUANT_LFTA3_SIZE+1];
487 gs_uint32_t ptr=s->usedptr;
490 for (; ptr != 0; ptr=s->t[ptr].next) {
491 tmp[i].val = s->t[ptr].val;
492 tmp[i].gap = s->t[ptr].gap;
493 tmp[i].del = s->t[ptr].del;
496 for (j=1; j <= i; j++) {
497 s->t[j].val = tmp[j-1].val;
498 s->t[j].gap = tmp[j-1].gap;
499 s->t[j].del = tmp[j-1].del;
505 // r->length = (5 + 4*(i+1))*sizeof(gs_uint32_t);
506 r->length = 5*sizeof(gs_uint32_t) + (i+1)*sizeof(tuple_t<T>);
508 #ifndef COMPRESSED_XFER
509 r->length = sizeof(quant_udaf_lfta3_struct_t<T>);
511 //printf("OUTPUT, size is %d\n",r->length);
515 template <class T> void quant_udaf_lfta3_LFTA_AGGR_DESTROY_(gs_sp_t b)
521 // -----------------------------------------------------------------
525 void quant_ui_udaf_lfta3_LFTA_AGGR_INIT_(gs_sp_t b){
526 quant_udaf_lfta3_LFTA_AGGR_INIT_<gs_uint32_t>(b);
528 void quant_ui_udaf_lfta3_LFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v){
529 quant_udaf_lfta3_LFTA_AGGR_UPDATE_<gs_uint32_t>(b, v);
531 gs_int32_t quant_ui_udaf_lfta3_LFTA_AGGR_FLUSHME_(gs_sp_t b){
532 return quant_udaf_lfta3_LFTA_AGGR_FLUSHME_<gs_uint32_t>(b);
534 void quant_ui_udaf_lfta3_LFTA_AGGR_OUTPUT_(struct gs_string *r, gs_sp_t b){
535 quant_udaf_lfta3_LFTA_AGGR_OUTPUT_<gs_uint32_t>(r, b);
537 void quant_ui_udaf_lfta3_LFTA_AGGR_DESTROY_(gs_sp_t b){
538 quant_udaf_lfta3_LFTA_AGGR_DESTROY_<gs_uint32_t>(b);
542 void quant_i_udaf_lfta3_LFTA_AGGR_INIT_(gs_sp_t b){
543 quant_udaf_lfta3_LFTA_AGGR_INIT_<gs_int32_t>(b);
545 void quant_i_udaf_lfta3_LFTA_AGGR_UPDATE_(gs_sp_t b, gs_int32_t v){
546 quant_udaf_lfta3_LFTA_AGGR_UPDATE_<gs_int32_t>(b, v);
548 gs_int32_t quant_i_udaf_lfta3_LFTA_AGGR_FLUSHME_(gs_sp_t b){
549 return quant_udaf_lfta3_LFTA_AGGR_FLUSHME_<gs_int32_t>(b);
551 void quant_i_udaf_lfta3_LFTA_AGGR_OUTPUT_(struct gs_string *r, gs_sp_t b){
552 quant_udaf_lfta3_LFTA_AGGR_OUTPUT_<gs_int32_t>(r, b);
554 void quant_i_udaf_lfta3_LFTA_AGGR_DESTROY_(gs_sp_t b){
555 quant_udaf_lfta3_LFTA_AGGR_DESTROY_<gs_int32_t>(b);
559 void quant_ul_udaf_lfta3_LFTA_AGGR_INIT_(gs_sp_t b){
560 quant_udaf_lfta3_LFTA_AGGR_INIT_<gs_uint64_t>(b);
562 void quant_ul_udaf_lfta3_LFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint64_t v){
563 quant_udaf_lfta3_LFTA_AGGR_UPDATE_<gs_uint64_t>(b, v);
565 gs_int32_t quant_ul_udaf_lfta3_LFTA_AGGR_FLUSHME_(gs_sp_t b){
566 return quant_udaf_lfta3_LFTA_AGGR_FLUSHME_<gs_uint64_t>(b);
568 void quant_ul_udaf_lfta3_LFTA_AGGR_OUTPUT_(struct gs_string *r, gs_sp_t b){
569 quant_udaf_lfta3_LFTA_AGGR_OUTPUT_<gs_uint64_t>(r, b);
571 void quant_ul_udaf_lfta3_LFTA_AGGR_DESTROY_(gs_sp_t b){
572 quant_udaf_lfta3_LFTA_AGGR_DESTROY_<gs_uint64_t>(b);
577 void quant_l_udaf_lfta3_LFTA_AGGR_INIT_(gs_sp_t b){
578 quant_udaf_lfta3_LFTA_AGGR_INIT_<gs_int64_t>(b);
580 void quant_l_udaf_lfta3_LFTA_AGGR_UPDATE_(gs_sp_t b, gs_int64_t v){
581 quant_udaf_lfta3_LFTA_AGGR_UPDATE_<gs_int64_t>(b, v);
583 gs_int32_t quant_l_udaf_lfta3_LFTA_AGGR_FLUSHME_(gs_sp_t b){
584 return quant_udaf_lfta3_LFTA_AGGR_FLUSHME_<gs_int64_t>(b);
586 void quant_l_udaf_lfta3_LFTA_AGGR_OUTPUT_(struct gs_string *r, gs_sp_t b){
587 quant_udaf_lfta3_LFTA_AGGR_OUTPUT_<gs_int64_t>(r, b);
589 void quant_l_udaf_lfta3_LFTA_AGGR_DESTROY_(gs_sp_t b){
590 quant_udaf_lfta3_LFTA_AGGR_DESTROY_<gs_int64_t>(b);
595 void quant_f_udaf_lfta3_LFTA_AGGR_INIT_(gs_sp_t b){
596 quant_udaf_lfta3_LFTA_AGGR_INIT_<gs_float_t>(b);
598 void quant_f_udaf_lfta3_LFTA_AGGR_UPDATE_(gs_sp_t b, gs_float_t v){
599 quant_udaf_lfta3_LFTA_AGGR_UPDATE_<gs_float_t>(b, v);
601 gs_int32_t quant_f_udaf_lfta3_LFTA_AGGR_FLUSHME_(gs_sp_t b){
602 return quant_udaf_lfta3_LFTA_AGGR_FLUSHME_<gs_float_t>(b);
604 void quant_f_udaf_lfta3_LFTA_AGGR_OUTPUT_(struct gs_string *r, gs_sp_t b){
605 quant_udaf_lfta3_LFTA_AGGR_OUTPUT_<gs_float_t>(r, b);
607 void quant_f_udaf_lfta3_LFTA_AGGR_DESTROY_(gs_sp_t b){
608 quant_udaf_lfta3_LFTA_AGGR_DESTROY_<gs_float_t>(b);