Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphftaaux / SSstateful_count_distinct.cc
1 /* ------------------------------------------------\r
2  Copyright 2014 AT&T Intellectual Property\r
3  Licensed under the Apache License, Version 2.0 (the "License");\r
4  you may not use this file except in compliance with the License.\r
5  You may obtain a copy of the License at\r
6  \r
7  http://www.apache.org/licenses/LICENSE-2.0\r
8  \r
9  Unless required by applicable law or agreed to in writing, software\r
10  distributed under the License is distributed on an "AS IS" BASIS,\r
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12  See the License for the specific language governing permissions and\r
13  limitations under the License.\r
14  ------------------------------------------- */\r
15 ///////////////////////////////////////////////\r
16 ////    State, functions for adaptive smart sampling\r
17 ///             written by Irina Rozenbaum\r
18 \r
19 #include <iostream>\r
20 #define max(x,y) ((x)>(y)?(x):(y))\r
21 #define _BURST_SOOTHING_FACTOR 10.0\r
22 \r
23 struct SSstate {\r
24     //unsigned long long int count;   // count to sample small packets with certain probability\r
25     double count;\r
26     double gcount;  // count for clean_with() function\r
27     double fcount;  // count for final_clean() function\r
28     double z;       //z is the threshold for a size of the packet\r
29     double z_prev;  // value of threshold from the previos cleaning phase\r
30     double gamma;   //tolerance parameter for emergency control over the\r
31     //number of samples, should be >= 1\r
32     int do_clean; // trigger cleaning phase\r
33     int bcount;  //count for number of packets that exceed threshold ,\r
34     //need it for one of the variations of algorithms for\r
35     //threshold adjustment\r
36     int s_size;  //need to remember sample size for _sfun_state_init()\r
37     int final_z; //bool that indicated if z was adjusted to its filnal value\r
38     //before the final clean\r
39     int time;    //timestamp from previously processed packet\r
40     //used in detecting frequency of the clening phases with aggregation\r
41     \r
42     int packet_count; //count for number of packets per time window\r
43     int do_clean_count; //count number of cleaning phases per time window\r
44     bool do_sooth; //turn on _BURST_SOOTHING_FACTOR\r
45     \r
46     // flow sampling\r
47     int count_closed; //bool indicates that counting phase is being triggered\r
48     int count_closed_flows; //count for closed flows\r
49     int count_notsampled_new; //count for the number of new groups that were not sampled during\r
50     // the final sampling phase will be used in next time window to compute resonable\r
51     // approximation for threshold before final sampling phase\r
52     \r
53     // for reporting\r
54     int delay; //interval within which there was no packets that belong to the current flow\r
55     //thus flow is considered to be closed.\r
56     //parameter specified by the query\r
57     int closing_now; //count for groups that are closing at the final cleraning phase\r
58     //or were closed during the most recent counting phase (newly closed)\r
59     \r
60     //for debugging\r
61     int how_many_cleaned;\r
62 };\r
63 \r
64 \r
65 // the function is called once at the initial initialization of the state\r
66 void _sfun_state_clean_init_smart_sampling_state(void *s){\r
67     struct SSstate *state  = (struct SSstate *)s;\r
68     \r
69     state->count = 0;\r
70     state->gcount = 0;\r
71     state->fcount = 0;\r
72     state->z = 200; //need to figure out good initial value for z\r
73     //    cout << "clean init Z: " <<  state->z << "\n";\r
74     state->z_prev = 0;\r
75     //    cout << "clean init Z prev: " <<  state->z << "\n";\r
76     state->gamma = 2;\r
77     state->do_clean = 0;\r
78     state->bcount = 0;\r
79     state->s_size = 0;\r
80     state->final_z = 0;\r
81     state->time = 0;\r
82     state->count_closed = 0;\r
83     state->count_closed_flows = 0;\r
84     state->count_notsampled_new = 0;\r
85     \r
86     state->packet_count = 0;\r
87     state->do_clean_count = 0;\r
88     state->do_sooth = true;\r
89     \r
90     state->how_many_cleaned = 0;\r
91     state->delay = 0;\r
92     state->closing_now = 0;\r
93 };\r
94 \r
95 \r
96 // the function will be called at the beginning of every time window\r
97 void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){\r
98     struct SSstate *state_new  = (struct SSstate *)s_new;\r
99     struct SSstate *state_old  = (struct SSstate *)s_old;\r
100     \r
101     //    cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";\r
102     //    cout << "***dirty current num of samples: " << curr_num_samples << "\n";\r
103     //    cout << "dirty init Z old: " <<  state_old->z << "\n";\r
104     if(curr_num_samples < state_old->s_size){\r
105         // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));\r
106         \r
107         //temp fix for early aggregation\r
108         state_new->z = (state_old->z)/2;\r
109         \r
110         if(state_old->do_sooth)\r
111             state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;\r
112     }\r
113     else {\r
114         if(curr_num_samples >= state_old->s_size){\r
115             //cout << "yes\n";\r
116             state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);\r
117             \r
118             if(state_old->do_sooth)\r
119                 state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;\r
120         }\r
121     }\r
122     \r
123     if(state_new->z <= 1.0)\r
124         state_new->z = 1;\r
125     //    cout << "dirty init Z new: " <<  state_new->z << "\n";\r
126     \r
127     state_new->gamma = state_old->gamma;\r
128     state_new->do_clean = state_old->do_clean;\r
129     state_new->s_size = state_old->s_size;\r
130     state_new->bcount = 0;\r
131     state_new->gcount = 0;\r
132     state_new->count = 0;\r
133     state_new->fcount = 0;\r
134     state_new->final_z = 0;\r
135     state_new->time = 0;\r
136     state_new->count_closed = 0;\r
137     state_new->count_closed_flows = 0;\r
138     state_new->count_notsampled_new = state_old->count_notsampled_new;\r
139     \r
140     state_new->packet_count = 0;\r
141     state_new->do_clean_count = 0;\r
142     state_new->do_sooth = true;\r
143     \r
144     state_new->how_many_cleaned = 0;\r
145     state_new->delay = 0;\r
146     state_new->closing_now = 0;\r
147     \r
148     //  cout << "dirty init gamma: " <<   state_new->gamma << "\n";\r
149     //     cout << "dirty init do_clean: " <<  state_new->do_clean << "\n";\r
150     //     cout << "dirty init s_size: " <<  state_new->s_size << "\n";\r
151     //     cout << "dirty init bcount: " <<  state_new->bcount << "\n";\r
152     //     cout << "dirty init gcount: " <<  state_new->gcount << "\n";\r
153     //     cout << "dirty init count: " <<  state_new->count << "\n";\r
154     //     cout << "dirty init fcount: " <<  state_new->fcount << "\n";\r
155     //     cout << "dirty init final_z: " <<  state_new->final_z << "\n";\r
156     //     cout << "dirty init time " <<  state_new->time  << "\n";\r
157     //     cout << "dirty init count_closed: " <<  state_new->count_closed << "\n";\r
158     //     cout << "dirty init count_closed_flows: " <<  state_new->count_closed_flows << "\n";\r
159     \r
160     //     cout << "dirty init packet count: " <<  state_new->packet_count  << "\n";\r
161     //     cout << "dirty init do_clean_count: " <<  state_new->do_clean_count  << "\n";\r
162     \r
163 };\r
164 \r
165 // function is called in two cases:\r
166 // adjustment of threshold before the cleaning phase is being triggered\r
167 // adjustment of threshold at the window border for the old state, this new\r
168 // threshold will be used by partial_flush() and flush() functions (final_clean())\r
169 \r
170 void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){\r
171     struct SSstate *state  = (struct SSstate *)s;\r
172     \r
173     //  cout << "final init Z old: " <<  state->z << "\n";\r
174     \r
175     if(state->final_z == 0){\r
176         \r
177         // just returned from the counting phase\r
178         // case when the time window is changed right after the counting phase\r
179         if(state->count_closed == 1){\r
180             state->count_closed = 0;\r
181             state->count_closed_flows = 0;\r
182             state->how_many_cleaned = 0;\r
183         }\r
184         \r
185         state->z_prev = state->z;\r
186         //    cout << "final current num of samples: " << curr_num_samples << "\n";\r
187         //    cout << "final count not sampled new: " << state->count_notsampled_new << "\n";\r
188         // adjust count for current number of sampled based on statistics\r
189         // gathered in the previous time window\r
190         // otherwise we overestimate it\r
191         if(state->count_notsampled_new != 0)\r
192             curr_num_samples -= state->count_notsampled_new;\r
193         \r
194         if(curr_num_samples < state->s_size){\r
195             //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));\r
196             \r
197             //temp fix for early aggregation\r
198             state->z = state->z/2;\r
199         }\r
200         else {\r
201             if(curr_num_samples >= state->s_size){\r
202                 state->z = state->z*((double)curr_num_samples/(double)state->s_size);\r
203             }\r
204         }\r
205         \r
206         if(state->z <= 0)\r
207             state->z = 1;\r
208         //    cout << "final init Z new: " <<  state->z << "\n";\r
209         \r
210         state->bcount = 0;\r
211         state->final_z = 1;\r
212         state->do_clean_count++;\r
213         state->count_notsampled_new = 0;\r
214         \r
215     }\r
216     \r
217     //  cout << "final init gamma: " <<   state->gamma << "\n";\r
218     //     cout << "final init do_clean: " <<  state->do_clean << "\n";\r
219     //     cout << "final init s_size: " <<  state->s_size << "\n";\r
220     //     cout << "final init bcount: " <<  state->bcount << "\n";\r
221     //     cout << "final init gcount: " <<  state->gcount << "\n";\r
222     //     cout << "final init count: " <<  state->count << "\n";\r
223     //     cout << "final init fcount: " <<  state->fcount << "\n";\r
224     //     cout << "final init final_z: " <<  state->final_z << "\n";\r
225     //     cout << "final init time " <<  state->time  << "\n";\r
226     //     cout << "final init count_closed: " <<  state->count_closed << "\n";\r
227     //     cout << "final init count_closed_flows: " <<  state->count_closed_flows << "\n";\r
228     \r
229     //     cout << "final init packet count: " <<  state->packet_count  << "\n";\r
230     //     cout << "final init do_clean_count: " <<  state->do_clean_count  << "\n";\r
231     \r
232 };\r
233 \r
234 int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){\r
235     struct SSstate *state  = (struct SSstate *)s;\r
236     \r
237     state->packet_count++;\r
238     \r
239     state->s_size = sample_size;\r
240     int sampled = 0;\r
241     \r
242     //just returned from the cleaning phase\r
243     if(state->do_clean == 1){\r
244         state->gcount = 0;\r
245         state->do_clean = 0;\r
246     }\r
247     \r
248     //sampling\r
249     if(len > state->z){\r
250         state->bcount++;\r
251         sampled=1;\r
252     }\r
253     else{\r
254         state->count += len;\r
255         if(state->count >= state->z){\r
256             sampled=1;\r
257             state->count -= state->z;\r
258         }\r
259     }\r
260     return sampled;\r
261     \r
262 };\r
263 \r
264 int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){\r
265     struct SSstate *state  = (struct SSstate *)s;\r
266     \r
267     state->packet_count++;\r
268     \r
269     state->s_size = sample_size;\r
270     int sampled = 0;\r
271     \r
272     //just returned from the counting phase\r
273     if(state->count_closed == 1)\r
274         //      cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";\r
275         \r
276         //just returned from the cleaning phase\r
277         if(state->do_clean == 1){\r
278             //      cout << "flows were sampled: " << state->how_many_cleaned << "\n";\r
279             state->how_many_cleaned = 0;\r
280             state->gcount = 0;\r
281             state->do_clean = 0;\r
282         }\r
283     \r
284     return 1;\r
285     \r
286 };\r
287 \r
288 \r
289 int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){\r
290     struct SSstate *state  = (struct SSstate *)s;\r
291     \r
292     state->do_sooth = true;\r
293     \r
294     // for ssample() where just returned from the clening phase\r
295     state->do_clean = 1;\r
296     \r
297     int sampled = 0;\r
298     double new_len = 0;\r
299     \r
300     if (glen < state->z_prev)\r
301         new_len = state->z_prev;\r
302     else\r
303         new_len = glen;\r
304     \r
305     //no need to clean\r
306     if(curr_num_samples <= state->s_size){\r
307         return 1;\r
308     }\r
309     else{\r
310         if(new_len > state->z){\r
311             sampled = 1;\r
312             state->bcount++;\r
313         }\r
314         else{\r
315             state->fcount += new_len;\r
316             if(state->fcount >= state->z){\r
317                 sampled = 1;\r
318                 state->fcount -= state->z;\r
319             }\r
320             //else{\r
321             //state->scount--;\r
322             //}\r
323         }\r
324         \r
325         return sampled;\r
326     }\r
327     \r
328 };\r
329 \r
330 int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){\r
331     struct SSstate *state  = (struct SSstate *)s;\r
332     \r
333     state->do_sooth = true;\r
334     // only for reporting\r
335     state->delay = delay;\r
336     \r
337     //for ssample() where just returned from the clening phase\r
338     state->do_clean = 1;\r
339     \r
340     int sampled = 0;\r
341     double new_len = 0;\r
342     // group is newly closed or not closed yet\r
343     int new_group = 1;\r
344     \r
345     // the flow is closed\r
346     if((ccondition == 1)||((state->time-maxtime)>=delay)){\r
347         \r
348         new_group = 0;\r
349         \r
350         // TF case or TT case for ccondition and delay\r
351         if(ccondition == 1){\r
352             if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows\r
353                 // the flow was closed in the previous counting slot\r
354                 new_group = 1;\r
355             }\r
356         }\r
357         //FT case for ccondition and delay\r
358         else{\r
359             if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows\r
360                 // the flow was closed in the previous counting slot\r
361                 new_group = 1;\r
362             }\r
363         }\r
364         // the flow was closed earlier than previous counting slot\r
365         // it is closed and old and new_group = 0\r
366     }\r
367     \r
368     //adjust size only for old closed groups\r
369     if(new_group == 0){\r
370         if (glen < state->z_prev)\r
371             new_len = state->z_prev;\r
372         else\r
373             new_len = glen;\r
374     }\r
375     // newly closed group\r
376     else{\r
377         state->closing_now++;\r
378         new_len = glen;\r
379     }\r
380     \r
381     \r
382     //no need to clean\r
383     if(curr_num_samples <= state->s_size){\r
384         state->count_notsampled_new = 0;\r
385         return 1;\r
386     }\r
387     else{\r
388         if(new_len > state->z){\r
389             sampled = 1;\r
390             state->bcount++;\r
391         }\r
392         else{\r
393             state->fcount += new_len;\r
394             if(state->fcount >= state->z){\r
395                 sampled = 1;\r
396                 state->fcount -= state->z;\r
397             }\r
398             //count number of not sampled newly closed groups\r
399             //will use in the next time window at the final cleaning phase\r
400             else{\r
401                 if(new_group == 1)\r
402                     state->count_notsampled_new++;\r
403             }\r
404             \r
405         }\r
406         \r
407         state->how_many_cleaned += sampled;\r
408         \r
409         return sampled;\r
410     }\r
411     \r
412 };\r
413 \r
414 \r
415 int ssdo_clean(void *s, int curr_num_samples){\r
416     struct SSstate *state  = (struct SSstate *)s;\r
417     \r
418     //emergency control\r
419     //bcount will be different after the cleaning phase\r
420     if(curr_num_samples > (state->gamma*state->s_size)){\r
421         state->z_prev = state->z;\r
422         state->z=(double)state->gamma*state->z;\r
423         state->do_clean = 1;\r
424         state->bcount = 0;\r
425         state->count = 0;\r
426         state->gcount = 0;\r
427         \r
428         state->do_clean_count++;\r
429     }\r
430     \r
431     return state->do_clean;\r
432 };\r
433 \r
434 int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){\r
435     struct SSstate *state  = (struct SSstate *)s;\r
436     \r
437     // initialize timestamp with first seen packet from the current timewindow\r
438     if(state->time == 0)\r
439         state->time = maxtime;\r
440     \r
441     // detect next counting time slot\r
442     if(state->time != maxtime){\r
443         //      cout << "need to count\n";\r
444         state->time = maxtime;\r
445         state->count_closed = 1;\r
446         return 1;\r
447     }\r
448     \r
449     //emergency control\r
450     //bcount will be different after the cleaning phase\r
451     //if(curr_num_samples > (state->gamma*state->s_size)){\r
452     if(state->count_closed_flows > (state->gamma*state->s_size)){\r
453         //      cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";\r
454         state->z_prev = state->z;\r
455         //      cout << "do clean Z old: " <<  state->z << "\n";\r
456         state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;\r
457         //      cout << "do clean Z new: " <<  state->z << "\n";\r
458         state->do_clean = 1;\r
459         state->bcount = 0;\r
460         state->count = 0;\r
461         state->gcount = 0;\r
462         \r
463         state->do_clean_count++;\r
464     }\r
465     \r
466     //just returned from the counting iteration\r
467     if(state->count_closed == 1){\r
468         state->count_closed = 0;\r
469         //      cout << "number of closed flows: " << state->count_closed_flows << "\n";\r
470         state->count_closed_flows = 0;\r
471         //initialize gcount since was used for sampling of new closed flows during counting phase\r
472         state->gcount = 0;\r
473     }\r
474     \r
475     return state->do_clean;\r
476 };\r
477 \r
478 double ssthreshold(void *s, int curr_num_samples){\r
479     struct SSstate *state = (struct SSstate *)s;\r
480     \r
481     return state->z;\r
482 };\r
483 \r
484 int count_distinct(void *s, int curr_num_samples){\r
485     return curr_num_samples;\r
486 };\r
487 \r
488 int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){\r
489     struct SSstate *state  = (struct SSstate *)s;\r
490     \r
491     //cleaning condition\r
492     int sampled = 0;\r
493     double new_len = 0;\r
494     \r
495     if (glen < state->z_prev)\r
496         new_len = state->z_prev;\r
497     else\r
498         new_len = glen;\r
499     \r
500     if(new_len > state->z){\r
501         state->bcount++;\r
502         sampled = 1;\r
503     }\r
504     else{\r
505         state->gcount += new_len;\r
506         if(state->gcount >= state->z){\r
507             sampled = 1;\r
508             state->gcount -= state->z;\r
509         }\r
510         //else{\r
511         //state->scount--;\r
512         //}\r
513     }\r
514     \r
515     return sampled;\r
516 };\r
517 \r
518 int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){\r
519     struct SSstate *state  = (struct SSstate *)s;\r
520     \r
521     //cleaning condition\r
522     int sampled = 0;\r
523     double new_len = 0;\r
524     int new_group = 0;\r
525     \r
526     //need to count closed flows\r
527     if(state->count_closed == 1){\r
528         //the flow is closed\r
529         if((ccondition == 1)||((state->time-maxtime)>=delay)){\r
530             state->count_closed_flows++;\r
531             // TF case or TT case for ccondition and delay\r
532             if(ccondition == 1){\r
533                 if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows\r
534                     new_group = 1;\r
535             }\r
536             //FT case for ccondition and delay\r
537             else{\r
538                 if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows\r
539                     new_group = 1;\r
540             }\r
541         }\r
542         \r
543         // if flow is closed but old, no need to subsample it\r
544         if (new_group == 0)\r
545             return true;\r
546         \r
547     }\r
548     \r
549     // clean only closed flows FF case\r
550     // the flow is still open\r
551     if((ccondition == 0)&&((state->time-maxtime)<delay)){\r
552         return true;\r
553     }\r
554     \r
555     // use glen for a new group and z_prev for old one\r
556     if(new_group == 0){\r
557         if (glen < state->z_prev)\r
558             new_len = state->z_prev;\r
559         else\r
560             new_len = glen;\r
561     }\r
562     //the group is new\r
563     else{\r
564         new_len = glen;\r
565     }\r
566     \r
567     // at this point either flow is closed and old and we are at the cleaning phase\r
568     // or flow is closed and new and we are at the counting phase\r
569     if(new_len > state->z){\r
570         state->bcount++;\r
571         sampled = 1;\r
572     }\r
573     else{\r
574         state->gcount += new_len;\r
575         if(state->gcount >= state->z){\r
576             sampled = 1;\r
577             state->gcount -= state->z;\r
578         }\r
579         //new_group is not sampled during counting phase\r
580         else{\r
581             if(state->count_closed == 1)\r
582                 state->count_closed_flows--;\r
583         }\r
584         \r
585     }\r
586     \r
587     if(state->do_clean == 1)\r
588         state->how_many_cleaned += sampled;\r
589     \r
590     return sampled;\r
591 };\r
592 \r
593 int packet_count(void *s, int curr_num_samples){\r
594     struct SSstate *state = (struct SSstate *)s;  \r
595     return state->packet_count;\r
596 };\r
597 \r
598 double gamma(void *s, int curr_num_samples){\r
599     struct SSstate *state = (struct SSstate *)s;  \r
600     return state->gamma;\r
601 };\r
602 \r
603 int do_clean_count(void *s, int curr_num_samples){\r
604     struct SSstate *state = (struct SSstate *)s;  \r
605     return state->do_clean_count;\r
606 };\r
607 \r
608 int delay(void *s, int curr_num_samples){\r
609     struct SSstate *state = (struct SSstate *)s;  \r
610     return state->delay;\r
611 };\r
612 \r
613 // number of groups which are newly closed (in the most recent\r
614 // counting phase or not closed at all during the final\r
615 // cleaning phase\r
616 int newly_closed(void *s, int curr_num_samples){\r
617     struct SSstate *state = (struct SSstate *)s;  \r
618     return state->closing_now;\r
619 };\r
620 \r
621 \r
622 \r
623 \r