Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / src / lib / gscphftaaux / hfta_udaf.cc
index aa44d24..a5ff5cb 100644 (file)
@@ -27,6 +27,7 @@ Copyright 2014 AT&T Intellectual Property
 #include <iostream>
 
 #include "hfta_runtime_library.h"
+#include"stringhash.h"
 
 
 #define max(a,b) ((a) > (b) ? (a) : (b))
@@ -207,7 +208,7 @@ struct avg_udaf_lfta_struct_t{
         gs_uint32_t cnt;
 };
 
-//              sctarchpad struct
+//              scratchpad struct
 struct avg_udaf_hfta_struct_t{
         gs_int64_t sum;
         gs_uint32_t cnt;
@@ -274,6 +275,7 @@ gs_float_t extr_avg_fcn(vstring *v){
 //             FIRST aggregate
 // hfta only
 
+// uint
 void FIRST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
        *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
 }
@@ -287,6 +289,22 @@ void FIRST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
 }
 void FIRST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
 
+// int
+
+void FIRST_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
+       *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
+}
+void FIRST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
+void FIRST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
+       if (*scratch == UINT_MAX)
+               *scratch = val;
+}
+void FIRST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
+       *res = *scratch;
+}
+void FIRST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
+
+// ullong
 void FIRST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
        *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
 }
@@ -300,89 +318,100 @@ void FIRST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
 }
 void FIRST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
 
-
-
-void FIRST_ULL_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
+// llong
+void FIRST_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
        *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
 }
-
-void FIRST_ULL_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
-
-void FIRST_ULL_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
+void FIRST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
+void FIRST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
        if (*scratch == UINT_MAX)
                *scratch = val;
 }
-
-void FIRST_ULL_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
+void FIRST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
        *res = *scratch;
 }
+void FIRST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
 
-void FIRST_ULL_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
-
-
-void FIRST_STR_HFTA_AGGR_INIT_(vstring* scratch) {
+// string
+void FIRST_HFTA_AGGR_INIT_(vstring* scratch) {
        scratch->offset= 0;
 }
-
-void FIRST_STR_HFTA_AGGR_REINIT_(vstring* scratch) { }
-
-void FIRST_STR_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
+void FIRST_HFTA_AGGR_REINIT_(vstring* scratch) { }
+void FIRST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
        if (!scratch->offset) {
     scratch->length = val->length;
     scratch->offset = val->offset;
     scratch->reserved = SHALLOW_COPY;
        }
 }
-
-void FIRST_STR_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
+void FIRST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
        *res = *scratch;
 }
-
-void FIRST_STR_HFTA_AGGR_DESTROY_(vstring* scratch) { }
+void FIRST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
 
 // hfta/lfta split
 
+// uint
 void FIRST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
        *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
 }
-
 void FIRST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
-
 void FIRST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
        if (*scratch == UINT_MAX)
                *scratch = val;
 }
-
 void FIRST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
        *res = *scratch;
 }
-
 void FIRST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
 
-void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
+// int
+void FIRST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) {
        *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
 }
+void FIRST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
+void FIRST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
+       if (*scratch == UINT_MAX)
+               *scratch = val;
+}
+void FIRST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
+       *res = *scratch;
+}
+void FIRST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
 
+// ullong
+void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
+       *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
+}
 void FIRST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
-
 void FIRST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
        if (*scratch == UINT_MAX)
                *scratch = val;
 }
-
 void FIRST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
        *res = *scratch;
 }
-
 void FIRST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
 
+// llong
+void FIRST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) {
+       *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
+}
+void FIRST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
+void FIRST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
+       if (*scratch == UINT_MAX)
+               *scratch = val;
+}
+void FIRST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
+       *res = *scratch;
+}
+void FIRST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
 
+// string
 void FIRST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
        scratch->offset= 0;
 }
-
 void FIRST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
