Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscplftaaux / flip_udaf.c
diff --git a/src/lib/gscplftaaux/flip_udaf.c b/src/lib/gscplftaaux/flip_udaf.c
new file mode 100644 (file)
index 0000000..9a8fbf9
--- /dev/null
@@ -0,0 +1,502 @@
+#include <stdio.h>\r
+#include <limits.h>\r
+#include <math.h>\r
+#include "rts_udaf.h"\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+\r
+/*      Full size\r
+#define QUANT_LFTA1_SIZE 729\r
+#define QUANT_LFTA2_SIZE 181\r
+#define QUANT_LFTA3_SIZE 100\r
+*/\r
+\r
+/*      half size\r
+*/\r
+#define QUANT_LFTA1_SIZE 378\r
+#define QUANT_LFTA2_SIZE 93\r
+#define QUANT_LFTA3_SIZE 50\r
+\r
+/*      quarter size\r
+#define QUANT_LFTA1_SIZE 202\r
+#define QUANT_LFTA2_SIZE 49\r
+#define QUANT_LFTA3_SIZE 25\r
+*/\r
+\r
+\r
+#define QUANT_EPS 0.01\r
+#define SKIPDIR_SIZE 100\r
+#define SKIPDIR_HEIGHT_MAX 7\r
+#define max(a,b) ((a) > (b) ? (a) : (b))\r
+#define COMPRESSED_XFER\r
+\r
+/****************************************************************/\r
+/* Data Structures                                             */\r
+/****************************************************************/\r
+typedef struct tuple_t {\r
+       gs_uint32_t val;\r
+       gs_uint32_t gap;\r
+       gs_uint32_t del;\r
+       gs_uint32_t next;\r
+} tuple_t;\r
+\r
+// For skip list\r
+typedef gs_uint32_t val_type;\r
+\r
+typedef struct skipnode {\r
+       val_type val;\r
+       gs_uint32_t next;\r
+       gs_uint32_t down;\r
+} skipnode_t;\r
+\r
+typedef struct skipdir {\r
+       gs_uint32_t height;                             // height of tree\r
+       gs_uint32_t freeptr;                            // cursor space stack\r
+       gs_uint32_t headptr[SKIPDIR_HEIGHT_MAX+1];      // ptrs to levels\r
+       skipnode_t list[SKIPDIR_SIZE+1];\r
+} skipdir_t;\r
+\r
+\r
+/****************************************************************/\r
+\r
+// fstring(5+(QUANT_LFTA3_SIZE+1)*4 +\r
+//         (2+lg(QUANT_LFTA3_SIZE)+(QUANT_LFTA3_SIZE+1)*3)*4)\r
+typedef struct quant_udaf_lfta3_struct_t {\r
+       gs_uint32_t nelts;      // # stream elements\r
+       gs_uint32_t freeptr;    // ptr to cursor stack\r
+       gs_uint32_t usedptr;    // ptr to allocated memory\r
+       gs_uint32_t circptr;    // circulating ptr used for compression\r
+       gs_uint32_t size;\r
+       tuple_t t[QUANT_LFTA3_SIZE+1];  // samples + auxiliary info\r
+       skipdir_t sd;           // directory for searching tuples\r
+} quant_udaf_lfta3_struct_t;\r
+\r
+/****************************************************************/\r
+/* Skip List Functions                                         */\r
+/****************************************************************/\r
+\r
+// Skip list cursor stack operations\r
+gs_uint32_t skipdir_alloc(skipdir_t *sd)\r
+{\r
+       gs_uint32_t ptr = sd->freeptr;\r
+       if (sd->freeptr != 0)\r
+               sd->freeptr = sd->list[ptr].next;\r
+       return ptr;\r
+}\r
+\r
+void skipdir_free(skipdir_t *sd, gs_uint32_t ptr)\r
+{\r
+       sd->list[ptr].val = 0;\r
+       sd->list[ptr].down = 0;\r
+       sd->list[ptr].next = sd->freeptr;\r
+       sd->freeptr = ptr;\r
+}\r
+\r
+\r
+void skipdir_create(skipdir_t *sd)\r
+{\r
+       gs_int32_t i;\r
+\r
+       sd->height = 0;\r
+       sd->freeptr = 1;\r
+       for (i=0; i < SKIPDIR_HEIGHT_MAX; i++)\r
+               sd->headptr[i] = 0;\r
+       for (i=1; i < SKIPDIR_SIZE; i++)\r
+               sd->list[i].next = i+1;\r
+       sd->list[SKIPDIR_SIZE].next = 0;\r
+}\r
+\r
+void skipdir_destroy(skipdir_t *sd)\r
+{\r
+       sd->height = 0;\r
+}\r
+\r
+\r
+void skipdir_search(skipdir_t *sd, val_type val, gs_uint32_t *ptrstack)\r
+{\r
+       gs_uint32_t ptr;\r
+       gs_int32_t l;\r
+\r
+       if (sd->height == 0) {\r
+               ptrstack[0] = ptrstack[1] = 0;\r
+               return;\r
+       }\r
+       // search nonleaf nodes\r
+       ptr = sd->headptr[sd->height-1];\r
+       for (l=sd->height-1; l >= 0; l--) {\r
+               if (ptr == 0) {\r
+                       ptrstack[l+1] = 0;\r
+                       ptr = (l > 0) ? sd->headptr[l-1] : 0;\r
+               }\r
+               else if (val <= sd->list[ptr].val) {\r
+                       ptrstack[l+1] = 0;\r
+                       ptr = (l > 0) ? sd->headptr[l-1] : 0;\r
+               }\r
+               else {\r
+                       while ((sd->list[ptr].next != 0) &&\r
+                       (sd->list[sd->list[ptr].next].val < val))\r
+                               ptr = sd->list[ptr].next;\r
+                       ptrstack[l+1] = ptr;\r
+                       ptr = sd->list[ptr].down;\r
+               }\r
+       }\r
+       ptrstack[0] = ptr;\r
+}\r
+\r
+\r
+void skipdir_insert(skipdir_t *sd, gs_uint32_t *ptrstack,\r
+                       gs_uint32_t leafptr, val_type val)\r
+{\r
+       gs_uint32_t newptr, oldptr;\r
+       gs_int32_t l;\r
+\r
+       // if path already existed then point to new duplicate\r
+       if ((ptrstack[1] == 0) && (sd->headptr[0] != 0)\r
+       && (sd->list[sd->headptr[0]].val == val)) {\r
+               sd->list[sd->headptr[0]].down = leafptr;\r
+               return;\r
+       }\r
+       if ((ptrstack[1] != 0) && (sd->list[ptrstack[1]].next != 0)\r
+       && (sd->list[sd->list[ptrstack[1]].next].val == val)) {\r
+               sd->list[sd->list[ptrstack[1]].next].down = leafptr;\r
+               return;\r
+       }\r
+\r
+       for (l=0; l < SKIPDIR_HEIGHT_MAX; l++) {\r
+               if (random() % 2) break;\r
+               newptr = skipdir_alloc(sd);\r
+               if (!newptr) break;     // out of memory\r
+               sd->list[newptr].val = val;\r
+               //copy(&val, &list[newptr[l]].val);\r
+               // link new directory node to level below\r
+               if (l > 0)\r
+                       sd->list[newptr].down = oldptr;\r
+               else\r
+                       sd->list[newptr].down = leafptr;\r
+               // insert node into current level\r
+               if ((l >= sd->height) || (ptrstack[l+1] == 0)) {\r
+                       sd->list[newptr].next = sd->headptr[l];\r
+                       sd->headptr[l] = newptr;\r
+               }\r
+               else {\r
+                       sd->list[newptr].next = sd->list[ptrstack[l+1]].next;\r
+                       sd->list[ptrstack[l+1]].next = newptr;\r
+               }\r
+               oldptr = newptr;\r
+       }\r
+       if (l > sd->height) sd->height = l;\r
+       //fprintf(stderr,"new height = %u\n",sd->height);\r
+}\r
+\r
+\r
+void skipdir_delete(skipdir_t *sd, gs_uint32_t *ptrstack, val_type val)\r
+{\r
+       gs_uint32_t delptr;\r
+       gs_int32_t l;\r
+\r
+       for (l=0; l < sd->height; l++) {\r
+               if (ptrstack[l+1] == 0) {\r
+                       delptr = sd->headptr[l];\r
+                       if (delptr == 0) break;\r
+                       if (sd->list[delptr].val == val) {\r
+                               sd->headptr[l] = sd->list[delptr].next;\r
+                               skipdir_free(sd, delptr);\r
+                       }\r
+                       else\r
+                               break;\r
+               }\r
+               else {\r
+                       delptr = sd->list[ptrstack[l+1]].next;\r
+                       if (delptr == 0) break;\r
+                       if (sd->list[delptr].val == val) {\r
+                               sd->list[ptrstack[l+1]].next = sd->list[delptr].next;\r
+                               skipdir_free(sd, delptr);\r
+                       }\r
+                       else\r
+                               break;\r
+               }\r
+       }\r
+}\r
+\r
+// For Debugging\r
+void skipdir_print(skipdir_t *sd)\r
+{\r
+       gs_uint32_t ptr;\r
+       gs_int32_t l;\r
+\r
+       for (l=sd->height-1; l >= 0; l--) {\r
+               for (ptr=sd->headptr[l]; ptr != 0; ptr=sd->list[ptr].next)\r
+                       fprintf(stderr,"%u ", sd->list[ptr].val);\r
+               fprintf(stderr,"\n");\r
+       }\r
+       fprintf(stderr,"-------\n");\r
+       for (l=sd->height-1; l > 0; l--) {\r
+               for (ptr=sd->headptr[l]; ptr != 0; ptr=sd->list[ptr].next)\r
+                       fprintf(stderr,"%u ", sd->list[sd->list[ptr].down].val);\r
+               fprintf(stderr,"\n");\r
+       }\r
+       fprintf(stderr,"-------\n");\r
+}\r
+\r
+\r
+\r
+\r
+/*************************** Version 3 **************************/\r
+/* Version 3: LFTA-medium                                      */\r
+/*                                                             */\r
+/* NIC performs O(log n) operations at each update.            */\r
+/****************************************************************/\r
+\r
+/****************************************************************/\r
+/* Helper functions                                            */\r
+/****************************************************************/\r
+gs_uint32_t quant_udaf_lfta3_cursor_alloc(quant_udaf_lfta3_struct_t *s)\r
+{\r
+       gs_uint32_t ptr = s->freeptr;\r
+       if (s->freeptr != 0) s->freeptr = s->t[ptr].next;\r
+       s->size++;\r
+       return ptr;\r
+}\r
+\r
+void quant_udaf_lfta3_cursor_free(quant_udaf_lfta3_struct_t *s, gs_uint32_t ptr)\r
+{\r
+       s->t[ptr].next = s->freeptr;\r
+       s->freeptr = ptr;\r
+       s->size--;\r
+}\r
+\r
+void quant_lfta3_print(quant_udaf_lfta3_struct_t *s)\r
+{\r
+       tuple_t *t=s->t;\r
+       gs_uint32_t ptr = s->usedptr;\r
+\r
+       if (ptr == 0) {\r
+               fprintf(stderr,"<empty>\n");\r
+               return;\r
+       }\r
+       //skipdir_print(&s->sd);\r
+       for (; ptr != 0; ptr=t[ptr].next) {\r
+               fprintf(stderr,"(%u, %u, %u) ",t[ptr].val,t[ptr].gap,t[ptr].del);\r
+       }\r
+       fprintf(stderr,"\n");\r
+}\r
+\r
+void quant_lfta3_compress(quant_udaf_lfta3_struct_t *s)\r
+{\r
+       tuple_t *t = s->t;\r
+       gs_uint32_t delptr;\r
+       gs_uint32_t threshold;\r
+       gs_uint32_t ptrstack[SKIPDIR_HEIGHT_MAX+5];\r
+\r
+       threshold = (gs_uint32_t)ceil(2.0 * QUANT_EPS * (gs_float_t)s->nelts);\r
+//if(s->circptr < 0 || s->circptr >= QUANT_LFTA3_SIZE)\r
+// printf("1) s->circptr = %d\n",s->circptr);\r
+//if(t[s->circptr].next < 0 || t[s->circptr].next >= QUANT_LFTA3_SIZE)\r
+// printf("t[s->circptr].next = %d\n",t[s->circptr].next);\r
+       if ((s->circptr == 0) || (t[s->circptr].next == 0)\r
+       || (t[t[s->circptr].next].next == 0))\r
+               s->circptr = s->usedptr;\r
+       //if ((s->size % 10) != 0) return;\r
+       if (s->nelts > 2) {\r
+//if(s->circptr < 0 || s->circptr >= QUANT_LFTA3_SIZE)\r
+// printf("2) s->circptr = %d\n",s->circptr);\r
+               delptr = t[s->circptr].next;\r
+//if(delptr < 0 || delptr >= QUANT_LFTA3_SIZE)\r
+// printf("delptr = %d\n",delptr);\r
+//if(t[delptr].next < 0 || t[delptr].next >= QUANT_LFTA3_SIZE)\r
+// printf("t[delptr].next = %d\n",t[delptr].next);\r
+               if (t[delptr].gap+t[t[delptr].next].gap+t[t[delptr].next].del < threshold) {\r
+                       // delete from directory\r
+                       if (t[s->circptr].val != t[delptr].val) {\r
+                               // leftmost duplicate (if multiplicity)\r
+                               skipdir_search(&(s->sd), t[delptr].val, ptrstack);\r
+                               if (t[delptr].val == t[t[delptr].next].val) {\r
+//if(s->sd.headptr[0] < 0 || s->sd.headptr[0] >= QUANT_LFTA3_SIZE)\r
+// printf("s->sd.headptr[0] = %d\n",s->sd.headptr[0]);\r
+                                       // duplicates case\r
+                                       if ((ptrstack[1] == 0)\r
+                                       && (s->sd.headptr[0] != 0)\r
+                                       && (s->sd.list[s->sd.headptr[0]].val == t[delptr].val))\r
+                                               s->sd.list[s->sd.headptr[0]].down = t[delptr].next;\r
+                                       else if ((ptrstack[1] != 0)\r
+                                       && (s->sd.list[ptrstack[1]].next != 0)\r
+                                       && (s->sd.list[s->sd.list[ptrstack[1]].next].val == t[delptr].val))\r
+                                               s->sd.list[s->sd.list[ptrstack[1]].next].down = t[delptr].next;\r
+                               }\r
+                               else {\r
+                                       // non-duplicates case\r
+                                       skipdir_delete(&(s->sd), ptrstack, t[delptr].val);\r
+                               }\r
+                       }\r
+                       // delete from list\r
+                       //fprintf(stderr,"DELETED %u\n", t[delptr].val);\r
+                       t[s->circptr].next = t[delptr].next;\r
+                       quant_udaf_lfta3_cursor_free(s, delptr);\r
+               }\r
+               else {\r
+                       s->circptr = t[s->circptr].next;\r
+               }\r
+       }\r
+}\r
+\r
+\r
+/****************************************************************/\r
+/* LFTA3 functions                                             */\r
+/****************************************************************/\r
+void quant_udaf_lfta3_LFTA_AGGR_INIT_(gs_sp_t b)\r
+{\r
+       gs_uint32_t i;\r
+\r
+       quant_udaf_lfta3_struct_t *s = (quant_udaf_lfta3_struct_t *)b;\r
+       s->nelts = 0;\r
+       s->usedptr = 0;         // NULL ptr\r
+       s->circptr = 0;\r
+       // initialize cursor stack\r
+       s->freeptr = 1;\r
+       s->size = 0;\r
+       for (i=1; i < QUANT_LFTA3_SIZE; i++)\r
+               s->t[i].next = i+1;\r
+       s->t[QUANT_LFTA3_SIZE].next = 0;\r
+       skipdir_create(&(s->sd));\r
+}\r
+\r
+void quant_udaf_lfta3_LFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v)\r
+{\r
+       quant_udaf_lfta3_struct_t *s = (quant_udaf_lfta3_struct_t *)b;\r
+       tuple_t *t = s->t;\r
+       gs_uint32_t ptr = s->usedptr;\r
+       gs_uint32_t newptr, delptr;\r
+       gs_uint32_t obj;        // objective function\r
+       gs_uint32_t threshold;\r
+       gs_uint32_t ptrstack[SKIPDIR_HEIGHT_MAX+5];\r
+       gs_uint32_t debugptr;\r
+\r
+//printf("AGGR_UPDATE start\n");\r
+       s->nelts++;\r
+       //fprintf(stderr,"nelts = %u\n",s->nelts);\r
+       // left boundary case\r
+       if ((ptr == 0) || (v < t[ptr].val)) {\r
+               if (t[ptr].val == v) {\r
+                       t[ptr].gap++;\r
+//printf("AGGR_UPDATE END 1\n");\r
+                       return;\r
+               }\r
+               newptr = quant_udaf_lfta3_cursor_alloc(s);\r
+               if (newptr == 0) {\r
+                       gslog(LOG_ALERT, "Out of space.\n");\r
+                       return;\r
+               }\r
+               t[newptr].val = v;\r
+               t[newptr].gap = 1;\r
+               t[newptr].del = 0;\r
+               t[newptr].next = s->usedptr;\r
+               s->usedptr = newptr;\r
+//printf("AGGR_UPDATE END 2\n");\r
+               return;\r
+       }\r
+\r
+       // locate $i$ such that (v_i-1 < v <= v_i)\r
+       skipdir_search(&(s->sd), v, ptrstack);\r
+\r
+       //ptr = (ptrstack[0] == 0) ? s->usedptr : s->sd.list[ptrstack[0]].down;\r
+       ptr = (ptrstack[0] == 0) ? s->usedptr : ptrstack[0];\r
+       while ((t[ptr].next != 0) && (t[t[ptr].next].val < v))\r
+               ptr = t[ptr].next;\r
+\r
+/*\r
+       // duplicate value\r
+       if ((t[ptr].next != 0) && (t[t[ptr].next].val == v)) {\r
+               t[t[ptr].next].gap++;\r
+printf("AGGR_UPDATE END 3\n");\r
+               return;\r
+       }\r
+*/\r
+\r
+       // right boundary case\r
+       if (t[ptr].next == 0) {\r
+               newptr = quant_udaf_lfta3_cursor_alloc(s);\r
+               if (newptr == 0) {\r
+                       gslog(LOG_ALERT, "Out of space.\n");\r
+                       return;\r
+               }\r
+               t[newptr].val = v;\r
+               t[newptr].gap = 1;\r
+               t[newptr].del = 0;\r
+               t[newptr].next = 0;\r
+               t[ptr].next = newptr;\r
+//printf("AGGR_UPDATE END 4\n");\r
+               return;\r
+       }\r
+\r
+       // non-boundary case\r
+//printf("1) t[ptr].next =%d, ptr=%d\n",t[ptr].next,ptr);\r
+       obj = t[ptr].gap+t[t[ptr].next].gap+t[t[ptr].next].del;\r
+       threshold = (gs_uint32_t)ceil(2.0 * QUANT_EPS * (gs_float_t)s->nelts);\r
+       if (obj > threshold) {\r
+               newptr = quant_udaf_lfta3_cursor_alloc(s);\r
+               if (newptr == 0) {\r
+                       gslog(LOG_ALERT, "Out of memory.\n");\r
+                       return;\r
+               }\r
+//printf("newptr=%d\n",newptr);\r
+               t[newptr].val = v;\r
+               t[newptr].gap = 1;\r
+               t[newptr].del = t[t[ptr].next].gap+t[t[ptr].next].del - 1;\r
+               t[newptr].next = t[ptr].next;\r
+               t[ptr].next = newptr;\r
+               skipdir_insert(&(s->sd), ptrstack, newptr, v);\r
+       }\r
+       else {\r
+               // insert into existing bucket\r
+//printf("t[ptr].next =%d\n",t[ptr].next);\r
+               t[t[ptr].next].gap++;\r
+       }\r
+       quant_lfta3_compress(s);\r
+//printf("AGGR_UPDATE END 5\n");\r
+}\r
+\r
+gs_int32_t quant_udaf_lfta3_LFTA_AGGR_FLUSHME_(gs_sp_t b)\r
+{\r
+       quant_udaf_lfta3_struct_t *s = (quant_udaf_lfta3_struct_t *)b;\r
+\r
+       if (s->freeptr == 0)\r
+               return 1;\r
+       else\r
+               return 0;\r
+}\r
+\r
+void quant_udaf_lfta3_LFTA_AGGR_OUTPUT_(struct gs_string *r, gs_sp_t b)\r
+{\r
+#ifdef COMPRESSED_XFER\r
+       quant_udaf_lfta3_struct_t *s = (quant_udaf_lfta3_struct_t *)b;\r
+       tuple_t tmp[QUANT_LFTA3_SIZE+1];\r
+       gs_uint32_t ptr=s->usedptr;\r
+       gs_int32_t i=0,j;\r
+\r
+       for (; ptr != 0; ptr=s->t[ptr].next) {\r
+               tmp[i].val = s->t[ptr].val;\r
+               tmp[i].gap = s->t[ptr].gap;\r
+               tmp[i].del = s->t[ptr].del;\r
+               i++;\r
+       }\r
+       for (j=1; j <= i; j++) {\r
+               s->t[j].val = tmp[j-1].val;\r
+               s->t[j].gap = tmp[j-1].gap;\r
+               s->t[j].del = tmp[j-1].del;\r
+               s->t[j].next = j+1;\r
+       }\r
+       s->t[i].next = 0;\r
+       s->usedptr = 1;\r
+\r
+       r->length = (5 + 4*(i+1))*sizeof(gs_uint32_t);\r
+#endif\r
+#ifndef COMPRESSED_XFER\r
+       r->length = sizeof(quant_udaf_lfta3_struct_t);\r
+#endif\r
+       r->data = b;\r
+}\r
+\r
+void quant_udaf_lfta3_LFTA_AGGR_DESTROY_(gs_sp_t b)\r
+{\r
+       return;\r
+}\r