Change VES version to 7. Added additional UDAFs
[com/gs-lite.git] / src / lib / gscphftaaux / hfta_udaf.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16
17 #include "gsconfig.h"
18 #include "gstypes.h"
19 #include "hfta_udaf.h"
20 #include "rts_udaf.h"
21 #include <stdio.h>
22 #include <limits.h>
23 #include <math.h>
24 //#include <memory.h>
25 #include <string.h>
26 #include <sys/time.h>
27 #include <iostream>
28
29 #include "hfta_runtime_library.h"
30
31
32 #define max(a,b) ((a) > (b) ? (a) : (b))
33 #define min(x,y) ((x) < (y) ? (x) : (y))
34 #define lg(x)    (log(x) / log(2))
35
36 using namespace std;
37
38
39 // -------------------------------------------------------------------
40 //              moving sum over N intervals
41
42 struct moving_sum_udaf_str{
43         gs_uint32_t N;
44         gs_uint32_t pos;
45         gs_uint32_t *sums;
46 };
47
48 void moving_sum_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
49   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
50   u->N=0; u->pos=0; u->sums=NULL;
51 }
52
53 void moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint32_t s, gs_uint32_t N) {
54   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
55   if(u->sums == NULL){
56         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
57         for(gs_int32_t i=0;i<N;i++)
58                 u->sums[i] = 0;
59     u->N = N;
60   }
61   u->sums[u->pos] += s;
62 }
63
64 void super_moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint64_t sub_sum) {
65   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
66   gs_uint32_t s = (gs_uint32_t)(sub_sum & 0xffffffff);
67   if(u->sums == NULL){
68     gs_uint32_t N = (gs_uint32_t)((sub_sum & 0xffffffff00000000ull) >> 32);
69         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
70         for(gs_int32_t i=0;i<N;i++)
71                 u->sums[i] = 0;
72     u->N = N;
73   }
74   u->sums[u->pos] += s;
75 }
76
77 void moving_sum_udaf_HFTA_AGGR_OUTPUT_(gs_p_t *result, gs_sp_t  buf){
78         *result = (gs_p_t)(buf);
79 }
80
81 void moving_sum_udaf_HFTA_AGGR_DESTROY_(gs_sp_t  buf){
82   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
83   if(u->sums != NULL)
84         free(u->sums);
85 }
86
87 void moving_sum_udaf_HFTA_AGGR_REINIT_( gs_sp_t  buf){
88   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
89   u->pos++;
90   if(u->pos >= u->N)
91         u->pos = 0;
92   u->sums[u->pos] = 0;
93 }
94
95 gs_uint32_t moving_sum_extract(gs_p_t result){
96   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
97   gs_uint32_t s=0, i=0;
98   for(i=0; i<u->N;i++){
99     s += u->sums[i];
100   }
101   return s;
102 }
103
104 gs_float_t moving_sum_extract_exp(gs_p_t result, gs_float_t alpha){
105   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
106   gs_uint32_t p=0, i=0;
107   gs_float_t s=0.0, m=1.0;
108   p=u->pos;
109   for(i=0; i<u->N;i++){
110     s += u->sums[i]*m;
111     if(p==0)
112                 p=u->N - 1;
113     else
114                 p--;
115         m *= alpha;
116   }
117   return s;
118 }
119
120
121 // -------------------------------------------------------------------
122 //              sum over 3 intervals : test rUDAF
123
124 struct sum3_udaf_str{
125   gs_uint32_t s_2;
126   gs_uint32_t s_1;
127   gs_uint32_t s_0;
128 };
129
130 void sum3_HFTA_AGGR_INIT_(gs_sp_t  buf) {
131   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
132   u->s_0 = 0; u->s_1 = 0; u->s_2 = 0;
133   return;
134 }          
135
136 void sum3_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint32_t s) {
137   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
138   u->s_0 += s;
139   return;
140 }
141
142 void sum3_HFTA_AGGR_OUTPUT_(gs_uint32_t *result, gs_sp_t  buf) {
143   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
144   *result = u->s_0 + u->s_1 + u->s_2;
145   return; 
146 }
147
148 void sum3_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {
149   return;
150 }
151
152 void sum3_HFTA_AGGR_REINIT_( gs_sp_t  buf) {
153   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
154   u->s_2 = u->s_1;
155   u->s_1 = u->s_0;
156   u->s_0 = 0;
157   return;
158 }
159
160
161 #define HISTORY_LENGTH 1024
162
163 /////////////////////////////////////////////////////////////////////////
164 /////   Calculate the average of all positive gs_float_t numbers
165
166 struct posavg_struct{
167   gs_float_t sum;
168   gs_float_t cnt;
169 };
170
171 void POSAVG_HFTA_AGGR_INIT_(gs_sp_t  buf) {
172   struct posavg_struct * a = (struct posavg_struct *) buf;
173   a->sum=0;
174   a->cnt=0;
175   return;
176 }
177
178 void POSAVG_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_float_t v) {
179   struct posavg_struct * a = (struct posavg_struct *) buf;
180   if (v>=0) {
181     a->sum=a->sum+v;
182     a->cnt=a->cnt+1;
183   }
184   return;
185 }
186
187 void POSAVG_HFTA_AGGR_OUTPUT_(gs_float_t * v, gs_sp_t  buf) {
188   struct posavg_struct * a = (struct posavg_struct *) buf;
189   if (a->cnt>0) {
190     *v=(a->sum/a->cnt);
191   } else {
192     *v=-1;
193   }
194   return;
195 }
196
197 void POSAVG_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {
198   return;
199 }
200
201 /////////////////////////////////////////////////////////////////////////
202 /////                   avg_udaf (simple example)
203
204 //              struct received from subaggregate
205 struct avg_udaf_lfta_struct_t{
206         gs_int64_t  sum;
207         gs_uint32_t cnt;
208 };
209
210 //              sctarchpad struct
211 struct avg_udaf_hfta_struct_t{
212         gs_int64_t sum;
213         gs_uint32_t cnt;
214 };
215
216 //                      avg_udaf functions
217 void avg_udaf_HFTA_AGGR_INIT_(gs_sp_t b){
218         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
219         s->sum = 0;
220         s->cnt = 0;
221 }
222
223 void avg_udaf_HFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v){
224         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
225         s->sum += v;
226         s->cnt ++;
227 }
228
229 void avg_udaf_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
230         r->length = 12;
231         r->offset = (gs_p_t)(b);
232         r->reserved = SHALLOW_COPY;
233 }
234
235 void avg_udaf_HFTA_AGGR_DESTROY_(gs_sp_t b){
236         return;
237 }
238
239
240 //                      avg_udaf superaggregate functions
241 void avg_udaf_hfta_HFTA_AGGR_INIT_(gs_sp_t b){
242         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
243         s->sum = 0;
244         s->cnt = 0;
245 }
246
247 void avg_udaf_hfta_HFTA_AGGR_UPDATE_(gs_sp_t b, vstring *v){
248         if(v->length != 12) return;
249         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
250         avg_udaf_lfta_struct_t *vs = (avg_udaf_lfta_struct_t *)(v->offset);
251         s->sum += vs->sum;
252         s->cnt += vs->cnt;
253 }
254
255 void avg_udaf_hfta_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
256         r->length = 12;
257         r->offset = (gs_p_t)(b);
258         r->reserved = SHALLOW_COPY;
259 }
260
261 void avg_udaf_hfta_HFTA_AGGR_DESTROY_(gs_sp_t b){
262         return;
263 }
264
265 //              Extraction function
266 gs_float_t extr_avg_fcn(vstring *v){
267         if(v->length != 12) return 0;
268         avg_udaf_hfta_struct_t *vs = (avg_udaf_hfta_struct_t *)(v->offset);
269         gs_float_t r = (gs_float_t)(vs->sum) / vs->cnt;
270     return r;
271 }
272
273 /////////////////////////////////////////////////////////
274 //              FIRST aggregate
275
276 void FIRST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
277         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
278 }
279
280 void FIRST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
281
282 void FIRST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
283         if (*scratch == UINT_MAX)
284                 *scratch = val;
285 }
286
287 void FIRST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
288         *res = *scratch;
289 }
290
291 void FIRST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
292
293 void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
294         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
295 }
296
297 void FIRST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
298
299 void FIRST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
300         if (*scratch == UINT_MAX)
301                 *scratch = val;
302 }
303
304 void FIRST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
305         *res = *scratch;
306 }
307
308 void FIRST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
309
310
311 void FIRST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
312         scratch->offset= 0;
313 }
314
315 void FIRST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
316
317 void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
318         if (!scratch->offset) {
319     scratch->length = val->length;
320     scratch->offset = val->offset;
321     scratch->reserved = SHALLOW_COPY;
322         }
323 }
324
325 void FIRST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
326         *res = *scratch;
327 }
328
329 void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
330
331 /////////////////////////////////////////////////////////
332 //              LAST aggregate
333
334 // hfta only
335
336 void LAST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
337
338 void LAST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
339
340 void LAST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
341         *scratch = val;
342 }
343
344 void LAST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
345         *res = *scratch;
346 }
347
348 void LAST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
349
350 void LAST_ULLHFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
351
352 void LAST_ULL_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
353
354 void LAST_ULL_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
355         *scratch = val;
356 }
357
358 void LAST_ULL_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
359         *res = *scratch;
360 }
361
362 void LAST_ULL_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
363
364
365 void LAST_STR_HFTA_AGGR_INIT_(vstring* scratch) {
366         scratch->offset= 0;
367 }
368
369 void LAST_STR_HFTA_AGGR_REINIT_(vstring* scratch) { }
370
371 void LAST_STR_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
372         scratch->length = val->length;
373   scratch->offset = val->offset;
374   scratch->reserved = SHALLOW_COPY;
375 }
376
377 void LAST_STR_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
378         *res = *scratch;
379 }
380
381 void LAST_STR_HFTA_AGGR_DESTROY_(vstring* scratch) { }
382
383 // hfta/lfta split
384
385 void LAST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
386
387 void LAST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
388
389 void LAST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
390         *scratch = val;
391 }
392
393 void LAST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
394         *res = *scratch;
395 }
396
397 void LAST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
398
399 void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
400
401 void LAST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
402
403 void LAST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
404         *scratch = val;
405 }
406
407 void LAST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
408         *res = *scratch;
409 }
410
411 void LAST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
412
413
414 void LAST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
415         scratch->offset= 0;
416 }
417
418 void LAST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
419
420 void LAST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
421         scratch->length = val->length;
422   scratch->offset = val->offset;
423   scratch->reserved = SHALLOW_COPY;
424 }
425
426 void LAST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
427         *res = *scratch;
428 }
429
430 void LAST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
431
432
433 /////////////////////////////////////////////////////////
434 //              running_array_aggr aggregate
435
436 void running_array_aggr_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
437   scratch->offset = NULL;  
438   scratch->length = 0;
439 }
440
441 void running_array_aggr_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
442
443 void running_array_aggr_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
444   char buffer[100];
445
446   gs_uint32_t* ints = (gs_uint32_t*)val->offset;
447   switch (val->length / sizeof (gs_uint32_t)) {
448     case 4:
449       sprintf(buffer, "%u,%u,%u,%u", ints[0], ints[1], ints[2], ints[3]);
450       break;
451     case 3:
452       sprintf(buffer, "%u,%u,%u", ints[0], ints[1], ints[2]);
453       break;   
454     case 2:
455       sprintf(buffer, "%u,%u", ints[0], ints[1]);
456       break;        
457     case 1:
458       sprintf(buffer, "%u", ints[0]);
459       break;  
460     case 0:
461       return;        
462   }
463   int buf_len = strlen(buffer);
464
465   // append the content of buffer to scratch
466         if (!scratch->offset) {
467     Vstring_Constructor(scratch, buffer);
468         } else {
469     scratch->offset = (gs_p_t)realloc((void*)scratch->offset, scratch->length + buf_len + 1);
470     *((char*)scratch->offset + scratch->length) = ',';
471     memcpy((void*)(scratch->offset + scratch->length + 1), (void*)buffer, buf_len);
472     scratch->length += buf_len + 1;
473     scratch->reserved = INTERNAL;
474   }
475 }
476
477 void running_array_aggr_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
478         *res = *scratch;
479   res->reserved = SHALLOW_COPY;
480 }
481
482 void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) {
483   hfta_vstr_destroy(scratch);
484  }
485