-
 void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
        if (!scratch->offset) {
     scratch->length = val->length;
@@ -390,11 +419,9 @@ void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
     scratch->reserved = SHALLOW_COPY;
        }
 }
-
 void FIRST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
        *res = *scratch;
 }
-
 void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
 
 
@@ -403,6 +430,7 @@ void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
 
 // hfta only
 
+// uint
 void LAST_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
 void LAST_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
 void LAST_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
@@ -413,107 +441,233 @@ void LAST_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
 }
 void LAST_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
 
-
-void LAST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
-void LAST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
-void LAST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
+// int
+void LAST_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
+void LAST_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
+void LAST_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
        *scratch = val;
 }
-void LAST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
+void LAST_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
        *res = *scratch;
 }
-void LAST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
-
+void LAST_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
 
-
-void LAST_ULLHFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
-
-void LAST_ULL_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
-
-void LAST_ULL_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
+// llong
+void LAST_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
+void LAST_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
+void LAST_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
        *scratch = val;
 }
-
-void LAST_ULL_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
+void LAST_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
        *res = *scratch;
 }
+void LAST_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
 
-void LAST_ULL_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
 
+// ullong
+void LAST_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
+void LAST_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
+void LAST_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
+       *scratch = val;
+}
+void LAST_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
+       *res = *scratch;
+}
+void LAST_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
 
-void LAST_STR_HFTA_AGGR_INIT_(vstring* scratch) {
+// string
+void LAST_HFTA_AGGR_INIT_(vstring* scratch) {
        scratch->offset= 0;
 }
-
-void LAST_STR_HFTA_AGGR_REINIT_(vstring* scratch) { }
-
-void LAST_STR_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
+void LAST_HFTA_AGGR_REINIT_(vstring* scratch) { }
+void LAST_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
        scratch->length = val->length;
   scratch->offset = val->offset;
   scratch->reserved = SHALLOW_COPY;
 }
-
-void LAST_STR_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
+void LAST_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
        *res = *scratch;
 }
-
-void LAST_STR_HFTA_AGGR_DESTROY_(vstring* scratch) { }
+void LAST_HFTA_AGGR_DESTROY_(vstring* scratch) { }
 
 // hfta/lfta split
 
 void LAST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
-
 void LAST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
-
 void LAST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
        *scratch = val;
 }
-
 void LAST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
        *res = *scratch;
 }
-
 void LAST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
 
-void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
+void LAST_INT_hfta_HFTA_AGGR_INIT_(gs_int32_t* scratch) { }
+void LAST_INT_hfta_HFTA_AGGR_REINIT_(gs_int32_t* scratch) { }
+void LAST_INT_hfta_HFTA_AGGR_UPDATE_(gs_int32_t* scratch, gs_int32_t val) {
+       *scratch = val;
+}
+void LAST_INT_hfta_HFTA_AGGR_OUTPUT_(gs_int32_t* res, gs_int32_t* scratch) {
+       *res = *scratch;
+}
+void LAST_INT_hfta_HFTA_AGGR_DESTROY_(gs_int32_t* scratch) { }
 
+void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
 void LAST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
-
 void LAST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
        *scratch = val;
 }
-
 void LAST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
        *res = *scratch;
 }
-
 void LAST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
 
+void LAST_LL_hfta_HFTA_AGGR_INIT_(gs_int64_t* scratch) { }
+void LAST_LL_hfta_HFTA_AGGR_REINIT_(gs_int64_t* scratch) { }
+void LAST_LL_hfta_HFTA_AGGR_UPDATE_(gs_int64_t* scratch, gs_int64_t val) {
+       *scratch = val;
+}
+void LAST_LL_hfta_HFTA_AGGR_OUTPUT_(gs_int64_t* res, gs_int64_t* scratch) {
+       *res = *scratch;
+}
+void LAST_LL_hfta_HFTA_AGGR_DESTROY_(gs_int64_t* scratch) { }
+
 
 void LAST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
        scratch->offset= 0;
 }
