Improvements to aggregation code and fucntion library
[com/gs-lite.git] / src / lib / gscphftaaux / hfta_udaf.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16
17 #include "gsconfig.h"
18 #include "gstypes.h"
19 #include "hfta_udaf.h"
20 #include "rts_udaf.h"
21 #include <stdio.h>
22 #include <limits.h>
23 #include <math.h>
24 //#include <memory.h>
25 #include <string.h>
26 #include <sys/time.h>
27 #include <iostream>
28
29 #include "hfta_runtime_library.h"
30
31
32 #define max(a,b) ((a) > (b) ? (a) : (b))
33 #define min(x,y) ((x) < (y) ? (x) : (y))
34 #define lg(x)    (log(x) / log(2))
35
36 using namespace std;
37
38
39 // -------------------------------------------------------------------
40 //              moving sum over N intervals
41
42 struct moving_sum_udaf_str{
43         gs_uint32_t N;
44         gs_uint32_t pos;
45         gs_uint32_t *sums;
46 };
47
48 void moving_sum_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
49   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
50   u->N=0; u->pos=0; u->sums=NULL;
51 }
52
53 void moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint32_t s, gs_uint32_t N) {
54   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
55   if(u->sums == NULL){
56         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
57         for(gs_int32_t i=0;i<N;i++)
58                 u->sums[i] = 0;
59     u->N = N;
60   }
61   u->sums[u->pos] += s;
62 }
63
64 void super_moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint64_t sub_sum) {
65   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
66   gs_uint32_t s = (gs_uint32_t)(sub_sum & 0xffffffff);
67   if(u->sums == NULL){
68     gs_uint32_t N = (gs_uint32_t)((sub_sum & 0xffffffff00000000ull) >> 32);
69         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
70         for(gs_int32_t i=0;i<N;i++)
71                 u->sums[i] = 0;
72     u->N = N;
73   }
74   u->sums[u->pos] += s;
75 }
76
77 void moving_sum_udaf_HFTA_AGGR_OUTPUT_(gs_p_t *result, gs_sp_t  buf){
78         *result = (gs_p_t)(buf);
79 }
80
81 void moving_sum_udaf_HFTA_AGGR_DESTROY_(gs_sp_t  buf){
82   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
83   if(u->sums != NULL)
84         free(u->sums);
85 }
86
87 void moving_sum_udaf_HFTA_AGGR_REINIT_( gs_sp_t  buf){
88   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
89   u->pos++;
90   if(u->pos >= u->N)
91         u->pos = 0;
92   u->sums[u->pos] = 0;
93 }
94
95 gs_uint32_t moving_sum_extract(gs_p_t result){
96   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
97   gs_uint32_t s=0, i=0;
98   for(i=0; i<u->N;i++){
99     s += u->sums[i];
100   }
101   return s;
102 }
103
104 gs_float_t moving_sum_extract_exp(gs_p_t result, gs_float_t alpha){
105   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
106   gs_uint32_t p=0, i=0;
107   gs_float_t s=0.0, m=1.0;
108   p=u->pos;
109   for(i=0; i<u->N;i++){
110     s += u->sums[i]*m;
111     if(p==0)
112                 p=u->N - 1;
113     else
114                 p--;
115         m *= alpha;
116   }
117   return s;
118 }
119
120
121 // -------------------------------------------------------------------
122 //              sum over 3 intervals : test rUDAF
123
124 struct sum3_udaf_str{
125   gs_uint32_t s_2;
126   gs_uint32_t s_1;
127   gs_uint32_t s_0;
128 };
129
130 void sum3_HFTA_AGGR_INIT_(gs_sp_t  buf) {
131   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
132   u->s_0 = 0; u->s_1 = 0; u->s_2 = 0;
133   return;
134 }          
135
136 void sum3_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint32_t s) {
137   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
138   u->s_0 += s;
139   return;
140 }
141
142 void sum3_HFTA_AGGR_OUTPUT_(gs_uint32_t *result, gs_sp_t  buf) {
143   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
144   *result = u->s_0 + u->s_1 + u->s_2;
145   return; 
146 }
147
148 void sum3_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {
149   return;
150 }
151
152 void sum3_HFTA_AGGR_REINIT_( gs_sp_t  buf) {
153   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
154   u->s_2 = u->s_1;
155   u->s_1 = u->s_0;
156   u->s_0 = 0;
157   return;
158 }
159
160
161 #define HISTORY_LENGTH 1024
162
163 /////////////////////////////////////////////////////////////////////////
164 /////   Calculate the average of all positive gs_float_t numbers
165
166 struct posavg_struct{
167   gs_float_t sum;
168   gs_float_t cnt;
169 };
170
171 void POSAVG_HFTA_AGGR_INIT_(gs_sp_t  buf) {
172   struct posavg_struct * a = (struct posavg_struct *) buf;
173   a->sum=0;
174   a->cnt=0;
175   return;
176 }
177
178 void POSAVG_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_float_t v) {
179   struct posavg_struct * a = (struct posavg_struct *) buf;
180   if (v>=0) {
181     a->sum=a->sum+v;
182     a->cnt=a->cnt+1;
183   }
184   return;
185 }
186
187 void POSAVG_HFTA_AGGR_OUTPUT_(gs_float_t * v, gs_sp_t  buf) {
188   struct posavg_struct * a = (struct posavg_struct *) buf;
189   if (a->cnt>0) {
190     *v=(a->sum/a->cnt);
191   } else {
192     *v=-1;
193   }
194   return;
195 }
196
197 void POSAVG_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {
198   return;
199 }
200
201 /////////////////////////////////////////////////////////////////////////
202 /////                   avg_udaf (simple example)
203
204 //              struct received from subaggregate
205 struct avg_udaf_lfta_struct_t{
206         gs_int64_t  sum;
207         gs_uint32_t cnt;
208 };
209
210 //              scratchpad struct
211 struct avg_udaf_hfta_struct_t{
212         gs_int64_t sum;
213         gs_uint32_t cnt;
214 };
215
216 //                      avg_udaf functions
217 void avg_udaf_HFTA_AGGR_INIT_(gs_sp_t b){
218         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
219         s->sum = 0;
220         s->cnt = 0;
221 }
222
223 void avg_udaf_HFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v){
224         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
225         s->sum += v;
226         s->cnt ++;
227 }
228
229 void avg_udaf_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
230         r->length = 12;
231         r->offset = (gs_p_t)(b);
232         r->reserved = SHALLOW_COPY;
233 }
234
235 void avg_udaf_HFTA_AGGR_DESTROY_(gs_sp_t b){
236         return;
237 }
238
239
240 //                      avg_udaf superaggregate functions
241 void avg_udaf_hfta_HFTA_AGGR_INIT_(gs_sp_t b){
242         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
243         s->sum = 0;
244         s->cnt = 0;
245 }
246
247 void avg_udaf_hfta_HFTA_AGGR_UPDATE_(gs_sp_t b, vstring *v){
248         if(v->length != 12) return;
249         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
250         avg_udaf_lfta_struct_t *vs = (avg_udaf_lfta_struct_t *)(v->offset);
251         s->sum += vs->sum;
252         s->cnt += vs->cnt;
253 }
254
255 void avg_udaf_hfta_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
256         r->length = 12;
257         r->offset = (gs_p_t)(b);
258         r->reserved = SHALLOW_COPY;
259 }
260
261 void avg_udaf_hfta_HFTA_AGGR_DESTROY_(gs_sp_t b){
262         return;
263 }
264
265 //              Extraction function
266 gs_float_t extr_avg_fcn(vstring *v){
267         if(v->length != 12) return 0;
268         avg_udaf_hfta_struct_t *vs = (avg_udaf_hfta_struct_t *)(v->offset);
269         gs_float_t r = (gs_float_t)(vs->sum) / vs->cnt;
270     return r;
271 }
272
273 /////////////////////////////////////////////////////////
274 //              FIRST aggregate
275 // hfta only
276
277 // uint
278 void FIRST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
279         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
280 }
281 void FIRST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
282 void FIRST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
283         if (*scratch == UINT_MAX)
284                 *scratch = val;
285 }
286 void FIRST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
287         *res = *scratch;
288 }
289 void FIRST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
290
291 // int
292
293 void FIRST_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
294         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
295 }
296 void FIRST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
297 void FIRST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
298         if (*scratch == UINT_MAX)
299                 *scratch = val;
300 }
301 void FIRST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
302         *res = *scratch;
303 }
304 void FIRST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
305
306 // ullong
307 void FIRST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
308         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
309 }
310 void FIRST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
311 void FIRST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
312         if (*scratch == UINT_MAX)
313                 *scratch = val;
314 }
315 void FIRST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
316         *res = *scratch;
317 }
318 void FIRST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
319
320 // llong
321 void FIRST_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
322         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
323 }
324 void FIRST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
325 void FIRST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
326         if (*scratch == UINT_MAX)
327                 *scratch = val;
328 }
329 void FIRST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
330         *res = *scratch;
331 }
332 void FIRST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
333
334 // string
335 void FIRST_HFTA_AGGR_INIT_(vstring* scratch) {
336         scratch->offset= 0;
337 }
338 void FIRST_HFTA_AGGR_REINIT_(vstring* scratch) { }
339 void FIRST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
340         if (!scratch->offset) {
341     scratch->length = val->length;
342     scratch->offset = val->offset;
343     scratch->reserved = SHALLOW_COPY;
344         }
345 }
346 void FIRST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
347         *res = *scratch;
348 }
349 void FIRST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
350
351 // hfta/lfta split
352
353 // uint
354 void FIRST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
355         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
356 }
357 void FIRST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
358 void FIRST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
359         if (*scratch == UINT_MAX)
360                 *scratch = val;
361 }
362 void FIRST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
363         *res = *scratch;
364 }
365 void FIRST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
366
367 // int
368 void FIRST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
369         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
370 }
371 void FIRST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
372 void FIRST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
373         if (*scratch == UINT_MAX)
374                 *scratch = val;
375 }
376 void FIRST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
377         *res = *scratch;
378 }
379 void FIRST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
380
381 // ullong
382 void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
383         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
384 }
385 void FIRST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
386 void FIRST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
387         if (*scratch == UINT_MAX)
388                 *scratch = val;
389 }
390 void FIRST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
391         *res = *scratch;
392 }
393 void FIRST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
394
395 // llong
396 void FIRST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
397         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
398 }
399 void FIRST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
400 void FIRST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
401         if (*scratch == UINT_MAX)
402                 *scratch = val;
403 }
404 void FIRST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
405         *res = *scratch;
406 }
407 void FIRST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
408
409 // string
410 void FIRST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
411         scratch->offset= 0;
412 }
413 void FIRST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
414 void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
415         if (!scratch->offset) {
416     scratch->length = val->length;
417     scratch->offset = val->offset;
418     scratch->reserved = SHALLOW_COPY;
419         }
420 }
421 void FIRST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
422         *res = *scratch;
423 }
424 void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
425
426
427 /////////////////////////////////////////////////////////
428 //              LAST aggregate
429
430 // hfta only
431
432 // uint
433 void LAST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
434 void LAST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
435 void LAST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
436         *scratch = val;
437 }
438 void LAST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
439         *res = *scratch;
440 }
441 void LAST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
442
443 // int
444 void LAST_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
445 void LAST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
446 void LAST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
447         *scratch = val;
448 }
449 void LAST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
450         *res = *scratch;
451 }
452 void LAST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
453
454 // llong
455 void LAST_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
456 void LAST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
457 void LAST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
458         *scratch = val;
459 }
460 void LAST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
461         *res = *scratch;
462 }
463 void LAST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
464
465
466 // ullong
467 void LAST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
468 void LAST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
469 void LAST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
470         *scratch = val;
471 }
472 void LAST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
473         *res = *scratch;
474 }
475 void LAST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
476
477 // string
478 void LAST_HFTA_AGGR_INIT_(vstring* scratch) {
479         scratch->offset= 0;
480 }
481 void LAST_HFTA_AGGR_REINIT_(vstring* scratch) { }
482 void LAST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
483         scratch->length = val->length;
484   scratch->offset = val->offset;
485   scratch->reserved = SHALLOW_COPY;
486 }
487 void LAST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
488         *res = *scratch;
489 }
490 void LAST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
491
492 // hfta/lfta split
493
494 void LAST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
495 void LAST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
496 void LAST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
497         *scratch = val;
498 }
499 void LAST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
500         *res = *scratch;
501 }
502 void LAST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
503
504 void LAST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
505 void LAST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
506 void LAST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
507         *scratch = val;
508 }
509 void LAST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
510         *res = *scratch;
511 }
512 void LAST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
513
514 void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
515 void LAST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
516 void LAST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
517         *scratch = val;
518 }
519 void LAST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
520         *res = *scratch;
521 }
522 void LAST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
523
524 void LAST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
525 void LAST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
526 void LAST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
527         *scratch = val;
528 }
529 void LAST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
530         *res = *scratch;
531 }
532 void LAST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
533
534
535 void LAST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
536         scratch->offset= 0;
537 }
538 void LAST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
539 void LAST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
540         scratch->length = val->length;
541   scratch->offset = val->offset;
542   scratch->reserved = SHALLOW_COPY;
543 }
544 void LAST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
545         *res = *scratch;
546 }
547 void LAST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
548
549
550 ////////////////////////////////////////////////////////////
551 //              count different (# of times the value is different than the previous)
552
553 struct count_diff_scratch{
554         gs_uint32_t count;
555         union{
556                 gs_uint32_t ui;
557                 gs_int32_t i;
558                 gs_uint64_t ul;
559                 gs_int64_t l;
560         } r;
561 };
562
563 //////////  HFTA only
564
565 // uint32
566 void count_diff_HFTA_AGGR_INIT_(gs_sp_t s){
567         count_diff_scratch *scratch = (count_diff_scratch *)s;
568         scratch->count = 0;
569         scratch->r.ul = 0;
570 }
571 void count_diff_HFTA_AGGR_REINIT_(gs_sp_t s){
572         count_diff_scratch *scratch = (count_diff_scratch *)s;
573         scratch->count = 0;
574 }
575 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val){
576         count_diff_scratch *scratch = (count_diff_scratch *)s;
577         if(scratch->count==0 || scratch->r.ui != val)
578                 scratch->count++;
579         scratch->r.ui = val;
580 }
581 void count_diff_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
582         count_diff_scratch *scratch = (count_diff_scratch *)s;
583         *res = scratch->count;
584 }
585 void count_diff_HFTA_AGGR_DESTROY_(gs_sp_t s){ }
586
587 // int32
588 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val){
589         count_diff_scratch *scratch = (count_diff_scratch *)s;
590         if(scratch->count==0 || scratch->r.i != val)
591                 scratch->count++;
592         scratch->r.i = val;
593 }
594
595 // uint64
596 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val){
597         count_diff_scratch *scratch = (count_diff_scratch *)s;
598         if(scratch->count==0 || scratch->r.ul != val)
599                 scratch->count++;
600         scratch->r.ul = val;
601 }
602
603 // int64
604 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val){
605         count_diff_scratch *scratch = (count_diff_scratch *)s;
606         if(scratch->count==0 || scratch->r.l != val)
607                 scratch->count++;
608         scratch->r.l = val;
609 }
610
611 // vstring
612 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring* val){
613         count_diff_scratch *scratch = (count_diff_scratch *)s;
614         gs_uint64_t hashval = hfta_vstr_long_hashfunc(val);
615         if(scratch->count==0 || scratch->r.l != hashval)
616                 scratch->count++;
617         scratch->r.l = hashval;
618 }
619
620 ////////// HFTA / LFTA split
621
622 struct lfta_count_diff_scratch{
623         gs_uint32_t count;
624         union{
625                 gs_uint32_t ui;
626                 gs_int32_t i;
627                 gs_uint64_t ul;
628                 gs_int64_t l;
629         } first;
630         union{
631                 gs_uint32_t ui;
632                 gs_int32_t i;
633                 gs_uint64_t ul;
634                 gs_int64_t l;
635         } last;
636 };
637
638
639 void count_diff_hfta_HFTA_AGGR_INIT_(gs_sp_t s){
640         count_diff_scratch *scratch = (count_diff_scratch *)s;
641         scratch->count = 0;
642         scratch->r.ul = 0;
643 }
644 void count_diff_hfta_HFTA_AGGR_REINIT_(gs_sp_t s){
645         count_diff_scratch *scratch = (count_diff_scratch *)s;
646         scratch->count = 0;
647         scratch->r.ul = 0;
648 }
649 void count_diff_hfta_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *v){
650         lfta_count_diff_scratch *val = (lfta_count_diff_scratch *)v;
651         count_diff_scratch *scratch = (count_diff_scratch *)(v->offset);
652         scratch->count += val->count - 1;
653         if(scratch->r.l != val->first.l)
654                 scratch->count++;
655         scratch->r.l = val->last.l;
656 }
657 void count_diff_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
658         count_diff_scratch *scratch = (count_diff_scratch *)s;
659         *res = (scratch->count)+1;
660 }
661 void count_diff_hfta_HFTA_AGGR_DESTROY_(gs_sp_t scratch){ }
662         
663
664
665 /////////////////////////////////////////////////////////
666 //              running_array_aggr aggregate
667
668 void running_array_aggr_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
669   scratch->offset = NULL;  
670   scratch->length = 0;
671 }
672
673 void running_array_aggr_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
674
675 void running_array_aggr_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
676   char buffer[100];
677
678   gs_uint32_t* ints = (gs_uint32_t*)val->offset;
679   switch (val->length / sizeof (gs_uint32_t)) {
680     case 4:
681       sprintf(buffer, "%u,%u,%u,%u", ints[0], ints[1], ints[2], ints[3]);
682       break;
683     case 3:
684       sprintf(buffer, "%u,%u,%u", ints[0], ints[1], ints[2]);
685       break;   
686     case 2:
687       sprintf(buffer, "%u,%u", ints[0], ints[1]);
688       break;        
689     case 1:
690       sprintf(buffer, "%u", ints[0]);
691       break;  
692     case 0:
693       return;        
694   }
695   int buf_len = strlen(buffer);
696
697   // append the content of buffer to scratch
698         if (!scratch->offset) {
699     Vstring_Constructor(scratch, buffer);
700         } else {
701     scratch->offset = (gs_p_t)realloc((void*)scratch->offset, scratch->length + buf_len + 1);
702     *((char*)scratch->offset + scratch->length) = ',';
703     memcpy((void*)(scratch->offset + scratch->length + 1), (void*)buffer, buf_len);
704     scratch->length += buf_len + 1;
705     scratch->reserved = INTERNAL;
706   }
707 }
708
709 void running_array_aggr_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
710         *res = *scratch;
711   res->reserved = SHALLOW_COPY;
712 }
713
714 void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) {
715   hfta_vstr_destroy(scratch);
716  }
717
718
719 ////////////////////////////////////////////
720 //              Aggregate strings by catenation
721         
722
723 struct CAT_aggr_scratch{
724         std::string val;
725         int x;
726 };
727
728 struct CAT_aggr_scratch_ptr{
729         CAT_aggr_scratch *ptr;
730 };
731
732 void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s){
733         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
734         CAT_aggr_scratch *v = new CAT_aggr_scratch();
735         v->x = 101;
736
737         p->ptr = v;
738 }
739 void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s){
740         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
741         CAT_aggr_scratch *v = p->ptr;
742         v->val="";
743 }
744 void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str){
745 char buf1[MAXTUPLESZ-20], buf2[MAXTUPLESZ-20];
746 int i;
747 for(i=0;i<sep->length;++i) buf1[i] = *(((char *)sep->offset)+i);
748 buf1[i]='\0';
749 for(i=0;i<str->length;++i) buf2[i] = *(((char *)str->offset)+i);
750 buf2[i]='\0';
751         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
752         CAT_aggr_scratch *v = p->ptr;
753         if(v->val.size()>0)
754                 v->val.append((char *)(sep->offset), sep->length);
755         v->val.append((char *)(str->offset), str->length);
756 //printf("sep=%s, str=%s, val=%s\n",buf1,buf2,v->val.c_str());
757 }
758 void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){
759         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
760         CAT_aggr_scratch *v = p->ptr;
761 //printf("output val=%s\n",v->val.c_str());
762         res->offset = (gs_p_t)malloc(v->val.size());
763         res->length = v->val.size();
764         if(res->length>MAXTUPLESZ-20)
765                 res->length=MAXTUPLESZ-20;
766 //      v->val.copy((char *)(res->offset), 0, res->length);
767         const char *dat = v->val.c_str();
768         memcpy((char *)(res->offset), dat, res->length);
769 //      for(int i=0;i<res->length;++i)
770 //              *(((char *)res->offset)+i) = dat[i];
771         res->reserved = INTERNAL;
772 }
773 void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s){
774         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
775         CAT_aggr_scratch *v = p->ptr;
776         delete v;
777 }
778
779         
780         
781 ///////////////////////////////////////////////////////////////
782 //      time_avg((sample, ts, window_size)
783 //  Compute time-weighted average sum(sample*duration)/window_size
784 //  duration is difference between current and next ts.
785 //  The idea is to compute a sum over a step function.
786 //  
787         
788 struct time_avg_udaf_str{
789         gs_float_t sum;
790         gs_float_t last_val;
791         gs_uint64_t last_ts;
792         gs_uint64_t window;
793         gs_uint64_t first_ts;
794         gs_uint8_t event_occurred;
795 };
796         
797 void time_avg_HFTA_AGGR_INIT_(gs_sp_t s){
798         time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
799         scratch->sum = 0.0;
800         scratch->last_val = 0.0;
801         scratch->last_ts = 0;
802         scratch->first_ts = 0;
803         scratch->event_occurred = 0;
804 }
805
806 void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s){
807 }
808
809 void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s){
810         time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
811         scratch->event_occurred = 0;
812         scratch->sum = 0;
813 //printf("time_avg_reinit: occurred=%d, last_val=%lf, sum=%lf, first_ts=%lld, last_ts=%lld\n",scratch->event_occurred, scratch->last_val, scratch->sum, scratch->first_ts, scratch->last_ts);
814 }
815
816 void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s){
817         time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
818         if(scratch->event_occurred==0){
819                 *result =  scratch->last_val;
820 //printf("\ttime_avg outpt1 sum=%lf, last_val=%lf, result=%lf\n", scratch->sum, scratch->last_val, *result);
821                 return;
822         }
823         gs_int64_t end_time = scratch->window * (scratch->last_ts/scratch->window + 1);
824         scratch->sum += (end_time - scratch->last_ts) * (gs_float_t)(scratch->last_val); 
825         gs_int64_t start_time = end_time - scratch->window;
826         if(scratch->first_ts > start_time){
827                 *result = scratch->sum / (end_time - scratch->first_ts);
828 //printf("\ttime_avg outpt2 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
829         }else{
830                 *result = scratch->sum / (end_time - start_time);
831 //printf("\ttime_avg outpt3 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
832         }
833 }
834
835 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window){
836         time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
837         scratch->window = window;
838         if(scratch->first_ts==0){
839                 scratch->first_ts = ts;
840         }else{
841                 if(scratch->event_occurred){
842
843                         scratch->sum += (ts - scratch->last_ts) * scratch->last_val;
844                 }else{
845                         gs_int64_t start_time = scratch->window * (scratch->last_ts/scratch->window);
846                         scratch->sum += (ts - start_time) * scratch->last_val;
847                 }
848         }
849 //printf("time_avg_upd: val=%lf, occurred=%d, last_val=%lf, sum=%lf, ts=%lld, first_ts=%lld, last_ts=%lld\n",val, scratch->event_occurred, scratch->last_val, scratch->sum, ts, scratch->first_ts, scratch->last_ts);
850         scratch->last_val = val;
851         scratch->last_ts = ts;
852         scratch->event_occurred = 1;
853 }
854 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window){
855         time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
856 }
857 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window){
858         time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
859 }
860 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window){
861         time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
862 }
863 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window){
864         time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
865 }
866         
867
868 // ------------------------------------------------------------
869 //              running_sum_max : get the running sum of an int,
870 //              be able to report this sum and also its max value
871 //              during the time window
872
873 struct run_sum_max_udaf_str{
874         gs_int64_t sum;
875         gs_int64_t max;
876 };
877 void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s){
878         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
879         scratch->sum = 0;
880         scratch->max = 0;
881 }
882 void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s){
883         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
884         scratch->max = scratch->sum;
885 }
886 void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
887         r->length = sizeof(run_sum_max_udaf_str);
888         r->offset = (gs_p_t)(b);
889         r->reserved = SHALLOW_COPY;
890 }
891 void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b){
892         return;
893 }
894
895 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v){
896         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
897         scratch->sum+=v;
898         if(scratch->sum>scratch->max) scratch->max=scratch->sum;
899 }
900 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v){
901         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
902         scratch->sum+=v;
903         if(scratch->sum>scratch->max) scratch->max=scratch->sum;
904 }
905 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v){
906         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
907         scratch->sum+=v;
908         if(scratch->sum>scratch->max) scratch->max=scratch->sum;
909 }
910 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v){
911         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
912         scratch->sum+=v;
913         if(scratch->sum>scratch->max) scratch->max=scratch->sum;
914 }
915 //      the extraction functions
916 gs_int64_t extr_running_sum(vstring *v){
917         if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
918         run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
919         return vs->sum;
920 }
921 gs_int64_t extr_running_sum_max(vstring *v){
922         if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
923         run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
924         return vs->max;
925 }
926
927