Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscphftaaux / SSstateful_count_distinct.cc
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 ///////////////////////////////////////////////
16 ////    State, functions for adaptive smart sampling
17 ///             written by Irina Rozenbaum
18
19 #include <iostream>
20 #define max(x,y) ((x)>(y)?(x):(y))
21 #define _BURST_SOOTHING_FACTOR 10.0
22
23 struct SSstate {
24     //unsigned long long int count;   // count to sample small packets with certain probability
25     double count;
26     double gcount;  // count for clean_with() function
27     double fcount;  // count for final_clean() function
28     double z;       //z is the threshold for a size of the packet
29     double z_prev;  // value of threshold from the previos cleaning phase
30     double gamma;   //tolerance parameter for emergency control over the
31     //number of samples, should be >= 1
32     int do_clean; // trigger cleaning phase
33     int bcount;  //count for number of packets that exceed threshold ,
34     //need it for one of the variations of algorithms for
35     //threshold adjustment
36     int s_size;  //need to remember sample size for _sfun_state_init()
37     int final_z; //bool that indicated if z was adjusted to its filnal value
38     //before the final clean
39     int time;    //timestamp from previously processed packet
40     //used in detecting frequency of the clening phases with aggregation
41     
42     int packet_count; //count for number of packets per time window
43     int do_clean_count; //count number of cleaning phases per time window
44     bool do_sooth; //turn on _BURST_SOOTHING_FACTOR
45     
46     // flow sampling
47     int count_closed; //bool indicates that counting phase is being triggered
48     int count_closed_flows; //count for closed flows
49     int count_notsampled_new; //count for the number of new groups that were not sampled during
50     // the final sampling phase will be used in next time window to compute resonable
51     // approximation for threshold before final sampling phase
52     
53     // for reporting
54     int delay; //interval within which there was no packets that belong to the current flow
55     //thus flow is considered to be closed.
56     //parameter specified by the query
57     int closing_now; //count for groups that are closing at the final cleraning phase
58     //or were closed during the most recent counting phase (newly closed)
59     
60     //for debugging
61     int how_many_cleaned;
62 };
63
64
65 // the function is called once at the initial initialization of the state
66 void _sfun_state_clean_init_smart_sampling_state(void *s){
67     struct SSstate *state  = (struct SSstate *)s;
68     
69     state->count = 0;
70     state->gcount = 0;
71     state->fcount = 0;
72     state->z = 200; //need to figure out good initial value for z
73     //    cout << "clean init Z: " <<  state->z << "\n";
74     state->z_prev = 0;
75     //    cout << "clean init Z prev: " <<  state->z << "\n";
76     state->gamma = 2;
77     state->do_clean = 0;
78     state->bcount = 0;
79     state->s_size = 0;
80     state->final_z = 0;
81     state->time = 0;
82     state->count_closed = 0;
83     state->count_closed_flows = 0;
84     state->count_notsampled_new = 0;
85     
86     state->packet_count = 0;
87     state->do_clean_count = 0;
88     state->do_sooth = true;
89     
90     state->how_many_cleaned = 0;
91     state->delay = 0;
92     state->closing_now = 0;
93 };
94
95
96 // the function will be called at the beginning of every time window
97 void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){
98     struct SSstate *state_new  = (struct SSstate *)s_new;
99     struct SSstate *state_old  = (struct SSstate *)s_old;
100     
101     //    cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";
102     //    cout << "***dirty current num of samples: " << curr_num_samples << "\n";
103     //    cout << "dirty init Z old: " <<  state_old->z << "\n";
104     if(curr_num_samples < state_old->s_size){
105         // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));
106         
107         //temp fix for early aggregation
108         state_new->z = (state_old->z)/2;
109         
110         if(state_old->do_sooth)
111             state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
112     }
113     else {
114         if(curr_num_samples >= state_old->s_size){
115             //cout << "yes\n";
116             state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);
117             
118             if(state_old->do_sooth)
119                 state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
120         }
121     }
122     
123     if(state_new->z <= 1.0)
124         state_new->z = 1;
125     //    cout << "dirty init Z new: " <<  state_new->z << "\n";
126     
127     state_new->gamma = state_old->gamma;
128     state_new->do_clean = state_old->do_clean;
129     state_new->s_size = state_old->s_size;
130     state_new->bcount = 0;
131     state_new->gcount = 0;
132     state_new->count = 0;
133     state_new->fcount = 0;
134     state_new->final_z = 0;
135     state_new->time = 0;
136     state_new->count_closed = 0;
137     state_new->count_closed_flows = 0;
138     state_new->count_notsampled_new = state_old->count_notsampled_new;
139     
140     state_new->packet_count = 0;
141     state_new->do_clean_count = 0;
142     state_new->do_sooth = true;
143     
144     state_new->how_many_cleaned = 0;
145     state_new->delay = 0;
146     state_new->closing_now = 0;
147     
148     //  cout << "dirty init gamma: " <<   state_new->gamma << "\n";
149     //     cout << "dirty init do_clean: " <<  state_new->do_clean << "\n";
150     //     cout << "dirty init s_size: " <<  state_new->s_size << "\n";
151     //     cout << "dirty init bcount: " <<  state_new->bcount << "\n";
152     //     cout << "dirty init gcount: " <<  state_new->gcount << "\n";
153     //     cout << "dirty init count: " <<  state_new->count << "\n";
154     //     cout << "dirty init fcount: " <<  state_new->fcount << "\n";
155     //     cout << "dirty init final_z: " <<  state_new->final_z << "\n";
156     //     cout << "dirty init time " <<  state_new->time  << "\n";
157     //     cout << "dirty init count_closed: " <<  state_new->count_closed << "\n";
158     //     cout << "dirty init count_closed_flows: " <<  state_new->count_closed_flows << "\n";
159     
160     //     cout << "dirty init packet count: " <<  state_new->packet_count  << "\n";
161     //     cout << "dirty init do_clean_count: " <<  state_new->do_clean_count  << "\n";
162     
163 };
164
165 // function is called in two cases:
166 // adjustment of threshold before the cleaning phase is being triggered
167 // adjustment of threshold at the window border for the old state, this new
168 // threshold will be used by partial_flush() and flush() functions (final_clean())
169
170 void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){
171     struct SSstate *state  = (struct SSstate *)s;
172     
173     //  cout << "final init Z old: " <<  state->z << "\n";
174     
175     if(state->final_z == 0){
176         
177         // just returned from the counting phase
178         // case when the time window is changed right after the counting phase
179         if(state->count_closed == 1){
180             state->count_closed = 0;
181             state->count_closed_flows = 0;
182             state->how_many_cleaned = 0;
183         }
184         
185         state->z_prev = state->z;
186         //    cout << "final current num of samples: " << curr_num_samples << "\n";
187         //    cout << "final count not sampled new: " << state->count_notsampled_new << "\n";
188         // adjust count for current number of sampled based on statistics
189         // gathered in the previous time window
190         // otherwise we overestimate it
191         if(state->count_notsampled_new != 0)
192             curr_num_samples -= state->count_notsampled_new;
193         
194         if(curr_num_samples < state->s_size){
195             //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));
196             
197             //temp fix for early aggregation
198             state->z = state->z/2;
199         }
200         else {
201             if(curr_num_samples >= state->s_size){
202                 state->z = state->z*((double)curr_num_samples/(double)state->s_size);
203             }
204         }
205         
206         if(state->z <= 0)
207             state->z = 1;
208         //    cout << "final init Z new: " <<  state->z << "\n";
209         
210         state->bcount = 0;
211         state->final_z = 1;
212         state->do_clean_count++;
213         state->count_notsampled_new = 0;
214         
215     }
216     
217     //  cout << "final init gamma: " <<   state->gamma << "\n";
218     //     cout << "final init do_clean: " <<  state->do_clean << "\n";
219     //     cout << "final init s_size: " <<  state->s_size << "\n";
220     //     cout << "final init bcount: " <<  state->bcount << "\n";
221     //     cout << "final init gcount: " <<  state->gcount << "\n";
222     //     cout << "final init count: " <<  state->count << "\n";
223     //     cout << "final init fcount: " <<  state->fcount << "\n";
224     //     cout << "final init final_z: " <<  state->final_z << "\n";
225     //     cout << "final init time " <<  state->time  << "\n";
226     //     cout << "final init count_closed: " <<  state->count_closed << "\n";
227     //     cout << "final init count_closed_flows: " <<  state->count_closed_flows << "\n";
228     
229     //     cout << "final init packet count: " <<  state->packet_count  << "\n";
230     //     cout << "final init do_clean_count: " <<  state->do_clean_count  << "\n";
231     
232 };
233
234 int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){
235     struct SSstate *state  = (struct SSstate *)s;
236     
237     state->packet_count++;
238     
239     state->s_size = sample_size;
240     int sampled = 0;
241     
242     //just returned from the cleaning phase
243     if(state->do_clean == 1){
244         state->gcount = 0;
245         state->do_clean = 0;
246     }
247     
248     //sampling
249     if(len > state->z){
250         state->bcount++;
251         sampled=1;
252     }
253     else{
254         state->count += len;
255         if(state->count >= state->z){
256             sampled=1;
257             state->count -= state->z;
258         }
259     }
260     return sampled;
261     
262 };
263
264 int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){
265     struct SSstate *state  = (struct SSstate *)s;
266     
267     state->packet_count++;
268     
269     state->s_size = sample_size;
270     int sampled = 0;
271     
272     //just returned from the counting phase
273     if(state->count_closed == 1)
274         //      cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";
275         
276         //just returned from the cleaning phase
277         if(state->do_clean == 1){
278             //      cout << "flows were sampled: " << state->how_many_cleaned << "\n";
279             state->how_many_cleaned = 0;
280             state->gcount = 0;
281             state->do_clean = 0;
282         }
283     
284     return 1;
285     
286 };
287
288
289 int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){
290     struct SSstate *state  = (struct SSstate *)s;
291     
292     state->do_sooth = true;
293     
294     // for ssample() where just returned from the clening phase
295     state->do_clean = 1;
296     
297     int sampled = 0;
298     double new_len = 0;
299     
300     if (glen < state->z_prev)
301         new_len = state->z_prev;
302     else
303         new_len = glen;
304     
305     //no need to clean
306     if(curr_num_samples <= state->s_size){
307         return 1;
308     }
309     else{
310         if(new_len > state->z){
311             sampled = 1;
312             state->bcount++;
313         }
314         else{
315             state->fcount += new_len;
316             if(state->fcount >= state->z){
317                 sampled = 1;
318                 state->fcount -= state->z;
319             }
320             //else{
321             //state->scount--;
322             //}
323         }
324         
325         return sampled;
326     }
327     
328 };
329
330 int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
331     struct SSstate *state  = (struct SSstate *)s;
332     
333     state->do_sooth = true;
334     // only for reporting
335     state->delay = delay;
336     
337     //for ssample() where just returned from the clening phase
338     state->do_clean = 1;
339     
340     int sampled = 0;
341     double new_len = 0;
342     // group is newly closed or not closed yet
343     int new_group = 1;
344     
345     // the flow is closed
346     if((ccondition == 1)||((state->time-maxtime)>=delay)){
347         
348         new_group = 0;
349         
350         // TF case or TT case for ccondition and delay
351         if(ccondition == 1){
352             if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows
353                 // the flow was closed in the previous counting slot
354                 new_group = 1;
355             }
356         }
357         //FT case for ccondition and delay
358         else{
359             if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows
360                 // the flow was closed in the previous counting slot
361                 new_group = 1;
362             }
363         }
364         // the flow was closed earlier than previous counting slot
365         // it is closed and old and new_group = 0
366     }
367     
368     //adjust size only for old closed groups
369     if(new_group == 0){
370         if (glen < state->z_prev)
371             new_len = state->z_prev;
372         else
373             new_len = glen;
374     }
375     // newly closed group
376     else{
377         state->closing_now++;
378         new_len = glen;
379     }
380     
381     
382     //no need to clean
383     if(curr_num_samples <= state->s_size){
384         state->count_notsampled_new = 0;
385         return 1;
386     }
387     else{
388         if(new_len > state->z){
389             sampled = 1;
390             state->bcount++;
391         }
392         else{
393             state->fcount += new_len;
394             if(state->fcount >= state->z){
395                 sampled = 1;
396                 state->fcount -= state->z;
397             }
398             //count number of not sampled newly closed groups
399             //will use in the next time window at the final cleaning phase
400             else{
401                 if(new_group == 1)
402                     state->count_notsampled_new++;
403             }
404             
405         }
406         
407         state->how_many_cleaned += sampled;
408         
409         return sampled;
410     }
411     
412 };
413
414
415 int ssdo_clean(void *s, int curr_num_samples){
416     struct SSstate *state  = (struct SSstate *)s;
417     
418     //emergency control
419     //bcount will be different after the cleaning phase
420     if(curr_num_samples > (state->gamma*state->s_size)){
421         state->z_prev = state->z;
422         state->z=(double)state->gamma*state->z;
423         state->do_clean = 1;
424         state->bcount = 0;
425         state->count = 0;
426         state->gcount = 0;
427         
428         state->do_clean_count++;
429     }
430     
431     return state->do_clean;
432 };
433
434 int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){
435     struct SSstate *state  = (struct SSstate *)s;
436     
437     // initialize timestamp with first seen packet from the current timewindow
438     if(state->time == 0)
439         state->time = maxtime;
440     
441     // detect next counting time slot
442     if(state->time != maxtime){
443         //      cout << "need to count\n";
444         state->time = maxtime;
445         state->count_closed = 1;
446         return 1;
447     }
448     
449     //emergency control
450     //bcount will be different after the cleaning phase
451     //if(curr_num_samples > (state->gamma*state->s_size)){
452     if(state->count_closed_flows > (state->gamma*state->s_size)){
453         //      cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";
454         state->z_prev = state->z;
455         //      cout << "do clean Z old: " <<  state->z << "\n";
456         state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;
457         //      cout << "do clean Z new: " <<  state->z << "\n";
458         state->do_clean = 1;
459         state->bcount = 0;
460         state->count = 0;
461         state->gcount = 0;
462         
463         state->do_clean_count++;
464     }
465     
466     //just returned from the counting iteration
467     if(state->count_closed == 1){
468         state->count_closed = 0;
469         //      cout << "number of closed flows: " << state->count_closed_flows << "\n";
470         state->count_closed_flows = 0;
471         //initialize gcount since was used for sampling of new closed flows during counting phase
472         state->gcount = 0;
473     }
474     
475     return state->do_clean;
476 };
477
478 double ssthreshold(void *s, int curr_num_samples){
479     struct SSstate *state = (struct SSstate *)s;
480     
481     return state->z;
482 };
483
484 int count_distinct(void *s, int curr_num_samples){
485     return curr_num_samples;
486 };
487
488 int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){
489     struct SSstate *state  = (struct SSstate *)s;
490     
491     //cleaning condition
492     int sampled = 0;
493     double new_len = 0;
494     
495     if (glen < state->z_prev)
496         new_len = state->z_prev;
497     else
498         new_len = glen;
499     
500     if(new_len > state->z){
501         state->bcount++;
502         sampled = 1;
503     }
504     else{
505         state->gcount += new_len;
506         if(state->gcount >= state->z){
507             sampled = 1;
508             state->gcount -= state->z;
509         }
510         //else{
511         //state->scount--;
512         //}
513     }
514     
515     return sampled;
516 };
517
518 int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
519     struct SSstate *state  = (struct SSstate *)s;
520     
521     //cleaning condition
522     int sampled = 0;
523     double new_len = 0;
524     int new_group = 0;
525     
526     //need to count closed flows
527     if(state->count_closed == 1){
528         //the flow is closed
529         if((ccondition == 1)||((state->time-maxtime)>=delay)){
530             state->count_closed_flows++;
531             // TF case or TT case for ccondition and delay
532             if(ccondition == 1){
533                 if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows
534                     new_group = 1;
535             }
536             //FT case for ccondition and delay
537             else{
538                 if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows
539                     new_group = 1;
540             }
541         }
542         
543         // if flow is closed but old, no need to subsample it
544         if (new_group == 0)
545             return true;
546         
547     }
548     
549     // clean only closed flows FF case
550     // the flow is still open
551     if((ccondition == 0)&&((state->time-maxtime)<delay)){
552         return true;
553     }
554     
555     // use glen for a new group and z_prev for old one
556     if(new_group == 0){
557         if (glen < state->z_prev)
558             new_len = state->z_prev;
559         else
560             new_len = glen;
561     }
562     //the group is new
563     else{
564         new_len = glen;
565     }
566     
567     // at this point either flow is closed and old and we are at the cleaning phase
568     // or flow is closed and new and we are at the counting phase
569     if(new_len > state->z){
570         state->bcount++;
571         sampled = 1;
572     }
573     else{
574         state->gcount += new_len;
575         if(state->gcount >= state->z){
576             sampled = 1;
577             state->gcount -= state->z;
578         }
579         //new_group is not sampled during counting phase
580         else{
581             if(state->count_closed == 1)
582                 state->count_closed_flows--;
583         }
584         
585     }
586     
587     if(state->do_clean == 1)
588         state->how_many_cleaned += sampled;
589     
590     return sampled;
591 };
592
593 int packet_count(void *s, int curr_num_samples){
594     struct SSstate *state = (struct SSstate *)s;  
595     return state->packet_count;
596 };
597
598 double gamma(void *s, int curr_num_samples){
599     struct SSstate *state = (struct SSstate *)s;  
600     return state->gamma;
601 };
602
603 int do_clean_count(void *s, int curr_num_samples){
604     struct SSstate *state = (struct SSstate *)s;  
605     return state->do_clean_count;
606 };
607
608 int delay(void *s, int curr_num_samples){
609     struct SSstate *state = (struct SSstate *)s;  
610     return state->delay;
611 };
612
613 // number of groups which are newly closed (in the most recent
614 // counting phase or not closed at all during the final
615 // cleaning phase
616 int newly_closed(void *s, int curr_num_samples){
617     struct SSstate *state = (struct SSstate *)s;  
618     return state->closing_now;
619 };
620
621
622
623