60ae7501eae769e9e08fe9d28d94700262ea92db
[com/gs-lite.git] / src / lib / gscphftaaux / hfta_udaf.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16
17 #include "gsconfig.h"
18 #include "gstypes.h"
19 #include "hfta_udaf.h"
20 #include "rts_udaf.h"
21 #include <stdio.h>
22 #include <limits.h>
23 #include <math.h>
24 //#include <memory.h>
25 #include <string.h>
26 #include <sys/time.h>
27 #include <iostream>
28
29 #include "hfta_runtime_library.h"
30 #include"stringhash.h"
31
32
33 #define max(a,b) ((a) > (b) ? (a) : (b))
34 #define min(x,y) ((x) < (y) ? (x) : (y))
35 #define lg(x)    (log(x) / log(2))
36
37 using namespace std;
38
39
40 // -------------------------------------------------------------------
41 //              moving sum over N intervals
42
43 struct moving_sum_udaf_str{
44         gs_uint32_t N;
45         gs_uint32_t pos;
46         gs_uint32_t *sums;
47 };
48
49 void moving_sum_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
50   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
51   u->N=0; u->pos=0; u->sums=NULL;
52 }
53
54 void moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint32_t s, gs_uint32_t N) {
55   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
56   if(u->sums == NULL){
57         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
58         for(gs_int32_t i=0;i<N;i++)
59                 u->sums[i] = 0;
60     u->N = N;
61   }
62   u->sums[u->pos] += s;
63 }
64
65 void super_moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint64_t sub_sum) {
66   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
67   gs_uint32_t s = (gs_uint32_t)(sub_sum & 0xffffffff);
68   if(u->sums == NULL){
69     gs_uint32_t N = (gs_uint32_t)((sub_sum & 0xffffffff00000000ull) >> 32);
70         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
71         for(gs_int32_t i=0;i<N;i++)
72                 u->sums[i] = 0;
73     u->N = N;
74   }
75   u->sums[u->pos] += s;
76 }
77
78 void moving_sum_udaf_HFTA_AGGR_OUTPUT_(gs_p_t *result, gs_sp_t  buf){
79         *result = (gs_p_t)(buf);
80 }
81
82 void moving_sum_udaf_HFTA_AGGR_DESTROY_(gs_sp_t  buf){
83   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
84   if(u->sums != NULL)
85         free(u->sums);
86 }
87
88 void moving_sum_udaf_HFTA_AGGR_REINIT_( gs_sp_t  buf){
89   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
90   u->pos++;
91   if(u->pos >= u->N)
92         u->pos = 0;
93   u->sums[u->pos] = 0;
94 }
95
96 gs_uint32_t moving_sum_extract(gs_p_t result){
97   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
98   gs_uint32_t s=0, i=0;
99   for(i=0; i<u->N;i++){
100     s += u->sums[i];
101   }
102   return s;
103 }
104
105 gs_float_t moving_sum_extract_exp(gs_p_t result, gs_float_t alpha){
106   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
107   gs_uint32_t p=0, i=0;
108   gs_float_t s=0.0, m=1.0;
109   p=u->pos;
110   for(i=0; i<u->N;i++){
111     s += u->sums[i]*m;
112     if(p==0)
113                 p=u->N - 1;
114     else
115                 p--;
116         m *= alpha;
117   }
118   return s;
119 }
120
121
122 // -------------------------------------------------------------------
123 //              sum over 3 intervals : test rUDAF
124
125 struct sum3_udaf_str{
126   gs_uint32_t s_2;
127   gs_uint32_t s_1;
128   gs_uint32_t s_0;
129 };
130
131 void sum3_HFTA_AGGR_INIT_(gs_sp_t  buf) {
132   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
133   u->s_0 = 0; u->s_1 = 0; u->s_2 = 0;
134   return;
135 }          
136
137 void sum3_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint32_t s) {
138   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
139   u->s_0 += s;
140   return;
141 }
142
143 void sum3_HFTA_AGGR_OUTPUT_(gs_uint32_t *result, gs_sp_t  buf) {
144   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
145   *result = u->s_0 + u->s_1 + u->s_2;
146   return; 
147 }
148
149 void sum3_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {
150   return;
151 }
152
153 void sum3_HFTA_AGGR_REINIT_( gs_sp_t  buf) {
154   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
155   u->s_2 = u->s_1;
156   u->s_1 = u->s_0;
157   u->s_0 = 0;
158   return;
159 }
160
161
162 #define HISTORY_LENGTH 1024
163
164 /////////////////////////////////////////////////////////////////////////
165 /////   Calculate the average of all positive gs_float_t numbers
166
167 struct posavg_struct{
168   gs_float_t sum;
169   gs_float_t cnt;
170 };
171
172 void POSAVG_HFTA_AGGR_INIT_(gs_sp_t  buf) {
173   struct posavg_struct * a = (struct posavg_struct *) buf;
174   a->sum=0;
175   a->cnt=0;
176   return;
177 }
178
179 void POSAVG_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_float_t v) {
180   struct posavg_struct * a = (struct posavg_struct *) buf;
181   if (v>=0) {
182     a->sum=a->sum+v;
183     a->cnt=a->cnt+1;
184   }
185   return;
186 }
187
188 void POSAVG_HFTA_AGGR_OUTPUT_(gs_float_t * v, gs_sp_t  buf) {
189   struct posavg_struct * a = (struct posavg_struct *) buf;
190   if (a->cnt>0) {
191     *v=(a->sum/a->cnt);
192   } else {
193     *v=-1;
194   }
195   return;
196 }
197
198 void POSAVG_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {
199   return;
200 }
201
202 /////////////////////////////////////////////////////////////////////////
203 /////                   avg_udaf (simple example)
204
205 //              struct received from subaggregate
206 struct avg_udaf_lfta_struct_t{
207         gs_int64_t  sum;
208         gs_uint32_t cnt;
209 };
210
211 //              scratchpad struct
212 struct avg_udaf_hfta_struct_t{
213         gs_int64_t sum;
214         gs_uint32_t cnt;
215 };
216
217 //                      avg_udaf functions
218 void avg_udaf_HFTA_AGGR_INIT_(gs_sp_t b){
219         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
220         s->sum = 0;
221         s->cnt = 0;
222 }
223
224 void avg_udaf_HFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v){
225         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
226         s->sum += v;
227         s->cnt ++;
228 }
229
230 void avg_udaf_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
231         r->length = 12;
232         r->offset = (gs_p_t)(b);
233         r->reserved = SHALLOW_COPY;
234 }
235
236 void avg_udaf_HFTA_AGGR_DESTROY_(gs_sp_t b){
237         return;
238 }
239
240
241 //                      avg_udaf superaggregate functions
242 void avg_udaf_hfta_HFTA_AGGR_INIT_(gs_sp_t b){
243         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
244         s->sum = 0;
245         s->cnt = 0;
246 }
247
248 void avg_udaf_hfta_HFTA_AGGR_UPDATE_(gs_sp_t b, vstring *v){
249         if(v->length != 12) return;
250         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
251         avg_udaf_lfta_struct_t *vs = (avg_udaf_lfta_struct_t *)(v->offset);
252         s->sum += vs->sum;
253         s->cnt += vs->cnt;
254 }
255
256 void avg_udaf_hfta_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
257         r->length = 12;
258         r->offset = (gs_p_t)(b);
259         r->reserved = SHALLOW_COPY;
260 }
261
262 void avg_udaf_hfta_HFTA_AGGR_DESTROY_(gs_sp_t b){
263         return;
264 }
265
266 //              Extraction function
267 gs_float_t extr_avg_fcn(vstring *v){
268         if(v->length != 12) return 0;
269         avg_udaf_hfta_struct_t *vs = (avg_udaf_hfta_struct_t *)(v->offset);
270         gs_float_t r = (gs_float_t)(vs->sum) / vs->cnt;
271     return r;
272 }
273
274 /////////////////////////////////////////////////////////
275 //              FIRST aggregate
276 // hfta only
277
278 // uint
279 void FIRST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
280         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
281 }
282 void FIRST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
283 void FIRST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
284         if (*scratch == UINT_MAX)
285                 *scratch = val;
286 }
287 void FIRST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
288         *res = *scratch;
289 }
290 void FIRST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
291
292 // int
293
294 void FIRST_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
295         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
296 }
297 void FIRST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
298 void FIRST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
299         if (*scratch == UINT_MAX)
300                 *scratch = val;
301 }
302 void FIRST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
303         *res = *scratch;
304 }
305 void FIRST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
306
307 // ullong
308 void FIRST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
309         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
310 }
311 void FIRST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
312 void FIRST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
313         if (*scratch == UINT_MAX)
314                 *scratch = val;
315 }
316 void FIRST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
317         *res = *scratch;
318 }
319 void FIRST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
320
321 // llong
322 void FIRST_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
323         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
324 }
325 void FIRST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
326 void FIRST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
327         if (*scratch == UINT_MAX)
328                 *scratch = val;
329 }
330 void FIRST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
331         *res = *scratch;
332 }
333 void FIRST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
334
335 // string
336 void FIRST_HFTA_AGGR_INIT_(vstring* scratch) {
337         scratch->offset= 0;
338 }
339 void FIRST_HFTA_AGGR_REINIT_(vstring* scratch) { }
340 void FIRST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
341         if (!scratch->offset) {
342     scratch->length = val->length;
343     scratch->offset = val->offset;
344     scratch->reserved = SHALLOW_COPY;
345         }
346 }
347 void FIRST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
348         *res = *scratch;
349 }
350 void FIRST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
351
352 // hfta/lfta split
353
354 // uint
355 void FIRST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
356         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
357 }
358 void FIRST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
359 void FIRST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
360         if (*scratch == UINT_MAX)
361                 *scratch = val;
362 }
363 void FIRST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
364         *res = *scratch;
365 }
366 void FIRST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
367
368 // int
369 void FIRST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
370         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
371 }
372 void FIRST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
373 void FIRST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
374         if (*scratch == UINT_MAX)
375                 *scratch = val;
376 }
377 void FIRST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
378         *res = *scratch;
379 }
380 void FIRST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
381
382 // ullong
383 void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
384         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
385 }
386 void FIRST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
387 void FIRST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
388         if (*scratch == UINT_MAX)
389                 *scratch = val;
390 }
391 void FIRST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
392         *res = *scratch;
393 }
394 void FIRST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
395
396 // llong
397 void FIRST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
398         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
399 }
400 void FIRST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
401 void FIRST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
402         if (*scratch == UINT_MAX)
403                 *scratch = val;
404 }
405 void FIRST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
406         *res = *scratch;
407 }
408 void FIRST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
409
410 // string
411 void FIRST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
412         scratch->offset= 0;
413 }
414 void FIRST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
415 void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
416         if (!scratch->offset) {
417     scratch->length = val->length;
418     scratch->offset = val->offset;
419     scratch->reserved = SHALLOW_COPY;
420         }
421 }
422 void FIRST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
423         *res = *scratch;
424 }
425 void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
426
427
428 /////////////////////////////////////////////////////////
429 //              LAST aggregate
430
431 // hfta only
432
433 // uint
434 void LAST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
435 void LAST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
436 void LAST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
437         *scratch = val;
438 }
439 void LAST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
440         *res = *scratch;
441 }
442 void LAST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
443
444 // int
445 void LAST_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
446 void LAST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
447 void LAST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
448         *scratch = val;
449 }
450 void LAST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
451         *res = *scratch;
452 }
453 void LAST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
454
455 // llong
456 void LAST_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
457 void LAST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
458 void LAST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
459         *scratch = val;
460 }
461 void LAST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
462         *res = *scratch;
463 }
464 void LAST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
465
466
467 // ullong
468 void LAST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
469 void LAST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
470 void LAST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
471         *scratch = val;
472 }
473 void LAST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
474         *res = *scratch;
475 }
476 void LAST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
477
478 // string
479 void LAST_HFTA_AGGR_INIT_(vstring* scratch) {
480         scratch->offset= 0;
481 }
482 void LAST_HFTA_AGGR_REINIT_(vstring* scratch) { }
483 void LAST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
484         scratch->length = val->length;
485   scratch->offset = val->offset;
486   scratch->reserved = SHALLOW_COPY;
487 }
488 void LAST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
489         *res = *scratch;
490 }
491 void LAST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
492
493 // hfta/lfta split
494
495 void LAST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
496 void LAST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
497 void LAST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
498         *scratch = val;
499 }
500 void LAST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
501         *res = *scratch;
502 }
503 void LAST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
504
505 void LAST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
506 void LAST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
507 void LAST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
508         *scratch = val;
509 }
510 void LAST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
511         *res = *scratch;
512 }
513 void LAST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
514
515 void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
516 void LAST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
517 void LAST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
518         *scratch = val;
519 }
520 void LAST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
521         *res = *scratch;
522 }
523 void LAST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
524
525 void LAST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
526 void LAST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
527 void LAST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
528         *scratch = val;
529 }
530 void LAST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
531         *res = *scratch;
532 }
533 void LAST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
534
535
536 void LAST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
537         scratch->offset= 0;
538 }
539 void LAST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
540 void LAST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
541         scratch->length = val->length;
542   scratch->offset = val->offset;
543   scratch->reserved = SHALLOW_COPY;
544 }
545 void LAST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
546         *res = *scratch;
547 }
548 void LAST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
549
550
551 ////////////////////////////////////////////////////////////
552 //              count different (# of times the value is different than the previous)
553
554 struct count_diff_scratch{
555         gs_uint32_t count;
556         union{
557                 gs_uint32_t ui;
558                 gs_int32_t i;
559                 gs_uint64_t ul;
560                 gs_int64_t l;
561         } r;
562 };
563
564 //////////  HFTA only
565
566 // uint32
567 void count_diff_HFTA_AGGR_INIT_(gs_sp_t s){
568         count_diff_scratch *scratch = (count_diff_scratch *)s;
569         scratch->count = 0;
570         scratch->r.ul = 0;
571 }
572 void count_diff_HFTA_AGGR_REINIT_(gs_sp_t s){
573         count_diff_scratch *scratch = (count_diff_scratch *)s;
574         scratch->count = 0;
575 }
576 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val){
577         count_diff_scratch *scratch = (count_diff_scratch *)s;
578         if(scratch->count==0 || scratch->r.ui != val)
579                 scratch->count++;
580         scratch->r.ui = val;
581 }
582 void count_diff_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
583         count_diff_scratch *scratch = (count_diff_scratch *)s;
584         *res = scratch->count;
585 }
586 void count_diff_HFTA_AGGR_DESTROY_(gs_sp_t s){ }
587
588 // int32
589 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val){
590         count_diff_scratch *scratch = (count_diff_scratch *)s;
591         if(scratch->count==0 || scratch->r.i != val)
592                 scratch->count++;
593         scratch->r.i = val;
594 }
595
596 // uint64
597 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val){
598         count_diff_scratch *scratch = (count_diff_scratch *)s;
599         if(scratch->count==0 || scratch->r.ul != val)
600                 scratch->count++;
601         scratch->r.ul = val;
602 }
603
604 // int64
605 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val){
606         count_diff_scratch *scratch = (count_diff_scratch *)s;
607         if(scratch->count==0 || scratch->r.l != val)
608                 scratch->count++;
609         scratch->r.l = val;
610 }
611
612 // vstring
613 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring* val){
614         count_diff_scratch *scratch = (count_diff_scratch *)s;
615         gs_uint64_t hashval = hfta_vstr_long_hashfunc(val);
616         if(scratch->count==0 || scratch->r.l != hashval)
617                 scratch->count++;
618         scratch->r.l = hashval;
619 }
620
621 ////////// HFTA / LFTA split
622
623 struct lfta_count_diff_scratch{
624         gs_uint32_t count;
625         union{
626                 gs_uint32_t ui;
627                 gs_int32_t i;
628                 gs_uint64_t ul;
629                 gs_int64_t l;
630         } first;
631         union{
632                 gs_uint32_t ui;
633                 gs_int32_t i;
634                 gs_uint64_t ul;
635                 gs_int64_t l;
636         } last;
637 };
638
639
640 void count_diff_hfta_HFTA_AGGR_INIT_(gs_sp_t s){
641         count_diff_scratch *scratch = (count_diff_scratch *)s;
642         scratch->count = 0;
643         scratch->r.ul = 0;
644 }
645 void count_diff_hfta_HFTA_AGGR_REINIT_(gs_sp_t s){
646         count_diff_scratch *scratch = (count_diff_scratch *)s;
647         scratch->count = 0;
648         scratch->r.ul = 0;
649 }
650 void count_diff_hfta_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *v){
651         lfta_count_diff_scratch *val = (lfta_count_diff_scratch *)v;
652         count_diff_scratch *scratch = (count_diff_scratch *)(v->offset);
653         scratch->count += val->count - 1;
654         if(scratch->r.l != val->first.l)
655                 scratch->count++;
656         scratch->r.l = val->last.l;
657 }
658 void count_diff_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
659         count_diff_scratch *scratch = (count_diff_scratch *)s;
660         *res = (scratch->count)+1;
661 }
662 void count_diff_hfta_HFTA_AGGR_DESTROY_(gs_sp_t scratch){ }
663         
664
665
666 /////////////////////////////////////////////////////////
667 //              running_array_aggr aggregate
668
669 void running_array_aggr_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
670   scratch->offset = NULL;  
671   scratch->length = 0;
672 }
673
674 void running_array_aggr_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
675
676 void running_array_aggr_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
677   char buffer[100];
678
679   gs_uint32_t* ints = (gs_uint32_t*)val->offset;
680   switch (val->length / sizeof (gs_uint32_t)) {
681     case 4:
682       sprintf(buffer, "%u,%u,%u,%u", ints[0], ints[1], ints[2], ints[3]);
683       break;
684     case 3:
685       sprintf(buffer, "%u,%u,%u", ints[0], ints[1], ints[2]);
686       break;   
687     case 2:
688       sprintf(buffer, "%u,%u", ints[0], ints[1]);
689       break;        
690     case 1:
691       sprintf(buffer, "%u", ints[0]);
692       break;  
693     case 0:
694       return;        
695   }
696   int buf_len = strlen(buffer);
697
698   // append the content of buffer to scratch
699         if (!scratch->offset) {
700     Vstring_Constructor(scratch, buffer);
701         } else {
702     scratch->offset = (gs_p_t)realloc((void*)scratch->offset, scratch->length + buf_len + 1);
703     *((char*)scratch->offset + scratch->length) = ',';
704     memcpy((void*)(scratch->offset + scratch->length + 1), (void*)buffer, buf_len);
705     scratch->length += buf_len + 1;
706     scratch->reserved = INTERNAL;
707   }
708 }
709
710 void running_array_aggr_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
711         *res = *scratch;
712   res->reserved = SHALLOW_COPY;
713 }
714
715 void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) {
716   hfta_vstr_destroy(scratch);
717  }
718
719
720 ////////////////////////////////////////////
721 //              Aggregate strings by catenation
722         
723
724 struct CAT_aggr_scratch{
725         std::string val;
726         int x;
727 };
728
729 struct CAT_aggr_scratch_ptr{
730         CAT_aggr_scratch *ptr;
731 };
732
733 void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s){
734         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
735         CAT_aggr_scratch *v = new CAT_aggr_scratch();
736         v->x = 101;
737
738         p->ptr = v;
739 }
740 void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s){
741         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
742         CAT_aggr_scratch *v = p->ptr;
743         v->val="";
744 }
745 void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str){
746 char buf1[MAXTUPLESZ-20], buf2[MAXTUPLESZ-20];
747 int i;
748 for(i=0;i<sep->length;++i) buf1[i] = *(((char *)sep->offset)+i);
749 buf1[i]='\0';
750 for(i=0;i<str->length;++i) buf2[i] = *(((char *)str->offset)+i);
751 buf2[i]='\0';
752         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
753         CAT_aggr_scratch *v = p->ptr;
754         if(v->val.size()>0)
755                 v->val.append((char *)(sep->offset), sep->length);
756         v->val.append((char *)(str->offset), str->length);
757 //printf("sep=%s, str=%s, val=%s\n",buf1,buf2,v->val.c_str());
758 }
759 void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){
760         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
761         CAT_aggr_scratch *v = p->ptr;
762 //printf("output val=%s\n",v->val.c_str());
763         res->offset = (gs_p_t)malloc(v->val.size());
764         res->length = v->val.size();
765         if(res->length>MAXTUPLESZ-20)
766                 res->length=MAXTUPLESZ-20;
767 //      v->val.copy((char *)(res->offset), 0, res->length);
768         const char *dat = v->val.c_str();
769         memcpy((char *)(res->offset), dat, res->length);
770 //      for(int i=0;i<res->length;++i)
771 //              *(((char *)res->offset)+i) = dat[i];
772         res->reserved = INTERNAL;
773 }
774 void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s){
775         CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
776         CAT_aggr_scratch *v = p->ptr;
777         delete v;
778 }
779
780         
781         
782 ///////////////////////////////////////////////////////////////
783 //      time_avg((sample, ts, window_size)
784 //  Compute time-weighted average sum(sample*duration)/window_size
785 //  duration is difference between current and next ts.
786 //  The idea is to compute a sum over a step function.
787 //  
788         
789 struct time_avg_udaf_str{
790         gs_float_t sum;
791         gs_float_t last_val;
792         gs_uint64_t last_ts;
793         gs_uint64_t window;
794         gs_uint64_t first_ts;
795         gs_uint8_t event_occurred;
796 };
797         
798 void time_avg_HFTA_AGGR_INIT_(gs_sp_t s){
799         time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
800         scratch->sum = 0.0;
801         scratch->last_val = 0.0;
802         scratch->last_ts = 0;
803         scratch->first_ts = 0;
804         scratch->event_occurred = 0;
805 }
806
807 void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s){
808 }
809
810 void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s){
811         time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
812         scratch->event_occurred = 0;
813         scratch->sum = 0;
814 //printf("time_avg_reinit: occurred=%d, last_val=%lf, sum=%lf, first_ts=%lld, last_ts=%lld\n",scratch->event_occurred, scratch->last_val, scratch->sum, scratch->first_ts, scratch->last_ts);
815 }
816
817 void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s){
818         time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
819         if(scratch->event_occurred==0){
820                 *result =  scratch->last_val;
821 //printf("\ttime_avg outpt1 sum=%lf, last_val=%lf, result=%lf\n", scratch->sum, scratch->last_val, *result);
822                 return;
823         }
824         gs_int64_t end_time = scratch->window * (scratch->last_ts/scratch->window + 1);
825         scratch->sum += (end_time - scratch->last_ts) * (gs_float_t)(scratch->last_val); 
826         gs_int64_t start_time = end_time - scratch->window;
827         if(scratch->first_ts > start_time){
828                 *result = scratch->sum / (end_time - scratch->first_ts);
829 //printf("\ttime_avg outpt2 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
830         }else{
831                 *result = scratch->sum / (end_time - start_time);
832 //printf("\ttime_avg outpt3 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
833         }
834 }
835
836 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window){
837         time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
838         scratch->window = window;
839         if(scratch->first_ts==0){
840                 scratch->first_ts = ts;
841         }else{
842                 if(scratch->event_occurred){
843
844                         scratch->sum += (ts - scratch->last_ts) * scratch->last_val;
845                 }else{
846                         gs_int64_t start_time = scratch->window * (scratch->last_ts/scratch->window);
847                         scratch->sum += (ts - start_time) * scratch->last_val;
848                 }
849         }
850 //printf("time_avg_upd: val=%lf, occurred=%d, last_val=%lf, sum=%lf, ts=%lld, first_ts=%lld, last_ts=%lld\n",val, scratch->event_occurred, scratch->last_val, scratch->sum, ts, scratch->first_ts, scratch->last_ts);
851         scratch->last_val = val;
852         scratch->last_ts = ts;
853         scratch->event_occurred = 1;
854 }
855 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window){
856         time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
857 }
858 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window){
859         time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
860 }
861 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window){
862         time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
863 }
864 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window){
865         time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
866 }
867         
868
869 // ------------------------------------------------------------
870 //              running_sum_max : get the running sum of an int,
871 //              be able to report this sum and also its max value
872 //              during the time window
873
874 struct run_sum_max_udaf_str{
875         gs_int64_t sum;
876         gs_int64_t max;
877 };
878 void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s){
879         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
880         scratch->sum = 0;
881         scratch->max = 0;
882 }
883 void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s){
884         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
885         scratch->max = scratch->sum;
886 }
887 void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
888         r->length = sizeof(run_sum_max_udaf_str);
889         r->offset = (gs_p_t)(b);
890         r->reserved = SHALLOW_COPY;
891 }
892 void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b){
893         return;
894 }
895
896 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v){
897         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
898         scratch->sum+=v;
899         if(scratch->sum>scratch->max) scratch->max=scratch->sum;
900 }
901 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v){
902         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
903         scratch->sum+=v;
904         if(scratch->sum>scratch->max) scratch->max=scratch->sum;
905 }
906 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v){
907         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
908         scratch->sum+=v;
909         if(scratch->sum>scratch->max) scratch->max=scratch->sum;
910 }
911 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v){
912         run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
913         scratch->sum+=v;
914         if(scratch->sum>scratch->max) scratch->max=scratch->sum;
915 }
916 //      the extraction functions
917 gs_int64_t extr_running_sum(vstring *v){
918         if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
919         run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
920         return vs->sum;
921 }
922 gs_int64_t extr_running_sum_max(vstring *v){
923         if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
924         run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
925         return vs->max;
926 }
927
928 // ---------------------------------------------
929 //              Approximate count distinct.
930 //              Rely on the minhashing approach.
931 //              Currently HFTA-only
932 //              Uses a 32-bit hash, tested up to 100,000,000 elements
933 //              and it gave good results (within 7%)
934
935
936 #define COUNT_DISTINCT_NREPS 250
937 #define COUNT_DISTINCT_MAX_STRING_LEN 200       // number of 4-byte words
938
939 static Hash32bit2univID hids[COUNT_DISTINCT_NREPS];
940 static int approx_count_distinct_udaf_initialized = 0;
941 struct approx_count_distinct_udaf_str{
942         unsigned int mn[COUNT_DISTINCT_NREPS];
943 };
944
945
946 void approx_count_distinct_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
947         approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)buf;
948         for(int i=0;i<COUNT_DISTINCT_NREPS;++i)
949                 cd->mn[i]=4294967295;
950         if(approx_count_distinct_udaf_initialized==0){
951                 for(int i=0;i<COUNT_DISTINCT_NREPS;++i)
952                         hids[i] = InitStringHash32bit2univ(COUNT_DISTINCT_MAX_STRING_LEN);
953                 approx_count_distinct_udaf_initialized=1;
954         }
955 }
956 void running_approx_count_distinct_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
957         approx_count_distinct_udaf_HFTA_AGGR_INIT_(buf);
958 }
959
960 void approx_count_distinct_udaf_HFTA_AGGR_REINIT_(gs_sp_t buf){ }
961 void running_approx_count_distinct_udaf_HFTA_AGGR_REINIT_(gs_sp_t buf){}
962
963 void approx_count_distinct_udaf_HFTA_AGGR_DESTROY_(gs_sp_t buf){ }
964 void running_approx_count_distinct_udaf_HFTA_AGGR_DESTROY_(gs_sp_t buf){ }
965
966 void approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, vstring *val){
967         approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)buf;
968         unsigned int buffer[sizeof(unsigned int)*COUNT_DISTINCT_MAX_STRING_LEN];
969         buffer[val->length/4] = 0;
970         memcpy((char *)buffer, (char *)val->offset, min(val->length, 800));
971         unsigned int len4 = val->length/4 + ((val->length&0x03)>0);
972
973         for(int i=0; i<COUNT_DISTINCT_NREPS; ++i){
974                 unsigned int h = StringHash32bit2univ(buffer, len4, hids[i]);
975                 if(h < cd->mn[i]) cd->mn[i] = h;
976         }
977 }
978 void running_approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, vstring *val){
979         approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(buf, val);
980 }
981
982 void approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t buf){
983         res->offset = (gs_p_t)buf;
984         res->length = sizeof(approx_count_distinct_udaf_str);
985         res->reserved = SHALLOW_COPY;
986 }
987 void running_approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t buf){
988         approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(res, buf);
989 }
990
991 gs_float_t extr_approx_count_distinct(vstring *v){
992         approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)(v->offset);
993         gs_float_t avg = 0.0;
994         for(int i=0;i<COUNT_DISTINCT_NREPS;++i){
995                 avg += cd->mn[i];
996         }
997         avg /= COUNT_DISTINCT_NREPS;
998         gs_float_t est = (4294967295.0 / avg) - 1;
999         return est;
1000 }
1001
1002