Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphftaaux / hfta_udaf.cc
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 \r
17 #include "gsconfig.h"\r
18 #include "gstypes.h"\r
19 #include "hfta_udaf.h"\r
20 #include "rts_udaf.h"\r
21 #include <stdio.h>\r
22 #include <limits.h>\r
23 #include <math.h>\r
24 //#include <memory.h>\r
25 #include <string.h>\r
26 #include <sys/time.h>\r
27 #include <iostream>\r
28 \r
29 #include "hfta_runtime_library.h"\r
30 \r
31 \r
32 #define max(a,b) ((a) > (b) ? (a) : (b))\r
33 #define min(x,y) ((x) < (y) ? (x) : (y))\r
34 #define lg(x)    (log(x) / log(2))\r
35 \r
36 using namespace std;\r
37 \r
38 \r
39 // -------------------------------------------------------------------\r
40 //              moving sum over N intervals\r
41 \r
42 struct moving_sum_udaf_str{\r
43         gs_uint32_t N;\r
44         gs_uint32_t pos;\r
45         gs_uint32_t *sums;\r
46 };\r
47 \r
48 void moving_sum_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){\r
49   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;\r
50   u->N=0; u->pos=0; u->sums=NULL;\r
51 }\r
52 \r
53 void moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint32_t s, gs_uint32_t N) {\r
54   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;\r
55   if(u->sums == NULL){\r
56         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));\r
57         for(gs_int32_t i=0;i<N;i++)\r
58                 u->sums[i] = 0;\r
59     u->N = N;\r
60   }\r
61   u->sums[u->pos] += s;\r
62 }\r
63 \r
64 void super_moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint64_t sub_sum) {\r
65   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;\r
66   gs_uint32_t s = (gs_uint32_t)(sub_sum & 0xffffffff);\r
67   if(u->sums == NULL){\r
68     gs_uint32_t N = (gs_uint32_t)((sub_sum & 0xffffffff00000000ull) >> 32);\r
69         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));\r
70         for(gs_int32_t i=0;i<N;i++)\r
71                 u->sums[i] = 0;\r
72     u->N = N;\r
73   }\r
74   u->sums[u->pos] += s;\r
75 }\r
76 \r
77 void moving_sum_udaf_HFTA_AGGR_OUTPUT_(gs_p_t *result, gs_sp_t  buf){\r
78         *result = (gs_p_t)(buf);\r
79 }\r
80 \r
81 void moving_sum_udaf_HFTA_AGGR_DESTROY_(gs_sp_t  buf){\r
82   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;\r
83   if(u->sums != NULL)\r
84         free(u->sums);\r
85 }\r
86 \r
87 void moving_sum_udaf_HFTA_AGGR_REINIT_( gs_sp_t  buf){\r
88   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;\r
89   u->pos++;\r
90   if(u->pos >= u->N)\r
91         u->pos = 0;\r
92   u->sums[u->pos] = 0;\r
93 }\r
94 \r
95 gs_uint32_t moving_sum_extract(gs_p_t result){\r
96   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;\r
97   gs_uint32_t s=0, i=0;\r
98   for(i=0; i<u->N;i++){\r
99     s += u->sums[i];\r
100   }\r
101   return s;\r
102 }\r
103 \r
104 gs_float_t moving_sum_extract_exp(gs_p_t result, gs_float_t alpha){\r
105   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;\r
106   gs_uint32_t p=0, i=0;\r
107   gs_float_t s=0.0, m=1.0;\r
108   p=u->pos;\r
109   for(i=0; i<u->N;i++){\r
110     s += u->sums[i]*m;\r
111     if(p==0)\r
112                 p=u->N - 1;\r
113     else\r
114                 p--;\r
115         m *= alpha;\r
116   }\r
117   return s;\r
118 }\r
119 \r
120 \r
121 // -------------------------------------------------------------------\r
122 //              sum over 3 intervals : test rUDAF\r
123 \r
124 struct sum3_udaf_str{\r
125   gs_uint32_t s_2;\r
126   gs_uint32_t s_1;\r
127   gs_uint32_t s_0;\r
128 };\r
129 \r
130 void sum3_HFTA_AGGR_INIT_(gs_sp_t  buf) {\r
131   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;\r
132   u->s_0 = 0; u->s_1 = 0; u->s_2 = 0;\r
133   return;\r
134 }          \r
135 \r
136 void sum3_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint32_t s) {\r
137   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;\r
138   u->s_0 += s;\r
139   return;\r
140 }\r
141 \r
142 void sum3_HFTA_AGGR_OUTPUT_(gs_uint32_t *result, gs_sp_t  buf) {\r
143   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;\r
144   *result = u->s_0 + u->s_1 + u->s_2;\r
145   return; \r
146 }\r
147 \r
148 void sum3_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {\r
149   return;\r
150 }\r
151 \r
152 void sum3_HFTA_AGGR_REINIT_( gs_sp_t  buf) {\r
153   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;\r
154   u->s_2 = u->s_1;\r
155   u->s_1 = u->s_0;\r
156   u->s_0 = 0;\r
157   return;\r
158 }\r
159 \r
160 \r
161 #define HISTORY_LENGTH 1024\r
162 \r
163 /////////////////////////////////////////////////////////////////////////\r
164 /////   Calculate the average of all positive gs_float_t numbers\r
165 \r
166 struct posavg_struct{\r
167   gs_float_t sum;\r
168   gs_float_t cnt;\r
169 };\r
170 \r
171 void POSAVG_HFTA_AGGR_INIT_(gs_sp_t  buf) {\r
172   struct posavg_struct * a = (struct posavg_struct *) buf;\r
173   a->sum=0;\r
174   a->cnt=0;\r
175   return;\r
176 }\r
177 \r
178 void POSAVG_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_float_t v) {\r
179   struct posavg_struct * a = (struct posavg_struct *) buf;\r
180   if (v>=0) {\r
181     a->sum=a->sum+v;\r
182     a->cnt=a->cnt+1;\r
183   }\r
184   return;\r
185 }\r
186 \r
187 void POSAVG_HFTA_AGGR_OUTPUT_(gs_float_t * v, gs_sp_t  buf) {\r
188   struct posavg_struct * a = (struct posavg_struct *) buf;\r
189   if (a->cnt>0) {\r
190     *v=(a->sum/a->cnt);\r
191   } else {\r
192     *v=-1;\r
193   }\r
194   return;\r
195 }\r
196 \r
197 void POSAVG_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {\r
198   return;\r
199 }\r
200 \r
201 /////////////////////////////////////////////////////////////////////////\r
202 /////                   avg_udaf (simple example)\r
203 \r
204 //              struct received from subaggregate\r
205 struct avg_udaf_lfta_struct_t{\r
206         gs_int64_t  sum;\r
207         gs_uint32_t cnt;\r
208 };\r
209 \r
210 //              sctarchpad struct\r
211 struct avg_udaf_hfta_struct_t{\r
212         gs_int64_t sum;\r
213         gs_uint32_t cnt;\r
214 };\r
215 \r
216 //                      avg_udaf functions\r
217 void avg_udaf_HFTA_AGGR_INIT_(gs_sp_t b){\r
218         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;\r
219         s->sum = 0;\r
220         s->cnt = 0;\r
221 }\r
222 \r
223 void avg_udaf_HFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v){\r
224         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;\r
225         s->sum += v;\r
226         s->cnt ++;\r
227 }\r
228 \r
229 void avg_udaf_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){\r
230         r->length = 12;\r
231         r->offset = (gs_p_t)(b);\r
232         r->reserved = SHALLOW_COPY;\r
233 }\r
234 \r
235 void avg_udaf_HFTA_AGGR_DESTROY_(gs_sp_t b){\r
236         return;\r
237 }\r
238 \r
239 \r
240 //                      avg_udaf superaggregate functions\r
241 void avg_udaf_hfta_HFTA_AGGR_INIT_(gs_sp_t b){\r
242         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;\r
243         s->sum = 0;\r
244         s->cnt = 0;\r
245 }\r
246 \r
247 void avg_udaf_hfta_HFTA_AGGR_UPDATE_(gs_sp_t b, vstring *v){\r
248         if(v->length != 12) return;\r
249         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;\r
250         avg_udaf_lfta_struct_t *vs = (avg_udaf_lfta_struct_t *)(v->offset);\r
251         s->sum += vs->sum;\r
252         s->cnt += vs->cnt;\r
253 }\r
254 \r
255 void avg_udaf_hfta_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){\r
256         r->length = 12;\r
257         r->offset = (gs_p_t)(b);\r
258         r->reserved = SHALLOW_COPY;\r
259 }\r
260 \r
261 void avg_udaf_hfta_HFTA_AGGR_DESTROY_(gs_sp_t b){\r
262         return;\r
263 }\r
264 \r
265 //              Extraction function\r
266 gs_float_t extr_avg_fcn(vstring *v){\r
267         if(v->length != 12) return 0;\r
268         avg_udaf_hfta_struct_t *vs = (avg_udaf_hfta_struct_t *)(v->offset);\r
269         gs_float_t r = (gs_float_t)(vs->sum) / vs->cnt;\r
270     return r;\r
271 }\r
272 \r
273 /////////////////////////////////////////////////////////\r
274 //              FIRST aggregate\r
275 // hfta only\r
276 \r
277 void FIRST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {\r
278         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX\r
279 }\r
280 \r
281 void FIRST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }\r
282 \r
283 void FIRST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {\r
284         if (*scratch == UINT_MAX)\r
285                 *scratch = val;\r
286 }\r
287 \r
288 void FIRST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {\r
289         *res = *scratch;\r
290 }\r
291 \r
292 void FIRST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }\r
293 \r
294 void FIRST_ULL_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {\r
295         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX\r
296 }\r
297 \r
298 void FIRST_ULL_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }\r
299 \r
300 void FIRST_ULL_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {\r
301         if (*scratch == UINT_MAX)\r
302                 *scratch = val;\r
303 }\r
304 \r
305 void FIRST_ULL_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {\r
306         *res = *scratch;\r
307 }\r
308 \r
309 void FIRST_ULL_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }\r
310 \r
311 \r
312 void FIRST_STR_HFTA_AGGR_INIT_(vstring* scratch) {\r
313         scratch->offset= 0;\r
314 }\r
315 \r
316 void FIRST_STR_HFTA_AGGR_REINIT_(vstring* scratch) { }\r
317 \r
318 void FIRST_STR_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {\r
319         if (!scratch->offset) {\r
320     scratch->length = val->length;\r
321     scratch->offset = val->offset;\r
322     scratch->reserved = SHALLOW_COPY;\r
323         }\r
324 }\r
325 \r
326 void FIRST_STR_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {\r
327         *res = *scratch;\r
328 }\r
329 \r
330 void FIRST_STR_HFTA_AGGR_DESTROY_(vstring* scratch) { }\r
331 \r
332 // hfta/lfta split\r
333 \r
334 void FIRST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {\r
335         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX\r
336 }\r
337 \r
338 void FIRST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }\r
339 \r
340 void FIRST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {\r
341         if (*scratch == UINT_MAX)\r
342                 *scratch = val;\r
343 }\r
344 \r
345 void FIRST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {\r
346         *res = *scratch;\r
347 }\r
348 \r
349 void FIRST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }\r
350 \r
351 void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {\r
352         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX\r
353 }\r
354 \r
355 void FIRST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }\r
356 \r
357 void FIRST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {\r
358         if (*scratch == UINT_MAX)\r
359                 *scratch = val;\r
360 }\r
361 \r
362 void FIRST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {\r
363         *res = *scratch;\r
364 }\r
365 \r
366 void FIRST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }\r
367 \r
368 \r
369 void FIRST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {\r
370         scratch->offset= 0;\r
371 }\r
372 \r
373 void FIRST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }\r
374 \r
375 void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {\r
376         if (!scratch->offset) {\r
377     scratch->length = val->length;\r
378     scratch->offset = val->offset;\r
379     scratch->reserved = SHALLOW_COPY;\r
380         }\r
381 }\r
382 \r
383 void FIRST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {\r
384         *res = *scratch;\r
385 }\r
386 \r
387 void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }\r
388 \r
389 \r
390 /////////////////////////////////////////////////////////\r
391 //              LAST aggregate\r
392 \r
393 // hfta only\r
394 \r
395 void LAST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }\r
396 \r
397 void LAST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }\r
398 \r
399 void LAST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {\r
400         *scratch = val;\r
401 }\r
402 \r
403 void LAST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {\r
404         *res = *scratch;\r
405 }\r
406 \r
407 void LAST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }\r
408 \r
409 void LAST_ULLHFTA_AGGR_INIT_(gs_uint64_t* scratch) { }\r
410 \r
411 void LAST_ULL_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }\r
412 \r
413 void LAST_ULL_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {\r
414         *scratch = val;\r
415 }\r
416 \r
417 void LAST_ULL_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {\r
418         *res = *scratch;\r
419 }\r
420 \r
421 void LAST_ULL_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }\r
422 \r
423 \r
424 void LAST_STR_HFTA_AGGR_INIT_(vstring* scratch) {\r
425         scratch->offset= 0;\r
426 }\r
427 \r
428 void LAST_STR_HFTA_AGGR_REINIT_(vstring* scratch) { }\r
429 \r
430 void LAST_STR_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {\r
431         scratch->length = val->length;\r
432   scratch->offset = val->offset;\r
433   scratch->reserved = SHALLOW_COPY;\r
434 }\r
435 \r
436 void LAST_STR_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {\r
437         *res = *scratch;\r
438 }\r
439 \r
440 void LAST_STR_HFTA_AGGR_DESTROY_(vstring* scratch) { }\r
441 \r
442 // hfta/lfta split\r
443 \r
444 void LAST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }\r
445 \r
446 void LAST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }\r
447 \r
448 void LAST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {\r
449         *scratch = val;\r
450 }\r
451 \r
452 void LAST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {\r
453         *res = *scratch;\r
454 }\r
455 \r
456 void LAST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }\r
457 \r
458 void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }\r
459 \r
460 void LAST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }\r
461 \r
462 void LAST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {\r
463         *scratch = val;\r
464 }\r
465 \r
466 void LAST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {\r
467         *res = *scratch;\r
468 }\r
469 \r
470 void LAST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }\r
471 \r
472 \r
473 void LAST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {\r
474         scratch->offset= 0;\r
475 }\r
476 \r
477 void LAST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }\r
478 \r
479 void LAST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {\r
480         scratch->length = val->length;\r
481   scratch->offset = val->offset;\r
482   scratch->reserved = SHALLOW_COPY;\r
483 }\r
484 \r
485 void LAST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {\r
486         *res = *scratch;\r
487 }\r
488 \r
489 void LAST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }\r
490 \r
491 \r
492 /////////////////////////////////////////////////////////\r
493 //              running_array_aggr aggregate\r
494 \r
495 void running_array_aggr_hfta_HFTA_AGGR_INIT_(vstring* scratch) {\r
496   scratch->offset = NULL;  \r
497   scratch->length = 0;\r
498 }\r
499 \r
500 void running_array_aggr_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }\r
501 \r
502 void running_array_aggr_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {\r
503   char buffer[100];\r
504 \r
505   gs_uint32_t* ints = (gs_uint32_t*)val->offset;\r
506   switch (val->length / sizeof (gs_uint32_t)) {\r
507     case 4:\r
508       sprintf(buffer, "%u,%u,%u,%u", ints[0], ints[1], ints[2], ints[3]);\r
509       break;\r
510     case 3:\r
511       sprintf(buffer, "%u,%u,%u", ints[0], ints[1], ints[2]);\r
512       break;   \r
513     case 2:\r
514       sprintf(buffer, "%u,%u", ints[0], ints[1]);\r
515       break;        \r
516     case 1:\r
517       sprintf(buffer, "%u", ints[0]);\r
518       break;  \r
519     case 0:\r
520       return;        \r
521   }\r
522   int buf_len = strlen(buffer);\r
523 \r
524   // append the content of buffer to scratch\r
525         if (!scratch->offset) {\r
526     Vstring_Constructor(scratch, buffer);\r
527         } else {\r
528     scratch->offset = (gs_p_t)realloc((void*)scratch->offset, scratch->length + buf_len + 1);\r
529     *((char*)scratch->offset + scratch->length) = ',';\r
530     memcpy((void*)(scratch->offset + scratch->length + 1), (void*)buffer, buf_len);\r
531     scratch->length += buf_len + 1;\r
532     scratch->reserved = INTERNAL;\r
533   }\r
534 }\r
535 \r
536 void running_array_aggr_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {\r
537         *res = *scratch;\r
538   res->reserved = SHALLOW_COPY;\r
539 }\r
540 \r
541 void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) {\r
542   hfta_vstr_destroy(scratch);\r
543  }\r
544 \r