1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
19 #include "hfta_udaf.h"
29 #include "hfta_runtime_library.h"
30 #include"stringhash.h"
33 #define max(a,b) ((a) > (b) ? (a) : (b))
34 #define min(x,y) ((x) < (y) ? (x) : (y))
35 #define lg(x) (log(x) / log(2))
40 // -------------------------------------------------------------------
41 // moving sum over N intervals
43 struct moving_sum_udaf_str{
49 void moving_sum_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
50 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
51 u->N=0; u->pos=0; u->sums=NULL;
54 void moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint32_t s, gs_uint32_t N) {
55 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
57 u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
58 for(gs_int32_t i=0;i<N;i++)
65 void super_moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint64_t sub_sum) {
66 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
67 gs_uint32_t s = (gs_uint32_t)(sub_sum & 0xffffffff);
69 gs_uint32_t N = (gs_uint32_t)((sub_sum & 0xffffffff00000000ull) >> 32);
70 u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
71 for(gs_int32_t i=0;i<N;i++)
78 void moving_sum_udaf_HFTA_AGGR_OUTPUT_(gs_p_t *result, gs_sp_t buf){
79 *result = (gs_p_t)(buf);
82 void moving_sum_udaf_HFTA_AGGR_DESTROY_(gs_sp_t buf){
83 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
88 void moving_sum_udaf_HFTA_AGGR_REINIT_( gs_sp_t buf){
89 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
96 gs_uint32_t moving_sum_extract(gs_p_t result){
97 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
105 gs_float_t moving_sum_extract_exp(gs_p_t result, gs_float_t alpha){
106 struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
107 gs_uint32_t p=0, i=0;
108 gs_float_t s=0.0, m=1.0;
110 for(i=0; i<u->N;i++){
122 // -------------------------------------------------------------------
123 // sum over 3 intervals : test rUDAF
125 struct sum3_udaf_str{
131 void sum3_HFTA_AGGR_INIT_(gs_sp_t buf) {
132 struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
133 u->s_0 = 0; u->s_1 = 0; u->s_2 = 0;
137 void sum3_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint32_t s) {
138 struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
143 void sum3_HFTA_AGGR_OUTPUT_(gs_uint32_t *result, gs_sp_t buf) {
144 struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
145 *result = u->s_0 + u->s_1 + u->s_2;
149 void sum3_HFTA_AGGR_DESTROY_(gs_sp_t buf) {
153 void sum3_HFTA_AGGR_REINIT_( gs_sp_t buf) {
154 struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
162 #define HISTORY_LENGTH 1024
164 /////////////////////////////////////////////////////////////////////////
165 ///// Calculate the average of all positive gs_float_t numbers
167 struct posavg_struct{
172 void POSAVG_HFTA_AGGR_INIT_(gs_sp_t buf) {
173 struct posavg_struct * a = (struct posavg_struct *) buf;
179 void POSAVG_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_float_t v) {
180 struct posavg_struct * a = (struct posavg_struct *) buf;
188 void POSAVG_HFTA_AGGR_OUTPUT_(gs_float_t * v, gs_sp_t buf) {
189 struct posavg_struct * a = (struct posavg_struct *) buf;
198 void POSAVG_HFTA_AGGR_DESTROY_(gs_sp_t buf) {
202 /////////////////////////////////////////////////////////////////////////
203 ///// avg_udaf (simple example)
205 // struct received from subaggregate
206 struct avg_udaf_lfta_struct_t{
212 struct avg_udaf_hfta_struct_t{
217 // avg_udaf functions
218 void avg_udaf_HFTA_AGGR_INIT_(gs_sp_t b){
219 avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
224 void avg_udaf_HFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v){
225 avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
230 void avg_udaf_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
232 r->offset = (gs_p_t)(b);
233 r->reserved = SHALLOW_COPY;
236 void avg_udaf_HFTA_AGGR_DESTROY_(gs_sp_t b){
241 // avg_udaf superaggregate functions
242 void avg_udaf_hfta_HFTA_AGGR_INIT_(gs_sp_t b){
243 avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
248 void avg_udaf_hfta_HFTA_AGGR_UPDATE_(gs_sp_t b, vstring *v){
249 if(v->length != 12) return;
250 avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
251 avg_udaf_lfta_struct_t *vs = (avg_udaf_lfta_struct_t *)(v->offset);
256 void avg_udaf_hfta_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
258 r->offset = (gs_p_t)(b);
259 r->reserved = SHALLOW_COPY;
262 void avg_udaf_hfta_HFTA_AGGR_DESTROY_(gs_sp_t b){
266 // Extraction function
267 gs_float_t extr_avg_fcn(vstring *v){
268 if(v->length != 12) return 0;
269 avg_udaf_hfta_struct_t *vs = (avg_udaf_hfta_struct_t *)(v->offset);
270 gs_float_t r = (gs_float_t)(vs->sum) / vs->cnt;
274 /////////////////////////////////////////////////////////
279 void FIRST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
280 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
282 void FIRST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
283 void FIRST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
284 if (*scratch == UINT_MAX)
287 void FIRST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
290 void FIRST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
294 void FIRST_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
295 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
297 void FIRST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
298 void FIRST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
299 if (*scratch == UINT_MAX)
302 void FIRST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
305 void FIRST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
308 void FIRST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
309 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
311 void FIRST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
312 void FIRST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
313 if (*scratch == UINT_MAX)
316 void FIRST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
319 void FIRST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
322 void FIRST_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
323 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
325 void FIRST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
326 void FIRST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
327 if (*scratch == UINT_MAX)
330 void FIRST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
333 void FIRST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
336 void FIRST_HFTA_AGGR_INIT_(vstring* scratch) {
339 void FIRST_HFTA_AGGR_REINIT_(vstring* scratch) { }
340 void FIRST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
341 if (!scratch->offset) {
342 scratch->length = val->length;
343 scratch->offset = val->offset;
344 scratch->reserved = SHALLOW_COPY;
347 void FIRST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
350 void FIRST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
355 void FIRST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
356 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
358 void FIRST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
359 void FIRST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
360 if (*scratch == UINT_MAX)
363 void FIRST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
366 void FIRST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
369 void FIRST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
370 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
372 void FIRST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
373 void FIRST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
374 if (*scratch == UINT_MAX)
377 void FIRST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
380 void FIRST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
383 void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
384 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
386 void FIRST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
387 void FIRST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
388 if (*scratch == UINT_MAX)
391 void FIRST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
394 void FIRST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
397 void FIRST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
398 *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX
400 void FIRST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
401 void FIRST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
402 if (*scratch == UINT_MAX)
405 void FIRST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
408 void FIRST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
411 void FIRST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
414 void FIRST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
415 void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
416 if (!scratch->offset) {
417 scratch->length = val->length;
418 scratch->offset = val->offset;
419 scratch->reserved = SHALLOW_COPY;
422 void FIRST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
425 void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
428 /////////////////////////////////////////////////////////
434 void LAST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
435 void LAST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
436 void LAST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
439 void LAST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
442 void LAST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
445 void LAST_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
446 void LAST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
447 void LAST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
450 void LAST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
453 void LAST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
456 void LAST_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
457 void LAST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
458 void LAST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
461 void LAST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
464 void LAST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
468 void LAST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
469 void LAST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
470 void LAST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
473 void LAST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
476 void LAST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
479 void LAST_HFTA_AGGR_INIT_(vstring* scratch) {
482 void LAST_HFTA_AGGR_REINIT_(vstring* scratch) { }
483 void LAST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
484 scratch->length = val->length;
485 scratch->offset = val->offset;
486 scratch->reserved = SHALLOW_COPY;
488 void LAST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
491 void LAST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
495 void LAST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
496 void LAST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
497 void LAST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
500 void LAST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
503 void LAST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
505 void LAST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
506 void LAST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
507 void LAST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
510 void LAST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
513 void LAST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
515 void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
516 void LAST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
517 void LAST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
520 void LAST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
523 void LAST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
525 void LAST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
526 void LAST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
527 void LAST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
530 void LAST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
533 void LAST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
536 void LAST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
539 void LAST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
540 void LAST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
541 scratch->length = val->length;
542 scratch->offset = val->offset;
543 scratch->reserved = SHALLOW_COPY;
545 void LAST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
548 void LAST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
551 ////////////////////////////////////////////////////////////
552 // count different (# of times the value is different than the previous)
554 struct count_diff_scratch{
567 void count_diff_HFTA_AGGR_INIT_(gs_sp_t s){
568 count_diff_scratch *scratch = (count_diff_scratch *)s;
572 void count_diff_HFTA_AGGR_REINIT_(gs_sp_t s){
573 count_diff_scratch *scratch = (count_diff_scratch *)s;
576 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val){
577 count_diff_scratch *scratch = (count_diff_scratch *)s;
578 if(scratch->count==0 || scratch->r.ui != val)
582 void count_diff_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
583 count_diff_scratch *scratch = (count_diff_scratch *)s;
584 *res = scratch->count;
586 void count_diff_HFTA_AGGR_DESTROY_(gs_sp_t s){ }
589 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val){
590 count_diff_scratch *scratch = (count_diff_scratch *)s;
591 if(scratch->count==0 || scratch->r.i != val)
597 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val){
598 count_diff_scratch *scratch = (count_diff_scratch *)s;
599 if(scratch->count==0 || scratch->r.ul != val)
605 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val){
606 count_diff_scratch *scratch = (count_diff_scratch *)s;
607 if(scratch->count==0 || scratch->r.l != val)
613 void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring* val){
614 count_diff_scratch *scratch = (count_diff_scratch *)s;
615 gs_uint64_t hashval = hfta_vstr_long_hashfunc(val);
616 if(scratch->count==0 || scratch->r.l != hashval)
618 scratch->r.l = hashval;
621 ////////// HFTA / LFTA split
623 struct lfta_count_diff_scratch{
640 void count_diff_hfta_HFTA_AGGR_INIT_(gs_sp_t s){
641 count_diff_scratch *scratch = (count_diff_scratch *)s;
645 void count_diff_hfta_HFTA_AGGR_REINIT_(gs_sp_t s){
646 count_diff_scratch *scratch = (count_diff_scratch *)s;
650 void count_diff_hfta_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *v){
651 lfta_count_diff_scratch *val = (lfta_count_diff_scratch *)v;
652 count_diff_scratch *scratch = (count_diff_scratch *)(v->offset);
653 scratch->count += val->count - 1;
654 if(scratch->r.l != val->first.l)
656 scratch->r.l = val->last.l;
658 void count_diff_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
659 count_diff_scratch *scratch = (count_diff_scratch *)s;
660 *res = (scratch->count)+1;
662 void count_diff_hfta_HFTA_AGGR_DESTROY_(gs_sp_t scratch){ }
666 /////////////////////////////////////////////////////////
667 // running_array_aggr aggregate
669 void running_array_aggr_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
670 scratch->offset = NULL;
674 void running_array_aggr_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
676 void running_array_aggr_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
679 gs_uint32_t* ints = (gs_uint32_t*)val->offset;
680 switch (val->length / sizeof (gs_uint32_t)) {
682 sprintf(buffer, "%u,%u,%u,%u", ints[0], ints[1], ints[2], ints[3]);
685 sprintf(buffer, "%u,%u,%u", ints[0], ints[1], ints[2]);
688 sprintf(buffer, "%u,%u", ints[0], ints[1]);
691 sprintf(buffer, "%u", ints[0]);
696 int buf_len = strlen(buffer);
698 // append the content of buffer to scratch
699 if (!scratch->offset) {
700 Vstring_Constructor(scratch, buffer);
702 scratch->offset = (gs_p_t)realloc((void*)scratch->offset, scratch->length + buf_len + 1);
703 *((char*)scratch->offset + scratch->length) = ',';
704 memcpy((void*)(scratch->offset + scratch->length + 1), (void*)buffer, buf_len);
705 scratch->length += buf_len + 1;
706 scratch->reserved = INTERNAL;
710 void running_array_aggr_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
712 res->reserved = SHALLOW_COPY;
715 void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) {
716 hfta_vstr_destroy(scratch);
720 ////////////////////////////////////////////
721 // Aggregate strings by catenation
724 struct CAT_aggr_scratch{
729 struct CAT_aggr_scratch_ptr{
730 CAT_aggr_scratch *ptr;
733 void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s){
734 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
735 CAT_aggr_scratch *v = new CAT_aggr_scratch();
740 void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s){
741 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
742 CAT_aggr_scratch *v = p->ptr;
745 void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str){
746 char buf1[MAXTUPLESZ-20], buf2[MAXTUPLESZ-20];
748 for(i=0;i<sep->length;++i) buf1[i] = *(((char *)sep->offset)+i);
750 for(i=0;i<str->length;++i) buf2[i] = *(((char *)str->offset)+i);
752 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
753 CAT_aggr_scratch *v = p->ptr;
755 v->val.append((char *)(sep->offset), sep->length);
756 v->val.append((char *)(str->offset), str->length);
757 //printf("sep=%s, str=%s, val=%s\n",buf1,buf2,v->val.c_str());
759 void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){
760 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
761 CAT_aggr_scratch *v = p->ptr;
762 //printf("output val=%s\n",v->val.c_str());
763 res->offset = (gs_p_t)malloc(v->val.size());
764 res->length = v->val.size();
765 if(res->length>MAXTUPLESZ-20)
766 res->length=MAXTUPLESZ-20;
767 // v->val.copy((char *)(res->offset), 0, res->length);
768 const char *dat = v->val.c_str();
769 memcpy((char *)(res->offset), dat, res->length);
770 // for(int i=0;i<res->length;++i)
771 // *(((char *)res->offset)+i) = dat[i];
772 res->reserved = INTERNAL;
774 void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s){
775 CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
776 CAT_aggr_scratch *v = p->ptr;
782 ///////////////////////////////////////////////////////////////
783 // time_avg((sample, ts, window_size)
784 // Compute time-weighted average sum(sample*duration)/window_size
785 // duration is difference between current and next ts.
786 // The idea is to compute a sum over a step function.
789 struct time_avg_udaf_str{
794 gs_uint64_t first_ts;
795 gs_uint8_t event_occurred;
798 void time_avg_HFTA_AGGR_INIT_(gs_sp_t s){
799 time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
801 scratch->last_val = 0.0;
802 scratch->last_ts = 0;
803 scratch->first_ts = 0;
804 scratch->event_occurred = 0;
807 void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s){
810 void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s){
811 time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
812 scratch->event_occurred = 0;
814 //printf("time_avg_reinit: occurred=%d, last_val=%lf, sum=%lf, first_ts=%lld, last_ts=%lld\n",scratch->event_occurred, scratch->last_val, scratch->sum, scratch->first_ts, scratch->last_ts);
817 void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s){
818 time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
819 if(scratch->event_occurred==0){
820 *result = scratch->last_val;
821 //printf("\ttime_avg outpt1 sum=%lf, last_val=%lf, result=%lf\n", scratch->sum, scratch->last_val, *result);
824 gs_int64_t end_time = scratch->window * (scratch->last_ts/scratch->window + 1);
825 scratch->sum += (end_time - scratch->last_ts) * (gs_float_t)(scratch->last_val);
826 gs_int64_t start_time = end_time - scratch->window;
827 if(scratch->first_ts > start_time){
828 *result = scratch->sum / (end_time - scratch->first_ts);
829 //printf("\ttime_avg outpt2 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
831 *result = scratch->sum / (end_time - start_time);
832 //printf("\ttime_avg outpt3 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
836 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window){
837 time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
838 scratch->window = window;
839 if(scratch->first_ts==0){
840 scratch->first_ts = ts;
842 if(scratch->event_occurred){
844 scratch->sum += (ts - scratch->last_ts) * scratch->last_val;
846 gs_int64_t start_time = scratch->window * (scratch->last_ts/scratch->window);
847 scratch->sum += (ts - start_time) * scratch->last_val;
850 //printf("time_avg_upd: val=%lf, occurred=%d, last_val=%lf, sum=%lf, ts=%lld, first_ts=%lld, last_ts=%lld\n",val, scratch->event_occurred, scratch->last_val, scratch->sum, ts, scratch->first_ts, scratch->last_ts);
851 scratch->last_val = val;
852 scratch->last_ts = ts;
853 scratch->event_occurred = 1;
855 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window){
856 time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
858 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window){
859 time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
861 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window){
862 time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
864 void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window){
865 time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
869 // ------------------------------------------------------------
870 // running_sum_max : get the running sum of an int,
871 // be able to report this sum and also its max value
872 // during the time window
874 struct run_sum_max_udaf_str{
878 void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s){
879 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
883 void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s){
884 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
885 scratch->max = scratch->sum;
887 void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
888 r->length = sizeof(run_sum_max_udaf_str);
889 r->offset = (gs_p_t)(b);
890 r->reserved = SHALLOW_COPY;
892 void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b){
896 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v){
897 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
899 if(scratch->sum>scratch->max) scratch->max=scratch->sum;
901 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v){
902 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
904 if(scratch->sum>scratch->max) scratch->max=scratch->sum;
906 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v){
907 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
909 if(scratch->sum>scratch->max) scratch->max=scratch->sum;
911 void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v){
912 run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
914 if(scratch->sum>scratch->max) scratch->max=scratch->sum;
916 // the extraction functions
917 gs_int64_t extr_running_sum(vstring *v){
918 if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
919 run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
922 gs_int64_t extr_running_sum_max(vstring *v){
923 if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
924 run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
928 // ---------------------------------------------
929 // Approximate count distinct.
930 // Rely on the minhashing approach.
931 // Currently HFTA-only
932 // Uses a 32-bit hash, tested up to 100,000,000 elements
933 // and it gave good results (within 7%)
936 #define COUNT_DISTINCT_NREPS 250
937 #define COUNT_DISTINCT_MAX_STRING_LEN 200 // number of 4-byte words
939 static Hash32bit2univID hids[COUNT_DISTINCT_NREPS];
940 static int approx_count_distinct_udaf_initialized = 0;
941 struct approx_count_distinct_udaf_str{
942 unsigned int mn[COUNT_DISTINCT_NREPS];
946 void approx_count_distinct_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
947 approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)buf;
948 for(int i=0;i<COUNT_DISTINCT_NREPS;++i)
949 cd->mn[i]=4294967295;
950 if(approx_count_distinct_udaf_initialized==0){
951 for(int i=0;i<COUNT_DISTINCT_NREPS;++i)
952 hids[i] = InitStringHash32bit2univ(COUNT_DISTINCT_MAX_STRING_LEN);
953 approx_count_distinct_udaf_initialized=1;
956 void running_approx_count_distinct_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
957 approx_count_distinct_udaf_HFTA_AGGR_INIT_(buf);
960 void approx_count_distinct_udaf_HFTA_AGGR_REINIT_(gs_sp_t buf){ }
961 void running_approx_count_distinct_udaf_HFTA_AGGR_REINIT_(gs_sp_t buf){}
963 void approx_count_distinct_udaf_HFTA_AGGR_DESTROY_(gs_sp_t buf){ }
964 void running_approx_count_distinct_udaf_HFTA_AGGR_DESTROY_(gs_sp_t buf){ }
966 void approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, vstring *val){
967 approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)buf;
968 unsigned int buffer[sizeof(unsigned int)*COUNT_DISTINCT_MAX_STRING_LEN];
969 buffer[val->length/4] = 0;
970 memcpy((char *)buffer, (char *)val->offset, min(val->length, 800));
971 unsigned int len4 = val->length/4 + ((val->length&0x03)>0);
973 for(int i=0; i<COUNT_DISTINCT_NREPS; ++i){
974 unsigned int h = StringHash32bit2univ(buffer, len4, hids[i]);
975 if(h < cd->mn[i]) cd->mn[i] = h;
978 void running_approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, vstring *val){
979 approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(buf, val);
982 void approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t buf){
983 res->offset = (gs_p_t)buf;
984 res->length = sizeof(approx_count_distinct_udaf_str);
985 res->reserved = SHALLOW_COPY;
987 void running_approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t buf){
988 approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(res, buf);
991 gs_float_t extr_approx_count_distinct(vstring *v){
992 approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)(v->offset);
993 gs_float_t avg = 0.0;
994 for(int i=0;i<COUNT_DISTINCT_NREPS;++i){
997 avg /= COUNT_DISTINCT_NREPS;
998 gs_float_t est = (4294967295.0 / avg) - 1;