-
 void LAST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
-
 void LAST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
        scratch->length = val->length;
   scratch->offset = val->offset;
   scratch->reserved = SHALLOW_COPY;
 }
-
 void LAST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
        *res = *scratch;
 }
-
 void LAST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
 
 
+////////////////////////////////////////////////////////////
+//             count different (# of times the value is different than the previous)
+
+struct count_diff_scratch{
+       gs_uint32_t count;
+       union{
+               gs_uint32_t ui;
+               gs_int32_t i;
+               gs_uint64_t ul;
+               gs_int64_t l;
+       } r;
+};
+
+//////////  HFTA only
+
+// uint32
+void count_diff_HFTA_AGGR_INIT_(gs_sp_t s){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       scratch->count = 0;
+       scratch->r.ul = 0;
+}
+void count_diff_HFTA_AGGR_REINIT_(gs_sp_t s){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       scratch->count = 0;
+}
+void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       if(scratch->count==0 || scratch->r.ui != val)
+               scratch->count++;
+       scratch->r.ui = val;
+}
+void count_diff_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       *res = scratch->count;
+}
+void count_diff_HFTA_AGGR_DESTROY_(gs_sp_t s){ }
+
+// int32
+void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       if(scratch->count==0 || scratch->r.i != val)
+               scratch->count++;
+       scratch->r.i = val;
+}
+
+// uint64
+void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       if(scratch->count==0 || scratch->r.ul != val)
+               scratch->count++;
+       scratch->r.ul = val;
+}
+
+// int64
+void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       if(scratch->count==0 || scratch->r.l != val)
+               scratch->count++;
+       scratch->r.l = val;
+}
+
+// vstring
+void count_diff_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring* val){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       gs_uint64_t hashval = hfta_vstr_long_hashfunc(val);
+       if(scratch->count==0 || scratch->r.l != hashval)
+               scratch->count++;
+       scratch->r.l = hashval;
+}
+
+////////// HFTA / LFTA split
+
+struct lfta_count_diff_scratch{
+       gs_uint32_t count;
+       union{
+               gs_uint32_t ui;
+               gs_int32_t i;
+               gs_uint64_t ul;
+               gs_int64_t l;
+       } first;
+       union{
+               gs_uint32_t ui;
+               gs_int32_t i;
+               gs_uint64_t ul;
+               gs_int64_t l;
+       } last;
+};
+
+
+void count_diff_hfta_HFTA_AGGR_INIT_(gs_sp_t s){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       scratch->count = 0;
+       scratch->r.ul = 0;
+}
+void count_diff_hfta_HFTA_AGGR_REINIT_(gs_sp_t s){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       scratch->count = 0;
+       scratch->r.ul = 0;
+}
+void count_diff_hfta_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *v){
+       lfta_count_diff_scratch *val = (lfta_count_diff_scratch *)v;
+       count_diff_scratch *scratch = (count_diff_scratch *)(v->offset);
+       scratch->count += val->count - 1;
+       if(scratch->r.l != val->first.l)
+               scratch->count++;
+       scratch->r.l = val->last.l;
+}
+void count_diff_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s){
+       count_diff_scratch *scratch = (count_diff_scratch *)s;
+       *res = (scratch->count)+1;
+}
+void count_diff_hfta_HFTA_AGGR_DESTROY_(gs_sp_t scratch){ }
+       
+
+
 /////////////////////////////////////////////////////////
 //             running_array_aggr aggregate
 
 void running_array_aggr_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
-  scratch->offset = NULL;  
+  scratch->offset = (gs_p_t)NULL;  
   scratch->length = 0;
 }
 
@@ -562,3 +716,358 @@ void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) {
   hfta_vstr_destroy(scratch);
  }
 
