1 /* ------------------------------------------------
\r
2 Copyright 2014 AT&T Intellectual Property
\r
3 Licensed under the Apache License, Version 2.0 (the "License");
\r
4 you may not use this file except in compliance with the License.
\r
5 You may obtain a copy of the License at
\r
7 http://www.apache.org/licenses/LICENSE-2.0
\r
9 Unless required by applicable law or agreed to in writing, software
\r
10 distributed under the License is distributed on an "AS IS" BASIS,
\r
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 See the License for the specific language governing permissions and
\r
13 limitations under the License.
\r
14 ------------------------------------------- */
\r
15 ///////////////////////////////////////////////
\r
16 //// State, functions for adaptive smart sampling
\r
17 /// written by Irina Rozenbaum
\r
20 #define max(x,y) ((x)>(y)?(x):(y))
\r
21 #define _BURST_SOOTHING_FACTOR 10.0
\r
24 //unsigned long long int count; // count to sample small packets with certain probability
\r
26 double gcount; // count for clean_with() function
\r
27 double fcount; // count for final_clean() function
\r
28 double z; //z is the threshold for a size of the packet
\r
29 double z_prev; // value of threshold from the previos cleaning phase
\r
30 double gamma; //tolerance parameter for emergency control over the
\r
31 //number of samples, should be >= 1
\r
32 int do_clean; // trigger cleaning phase
\r
33 int bcount; //count for number of packets that exceed threshold ,
\r
34 //need it for one of the variations of algorithms for
\r
35 //threshold adjustment
\r
36 int s_size; //need to remember sample size for _sfun_state_init()
\r
37 int final_z; //bool that indicated if z was adjusted to its filnal value
\r
38 //before the final clean
\r
39 int time; //timestamp from previously processed packet
\r
40 //used in detecting frequency of the clening phases with aggregation
\r
42 int packet_count; //count for number of packets per time window
\r
43 int do_clean_count; //count number of cleaning phases per time window
\r
44 bool do_sooth; //turn on _BURST_SOOTHING_FACTOR
\r
47 int count_closed; //bool indicates that counting phase is being triggered
\r
48 int count_closed_flows; //count for closed flows
\r
49 int count_notsampled_new; //count for the number of new groups that were not sampled during
\r
50 // the final sampling phase will be used in next time window to compute resonable
\r
51 // approximation for threshold before final sampling phase
\r
54 int delay; //interval within which there was no packets that belong to the current flow
\r
55 //thus flow is considered to be closed.
\r
56 //parameter specified by the query
\r
57 int closing_now; //count for groups that are closing at the final cleraning phase
\r
58 //or were closed during the most recent counting phase (newly closed)
\r
61 int how_many_cleaned;
\r
65 // the function is called once at the initial initialization of the state
\r
66 void _sfun_state_clean_init_smart_sampling_state(void *s){
\r
67 struct SSstate *state = (struct SSstate *)s;
\r
72 state->z = 200; //need to figure out good initial value for z
\r
73 // cout << "clean init Z: " << state->z << "\n";
\r
75 // cout << "clean init Z prev: " << state->z << "\n";
\r
77 state->do_clean = 0;
\r
82 state->count_closed = 0;
\r
83 state->count_closed_flows = 0;
\r
84 state->count_notsampled_new = 0;
\r
86 state->packet_count = 0;
\r
87 state->do_clean_count = 0;
\r
88 state->do_sooth = true;
\r
90 state->how_many_cleaned = 0;
\r
92 state->closing_now = 0;
\r
96 // the function will be called at the beginning of every time window
\r
97 void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){
\r
98 struct SSstate *state_new = (struct SSstate *)s_new;
\r
99 struct SSstate *state_old = (struct SSstate *)s_old;
\r
101 // cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";
\r
102 // cout << "***dirty current num of samples: " << curr_num_samples << "\n";
\r
103 // cout << "dirty init Z old: " << state_old->z << "\n";
\r
104 if(curr_num_samples < state_old->s_size){
\r
105 // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));
\r
107 //temp fix for early aggregation
\r
108 state_new->z = (state_old->z)/2;
\r
110 if(state_old->do_sooth)
\r
111 state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
\r
114 if(curr_num_samples >= state_old->s_size){
\r
116 state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);
\r
118 if(state_old->do_sooth)
\r
119 state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
\r
123 if(state_new->z <= 1.0)
\r
125 // cout << "dirty init Z new: " << state_new->z << "\n";
\r
127 state_new->gamma = state_old->gamma;
\r
128 state_new->do_clean = state_old->do_clean;
\r
129 state_new->s_size = state_old->s_size;
\r
130 state_new->bcount = 0;
\r
131 state_new->gcount = 0;
\r
132 state_new->count = 0;
\r
133 state_new->fcount = 0;
\r
134 state_new->final_z = 0;
\r
135 state_new->time = 0;
\r
136 state_new->count_closed = 0;
\r
137 state_new->count_closed_flows = 0;
\r
138 state_new->count_notsampled_new = state_old->count_notsampled_new;
\r
140 state_new->packet_count = 0;
\r
141 state_new->do_clean_count = 0;
\r
142 state_new->do_sooth = true;
\r
144 state_new->how_many_cleaned = 0;
\r
145 state_new->delay = 0;
\r
146 state_new->closing_now = 0;
\r
148 // cout << "dirty init gamma: " << state_new->gamma << "\n";
\r
149 // cout << "dirty init do_clean: " << state_new->do_clean << "\n";
\r
150 // cout << "dirty init s_size: " << state_new->s_size << "\n";
\r
151 // cout << "dirty init bcount: " << state_new->bcount << "\n";
\r
152 // cout << "dirty init gcount: " << state_new->gcount << "\n";
\r
153 // cout << "dirty init count: " << state_new->count << "\n";
\r
154 // cout << "dirty init fcount: " << state_new->fcount << "\n";
\r
155 // cout << "dirty init final_z: " << state_new->final_z << "\n";
\r
156 // cout << "dirty init time " << state_new->time << "\n";
\r
157 // cout << "dirty init count_closed: " << state_new->count_closed << "\n";
\r
158 // cout << "dirty init count_closed_flows: " << state_new->count_closed_flows << "\n";
\r
160 // cout << "dirty init packet count: " << state_new->packet_count << "\n";
\r
161 // cout << "dirty init do_clean_count: " << state_new->do_clean_count << "\n";
\r
165 // function is called in two cases:
\r
166 // adjustment of threshold before the cleaning phase is being triggered
\r
167 // adjustment of threshold at the window border for the old state, this new
\r
168 // threshold will be used by partial_flush() and flush() functions (final_clean())
\r
170 void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){
\r
171 struct SSstate *state = (struct SSstate *)s;
\r
173 // cout << "final init Z old: " << state->z << "\n";
\r
175 if(state->final_z == 0){
\r
177 // just returned from the counting phase
\r
178 // case when the time window is changed right after the counting phase
\r
179 if(state->count_closed == 1){
\r
180 state->count_closed = 0;
\r
181 state->count_closed_flows = 0;
\r
182 state->how_many_cleaned = 0;
\r
185 state->z_prev = state->z;
\r
186 // cout << "final current num of samples: " << curr_num_samples << "\n";
\r
187 // cout << "final count not sampled new: " << state->count_notsampled_new << "\n";
\r
188 // adjust count for current number of sampled based on statistics
\r
189 // gathered in the previous time window
\r
190 // otherwise we overestimate it
\r
191 if(state->count_notsampled_new != 0)
\r
192 curr_num_samples -= state->count_notsampled_new;
\r
194 if(curr_num_samples < state->s_size){
\r
195 //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));
\r
197 //temp fix for early aggregation
\r
198 state->z = state->z/2;
\r
201 if(curr_num_samples >= state->s_size){
\r
202 state->z = state->z*((double)curr_num_samples/(double)state->s_size);
\r
208 // cout << "final init Z new: " << state->z << "\n";
\r
211 state->final_z = 1;
\r
212 state->do_clean_count++;
\r
213 state->count_notsampled_new = 0;
\r
217 // cout << "final init gamma: " << state->gamma << "\n";
\r
218 // cout << "final init do_clean: " << state->do_clean << "\n";
\r
219 // cout << "final init s_size: " << state->s_size << "\n";
\r
220 // cout << "final init bcount: " << state->bcount << "\n";
\r
221 // cout << "final init gcount: " << state->gcount << "\n";
\r
222 // cout << "final init count: " << state->count << "\n";
\r
223 // cout << "final init fcount: " << state->fcount << "\n";
\r
224 // cout << "final init final_z: " << state->final_z << "\n";
\r
225 // cout << "final init time " << state->time << "\n";
\r
226 // cout << "final init count_closed: " << state->count_closed << "\n";
\r
227 // cout << "final init count_closed_flows: " << state->count_closed_flows << "\n";
\r
229 // cout << "final init packet count: " << state->packet_count << "\n";
\r
230 // cout << "final init do_clean_count: " << state->do_clean_count << "\n";
\r
234 int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){
\r
235 struct SSstate *state = (struct SSstate *)s;
\r
237 state->packet_count++;
\r
239 state->s_size = sample_size;
\r
242 //just returned from the cleaning phase
\r
243 if(state->do_clean == 1){
\r
245 state->do_clean = 0;
\r
249 if(len > state->z){
\r
254 state->count += len;
\r
255 if(state->count >= state->z){
\r
257 state->count -= state->z;
\r
264 int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){
\r
265 struct SSstate *state = (struct SSstate *)s;
\r
267 state->packet_count++;
\r
269 state->s_size = sample_size;
\r
272 //just returned from the counting phase
\r
273 if(state->count_closed == 1)
\r
274 // cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";
\r
276 //just returned from the cleaning phase
\r
277 if(state->do_clean == 1){
\r
278 // cout << "flows were sampled: " << state->how_many_cleaned << "\n";
\r
279 state->how_many_cleaned = 0;
\r
281 state->do_clean = 0;
\r
289 int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){
\r
290 struct SSstate *state = (struct SSstate *)s;
\r
292 state->do_sooth = true;
\r
294 // for ssample() where just returned from the clening phase
\r
295 state->do_clean = 1;
\r
298 double new_len = 0;
\r
300 if (glen < state->z_prev)
\r
301 new_len = state->z_prev;
\r
306 if(curr_num_samples <= state->s_size){
\r
310 if(new_len > state->z){
\r
315 state->fcount += new_len;
\r
316 if(state->fcount >= state->z){
\r
318 state->fcount -= state->z;
\r
330 int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
\r
331 struct SSstate *state = (struct SSstate *)s;
\r
333 state->do_sooth = true;
\r
334 // only for reporting
\r
335 state->delay = delay;
\r
337 //for ssample() where just returned from the clening phase
\r
338 state->do_clean = 1;
\r
341 double new_len = 0;
\r
342 // group is newly closed or not closed yet
\r
345 // the flow is closed
\r
346 if((ccondition == 1)||((state->time-maxtime)>=delay)){
\r
350 // TF case or TT case for ccondition and delay
\r
351 if(ccondition == 1){
\r
352 if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows
\r
353 // the flow was closed in the previous counting slot
\r
357 //FT case for ccondition and delay
\r
359 if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows
\r
360 // the flow was closed in the previous counting slot
\r
364 // the flow was closed earlier than previous counting slot
\r
365 // it is closed and old and new_group = 0
\r
368 //adjust size only for old closed groups
\r
369 if(new_group == 0){
\r
370 if (glen < state->z_prev)
\r
371 new_len = state->z_prev;
\r
375 // newly closed group
\r
377 state->closing_now++;
\r
383 if(curr_num_samples <= state->s_size){
\r
384 state->count_notsampled_new = 0;
\r
388 if(new_len > state->z){
\r
393 state->fcount += new_len;
\r
394 if(state->fcount >= state->z){
\r
396 state->fcount -= state->z;
\r
398 //count number of not sampled newly closed groups
\r
399 //will use in the next time window at the final cleaning phase
\r
402 state->count_notsampled_new++;
\r
407 state->how_many_cleaned += sampled;
\r
415 int ssdo_clean(void *s, int curr_num_samples){
\r
416 struct SSstate *state = (struct SSstate *)s;
\r
418 //emergency control
\r
419 //bcount will be different after the cleaning phase
\r
420 if(curr_num_samples > (state->gamma*state->s_size)){
\r
421 state->z_prev = state->z;
\r
422 state->z=(double)state->gamma*state->z;
\r
423 state->do_clean = 1;
\r
428 state->do_clean_count++;
\r
431 return state->do_clean;
\r
434 int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){
\r
435 struct SSstate *state = (struct SSstate *)s;
\r
437 // initialize timestamp with first seen packet from the current timewindow
\r
438 if(state->time == 0)
\r
439 state->time = maxtime;
\r
441 // detect next counting time slot
\r
442 if(state->time != maxtime){
\r
443 // cout << "need to count\n";
\r
444 state->time = maxtime;
\r
445 state->count_closed = 1;
\r
449 //emergency control
\r
450 //bcount will be different after the cleaning phase
\r
451 //if(curr_num_samples > (state->gamma*state->s_size)){
\r
452 if(state->count_closed_flows > (state->gamma*state->s_size)){
\r
453 // cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";
\r
454 state->z_prev = state->z;
\r
455 // cout << "do clean Z old: " << state->z << "\n";
\r
456 state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;
\r
457 // cout << "do clean Z new: " << state->z << "\n";
\r
458 state->do_clean = 1;
\r
463 state->do_clean_count++;
\r
466 //just returned from the counting iteration
\r
467 if(state->count_closed == 1){
\r
468 state->count_closed = 0;
\r
469 // cout << "number of closed flows: " << state->count_closed_flows << "\n";
\r
470 state->count_closed_flows = 0;
\r
471 //initialize gcount since was used for sampling of new closed flows during counting phase
\r
475 return state->do_clean;
\r
478 double ssthreshold(void *s, int curr_num_samples){
\r
479 struct SSstate *state = (struct SSstate *)s;
\r
484 int count_distinct(void *s, int curr_num_samples){
\r
485 return curr_num_samples;
\r
488 int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){
\r
489 struct SSstate *state = (struct SSstate *)s;
\r
491 //cleaning condition
\r
493 double new_len = 0;
\r
495 if (glen < state->z_prev)
\r
496 new_len = state->z_prev;
\r
500 if(new_len > state->z){
\r
505 state->gcount += new_len;
\r
506 if(state->gcount >= state->z){
\r
508 state->gcount -= state->z;
\r
518 int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
\r
519 struct SSstate *state = (struct SSstate *)s;
\r
521 //cleaning condition
\r
523 double new_len = 0;
\r
526 //need to count closed flows
\r
527 if(state->count_closed == 1){
\r
528 //the flow is closed
\r
529 if((ccondition == 1)||((state->time-maxtime)>=delay)){
\r
530 state->count_closed_flows++;
\r
531 // TF case or TT case for ccondition and delay
\r
532 if(ccondition == 1){
\r
533 if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows
\r
536 //FT case for ccondition and delay
\r
538 if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows
\r
543 // if flow is closed but old, no need to subsample it
\r
544 if (new_group == 0)
\r
549 // clean only closed flows FF case
\r
550 // the flow is still open
\r
551 if((ccondition == 0)&&((state->time-maxtime)<delay)){
\r
555 // use glen for a new group and z_prev for old one
\r
556 if(new_group == 0){
\r
557 if (glen < state->z_prev)
\r
558 new_len = state->z_prev;
\r
567 // at this point either flow is closed and old and we are at the cleaning phase
\r
568 // or flow is closed and new and we are at the counting phase
\r
569 if(new_len > state->z){
\r
574 state->gcount += new_len;
\r
575 if(state->gcount >= state->z){
\r
577 state->gcount -= state->z;
\r
579 //new_group is not sampled during counting phase
\r
581 if(state->count_closed == 1)
\r
582 state->count_closed_flows--;
\r
587 if(state->do_clean == 1)
\r
588 state->how_many_cleaned += sampled;
\r
593 int packet_count(void *s, int curr_num_samples){
\r
594 struct SSstate *state = (struct SSstate *)s;
\r
595 return state->packet_count;
\r
598 double gamma(void *s, int curr_num_samples){
\r
599 struct SSstate *state = (struct SSstate *)s;
\r
600 return state->gamma;
\r
603 int do_clean_count(void *s, int curr_num_samples){
\r
604 struct SSstate *state = (struct SSstate *)s;
\r
605 return state->do_clean_count;
\r
608 int delay(void *s, int curr_num_samples){
\r
609 struct SSstate *state = (struct SSstate *)s;
\r
610 return state->delay;
\r
613 // number of groups which are newly closed (in the most recent
\r
614 // counting phase or not closed at all during the final
\r
616 int newly_closed(void *s, int curr_num_samples){
\r
617 struct SSstate *state = (struct SSstate *)s;
\r
618 return state->closing_now;
\r