1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
15 ///////////////////////////////////////////////
16 //// State, functions for adaptive smart sampling
17 /// written by Irina Rozenbaum
20 #define max(x,y) ((x)>(y)?(x):(y))
21 #define _BURST_SOOTHING_FACTOR 10.0
24 //unsigned long long int count; // count to sample small packets with certain probability
26 double gcount; // count for clean_with() function
27 double fcount; // count for final_clean() function
28 double z; //z is the threshold for a size of the packet
29 double z_prev; // value of threshold from the previos cleaning phase
30 double gamma; //tolerance parameter for emergency control over the
31 //number of samples, should be >= 1
32 int do_clean; // trigger cleaning phase
33 int bcount; //count for number of packets that exceed threshold ,
34 //need it for one of the variations of algorithms for
35 //threshold adjustment
36 int s_size; //need to remember sample size for _sfun_state_init()
37 int final_z; //bool that indicated if z was adjusted to its filnal value
38 //before the final clean
39 int time; //timestamp from previously processed packet
40 //used in detecting frequency of the clening phases with aggregation
42 int packet_count; //count for number of packets per time window
43 int do_clean_count; //count number of cleaning phases per time window
44 bool do_sooth; //turn on _BURST_SOOTHING_FACTOR
47 int count_closed; //bool indicates that counting phase is being triggered
48 int count_closed_flows; //count for closed flows
49 int count_notsampled_new; //count for the number of new groups that were not sampled during
50 // the final sampling phase will be used in next time window to compute resonable
51 // approximation for threshold before final sampling phase
54 int delay; //interval within which there was no packets that belong to the current flow
55 //thus flow is considered to be closed.
56 //parameter specified by the query
57 int closing_now; //count for groups that are closing at the final cleraning phase
58 //or were closed during the most recent counting phase (newly closed)
65 // the function is called once at the initial initialization of the state
66 void _sfun_state_clean_init_smart_sampling_state(void *s){
67 struct SSstate *state = (struct SSstate *)s;
72 state->z = 200; //need to figure out good initial value for z
73 // cout << "clean init Z: " << state->z << "\n";
75 // cout << "clean init Z prev: " << state->z << "\n";
82 state->count_closed = 0;
83 state->count_closed_flows = 0;
84 state->count_notsampled_new = 0;
86 state->packet_count = 0;
87 state->do_clean_count = 0;
88 state->do_sooth = true;
90 state->how_many_cleaned = 0;
92 state->closing_now = 0;
96 // the function will be called at the beginning of every time window
97 void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){
98 struct SSstate *state_new = (struct SSstate *)s_new;
99 struct SSstate *state_old = (struct SSstate *)s_old;
101 // cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";
102 // cout << "***dirty current num of samples: " << curr_num_samples << "\n";
103 // cout << "dirty init Z old: " << state_old->z << "\n";
104 if(curr_num_samples < state_old->s_size){
105 // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));
107 //temp fix for early aggregation
108 state_new->z = (state_old->z)/2;
110 if(state_old->do_sooth)
111 state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
114 if(curr_num_samples >= state_old->s_size){
116 state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);
118 if(state_old->do_sooth)
119 state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
123 if(state_new->z <= 1.0)
125 // cout << "dirty init Z new: " << state_new->z << "\n";
127 state_new->gamma = state_old->gamma;
128 state_new->do_clean = state_old->do_clean;
129 state_new->s_size = state_old->s_size;
130 state_new->bcount = 0;
131 state_new->gcount = 0;
132 state_new->count = 0;
133 state_new->fcount = 0;
134 state_new->final_z = 0;
136 state_new->count_closed = 0;
137 state_new->count_closed_flows = 0;
138 state_new->count_notsampled_new = state_old->count_notsampled_new;
140 state_new->packet_count = 0;
141 state_new->do_clean_count = 0;
142 state_new->do_sooth = true;
144 state_new->how_many_cleaned = 0;
145 state_new->delay = 0;
146 state_new->closing_now = 0;
148 // cout << "dirty init gamma: " << state_new->gamma << "\n";
149 // cout << "dirty init do_clean: " << state_new->do_clean << "\n";
150 // cout << "dirty init s_size: " << state_new->s_size << "\n";
151 // cout << "dirty init bcount: " << state_new->bcount << "\n";
152 // cout << "dirty init gcount: " << state_new->gcount << "\n";
153 // cout << "dirty init count: " << state_new->count << "\n";
154 // cout << "dirty init fcount: " << state_new->fcount << "\n";
155 // cout << "dirty init final_z: " << state_new->final_z << "\n";
156 // cout << "dirty init time " << state_new->time << "\n";
157 // cout << "dirty init count_closed: " << state_new->count_closed << "\n";
158 // cout << "dirty init count_closed_flows: " << state_new->count_closed_flows << "\n";
160 // cout << "dirty init packet count: " << state_new->packet_count << "\n";
161 // cout << "dirty init do_clean_count: " << state_new->do_clean_count << "\n";
165 // function is called in two cases:
166 // adjustment of threshold before the cleaning phase is being triggered
167 // adjustment of threshold at the window border for the old state, this new
168 // threshold will be used by partial_flush() and flush() functions (final_clean())
170 void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){
171 struct SSstate *state = (struct SSstate *)s;
173 // cout << "final init Z old: " << state->z << "\n";
175 if(state->final_z == 0){
177 // just returned from the counting phase
178 // case when the time window is changed right after the counting phase
179 if(state->count_closed == 1){
180 state->count_closed = 0;
181 state->count_closed_flows = 0;
182 state->how_many_cleaned = 0;
185 state->z_prev = state->z;
186 // cout << "final current num of samples: " << curr_num_samples << "\n";
187 // cout << "final count not sampled new: " << state->count_notsampled_new << "\n";
188 // adjust count for current number of sampled based on statistics
189 // gathered in the previous time window
190 // otherwise we overestimate it
191 if(state->count_notsampled_new != 0)
192 curr_num_samples -= state->count_notsampled_new;
194 if(curr_num_samples < state->s_size){
195 //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));
197 //temp fix for early aggregation
198 state->z = state->z/2;
201 if(curr_num_samples >= state->s_size){
202 state->z = state->z*((double)curr_num_samples/(double)state->s_size);
208 // cout << "final init Z new: " << state->z << "\n";
212 state->do_clean_count++;
213 state->count_notsampled_new = 0;
217 // cout << "final init gamma: " << state->gamma << "\n";
218 // cout << "final init do_clean: " << state->do_clean << "\n";
219 // cout << "final init s_size: " << state->s_size << "\n";
220 // cout << "final init bcount: " << state->bcount << "\n";
221 // cout << "final init gcount: " << state->gcount << "\n";
222 // cout << "final init count: " << state->count << "\n";
223 // cout << "final init fcount: " << state->fcount << "\n";
224 // cout << "final init final_z: " << state->final_z << "\n";
225 // cout << "final init time " << state->time << "\n";
226 // cout << "final init count_closed: " << state->count_closed << "\n";
227 // cout << "final init count_closed_flows: " << state->count_closed_flows << "\n";
229 // cout << "final init packet count: " << state->packet_count << "\n";
230 // cout << "final init do_clean_count: " << state->do_clean_count << "\n";
234 int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){
235 struct SSstate *state = (struct SSstate *)s;
237 state->packet_count++;
239 state->s_size = sample_size;
242 //just returned from the cleaning phase
243 if(state->do_clean == 1){
255 if(state->count >= state->z){
257 state->count -= state->z;
264 int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){
265 struct SSstate *state = (struct SSstate *)s;
267 state->packet_count++;
269 state->s_size = sample_size;
272 //just returned from the counting phase
273 if(state->count_closed == 1)
274 // cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";
276 //just returned from the cleaning phase
277 if(state->do_clean == 1){
278 // cout << "flows were sampled: " << state->how_many_cleaned << "\n";
279 state->how_many_cleaned = 0;
289 int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){
290 struct SSstate *state = (struct SSstate *)s;
292 state->do_sooth = true;
294 // for ssample() where just returned from the clening phase
300 if (glen < state->z_prev)
301 new_len = state->z_prev;
306 if(curr_num_samples <= state->s_size){
310 if(new_len > state->z){
315 state->fcount += new_len;
316 if(state->fcount >= state->z){
318 state->fcount -= state->z;
330 int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
331 struct SSstate *state = (struct SSstate *)s;
333 state->do_sooth = true;
334 // only for reporting
335 state->delay = delay;
337 //for ssample() where just returned from the clening phase
342 // group is newly closed or not closed yet
345 // the flow is closed
346 if((ccondition == 1)||((state->time-maxtime)>=delay)){
350 // TF case or TT case for ccondition and delay
352 if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows
353 // the flow was closed in the previous counting slot
357 //FT case for ccondition and delay
359 if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows
360 // the flow was closed in the previous counting slot
364 // the flow was closed earlier than previous counting slot
365 // it is closed and old and new_group = 0
368 //adjust size only for old closed groups
370 if (glen < state->z_prev)
371 new_len = state->z_prev;
375 // newly closed group
377 state->closing_now++;
383 if(curr_num_samples <= state->s_size){
384 state->count_notsampled_new = 0;
388 if(new_len > state->z){
393 state->fcount += new_len;
394 if(state->fcount >= state->z){
396 state->fcount -= state->z;
398 //count number of not sampled newly closed groups
399 //will use in the next time window at the final cleaning phase
402 state->count_notsampled_new++;
407 state->how_many_cleaned += sampled;
415 int ssdo_clean(void *s, int curr_num_samples){
416 struct SSstate *state = (struct SSstate *)s;
419 //bcount will be different after the cleaning phase
420 if(curr_num_samples > (state->gamma*state->s_size)){
421 state->z_prev = state->z;
422 state->z=(double)state->gamma*state->z;
428 state->do_clean_count++;
431 return state->do_clean;
434 int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){
435 struct SSstate *state = (struct SSstate *)s;
437 // initialize timestamp with first seen packet from the current timewindow
439 state->time = maxtime;
441 // detect next counting time slot
442 if(state->time != maxtime){
443 // cout << "need to count\n";
444 state->time = maxtime;
445 state->count_closed = 1;
450 //bcount will be different after the cleaning phase
451 //if(curr_num_samples > (state->gamma*state->s_size)){
452 if(state->count_closed_flows > (state->gamma*state->s_size)){
453 // cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";
454 state->z_prev = state->z;
455 // cout << "do clean Z old: " << state->z << "\n";
456 state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;
457 // cout << "do clean Z new: " << state->z << "\n";
463 state->do_clean_count++;
466 //just returned from the counting iteration
467 if(state->count_closed == 1){
468 state->count_closed = 0;
469 // cout << "number of closed flows: " << state->count_closed_flows << "\n";
470 state->count_closed_flows = 0;
471 //initialize gcount since was used for sampling of new closed flows during counting phase
475 return state->do_clean;
478 double ssthreshold(void *s, int curr_num_samples){
479 struct SSstate *state = (struct SSstate *)s;
484 int count_distinct(void *s, int curr_num_samples){
485 return curr_num_samples;
488 int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){
489 struct SSstate *state = (struct SSstate *)s;
495 if (glen < state->z_prev)
496 new_len = state->z_prev;
500 if(new_len > state->z){
505 state->gcount += new_len;
506 if(state->gcount >= state->z){
508 state->gcount -= state->z;
518 int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
519 struct SSstate *state = (struct SSstate *)s;
526 //need to count closed flows
527 if(state->count_closed == 1){
529 if((ccondition == 1)||((state->time-maxtime)>=delay)){
530 state->count_closed_flows++;
531 // TF case or TT case for ccondition and delay
533 if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows
536 //FT case for ccondition and delay
538 if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows
543 // if flow is closed but old, no need to subsample it
549 // clean only closed flows FF case
550 // the flow is still open
551 if((ccondition == 0)&&((state->time-maxtime)<delay)){
555 // use glen for a new group and z_prev for old one
557 if (glen < state->z_prev)
558 new_len = state->z_prev;
567 // at this point either flow is closed and old and we are at the cleaning phase
568 // or flow is closed and new and we are at the counting phase
569 if(new_len > state->z){
574 state->gcount += new_len;
575 if(state->gcount >= state->z){
577 state->gcount -= state->z;
579 //new_group is not sampled during counting phase
581 if(state->count_closed == 1)
582 state->count_closed_flows--;
587 if(state->do_clean == 1)
588 state->how_many_cleaned += sampled;
593 int packet_count(void *s, int curr_num_samples){
594 struct SSstate *state = (struct SSstate *)s;
595 return state->packet_count;
598 double gamma(void *s, int curr_num_samples){
599 struct SSstate *state = (struct SSstate *)s;
603 int do_clean_count(void *s, int curr_num_samples){
604 struct SSstate *state = (struct SSstate *)s;
605 return state->do_clean_count;
608 int delay(void *s, int curr_num_samples){
609 struct SSstate *state = (struct SSstate *)s;
613 // number of groups which are newly closed (in the most recent
614 // counting phase or not closed at all during the final
616 int newly_closed(void *s, int curr_num_samples){
617 struct SSstate *state = (struct SSstate *)s;
618 return state->closing_now;