+
+////////////////////////////////////////////
+//             Aggregate strings by catenation
+       
+
+struct CAT_aggr_scratch{
+       std::string val;
+       int x;
+};
+
+struct CAT_aggr_scratch_ptr{
+       CAT_aggr_scratch *ptr;
+};
+
+void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s){
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = new CAT_aggr_scratch();
+       v->x = 101;
+
+       p->ptr = v;
+}
+void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s){
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = p->ptr;
+       v->val="";
+}
+void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str){
+//char buf1[MAXTUPLESZ-20], buf2[MAXTUPLESZ-20];
+//int i;
+//for(i=0;i<sep->length;++i) buf1[i] = *(((char *)sep->offset)+i);
+//buf1[i]='\0';
+//for(i=0;i<str->length;++i) buf2[i] = *(((char *)str->offset)+i);
+//buf2[i]='\0';
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = p->ptr;
+       if(v->val.size()>0)
+               v->val.append((char *)(sep->offset), sep->length);
+       v->val.append((char *)(str->offset), str->length);
+//printf("sep=%s, str=%s, val=%s\n",buf1,buf2,v->val.c_str());
+}
+void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = p->ptr;
+//printf("output val=%s\n",v->val.c_str());
+       res->offset = (gs_p_t)malloc(v->val.size());
+       res->length = v->val.size();
+       if(res->length>MAXTUPLESZ-20)
+               res->length=MAXTUPLESZ-20;
+//     v->val.copy((char *)(res->offset), 0, res->length);
+       const char *dat = v->val.c_str();
+       memcpy((char *)(res->offset), dat, res->length);
+//     for(int i=0;i<res->length;++i)
+//             *(((char *)res->offset)+i) = dat[i];
+       res->reserved = INTERNAL;
+}
+void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s){
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = p->ptr;
+       delete v;
+}
+
+       
+       
+///////////////////////////////////////////////////////////////
+//     time_avg((sample, ts, window_size)
+//  Compute time-weighted average sum(sample*duration)/window_size
+//  duration is difference between current and next ts.
+//  The idea is to compute a sum over a step function.
+//  
+       
+struct time_avg_udaf_str{
+       gs_float_t sum;
+       gs_float_t last_val;
+       gs_uint64_t last_ts;
+       gs_uint64_t window;
+       gs_uint64_t first_ts;
+       gs_uint8_t event_occurred;
+};
+       
+void time_avg_HFTA_AGGR_INIT_(gs_sp_t s){
+       time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+       scratch->sum = 0.0;
+       scratch->last_val = 0.0;
+       scratch->last_ts = 0;
+       scratch->first_ts = 0;
+       scratch->event_occurred = 0;
+}
+
+void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s){
+}
+
+void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s){
+       time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+       scratch->event_occurred = 0;
+       scratch->sum = 0;
+//printf("time_avg_reinit: occurred=%d, last_val=%lf, sum=%lf, first_ts=%lld, last_ts=%lld\n",scratch->event_occurred, scratch->last_val, scratch->sum, scratch->first_ts, scratch->last_ts);
+}
+
+void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s){
+       time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+       if(scratch->event_occurred==0){
+               *result =  scratch->last_val;
+//printf("\ttime_avg outpt1 sum=%lf, last_val=%lf, result=%lf\n", scratch->sum, scratch->last_val, *result);
+               return;
+       }
+       gs_int64_t end_time = scratch->window * (scratch->last_ts/scratch->window + 1);
+       scratch->sum += (end_time - scratch->last_ts) * (gs_float_t)(scratch->last_val); 
+       gs_int64_t start_time = end_time - scratch->window;
+       if(scratch->first_ts > start_time){
+               *result = scratch->sum / (end_time - scratch->first_ts);
+//printf("\ttime_avg outpt2 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
+       }else{
+               *result = scratch->sum / (end_time - start_time);
+//printf("\ttime_avg outpt3 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
+       }
+}
+
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+       scratch->window = window;
+       if(scratch->first_ts==0){
+               scratch->first_ts = ts;
+       }else{
+               if(scratch->event_occurred){
+
+                       scratch->sum += (ts - scratch->last_ts) * scratch->last_val;
+               }else{
+                       gs_int64_t start_time = scratch->window * (scratch->last_ts/scratch->window);
+                       scratch->sum += (ts - start_time) * scratch->last_val;
+               }
+       }
+//printf("time_avg_upd: val=%lf, occurred=%d, last_val=%lf, sum=%lf, ts=%lld, first_ts=%lld, last_ts=%lld\n",val, scratch->event_occurred, scratch->last_val, scratch->sum, ts, scratch->first_ts, scratch->last_ts);
+       scratch->last_val = val;
+       scratch->last_ts = ts;
+       scratch->event_occurred = 1;
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+       
+
+// ------------------------------------------------------------
+//             running_sum_max : get the running sum of an int,
+//             be able to report this sum and also its max value
+//             during the time window
+
+struct run_sum_max_udaf_str{
+       gs_int64_t sum;
+       gs_int64_t max;
+};
+void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum = 0;
+       scratch->max = 0;
+}
+void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->max = scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
+        r->length = sizeof(run_sum_max_udaf_str);
+        r->offset = (gs_p_t)(b);
+        r->reserved = SHALLOW_COPY;
+}
+void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b){
+        return;
+}
+
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum+=v;
+       if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum+=v;
+       if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum+=v;
+       if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum+=v;
+       if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+//     the extraction functions
+gs_int64_t extr_running_sum(vstring *v){
+       if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
+       run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
+       return vs->sum;
+}
+gs_int64_t extr_running_sum_max(vstring *v){
+       if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
+       run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
+       return vs->max;
+}
+
+
+// ---------------------------------------------
+//             aggr_diff : from a sequence of strings, collect
+//               the ones which are different than the previous.
+//               this includes the prior time period.
+//               the idea is to see the sequence of handovers
+
+struct CAT_aggr_diff_scratch{
+       std::string val;
+       std::string prev_s;
+//     gs_int64_t prev_ts;     // for now, just catenate strings
+};
+
+struct CAT_aggr_diff_scratch_ptr{
+       CAT_aggr_diff_scratch *ptr;
+};
+
+
+
+void CAT_aggr_diff_HFTA_AGGR_INIT_(gs_sp_t s){
+       CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s;
+       CAT_aggr_diff_scratch *v = new CAT_aggr_diff_scratch();
+       v->prev_s = "";
+       v->val = "";
+
+       p->ptr = v;
+}
+void CAT_aggr_diff_HFTA_AGGR_REINIT_(gs_sp_t s){
+       CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s;
+       CAT_aggr_diff_scratch *v = p->ptr;
+       v->val=v->prev_s;
+}
+void CAT_aggr_diff_HFTA_AGGR_UPDATE_(gs_sp_t s,  vstring *str){
+       char str_buf[MAXTUPLESZ-20];
+       int i;
+       for(i=0;i<str->length;++i) str_buf[i] = *(((char *)str->offset)+i);
+       str_buf[i]='\0';
+
+       CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s;
+       CAT_aggr_diff_scratch *v = p->ptr;
+       if(str_buf != v->prev_s){
+               if(v->val.size()>0)
+                       v->val += ':';
+               v->val += str_buf;
+               v->prev_s = str_buf;
+       }
+}
+
+void CAT_aggr_diff_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){
+       CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s;
+       CAT_aggr_diff_scratch *v = p->ptr;
+//printf("output val=%s\n",v->val.c_str());
+       res->offset = (gs_p_t)malloc(v->val.size());
+       res->length = v->val.size();
+       if(res->length>MAXTUPLESZ-20)
+               res->length=MAXTUPLESZ-20;
+//     v->val.copy((char *)(res->offset), 0, res->length);
+       const char *dat = v->val.c_str();
+       memcpy((char *)(res->offset), dat, res->length);
+//     for(int i=0;i<res->length;++i)
+//             *(((char *)res->offset)+i) = dat[i];
+       res->reserved = INTERNAL;
+}
+void CAT_aggr_diff_HFTA_AGGR_DESTROY_(gs_sp_t s){
+       CAT_aggr_diff_scratch_ptr *p = (CAT_aggr_diff_scratch_ptr *)s;
+       CAT_aggr_diff_scratch *v = p->ptr;
+       delete v;
+}
+
+
+
+// ---------------------------------------------
+//             Approximate count distinct.
+//             Rely on the minhashing approach.
+//             Currently HFTA-only
+//             Uses a 32-bit hash, tested up to 100,000,000 elements
+//             and it gave good results (within 7%)
+
+
+#define COUNT_DISTINCT_NREPS 250
+#define COUNT_DISTINCT_MAX_STRING_LEN 200      // number of 4-byte words
+
+static Hash32bit2univID hids[COUNT_DISTINCT_NREPS];
+static int approx_count_distinct_udaf_initialized = 0;
+struct approx_count_distinct_udaf_str{
+       unsigned int mn[COUNT_DISTINCT_NREPS];
+};
+
+
+void approx_count_distinct_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
+       approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)buf;
+       for(int i=0;i<COUNT_DISTINCT_NREPS;++i)
+               cd->mn[i]=4294967295;
+       if(approx_count_distinct_udaf_initialized==0){
+               for(int i=0;i<COUNT_DISTINCT_NREPS;++i)
+                       hids[i] = InitStringHash32bit2univ(COUNT_DISTINCT_MAX_STRING_LEN);
+               approx_count_distinct_udaf_initialized=1;
+       }
+}
+void running_approx_count_distinct_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
+       approx_count_distinct_udaf_HFTA_AGGR_INIT_(buf);
+}
+
+void approx_count_distinct_udaf_HFTA_AGGR_REINIT_(gs_sp_t buf){ }
+void running_approx_count_distinct_udaf_HFTA_AGGR_REINIT_(gs_sp_t buf){}
+
+void approx_count_distinct_udaf_HFTA_AGGR_DESTROY_(gs_sp_t buf){ }
+void running_approx_count_distinct_udaf_HFTA_AGGR_DESTROY_(gs_sp_t buf){ }
+
+void approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, vstring *val){
+       approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)buf;
+       unsigned int buffer[sizeof(unsigned int)*COUNT_DISTINCT_MAX_STRING_LEN];
+       buffer[val->length/4] = 0;
+       memcpy((char *)buffer, (char *)val->offset, min(val->length, 800));
+       unsigned int len4 = val->length/4 + ((val->length&0x03)>0);
+
+       for(int i=0; i<COUNT_DISTINCT_NREPS; ++i){
+               unsigned int h = StringHash32bit2univ(buffer, len4, hids[i]);
+               if(h < cd->mn[i]) cd->mn[i] = h;
+       }
+}
+void running_approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, vstring *val){
+       approx_count_distinct_udaf_HFTA_AGGR_UPDATE_(buf, val);
+}
+
+void approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t buf){
+       res->offset = (gs_p_t)buf;
+       res->length = sizeof(approx_count_distinct_udaf_str);
+       res->reserved = SHALLOW_COPY;
+}
+void running_approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t buf){
+       approx_count_distinct_udaf_HFTA_AGGR_OUTPUT_(res, buf);
+}
+
+gs_float_t extr_approx_count_distinct(vstring *v){
+       approx_count_distinct_udaf_str *cd = (approx_count_distinct_udaf_str *)(v->offset);
+       gs_float_t avg = 0.0;
+       for(int i=0;i<COUNT_DISTINCT_NREPS;++i){
+               avg += cd->mn[i];
+       }
+       avg /= COUNT_DISTINCT_NREPS;
+       gs_float_t est = (4294967295.0 / avg) - 1;
+       return est;
+}
+
+