1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
19 #include "hfta_udaf.h"
29 #include "hfta_runtime_library.h"
32 #define max(a,b) ((a) > (b) ? (a) : (b))
33 #define min(x,y) ((x) < (y) ? (x) : (y))
34 #define lg(x) (log(x) / log(2))
39 // -------------------------------------------------------------------
40 // moving sum over N intervals
42 struct moving_sum_udaf_str{
48 void moving_sum_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
49 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
50 u->N=0; u->pos=0; u->sums=NULL;
53 void moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint32_t s, gs_uint32_t N) {
54 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
56 u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
57 for(gs_int32_t i=0;i<N;i++)
64 void super_moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint64_t sub_sum) {
65 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
66 gs_uint32_t s = (gs_uint32_t)(sub_sum & 0xffffffff);
68 gs_uint32_t N = (gs_uint32_t)((sub_sum & 0xffffffff00000000ull) >> 32);
69 u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
70 for(gs_int32_t i=0;i<N;i++)
77 void moving_sum_udaf_HFTA_AGGR_OUTPUT_(gs_p_t *result, gs_sp_t buf){
78 *result = (gs_p_t)(buf);
81 void moving_sum_udaf_HFTA_AGGR_DESTROY_(gs_sp_t buf){
82 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
87 void moving_sum_udaf_HFTA_AGGR_REINIT_( gs_sp_t buf){
88 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
95 gs_uint32_t moving_sum_extract(gs_p_t result){
96 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
104 gs_float_t moving_sum_extract_exp(gs_p_t result, gs_float_t alpha){
105 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
106 gs_uint32_t p=0, i=0;
107 gs_float_t s=0.0, m=1.0;
109 for(i=0; i<u->N;i++){
121 // -------------------------------------------------------------------
122 // sum over 3 intervals : test rUDAF
124 struct sum3_udaf_str{
130 void sum3_HFTA_AGGR_INIT_(gs_sp_t buf) {
131 struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
132 u->s_0 = 0; u->s_1 = 0; u->s_2 = 0;
136 void sum3_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint32_t s) {
137 struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
142 void sum3_HFTA_AGGR_OUTPUT_(gs_uint32_t *result, gs_sp_t buf) {
143 struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
144 *result = u->s_0 + u->s_1 + u->s_2;
148 void sum3_HFTA_AGGR_DESTROY_(gs_sp_t buf) {
152 void sum3_HFTA_AGGR_REINIT_( gs_sp_t buf) {
153 struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
161 #define HISTORY_LENGTH 1024
163 /////////////////////////////////////////////////////////////////////////
164 ///// Calculate the average of all positive gs_float_t numbers
166 struct posavg_struct{
171 void POSAVG_HFTA_AGGR_INIT_(gs_sp_t buf) {
172 struct posavg_struct * a = (struct posavg_struct *) buf;
178 void POSAVG_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_float_t v) {
179 struct posavg_struct * a = (struct posavg_struct *) buf;
187 void POSAVG_HFTA_AGGR_OUTPUT_(gs_float_t * v, gs_sp_t buf) {
188 struct posavg_struct * a = (struct posavg_struct *) buf;
197 void POSAVG_HFTA_AGGR_DESTROY_(gs_sp_t buf) {
201 /////////////////////////////////////////////////////////////////////////
202 ///// avg_udaf (simple example)
204 // struct received from subaggregate
205 struct avg_udaf_lfta_struct_t{
211 struct avg_udaf_hfta_struct_t{
216 // avg_udaf functions
217 void avg_udaf_HFTA_AGGR_INIT_(gs_sp_t b){
218 avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
223 void avg_udaf_HFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v){
224 avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
229 void avg_udaf_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
231 r->offset = (gs_p_t)(b);
232 r->reserved = SHALLOW_COPY;
235 void avg_udaf_HFTA_AGGR_DESTROY_(gs_sp_t b){
240 // avg_udaf superaggregate functions
241 void avg_udaf_hfta_HFTA_AGGR_INIT_(gs_sp_t b){
242 avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
247 void avg_udaf_hfta_HFTA_AGGR_UPDATE_(gs_sp_t b, vstring *v){
248 if(v->length != 12) return;
249 avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
250 avg_udaf_lfta_struct_t *vs = (avg_udaf_lfta_struct_t *)(v->offset);
255 void avg_udaf_hfta_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
257 r->offset = (gs_p_t)(b);
258 r->reserved = SHALLOW_COPY;
261 void avg_udaf_hfta_HFTA_AGGR_DESTROY_(gs_sp_t b){
265 // Extraction function
266 gs_float_t extr_avg_fcn(vstring *v){
267 if(v->length != 12) return 0;
268 avg_udaf_hfta_struct_t *vs = (avg_udaf_hfta_struct_t *)(v->offset);
269 gs_float_t r = (gs_float_t)(vs->sum) / vs->cnt;
273 /////////////////////////////////////////////////////////
278 void FIRST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
279 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
281 void FIRST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
282 void FIRST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
283 if (*scratch == UINT_MAX)
286 void FIRST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
289 void FIRST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
293 void FIRST_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
294 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
296 void FIRST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
297 void FIRST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
298 if (*scratch == UINT_MAX)
301 void FIRST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
304 void FIRST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
307 void FIRST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
308 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
310 void FIRST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
311 void FIRST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
312 if (*scratch == UINT_MAX)
315 void FIRST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
318 void FIRST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
321 void FIRST_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
322 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
324 void FIRST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
325 void FIRST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
326 if (*scratch == UINT_MAX)
329 void FIRST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
332 void FIRST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
335 void FIRST_HFTA_AGGR_INIT_(vstring* scratch) {
338 void FIRST_HFTA_AGGR_REINIT_(vstring* scratch) { }
339 void FIRST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
340 if (!scratch->offset) {
341 scratch->length = val->length;
342 scratch->offset = val->offset;
343 scratch->reserved = SHALLOW_COPY;
346 void FIRST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
349 void FIRST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
354 void FIRST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
355 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
357 void FIRST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
358 void FIRST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
359 if (*scratch == UINT_MAX)
362 void FIRST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
365 void FIRST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
368 void FIRST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
369 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
371 void FIRST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
372 void FIRST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
373 if (*scratch == UINT_MAX)
376 void FIRST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
379 void FIRST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
382 void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
383 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
385 void FIRST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
386 void FIRST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
387 if (*scratch == UINT_MAX)
390 void FIRST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
393 void FIRST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
396 void FIRST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
397 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
399 void FIRST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
400 void FIRST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
401 if (*scratch == UINT_MAX)
404 void FIRST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
407 void FIRST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
410 void FIRST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
413 void FIRST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
414 void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
415 if (!scratch->offset) {
416 scratch->length = val->length;
417 scratch->offset = val->offset;
418 scratch->reserved = SHALLOW_COPY;
421 void FIRST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
424 void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
427 /////////////////////////////////////////////////////////
433 void LAST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
434 void LAST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
435 void LAST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
438 void LAST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
441 void LAST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
444 void LAST_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
445 void LAST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
446 void LAST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
449 void LAST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
452 void LAST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
455 void LAST_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
456 void LAST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
457 void LAST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
460 void LAST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
463 void LAST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
467 void LAST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
468 void LAST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
469 void LAST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
472 void LAST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
475 void LAST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
478 void LAST_HFTA_AGGR_INIT_(vstring* scratch) {
481 void LAST_HFTA_AGGR_REINIT_(vstring* scratch) { }
482 void LAST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
483 scratch->length = val->length;
484 scratch->offset = val->offset;
485 scratch->reserved = SHALLOW_COPY;
487 void LAST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
490 void LAST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
494 void LAST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
495 void LAST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
496 void LAST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
499 void LAST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
502 void LAST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
504 void LAST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
505 void LAST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
506 void LAST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
509 void LAST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
512 void LAST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
514 void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
515 void LAST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
516 void LAST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
519 void LAST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
522 void LAST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
524 void LAST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
525 void LAST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
526 void LAST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
529 void LAST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
532 void LAST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
535 void LAST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
538 void LAST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
539 void LAST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
540 scratch->length = val->length;
541 scratch->offset = val->offset;
542 scratch->reserved = SHALLOW_COPY;
544 void LAST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
547 void LAST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
550 ////////////////////////////////////////////////////////////
551 // count different (# of times the value is different than the previous)
553 struct count_diff_scratch{
566 void count_diff_HFTA_AGGR_INIT_(gs_sp_t s){
567 count_diff_scratch *scratch = (count_diff_scratch *)s;
571 void count_diff_HFTA_AGGR_REINIT_(gs_sp_t s){
572 count_diff_scratch *scratch = (count_diff_scratch *)s;
575 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val){
576 count_diff_scratch *scratch = (count_diff_scratch *)s;
577 if(scratch->count==0 || scratch->r.ui != val)
581 void count_diff_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
582 count_diff_scratch *scratch = (count_diff_scratch *)s;
583 *res = scratch->count;
585 void count_diff_HFTA_AGGR_DESTROY_(gs_sp_t s){ }
588 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val){
589 count_diff_scratch *scratch = (count_diff_scratch *)s;
590 if(scratch->count==0 || scratch->r.i != val)
596 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val){
597 count_diff_scratch *scratch = (count_diff_scratch *)s;
598 if(scratch->count==0 || scratch->r.ul != val)
604 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val){
605 count_diff_scratch *scratch = (count_diff_scratch *)s;
606 if(scratch->count==0 || scratch->r.l != val)
612 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring* val){
613 count_diff_scratch *scratch = (count_diff_scratch *)s;
614 gs_uint64_t hashval = hfta_vstr_long_hashfunc(val);
615 if(scratch->count==0 || scratch->r.l != hashval)
617 scratch->r.l = hashval;
620 ////////// HFTA / LFTA split
622 struct lfta_count_diff_scratch{
639 void count_diff_hfta_HFTA_AGGR_INIT_(gs_sp_t s){
640 count_diff_scratch *scratch = (count_diff_scratch *)s;
644 void count_diff_hfta_HFTA_AGGR_REINIT_(gs_sp_t s){
645 count_diff_scratch *scratch = (count_diff_scratch *)s;
649 void count_diff_hfta_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *v){
650 lfta_count_diff_scratch *val = (lfta_count_diff_scratch *)v;
651 count_diff_scratch *scratch = (count_diff_scratch *)(v->offset);
652 scratch->count += val->count - 1;
653 if(scratch->r.l != val->first.l)
655 scratch->r.l = val->last.l;
657 void count_diff_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
658 count_diff_scratch *scratch = (count_diff_scratch *)s;
659 *res = (scratch->count)+1;
661 void count_diff_hfta_HFTA_AGGR_DESTROY_(gs_sp_t scratch){ }
665 /////////////////////////////////////////////////////////
666 // running_array_aggr aggregate
668 void running_array_aggr_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
669 scratch->offset = NULL;
673 void running_array_aggr_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
675 void running_array_aggr_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
678 gs_uint32_t* ints = (gs_uint32_t*)val->offset;
679 switch (val->length / sizeof (gs_uint32_t)) {
681 sprintf(buffer, "%u,%u,%u,%u", ints[0], ints[1], ints[2], ints[3]);
684 sprintf(buffer, "%u,%u,%u", ints[0], ints[1], ints[2]);
687 sprintf(buffer, "%u,%u", ints[0], ints[1]);
690 sprintf(buffer, "%u", ints[0]);
695 int buf_len = strlen(buffer);
697 // append the content of buffer to scratch
698 if (!scratch->offset) {
699 Vstring_Constructor(scratch, buffer);
701 scratch->offset = (gs_p_t)realloc((void*)scratch->offset, scratch->length + buf_len + 1);
702 *((char*)scratch->offset + scratch->length) = ',';
703 memcpy((void*)(scratch->offset + scratch->length + 1), (void*)buffer, buf_len);
704 scratch->length += buf_len + 1;
705 scratch->reserved = INTERNAL;
709 void running_array_aggr_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
711 res->reserved = SHALLOW_COPY;
714 void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) {
715 hfta_vstr_destroy(scratch);
719 ////////////////////////////////////////////
720 // Aggregate strings by catenation
723 struct CAT_aggr_scratch{
728 struct CAT_aggr_scratch_ptr{
729 CAT_aggr_scratch *ptr;
732 void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s){
733 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
734 CAT_aggr_scratch *v = new CAT_aggr_scratch();
739 void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s){
740 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
741 CAT_aggr_scratch *v = p->ptr;
744 void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str){
745 char buf1[MAXTUPLESZ-20], buf2[MAXTUPLESZ-20];
747 for(i=0;i<sep->length;++i) buf1[i] = *(((char *)sep->offset)+i);
749 for(i=0;i<str->length;++i) buf2[i] = *(((char *)str->offset)+i);
751 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
752 CAT_aggr_scratch *v = p->ptr;
754 v->val.append((char *)(sep->offset), sep->length);
755 v->val.append((char *)(str->offset), str->length);
756 //printf("sep=%s, str=%s, val=%s\n",buf1,buf2,v->val.c_str());
758 void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){
759 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
760 CAT_aggr_scratch *v = p->ptr;
761 //printf("output val=%s\n",v->val.c_str());
762 res->offset = (gs_p_t)malloc(v->val.size());
763 res->length = v->val.size();
764 if(res->length>MAXTUPLESZ-20)
765 res->length=MAXTUPLESZ-20;
766 // v->val.copy((char *)(res->offset), 0, res->length);
767 const char *dat = v->val.c_str();
768 memcpy((char *)(res->offset), dat, res->length);
769 // for(int i=0;i<res->length;++i)
770 // *(((char *)res->offset)+i) = dat[i];
771 res->reserved = INTERNAL;
773 void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s){
774 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
775 CAT_aggr_scratch *v = p->ptr;
781 ///////////////////////////////////////////////////////////////
782 // time_avg((sample, ts, window_size)
783 // Compute time-weighted average sum(sample*duration)/window_size
784 // duration is difference between current and next ts.
785 // The idea is to compute a sum over a step function.
788 struct time_avg_udaf_str{
793 gs_uint64_t first_ts;
794 gs_uint8_t event_occurred;
797 void time_avg_HFTA_AGGR_INIT_(gs_sp_t s){
798 time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
800 scratch->last_val = 0.0;
801 scratch->last_ts = 0;
802 scratch->first_ts = 0;
803 scratch->event_occurred = 0;
806 void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s){
809 void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s){
810 time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
811 scratch->event_occurred = 0;
813 //printf("time_avg_reinit: occurred=%d, last_val=%lf, sum=%lf, first_ts=%lld, last_ts=%lld\n",scratch->event_occurred, scratch->last_val, scratch->sum, scratch->first_ts, scratch->last_ts);
816 void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s){
817 time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
818 if(scratch->event_occurred==0){
819 *result = scratch->last_val;
820 //printf("\ttime_avg outpt1 sum=%lf, last_val=%lf, result=%lf\n", scratch->sum, scratch->last_val, *result);
823 gs_int64_t end_time = scratch->window * (scratch->last_ts/scratch->window + 1);
824 scratch->sum += (end_time - scratch->last_ts) * (gs_float_t)(scratch->last_val);
825 gs_int64_t start_time = end_time - scratch->window;
826 if(scratch->first_ts > start_time){
827 *result = scratch->sum / (end_time - scratch->first_ts);
828 //printf("\ttime_avg outpt2 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
830 *result = scratch->sum / (end_time - start_time);
831 //printf("\ttime_avg outpt3 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
835 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window){
836 time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
837 scratch->window = window;
838 if(scratch->first_ts==0){
839 scratch->first_ts = ts;
841 if(scratch->event_occurred){
843 scratch->sum += (ts - scratch->last_ts) * scratch->last_val;
845 gs_int64_t start_time = scratch->window * (scratch->last_ts/scratch->window);
846 scratch->sum += (ts - start_time) * scratch->last_val;
849 //printf("time_avg_upd: val=%lf, occurred=%d, last_val=%lf, sum=%lf, ts=%lld, first_ts=%lld, last_ts=%lld\n",val, scratch->event_occurred, scratch->last_val, scratch->sum, ts, scratch->first_ts, scratch->last_ts);
850 scratch->last_val = val;
851 scratch->last_ts = ts;
852 scratch->event_occurred = 1;
854 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window){
855 time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
857 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window){
858 time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
860 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window){
861 time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
863 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window){
864 time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
868 // ------------------------------------------------------------
869 // running_sum_max : get the running sum of an int,
870 // be able to report this sum and also its max value
871 // during the time window
873 struct run_sum_max_udaf_str{
877 void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s){
878 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
882 void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s){
883 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
884 scratch->max = scratch->sum;
886 void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
887 r->length = sizeof(run_sum_max_udaf_str);
888 r->offset = (gs_p_t)(b);
889 r->reserved = SHALLOW_COPY;
891 void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b){
895 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v){
896 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
898 if(scratch->sum>scratch->max) scratch->max=scratch->sum;
900 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v){
901 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
903 if(scratch->sum>scratch->max) scratch->max=scratch->sum;
905 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v){
906 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
908 if(scratch->sum>scratch->max) scratch->max=scratch->sum;
910 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v){
911 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
913 if(scratch->sum>scratch->max) scratch->max=scratch->sum;
915 // the extraction functions
916 gs_int64_t extr_running_sum(vstring *v){
917 if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
918 run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
921 gs_int64_t extr_running_sum_max(vstring *v){
922 if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
923 run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);