X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Flib%2Fgscphftaaux%2Fhfta_udaf.cc;h=a5ff5cba31602aa232c9bd45a66e2448ec398010;hb=2bc6bde491e4ae54fb54302c052f23a98482eb92;hp=066073915b38ba80f882ba667e7c9a3885d2b5d6;hpb=93d248304a68de7a8f9daf4aa74f9ee4cd27410c;p=com%2Fgs-lite.git diff --git a/src/lib/gscphftaaux/hfta_udaf.cc b/src/lib/gscphftaaux/hfta_udaf.cc index 0660739..a5ff5cb 100644 --- a/src/lib/gscphftaaux/hfta_udaf.cc +++ b/src/lib/gscphftaaux/hfta_udaf.cc @@ -27,6 +27,7 @@ Copyright 2014 AT&T Intellectual Property #include #include "hfta_runtime_library.h" +#include"stringhash.h" #define max(a,b) ((a) > (b) ? (a) : (b)) @@ -207,7 +208,7 @@ struct avg_udaf_lfta_struct_t{ gs_uint32_t cnt; }; -// sctarchpad struct +// scratchpad struct struct avg_udaf_hfta_struct_t{ gs_int64_t sum; gs_uint32_t cnt; @@ -274,104 +275,143 @@ gs_float_t extr_avg_fcn(vstring *v){ // FIRST aggregate // hfta only +// uint void FIRST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX } - void FIRST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { } - void FIRST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) { if (*scratch == UINT_MAX) *scratch = val; } - void FIRST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) { *res = *scratch; } - void FIRST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { } -void FIRST_ULL_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { +// int + +void FIRST_HFTA_AGGR_INIT_(gs_int32_t* scratch) { *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX } - -void FIRST_ULL_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { } - -void FIRST_ULL_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) { +void FIRST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { } +void FIRST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) { if (*scratch == UINT_MAX) *scratch = val; } - -void FIRST_ULL_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) { +void FIRST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) { *res = *scratch; } +void FIRST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { } -void FIRST_ULL_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { } +// ullong +void FIRST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { + *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX +} +void FIRST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { } +void FIRST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) { + if (*scratch == UINT_MAX) + *scratch = val; +} +void FIRST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) { + *res = *scratch; +} +void FIRST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { } +// llong +void FIRST_HFTA_AGGR_INIT_(gs_int64_t* scratch) { + *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX +} +void FIRST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { } +void FIRST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) { + if (*scratch == UINT_MAX) + *scratch = val; +} +void FIRST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) { + *res = *scratch; +} +void FIRST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { } -void FIRST_STR_HFTA_AGGR_INIT_(vstring* scratch) { +// string +void FIRST_HFTA_AGGR_INIT_(vstring* scratch) { scratch->offset= 0; } - -void FIRST_STR_HFTA_AGGR_REINIT_(vstring* scratch) { } - -void FIRST_STR_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) { +void FIRST_HFTA_AGGR_REINIT_(vstring* scratch) { } +void FIRST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) { if (!scratch->offset) { scratch->length = val->length; scratch->offset = val->offset; scratch->reserved = SHALLOW_COPY; } } - -void FIRST_STR_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) { +void FIRST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) { *res = *scratch; } - -void FIRST_STR_HFTA_AGGR_DESTROY_(vstring* scratch) { } +void FIRST_HFTA_AGGR_DESTROY_(vstring* scratch) { } // hfta/lfta split +// uint void FIRST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX } - void FIRST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { } - void FIRST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) { if (*scratch == UINT_MAX) *scratch = val; } - void FIRST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) { *res = *scratch; } - void FIRST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { } -void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { +// int +void FIRST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) { *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX } +void FIRST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { } +void FIRST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) { + if (*scratch == UINT_MAX) + *scratch = val; +} +void FIRST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) { + *res = *scratch; +} +void FIRST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { } +// ullong +void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { + *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX +} void FIRST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { } - void FIRST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) { if (*scratch == UINT_MAX) *scratch = val; } - void FIRST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) { *res = *scratch; } - void FIRST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { } +// llong +void FIRST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) { + *scratch = UINT_MAX; // we will encode uninitialized value of UINT_MAX +} +void FIRST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { } +void FIRST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) { + if (*scratch == UINT_MAX) + *scratch = val; +} +void FIRST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) { + *res = *scratch; +} +void FIRST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { } +// string void FIRST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) { scratch->offset= 0; } - void FIRST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { } - void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) { if (!scratch->offset) { scratch->length = val->length; @@ -379,11 +419,9 @@ void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) { scratch->reserved = SHALLOW_COPY; } } - void FIRST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) { *res = *scratch; } - void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { } @@ -392,108 +430,244 @@ void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { } // hfta only +// uint void LAST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { } - void LAST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { } - void LAST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) { *scratch = val; } - void LAST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) { *res = *scratch; } - void LAST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { } -void LAST_ULLHFTA_AGGR_INIT_(gs_uint64_t* scratch) { } - -void LAST_ULL_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { } - -void LAST_ULL_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) { +// int +void LAST_HFTA_AGGR_INIT_(gs_int32_t* scratch) { } +void LAST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { } +void LAST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) { *scratch = val; } +void LAST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) { + *res = *scratch; +} +void LAST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { } -void LAST_ULL_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) { +// llong +void LAST_HFTA_AGGR_INIT_(gs_int64_t* scratch) { } +void LAST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { } +void LAST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) { + *scratch = val; +} +void LAST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) { *res = *scratch; } +void LAST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { } -void LAST_ULL_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { } +// ullong +void LAST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { } +void LAST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { } +void LAST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) { + *scratch = val; +} +void LAST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) { + *res = *scratch; +} +void LAST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { } -void LAST_STR_HFTA_AGGR_INIT_(vstring* scratch) { +// string +void LAST_HFTA_AGGR_INIT_(vstring* scratch) { scratch->offset= 0; } - -void LAST_STR_HFTA_AGGR_REINIT_(vstring* scratch) { } - -void LAST_STR_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) { +void LAST_HFTA_AGGR_REINIT_(vstring* scratch) { } +void LAST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) { scratch->length = val->length; scratch->offset = val->offset; scratch->reserved = SHALLOW_COPY; } - -void LAST_STR_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) { +void LAST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) { *res = *scratch; } - -void LAST_STR_HFTA_AGGR_DESTROY_(vstring* scratch) { } +void LAST_HFTA_AGGR_DESTROY_(vstring* scratch) { } // hfta/lfta split void LAST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { } - void LAST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { } - void LAST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) { *scratch = val; } - void LAST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) { *res = *scratch; } - void LAST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { } -void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { } +void LAST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) { } +void LAST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { } +void LAST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) { + *scratch = val; +} +void LAST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) { + *res = *scratch; +} +void LAST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { } +void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { } void LAST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { } - void LAST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) { *scratch = val; } - void LAST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) { *res = *scratch; } - void LAST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { } +void LAST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) { } +void LAST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { } +void LAST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) { + *scratch = val; +} +void LAST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) { + *res = *scratch; +} +void LAST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { } + void LAST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) { scratch->offset= 0; } - void LAST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { } - void LAST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) { scratch->length = val->length; scratch->offset = val->offset; scratch->reserved = SHALLOW_COPY; } - void LAST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) { *res = *scratch; } - void LAST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { } +//////////////////////////////////////////////////////////// +// count different (# of times the value is different than the previous) + +struct count_diff_scratch{ + gs_uint32_t count; + union{ + gs_uint32_t ui; + gs_int32_t i; + gs_uint64_t ul; + gs_int64_t l; + } r; +}; + +////////// HFTA only + +// uint32 +void count_diff_HFTA_AGGR_INIT_(gs_sp_t s){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + scratch->count = 0; + scratch->r.ul = 0; +} +void count_diff_HFTA_AGGR_REINIT_(gs_sp_t s){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + scratch->count = 0; +} +void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + if(scratch->count==0 || scratch->r.ui != val) + scratch->count++; + scratch->r.ui = val; +} +void count_diff_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + *res = scratch->count; +} +void count_diff_HFTA_AGGR_DESTROY_(gs_sp_t s){ } + +// int32 +void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + if(scratch->count==0 || scratch->r.i != val) + scratch->count++; + scratch->r.i = val; +} + +// uint64 +void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + if(scratch->count==0 || scratch->r.ul != val) + scratch->count++; + scratch->r.ul = val; +} + +// int64 +void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + if(scratch->count==0 || scratch->r.l != val) + scratch->count++; + scratch->r.l = val; +} + +// vstring +void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring* val){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + gs_uint64_t hashval = hfta_vstr_long_hashfunc(val); + if(scratch->count==0 || scratch->r.l != hashval) + scratch->count++; + scratch->r.l = hashval; +} + +////////// HFTA / LFTA split + +struct lfta_count_diff_scratch{ + gs_uint32_t count; + union{ + gs_uint32_t ui; + gs_int32_t i; + gs_uint64_t ul; + gs_int64_t l; + } first; + union{ + gs_uint32_t ui; + gs_int32_t i; + gs_uint64_t ul; + gs_int64_t l; + } last; +}; + + +void count_diff_hfta_HFTA_AGGR_INIT_(gs_sp_t s){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + scratch->count = 0; + scratch->r.ul = 0; +} +void count_diff_hfta_HFTA_AGGR_REINIT_(gs_sp_t s){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + scratch->count = 0; + scratch->r.ul = 0; +} +void count_diff_hfta_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *v){ + lfta_count_diff_scratch *val = (lfta_count_diff_scratch *)v; + count_diff_scratch *scratch = (count_diff_scratch *)(v->offset); + scratch->count += val->count - 1; + if(scratch->r.l != val->first.l) + scratch->count++; + scratch->r.l = val->last.l; +} +void count_diff_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){ + count_diff_scratch *scratch = (count_diff_scratch *)s; + *res = (scratch->count)+1; +} +void count_diff_hfta_HFTA_AGGR_DESTROY_(gs_sp_t scratch){ } + + + ///////////////////////////////////////////////////////// // running_array_aggr aggregate void running_array_aggr_hfta_HFTA_AGGR_INIT_(vstring* scratch) { - scratch->offset = NULL; + scratch->offset = (gs_p_t)NULL; scratch->length = 0; } @@ -542,3 +716,358 @@ void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { hfta_vstr_destroy(scratch); } + +//////////////////////////////////////////// +// Aggregate strings by catenation + + +struct CAT_aggr_scratch{ + std::string val; + int x; +}; + +struct CAT_aggr_scratch_ptr{ + CAT_aggr_scratch *ptr; +}; + +void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s){ + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = new CAT_aggr_scratch(); + v->x = 101; + + p->ptr = v; +} +void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s){ + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = p->ptr; + v->val=""; +} +void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str){ +//char buf1[MAXTUPLESZ-20], buf2[MAXTUPLESZ-20]; +//int i; +//for(i=0;ilength;++i) buf1[i] = *(((char *)sep->offset)+i); +//buf1[i]='\0'; +//for(i=0;ilength;++i) buf2[i] = *(((char *)str->offset)+i); +//buf2[i]='\0'; + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = p->ptr; + if(v->val.size()>0) + v->val.append((char *)(sep->offset), sep->length); + v->val.append((char *)(str->offset), str->length); +//printf("sep=%s, str=%s, val=%s\n",buf1,buf2,v->val.c_str()); +} +void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){ + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = p->ptr; +//printf("output val=%s\n",v->val.c_str()); + res->offset = (gs_p_t)malloc(v->val.size()); + res->length = v->val.size(); + if(res->length>MAXTUPLESZ-20) + res->length=MAXTUPLESZ-20; +// v->val.copy((char *)(res->offset), 0, res->length); + const char *dat = v->val.c_str(); + memcpy((char *)(res->offset), dat, res->length); +// for(int i=0;ilength;++i) +// *(((char *)res->offset)+i) = dat[i]; + res->reserved = INTERNAL; +} +void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s){ + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = p->ptr; + delete v; +} + + + +/////////////////////////////////////////////////////////////// +// time_avg((sample, ts, window_size) +// Compute time-weighted average sum(sample*duration)/window_size +// duration is difference between current and next ts. +// The idea is to compute a sum over a step function. +// + +struct time_avg_udaf_str{ + gs_float_t sum; + gs_float_t last_val; + gs_uint64_t last_ts; + gs_uint64_t window; + gs_uint64_t first_ts; + gs_uint8_t event_occurred; +}; + +void time_avg_HFTA_AGGR_INIT_(gs_sp_t s){ + time_avg_udaf_str *scratch = (time_avg_udaf_str *)s; + scratch->sum = 0.0; + scratch->last_val = 0.0; + scratch->last_ts = 0; + scratch->first_ts = 0; + scratch->event_occurred = 0; +} + +void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s){ +} + +void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s){ + time_avg_udaf_str *scratch = (time_avg_udaf_str *)s; + scratch->event_occurred = 0; + scratch->sum = 0; +//printf("time_avg_reinit: occurred=%d, last_val=%lf, sum=%lf, first_ts=%lld, last_ts=%lld\n",scratch->event_occurred, scratch->last_val, scratch->sum, scratch->first_ts, scratch->last_ts); +} + +void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s){ + time_avg_udaf_str *scratch = (time_avg_udaf_str *)s; + if(scratch->event_occurred==0){ + *result = scratch->last_val; +//printf("\ttime_avg outpt1 sum=%lf, last_val=%lf, result=%lf\n", scratch->sum, scratch->last_val, *result); + return; + } + gs_int64_t end_time = scratch->window * (scratch->last_ts/scratch->window + 1); + scratch->sum += (end_time - scratch->last_ts) * (gs_float_t)(scratch->last_val); + gs_int64_t start_time = end_time - scratch->window; + if(scratch->first_ts > start_time){ + *result = scratch->sum / (end_time - scratch->first_ts); +//printf("\ttime_avg outpt2 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result); + }else{ + *result = scratch->sum / (end_time - start_time); +//printf("\ttime_avg outpt3 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result); + } +} + +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_udaf_str *scratch = (time_avg_udaf_str *)s; + scratch->window = window; + if(scratch->first_ts==0){ + scratch->first_ts = ts; + }else{ + if(scratch->event_occurred){ + + scratch->sum += (ts - scratch->last_ts) * scratch->last_val; + }else{ + gs_int64_t start_time = scratch->window * (scratch->last_ts/scratch->window); + scratch->sum += (ts - start_time) * scratch->last_val; + } + } +//printf("time_avg_upd: val=%lf, occurred=%d, last_val=%lf, sum=%lf, ts=%lld, first_ts=%lld, last_ts=%lld\n",val, scratch->event_occurred, scratch->last_val, scratch->sum, ts, scratch->first_ts, scratch->last_ts); + scratch->last_val = val; + scratch->last_ts = ts; + scratch->event_occurred = 1; +} +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window); +} +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window); +} +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window); +} +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window); +} + + +// ------------------------------------------------------------ +// running_sum_max : get the running sum of an int, +// be able to report this sum and also its max value +// during the time window + +struct run_sum_max_udaf_str{ + gs_int64_t sum; + gs_int64_t max; +}; +void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum = 0; + scratch->max = 0; +} +void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->max = scratch->sum; +} +void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){ + r->length = sizeof(run_sum_max_udaf_str); + r->offset = (gs_p_t)(b); + r->reserved = SHALLOW_COPY; +} +void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b){ + return; +} + +void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum+=v; + if(scratch->sum>scratch->max) scratch->max=scratch->sum; +} +void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum+=v; + if(scratch->sum>scratch->max) scratch->max=scratch->sum; +} +void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum+=v; + if(scratch->sum>scratch->max) scratch->max=scratch->sum; +} +void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum+=v; + if(scratch->sum>scratch->max) scratch->max=scratch->sum; +} +// the extraction functions +gs_int64_t extr_running_sum(vstring *v){ + if(v->length != sizeof(run_sum_max_udaf_str)) return 0; + run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset); + return vs->sum; +} +gs_int64_t extr_running_sum_max(vstring *v){ + if(v->length != sizeof(run_sum_max_udaf_str)) return 0; + run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset); + return vs->max; +} + + +// --------------------------------------------- +// aggr_diff : from a sequence of strings, collect +// the ones which are different than the previous. +// this includes the prior time period. +// the idea is to see the sequence of handovers + +struct CAT_aggr_diff_scratch{ + std::string val; + std::string prev_s; +// gs_int64_t prev_ts; // for now, just catenate strings +}; + +struct CAT_aggr_diff_scratch_ptr{ + CAT_aggr_diff_scratch *ptr; +}; + + + +void CAT_aggr_diff_HFTA_AGGR_INIT_(gs_sp_t s){ + CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s; + CAT_aggr_diff_scratch *v = new CAT_aggr_diff_scratch(); + v->prev_s = ""; + v->val = ""; + + p->ptr = v; +} +void CAT_aggr_diff_HFTA_AGGR_REINIT_(gs_sp_t s){ + CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s; + CAT_aggr_diff_scratch *v = p->ptr; + v->val=v->prev_s; +} +void CAT_aggr_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *str){ + char str_buf[MAXTUPLESZ-20]; + int i; + for(i=0;ilength;++i) str_buf[i] = *(((char *)str->offset)+i); + str_buf[i]='\0'; + + CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s; + CAT_aggr_diff_scratch *v = p->ptr; + if(str_buf != v->prev_s){ + if(v->val.size()>0) + v->val += ':'; + v->val += str_buf; + v->prev_s = str_buf; + } +} + +void CAT_aggr_diff_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){ + CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s; + CAT_aggr_diff_scratch *v = p->ptr; +//printf("output val=%s\n",v->val.c_str()); + res->offset = (gs_p_t)malloc(v->val.size()); + res->length = v->val.size(); + if(res->length>MAXTUPLESZ-20) + res->length=MAXTUPLESZ-20; +// v->val.copy((char *)(res->offset), 0, res->length); + const char *dat = v->val.c_str(); + memcpy((char *)(res->offset), dat, res->length); +// for(int i=0;ilength;++i) +// *(((char *)res->offset)+i) = dat[i]; + res->reserved = INTERNAL; +} +void CAT_aggr_diff_HFTA_AGGR_DESTROY_(gs_sp_t s){ + CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s; + CAT_aggr_diff_scratch *v = p->ptr; + delete v; +} + + + +// --------------------------------------------- +// Approximate count distinct. +// Rely on the minhashing approach. +// Currently HFTA-only +// Uses a 32-bit hash, tested up to 100,000,000 elements +// and it gave good results (within 7%) + + +#define COUNT_DISTINCT_NREPS 250 +#define COUNT_DISTINCT_MAX_STRING_LEN 200 // number of 4-byte words + +static Hash32bit2univID hids[COUNT_DISTINCT_NREPS]; +static int approx_count_distinct_udaf_initialized = 0; +struct approx_count_distinct_udaf_str{ + unsigned int mn[COUNT_DISTINCT_NREPS]; +}; + + +void approx_count_distinct_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){ + approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)buf; + for(int i=0;imn[i]=4294967295; + if(approx_count_distinct_udaf_initialized==0){ + for(int i=0;ilength/4] = 0; + memcpy((char *)buffer, (char *)val->offset, min(val->length, 800)); + unsigned int len4 = val->length/4 + ((val->length&0x03)>0); + + for(int i=0; imn[i]) cd->mn[i] = h; + } +} +void running_approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, vstring *val){ + approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(buf, val); +} + +void approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t buf){ + res->offset = (gs_p_t)buf; + res->length = sizeof(approx_count_distinct_udaf_str); + res->reserved = SHALLOW_COPY; +} +void running_approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t buf){ + approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(res, buf); +} + +gs_float_t extr_approx_count_distinct(vstring *v){ + approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)(v->offset); + gs_float_t avg = 0.0; + for(int i=0;imn[i]; + } + avg /= COUNT_DISTINCT_NREPS; + gs_float_t est = (4294967295.0 / avg) - 1; + return est; +} + +