Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscplftaaux / flip_udaf.c
1 #include <stdio.h>\r
2 #include <limits.h>\r
3 #include <math.h>\r
4 #include "rts_udaf.h"\r
5 #include "gsconfig.h"\r
6 #include "gstypes.h"\r
7 \r
8 /*      Full size\r
9 #define QUANT_LFTA1_SIZE 729\r
10 #define QUANT_LFTA2_SIZE 181\r
11 #define QUANT_LFTA3_SIZE 100\r
12 */\r
13 \r
14 /*      half size\r
15 */\r
16 #define QUANT_LFTA1_SIZE 378\r
17 #define QUANT_LFTA2_SIZE 93\r
18 #define QUANT_LFTA3_SIZE 50\r
19 \r
20 /*      quarter size\r
21 #define QUANT_LFTA1_SIZE 202\r
22 #define QUANT_LFTA2_SIZE 49\r
23 #define QUANT_LFTA3_SIZE 25\r
24 */\r
25 \r
26 \r
27 #define QUANT_EPS 0.01\r
28 #define SKIPDIR_SIZE 100\r
29 #define SKIPDIR_HEIGHT_MAX 7\r
30 #define max(a,b) ((a) > (b) ? (a) : (b))\r
31 #define COMPRESSED_XFER\r
32 \r
33 /****************************************************************/\r
34 /* Data Structures                                              */\r
35 /****************************************************************/\r
36 typedef struct tuple_t {\r
37         gs_uint32_t val;\r
38         gs_uint32_t gap;\r
39         gs_uint32_t del;\r
40         gs_uint32_t next;\r
41 } tuple_t;\r
42 \r
43 // For skip list\r
44 typedef gs_uint32_t val_type;\r
45 \r
46 typedef struct skipnode {\r
47         val_type val;\r
48         gs_uint32_t next;\r
49         gs_uint32_t down;\r
50 } skipnode_t;\r
51 \r
52 typedef struct skipdir {\r
53         gs_uint32_t height;                             // height of tree\r
54         gs_uint32_t freeptr;                            // cursor space stack\r
55         gs_uint32_t headptr[SKIPDIR_HEIGHT_MAX+1];      // ptrs to levels\r
56         skipnode_t list[SKIPDIR_SIZE+1];\r
57 } skipdir_t;\r
58 \r
59 \r
60 /****************************************************************/\r
61 \r
62 // fstring(5+(QUANT_LFTA3_SIZE+1)*4 +\r
63 //         (2+lg(QUANT_LFTA3_SIZE)+(QUANT_LFTA3_SIZE+1)*3)*4)\r
64 typedef struct quant_udaf_lfta3_struct_t {\r
65         gs_uint32_t nelts;      // # stream elements\r
66         gs_uint32_t freeptr;    // ptr to cursor stack\r
67         gs_uint32_t usedptr;    // ptr to allocated memory\r
68         gs_uint32_t circptr;    // circulating ptr used for compression\r
69         gs_uint32_t size;\r
70         tuple_t t[QUANT_LFTA3_SIZE+1];  // samples + auxiliary info\r
71         skipdir_t sd;           // directory for searching tuples\r
72 } quant_udaf_lfta3_struct_t;\r
73 \r
74 /****************************************************************/\r
75 /* Skip List Functions                                          */\r
76 /****************************************************************/\r
77 \r
78 // Skip list cursor stack operations\r
79 gs_uint32_t skipdir_alloc(skipdir_t *sd)\r
80 {\r
81         gs_uint32_t ptr = sd->freeptr;\r
82         if (sd->freeptr != 0)\r
83                 sd->freeptr = sd->list[ptr].next;\r
84         return ptr;\r
85 }\r
86 \r
87 void skipdir_free(skipdir_t *sd, gs_uint32_t ptr)\r
88 {\r
89         sd->list[ptr].val = 0;\r
90         sd->list[ptr].down = 0;\r
91         sd->list[ptr].next = sd->freeptr;\r
92         sd->freeptr = ptr;\r
93 }\r
94 \r
95 \r
96 void skipdir_create(skipdir_t *sd)\r
97 {\r
98         gs_int32_t i;\r
99 \r
100         sd->height = 0;\r
101         sd->freeptr = 1;\r
102         for (i=0; i < SKIPDIR_HEIGHT_MAX; i++)\r
103                 sd->headptr[i] = 0;\r
104         for (i=1; i < SKIPDIR_SIZE; i++)\r
105                 sd->list[i].next = i+1;\r
106         sd->list[SKIPDIR_SIZE].next = 0;\r
107 }\r
108 \r
109 void skipdir_destroy(skipdir_t *sd)\r
110 {\r
111         sd->height = 0;\r
112 }\r
113 \r
114 \r
115 void skipdir_search(skipdir_t *sd, val_type val, gs_uint32_t *ptrstack)\r
116 {\r
117         gs_uint32_t ptr;\r
118         gs_int32_t l;\r
119 \r
120         if (sd->height == 0) {\r
121                 ptrstack[0] = ptrstack[1] = 0;\r
122                 return;\r
123         }\r
124         // search nonleaf nodes\r
125         ptr = sd->headptr[sd->height-1];\r
126         for (l=sd->height-1; l >= 0; l--) {\r
127                 if (ptr == 0) {\r
128                         ptrstack[l+1] = 0;\r
129                         ptr = (l > 0) ? sd->headptr[l-1] : 0;\r
130                 }\r
131                 else if (val <= sd->list[ptr].val) {\r
132                         ptrstack[l+1] = 0;\r
133                         ptr = (l > 0) ? sd->headptr[l-1] : 0;\r
134                 }\r
135                 else {\r
136                         while ((sd->list[ptr].next != 0) &&\r
137                         (sd->list[sd->list[ptr].next].val < val))\r
138                                 ptr = sd->list[ptr].next;\r
139                         ptrstack[l+1] = ptr;\r
140                         ptr = sd->list[ptr].down;\r
141                 }\r
142         }\r
143         ptrstack[0] = ptr;\r
144 }\r
145 \r
146 \r
147 void skipdir_insert(skipdir_t *sd, gs_uint32_t *ptrstack,\r
148                         gs_uint32_t leafptr, val_type val)\r
149 {\r
150         gs_uint32_t newptr, oldptr;\r
151         gs_int32_t l;\r
152 \r
153         // if path already existed then point to new duplicate\r
154         if ((ptrstack[1] == 0) && (sd->headptr[0] != 0)\r
155         && (sd->list[sd->headptr[0]].val == val)) {\r
156                 sd->list[sd->headptr[0]].down = leafptr;\r
157                 return;\r
158         }\r
159         if ((ptrstack[1] != 0) && (sd->list[ptrstack[1]].next != 0)\r
160         && (sd->list[sd->list[ptrstack[1]].next].val == val)) {\r
161                 sd->list[sd->list[ptrstack[1]].next].down = leafptr;\r
162                 return;\r
163         }\r
164 \r
165         for (l=0; l < SKIPDIR_HEIGHT_MAX; l++) {\r
166                 if (random() % 2) break;\r
167                 newptr = skipdir_alloc(sd);\r
168                 if (!newptr) break;     // out of memory\r
169                 sd->list[newptr].val = val;\r
170                 //copy(&val, &list[newptr[l]].val);\r
171                 // link new directory node to level below\r
172                 if (l > 0)\r
173                         sd->list[newptr].down = oldptr;\r
174                 else\r
175                         sd->list[newptr].down = leafptr;\r
176                 // insert node into current level\r
177                 if ((l >= sd->height) || (ptrstack[l+1] == 0)) {\r
178                         sd->list[newptr].next = sd->headptr[l];\r
179                         sd->headptr[l] = newptr;\r
180                 }\r
181                 else {\r
182                         sd->list[newptr].next = sd->list[ptrstack[l+1]].next;\r
183                         sd->list[ptrstack[l+1]].next = newptr;\r
184                 }\r
185                 oldptr = newptr;\r
186         }\r
187         if (l > sd->height) sd->height = l;\r
188         //fprintf(stderr,"new height = %u\n",sd->height);\r
189 }\r
190 \r
191 \r
192 void skipdir_delete(skipdir_t *sd, gs_uint32_t *ptrstack, val_type val)\r
193 {\r
194         gs_uint32_t delptr;\r
195         gs_int32_t l;\r
196 \r
197         for (l=0; l < sd->height; l++) {\r
198                 if (ptrstack[l+1] == 0) {\r
199                         delptr = sd->headptr[l];\r
200                         if (delptr == 0) break;\r
201                         if (sd->list[delptr].val == val) {\r
202                                 sd->headptr[l] = sd->list[delptr].next;\r
203                                 skipdir_free(sd, delptr);\r
204                         }\r
205                         else\r
206                                 break;\r
207                 }\r
208                 else {\r
209                         delptr = sd->list[ptrstack[l+1]].next;\r
210                         if (delptr == 0) break;\r
211                         if (sd->list[delptr].val == val) {\r
212                                 sd->list[ptrstack[l+1]].next = sd->list[delptr].next;\r
213                                 skipdir_free(sd, delptr);\r
214                         }\r
215                         else\r
216                                 break;\r
217                 }\r
218         }\r
219 }\r
220 \r
221 // For Debugging\r
222 void skipdir_print(skipdir_t *sd)\r
223 {\r
224         gs_uint32_t ptr;\r
225         gs_int32_t l;\r
226 \r
227         for (l=sd->height-1; l >= 0; l--) {\r
228                 for (ptr=sd->headptr[l]; ptr != 0; ptr=sd->list[ptr].next)\r
229                         fprintf(stderr,"%u ", sd->list[ptr].val);\r
230                 fprintf(stderr,"\n");\r
231         }\r
232         fprintf(stderr,"-------\n");\r
233         for (l=sd->height-1; l > 0; l--) {\r
234                 for (ptr=sd->headptr[l]; ptr != 0; ptr=sd->list[ptr].next)\r
235                         fprintf(stderr,"%u ", sd->list[sd->list[ptr].down].val);\r
236                 fprintf(stderr,"\n");\r
237         }\r
238         fprintf(stderr,"-------\n");\r
239 }\r
240 \r
241 \r
242 \r
243 \r
244 /*************************** Version 3 **************************/\r
245 /* Version 3: LFTA-medium                                       */\r
246 /*                                                              */\r
247 /* NIC performs O(log n) operations at each update.             */\r
248 /****************************************************************/\r
249 \r
250 /****************************************************************/\r
251 /* Helper functions                                             */\r
252 /****************************************************************/\r
253 gs_uint32_t quant_udaf_lfta3_cursor_alloc(quant_udaf_lfta3_struct_t *s)\r
254 {\r
255         gs_uint32_t ptr = s->freeptr;\r
256         if (s->freeptr != 0) s->freeptr = s->t[ptr].next;\r
257         s->size++;\r
258         return ptr;\r
259 }\r
260 \r
261 void quant_udaf_lfta3_cursor_free(quant_udaf_lfta3_struct_t *s, gs_uint32_t ptr)\r
262 {\r
263         s->t[ptr].next = s->freeptr;\r
264         s->freeptr = ptr;\r
265         s->size--;\r
266 }\r
267 \r
268 void quant_lfta3_print(quant_udaf_lfta3_struct_t *s)\r
269 {\r
270         tuple_t *t=s->t;\r
271         gs_uint32_t ptr = s->usedptr;\r
272 \r
273         if (ptr == 0) {\r
274                 fprintf(stderr,"<empty>\n");\r
275                 return;\r
276         }\r
277         //skipdir_print(&s->sd);\r
278         for (; ptr != 0; ptr=t[ptr].next) {\r
279                 fprintf(stderr,"(%u, %u, %u) ",t[ptr].val,t[ptr].gap,t[ptr].del);\r
280         }\r
281         fprintf(stderr,"\n");\r
282 }\r
283 \r
284 void quant_lfta3_compress(quant_udaf_lfta3_struct_t *s)\r
285 {\r
286         tuple_t *t = s->t;\r
287         gs_uint32_t delptr;\r
288         gs_uint32_t threshold;\r
289         gs_uint32_t ptrstack[SKIPDIR_HEIGHT_MAX+5];\r
290 \r
291         threshold = (gs_uint32_t)ceil(2.0 * QUANT_EPS * (gs_float_t)s->nelts);\r
292 //if(s->circptr < 0 || s->circptr >= QUANT_LFTA3_SIZE)\r
293 // printf("1) s->circptr = %d\n",s->circptr);\r
294 //if(t[s->circptr].next < 0 || t[s->circptr].next >= QUANT_LFTA3_SIZE)\r
295 // printf("t[s->circptr].next = %d\n",t[s->circptr].next);\r
296         if ((s->circptr == 0) || (t[s->circptr].next == 0)\r
297         || (t[t[s->circptr].next].next == 0))\r
298                 s->circptr = s->usedptr;\r
299         //if ((s->size % 10) != 0) return;\r
300         if (s->nelts > 2) {\r
301 //if(s->circptr < 0 || s->circptr >= QUANT_LFTA3_SIZE)\r
302 // printf("2) s->circptr = %d\n",s->circptr);\r
303                 delptr = t[s->circptr].next;\r
304 //if(delptr < 0 || delptr >= QUANT_LFTA3_SIZE)\r
305 // printf("delptr = %d\n",delptr);\r
306 //if(t[delptr].next < 0 || t[delptr].next >= QUANT_LFTA3_SIZE)\r
307 // printf("t[delptr].next = %d\n",t[delptr].next);\r
308                 if (t[delptr].gap+t[t[delptr].next].gap+t[t[delptr].next].del < threshold) {\r
309                         // delete from directory\r
310                         if (t[s->circptr].val != t[delptr].val) {\r
311                                 // leftmost duplicate (if multiplicity)\r
312                                 skipdir_search(&(s->sd), t[delptr].val, ptrstack);\r
313                                 if (t[delptr].val == t[t[delptr].next].val) {\r
314 //if(s->sd.headptr[0] < 0 || s->sd.headptr[0] >= QUANT_LFTA3_SIZE)\r
315 // printf("s->sd.headptr[0] = %d\n",s->sd.headptr[0]);\r
316                                         // duplicates case\r
317                                         if ((ptrstack[1] == 0)\r
318                                         && (s->sd.headptr[0] != 0)\r
319                                         && (s->sd.list[s->sd.headptr[0]].val == t[delptr].val))\r
320                                                 s->sd.list[s->sd.headptr[0]].down = t[delptr].next;\r
321                                         else if ((ptrstack[1] != 0)\r
322                                         && (s->sd.list[ptrstack[1]].next != 0)\r
323                                         && (s->sd.list[s->sd.list[ptrstack[1]].next].val == t[delptr].val))\r
324                                                 s->sd.list[s->sd.list[ptrstack[1]].next].down = t[delptr].next;\r
325                                 }\r
326                                 else {\r
327                                         // non-duplicates case\r
328                                         skipdir_delete(&(s->sd), ptrstack, t[delptr].val);\r
329                                 }\r
330                         }\r
331                         // delete from list\r
332                         //fprintf(stderr,"DELETED %u\n", t[delptr].val);\r
333                         t[s->circptr].next = t[delptr].next;\r
334                         quant_udaf_lfta3_cursor_free(s, delptr);\r
335                 }\r
336                 else {\r
337                         s->circptr = t[s->circptr].next;\r
338                 }\r
339         }\r
340 }\r
341 \r
342 \r
343 /****************************************************************/\r
344 /* LFTA3 functions                                              */\r
345 /****************************************************************/\r
346 void quant_udaf_lfta3_LFTA_AGGR_INIT_(gs_sp_t b)\r
347 {\r
348         gs_uint32_t i;\r
349 \r
350         quant_udaf_lfta3_struct_t *s = (quant_udaf_lfta3_struct_t *)b;\r
351         s->nelts = 0;\r
352         s->usedptr = 0;         // NULL ptr\r
353         s->circptr = 0;\r
354         // initialize cursor stack\r
355         s->freeptr = 1;\r
356         s->size = 0;\r
357         for (i=1; i < QUANT_LFTA3_SIZE; i++)\r
358                 s->t[i].next = i+1;\r
359         s->t[QUANT_LFTA3_SIZE].next = 0;\r
360         skipdir_create(&(s->sd));\r
361 }\r
362 \r
363 void quant_udaf_lfta3_LFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v)\r
364 {\r
365         quant_udaf_lfta3_struct_t *s = (quant_udaf_lfta3_struct_t *)b;\r
366         tuple_t *t = s->t;\r
367         gs_uint32_t ptr = s->usedptr;\r
368         gs_uint32_t newptr, delptr;\r
369         gs_uint32_t obj;        // objective function\r
370         gs_uint32_t threshold;\r
371         gs_uint32_t ptrstack[SKIPDIR_HEIGHT_MAX+5];\r
372         gs_uint32_t debugptr;\r
373 \r
374 //printf("AGGR_UPDATE start\n");\r
375         s->nelts++;\r
376         //fprintf(stderr,"nelts = %u\n",s->nelts);\r
377         // left boundary case\r
378         if ((ptr == 0) || (v < t[ptr].val)) {\r
379                 if (t[ptr].val == v) {\r
380                         t[ptr].gap++;\r
381 //printf("AGGR_UPDATE END 1\n");\r
382                         return;\r
383                 }\r
384                 newptr = quant_udaf_lfta3_cursor_alloc(s);\r
385                 if (newptr == 0) {\r
386                         gslog(LOG_ALERT, "Out of space.\n");\r
387                         return;\r
388                 }\r
389                 t[newptr].val = v;\r
390                 t[newptr].gap = 1;\r
391                 t[newptr].del = 0;\r
392                 t[newptr].next = s->usedptr;\r
393                 s->usedptr = newptr;\r
394 //printf("AGGR_UPDATE END 2\n");\r
395                 return;\r
396         }\r
397 \r
398         // locate $i$ such that (v_i-1 < v <= v_i)\r
399         skipdir_search(&(s->sd), v, ptrstack);\r
400 \r
401         //ptr = (ptrstack[0] == 0) ? s->usedptr : s->sd.list[ptrstack[0]].down;\r
402         ptr = (ptrstack[0] == 0) ? s->usedptr : ptrstack[0];\r
403         while ((t[ptr].next != 0) && (t[t[ptr].next].val < v))\r
404                 ptr = t[ptr].next;\r
405 \r
406 /*\r
407         // duplicate value\r
408         if ((t[ptr].next != 0) && (t[t[ptr].next].val == v)) {\r
409                 t[t[ptr].next].gap++;\r
410 printf("AGGR_UPDATE END 3\n");\r
411                 return;\r
412         }\r
413 */\r
414 \r
415         // right boundary case\r
416         if (t[ptr].next == 0) {\r
417                 newptr = quant_udaf_lfta3_cursor_alloc(s);\r
418                 if (newptr == 0) {\r
419                         gslog(LOG_ALERT, "Out of space.\n");\r
420                         return;\r
421                 }\r
422                 t[newptr].val = v;\r
423                 t[newptr].gap = 1;\r
424                 t[newptr].del = 0;\r
425                 t[newptr].next = 0;\r
426                 t[ptr].next = newptr;\r
427 //printf("AGGR_UPDATE END 4\n");\r
428                 return;\r
429         }\r
430 \r
431         // non-boundary case\r
432 //printf("1) t[ptr].next =%d, ptr=%d\n",t[ptr].next,ptr);\r
433         obj = t[ptr].gap+t[t[ptr].next].gap+t[t[ptr].next].del;\r
434         threshold = (gs_uint32_t)ceil(2.0 * QUANT_EPS * (gs_float_t)s->nelts);\r
435         if (obj > threshold) {\r
436                 newptr = quant_udaf_lfta3_cursor_alloc(s);\r
437                 if (newptr == 0) {\r
438                         gslog(LOG_ALERT, "Out of memory.\n");\r
439                         return;\r
440                 }\r
441 //printf("newptr=%d\n",newptr);\r
442                 t[newptr].val = v;\r
443                 t[newptr].gap = 1;\r
444                 t[newptr].del = t[t[ptr].next].gap+t[t[ptr].next].del - 1;\r
445                 t[newptr].next = t[ptr].next;\r
446                 t[ptr].next = newptr;\r
447                 skipdir_insert(&(s->sd), ptrstack, newptr, v);\r
448         }\r
449         else {\r
450                 // insert into existing bucket\r
451 //printf("t[ptr].next =%d\n",t[ptr].next);\r
452                 t[t[ptr].next].gap++;\r
453         }\r
454         quant_lfta3_compress(s);\r
455 //printf("AGGR_UPDATE END 5\n");\r
456 }\r
457 \r
458 gs_int32_t quant_udaf_lfta3_LFTA_AGGR_FLUSHME_(gs_sp_t b)\r
459 {\r
460         quant_udaf_lfta3_struct_t *s = (quant_udaf_lfta3_struct_t *)b;\r
461 \r
462         if (s->freeptr == 0)\r
463                 return 1;\r
464         else\r
465                 return 0;\r
466 }\r
467 \r
468 void quant_udaf_lfta3_LFTA_AGGR_OUTPUT_(struct gs_string *r, gs_sp_t b)\r
469 {\r
470 #ifdef COMPRESSED_XFER\r
471         quant_udaf_lfta3_struct_t *s = (quant_udaf_lfta3_struct_t *)b;\r
472         tuple_t tmp[QUANT_LFTA3_SIZE+1];\r
473         gs_uint32_t ptr=s->usedptr;\r
474         gs_int32_t i=0,j;\r
475 \r
476         for (; ptr != 0; ptr=s->t[ptr].next) {\r
477                 tmp[i].val = s->t[ptr].val;\r
478                 tmp[i].gap = s->t[ptr].gap;\r
479                 tmp[i].del = s->t[ptr].del;\r
480                 i++;\r
481         }\r
482         for (j=1; j <= i; j++) {\r
483                 s->t[j].val = tmp[j-1].val;\r
484                 s->t[j].gap = tmp[j-1].gap;\r
485                 s->t[j].del = tmp[j-1].del;\r
486                 s->t[j].next = j+1;\r
487         }\r
488         s->t[i].next = 0;\r
489         s->usedptr = 1;\r
490 \r
491         r->length = (5 + 4*(i+1))*sizeof(gs_uint32_t);\r
492 #endif\r
493 #ifndef COMPRESSED_XFER\r
494         r->length = sizeof(quant_udaf_lfta3_struct_t);\r
495 #endif\r
496         r->data = b;\r
497 }\r
498 \r
499 void quant_udaf_lfta3_LFTA_AGGR_DESTROY_(gs_sp_t b)\r
500 {\r
501         return;\r
502 }\r