Initial commit
[com/gs-lite.git] / src / lib / gscphftaaux / hfta_udaf.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16
17 #include "gsconfig.h"
18 #include "gstypes.h"
19 #include "hfta_udaf.h"
20 #include "rts_udaf.h"
21 #include <stdio.h>
22 #include <limits.h>
23 #include <math.h>
24 //#include <memory.h>
25 #include <string.h>
26 #include <sys/time.h>
27 #include <iostream>
28
29 #include "hfta_runtime_library.h"
30
31
32 #define max(a,b) ((a) > (b) ? (a) : (b))
33 #define min(x,y) ((x) < (y) ? (x) : (y))
34 #define lg(x)    (log(x) / log(2))
35
36 using namespace std;
37
38
39 // -------------------------------------------------------------------
40 //              moving sum over N intervals
41
42 struct moving_sum_udaf_str{
43         gs_uint32_t N;
44         gs_uint32_t pos;
45         gs_uint32_t *sums;
46 };
47
48 void moving_sum_udaf_HFTA_AGGR_INIT_(gs_sp_t buf){
49   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
50   u->N=0; u->pos=0; u->sums=NULL;
51 }
52
53 void moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t buf, gs_uint32_t s, gs_uint32_t N) {
54   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
55   if(u->sums == NULL){
56         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
57         for(gs_int32_t i=0;i<N;i++)
58                 u->sums[i] = 0;
59     u->N = N;
60   }
61   u->sums[u->pos] += s;
62 }
63
64 void super_moving_sum_udaf_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint64_t sub_sum) {
65   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
66   gs_uint32_t s = (gs_uint32_t)(sub_sum & 0xffffffff);
67   if(u->sums == NULL){
68     gs_uint32_t N = (gs_uint32_t)((sub_sum & 0xffffffff00000000ull) >> 32);
69         u->sums = (gs_uint32_t *)malloc(N*sizeof(gs_uint32_t));
70         for(gs_int32_t i=0;i<N;i++)
71                 u->sums[i] = 0;
72     u->N = N;
73   }
74   u->sums[u->pos] += s;
75 }
76
77 void moving_sum_udaf_HFTA_AGGR_OUTPUT_(gs_p_t *result, gs_sp_t  buf){
78         *result = (gs_p_t)(buf);
79 }
80
81 void moving_sum_udaf_HFTA_AGGR_DESTROY_(gs_sp_t  buf){
82   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
83   if(u->sums != NULL)
84         free(u->sums);
85 }
86
87 void moving_sum_udaf_HFTA_AGGR_REINIT_( gs_sp_t  buf){
88   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) buf;
89   u->pos++;
90   if(u->pos >= u->N)
91         u->pos = 0;
92   u->sums[u->pos] = 0;
93 }
94
95 gs_uint32_t moving_sum_extract(gs_p_t result){
96   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
97   gs_uint32_t s=0, i=0;
98   for(i=0; i<u->N;i++){
99     s += u->sums[i];
100   }
101   return s;
102 }
103
104 gs_float_t moving_sum_extract_exp(gs_p_t result, gs_float_t alpha){
105   struct moving_sum_udaf_str * u = (struct moving_sum_udaf_str *) result;
106   gs_uint32_t p=0, i=0;
107   gs_float_t s=0.0, m=1.0;
108   p=u->pos;
109   for(i=0; i<u->N;i++){
110     s += u->sums[i]*m;
111     if(p==0)
112                 p=u->N - 1;
113     else
114                 p--;
115         m *= alpha;
116   }
117   return s;
118 }
119
120
121 // -------------------------------------------------------------------
122 //              sum over 3 intervals : test rUDAF
123
124 struct sum3_udaf_str{
125   gs_uint32_t s_2;
126   gs_uint32_t s_1;
127   gs_uint32_t s_0;
128 };
129
130 void sum3_HFTA_AGGR_INIT_(gs_sp_t  buf) {
131   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
132   u->s_0 = 0; u->s_1 = 0; u->s_2 = 0;
133   return;
134 }          
135
136 void sum3_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_uint32_t s) {
137   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
138   u->s_0 += s;
139   return;
140 }
141
142 void sum3_HFTA_AGGR_OUTPUT_(gs_uint32_t *result, gs_sp_t  buf) {
143   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
144   *result = u->s_0 + u->s_1 + u->s_2;
145   return; 
146 }
147
148 void sum3_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {
149   return;
150 }
151
152 void sum3_HFTA_AGGR_REINIT_( gs_sp_t  buf) {
153   struct sum3_udaf_str * u = (struct sum3_udaf_str *) buf;
154   u->s_2 = u->s_1;
155   u->s_1 = u->s_0;
156   u->s_0 = 0;
157   return;
158 }
159
160
161 #define HISTORY_LENGTH 1024
162
163 /////////////////////////////////////////////////////////////////////////
164 /////   Calculate the average of all positive gs_float_t numbers
165
166 struct posavg_struct{
167   gs_float_t sum;
168   gs_float_t cnt;
169 };
170
171 void POSAVG_HFTA_AGGR_INIT_(gs_sp_t  buf) {
172   struct posavg_struct * a = (struct posavg_struct *) buf;
173   a->sum=0;
174   a->cnt=0;
175   return;
176 }
177
178 void POSAVG_HFTA_AGGR_UPDATE_(gs_sp_t  buf, gs_float_t v) {
179   struct posavg_struct * a = (struct posavg_struct *) buf;
180   if (v>=0) {
181     a->sum=a->sum+v;
182     a->cnt=a->cnt+1;
183   }
184   return;
185 }
186
187 void POSAVG_HFTA_AGGR_OUTPUT_(gs_float_t * v, gs_sp_t  buf) {
188   struct posavg_struct * a = (struct posavg_struct *) buf;
189   if (a->cnt>0) {
190     *v=(a->sum/a->cnt);
191   } else {
192     *v=-1;
193   }
194   return;
195 }
196
197 void POSAVG_HFTA_AGGR_DESTROY_(gs_sp_t  buf) {
198   return;
199 }
200
201 /////////////////////////////////////////////////////////////////////////
202 /////                   avg_udaf (simple example)
203
204 //              struct received from subaggregate
205 struct avg_udaf_lfta_struct_t{
206         gs_int64_t  sum;
207         gs_uint32_t cnt;
208 };
209
210 //              sctarchpad struct
211 struct avg_udaf_hfta_struct_t{
212         gs_int64_t sum;
213         gs_uint32_t cnt;
214 };
215
216 //                      avg_udaf functions
217 void avg_udaf_HFTA_AGGR_INIT_(gs_sp_t b){
218         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
219         s->sum = 0;
220         s->cnt = 0;
221 }
222
223 void avg_udaf_HFTA_AGGR_UPDATE_(gs_sp_t b, gs_uint32_t v){
224         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
225         s->sum += v;
226         s->cnt ++;
227 }
228
229 void avg_udaf_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
230         r->length = 12;
231         r->offset = (gs_p_t)(b);
232         r->reserved = SHALLOW_COPY;
233 }
234
235 void avg_udaf_HFTA_AGGR_DESTROY_(gs_sp_t b){
236         return;
237 }
238
239
240 //                      avg_udaf superaggregate functions
241 void avg_udaf_hfta_HFTA_AGGR_INIT_(gs_sp_t b){
242         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
243         s->sum = 0;
244         s->cnt = 0;
245 }
246
247 void avg_udaf_hfta_HFTA_AGGR_UPDATE_(gs_sp_t b, vstring *v){
248         if(v->length != 12) return;
249         avg_udaf_hfta_struct_t *s = (avg_udaf_hfta_struct_t *) b;
250         avg_udaf_lfta_struct_t *vs = (avg_udaf_lfta_struct_t *)(v->offset);
251         s->sum += vs->sum;
252         s->cnt += vs->cnt;
253 }
254
255 void avg_udaf_hfta_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
256         r->length = 12;
257         r->offset = (gs_p_t)(b);
258         r->reserved = SHALLOW_COPY;
259 }
260
261 void avg_udaf_hfta_HFTA_AGGR_DESTROY_(gs_sp_t b){
262         return;
263 }
264
265 //              Extraction function
266 gs_float_t extr_avg_fcn(vstring *v){
267         if(v->length != 12) return 0;
268         avg_udaf_hfta_struct_t *vs = (avg_udaf_hfta_struct_t *)(v->offset);
269         gs_float_t r = (gs_float_t)(vs->sum) / vs->cnt;
270     return r;
271 }
272
273 /////////////////////////////////////////////////////////
274 //              FIRST aggregate
275
276 void FIRST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) {
277         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
278 }
279
280 void FIRST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
281
282 void FIRST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
283         if (*scratch == UINT_MAX)
284                 *scratch = val;
285 }
286
287 void FIRST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
288         *res = *scratch;
289 }
290
291 void FIRST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
292
293 void FIRST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) {
294         *scratch = UINT_MAX;            // we will encode uninitialized value of UINT_MAX
295 }
296
297 void FIRST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
298
299 void FIRST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
300         if (*scratch == UINT_MAX)
301                 *scratch = val;
302 }
303
304 void FIRST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
305         *res = *scratch;
306 }
307
308 void FIRST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
309
310
311 void FIRST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
312         scratch->offset= 0;
313 }
314
315 void FIRST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
316
317 void FIRST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
318         if (!scratch->offset) {
319     scratch->length = val->length;
320     scratch->offset = val->offset;
321     scratch->reserved = SHALLOW_COPY;
322         }
323 }
324
325 void FIRST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
326         *res = *scratch;
327 }
328
329 void FIRST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
330
331 /////////////////////////////////////////////////////////
332 //              LAST aggregate
333
334 void LAST_hfta_HFTA_AGGR_INIT_(gs_uint32_t* scratch) { }
335
336 void LAST_hfta_HFTA_AGGR_REINIT_(gs_uint32_t* scratch) { }
337
338 void LAST_hfta_HFTA_AGGR_UPDATE_(gs_uint32_t* scratch, gs_uint32_t val) {
339         *scratch = val;
340 }
341
342 void LAST_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t* res, gs_uint32_t* scratch) {
343         *res = *scratch;
344 }
345
346 void LAST_hfta_HFTA_AGGR_DESTROY_(gs_uint32_t* scratch) { }
347
348 void LAST_ULL_hfta_HFTA_AGGR_INIT_(gs_uint64_t* scratch) { }
349
350 void LAST_ULL_hfta_HFTA_AGGR_REINIT_(gs_uint64_t* scratch) { }
351
352 void LAST_ULL_hfta_HFTA_AGGR_UPDATE_(gs_uint64_t* scratch, gs_uint64_t val) {
353         *scratch = val;
354 }
355
356 void LAST_ULL_hfta_HFTA_AGGR_OUTPUT_(gs_uint64_t* res, gs_uint64_t* scratch) {
357         *res = *scratch;
358 }
359
360 void LAST_ULL_hfta_HFTA_AGGR_DESTROY_(gs_uint64_t* scratch) { }
361
362
363 void LAST_STR_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
364         scratch->offset= 0;
365 }
366
367 void LAST_STR_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
368
369 void LAST_STR_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
370         scratch->length = val->length;
371   scratch->offset = val->offset;
372   scratch->reserved = SHALLOW_COPY;
373 }
374
375 void LAST_STR_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
376         *res = *scratch;
377 }
378
379 void LAST_STR_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { }
380
381
382 /////////////////////////////////////////////////////////
383 //              running_array_aggr aggregate
384
385 void running_array_aggr_hfta_HFTA_AGGR_INIT_(vstring* scratch) {
386   scratch->offset = NULL;  
387   scratch->length = 0;
388 }
389
390 void running_array_aggr_hfta_HFTA_AGGR_REINIT_(vstring* scratch) { }
391
392 void running_array_aggr_hfta_HFTA_AGGR_UPDATE_(vstring* scratch, vstring* val) {
393   char buffer[100];
394
395   gs_uint32_t* ints = (gs_uint32_t*)val->offset;
396   switch (val->length / sizeof (gs_uint32_t)) {
397     case 4:
398       sprintf(buffer, "%u,%u,%u,%u", ints[0], ints[1], ints[2], ints[3]);
399       break;
400     case 3:
401       sprintf(buffer, "%u,%u,%u", ints[0], ints[1], ints[2]);
402       break;   
403     case 2:
404       sprintf(buffer, "%u,%u", ints[0], ints[1]);
405       break;        
406     case 1:
407       sprintf(buffer, "%u", ints[0]);
408       break;  
409     case 0:
410       return;        
411   }
412   int buf_len = strlen(buffer);
413
414   // append the content of buffer to scratch
415         if (!scratch->offset) {
416     Vstring_Constructor(scratch, buffer);
417         } else {
418     scratch->offset = (gs_p_t)realloc((void*)scratch->offset, scratch->length + buf_len + 1);
419     *((char*)scratch->offset + scratch->length) = ',';
420     memcpy((void*)(scratch->offset + scratch->length + 1), (void*)buffer, buf_len);
421     scratch->length += buf_len + 1;
422     scratch->reserved = INTERNAL;
423   }
424 }
425
426 void running_array_aggr_hfta_HFTA_AGGR_OUTPUT_(vstring* res, vstring* scratch) {
427         *res = *scratch;
428   res->reserved = SHALLOW_COPY;
429 }
430
431 void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) {
432   hfta_vstr_destroy(scratch);
433  }
434