-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
- \r
- http://www.apache.org/licenses/LICENSE-2.0\r
- \r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-///////////////////////////////////////////////\r
-//// State, functions for adaptive smart sampling\r
-/// written by Irina Rozenbaum\r
-\r
-#include <iostream>\r
-#define max(x,y) ((x)>(y)?(x):(y))\r
-#define _BURST_SOOTHING_FACTOR 10.0\r
-\r
-struct SSstate {\r
- //unsigned long long int count; // count to sample small packets with certain probability\r
- double count;\r
- double gcount; // count for clean_with() function\r
- double fcount; // count for final_clean() function\r
- double z; //z is the threshold for a size of the packet\r
- double z_prev; // value of threshold from the previos cleaning phase\r
- double gamma; //tolerance parameter for emergency control over the\r
- //number of samples, should be >= 1\r
- int do_clean; // trigger cleaning phase\r
- int bcount; //count for number of packets that exceed threshold ,\r
- //need it for one of the variations of algorithms for\r
- //threshold adjustment\r
- int s_size; //need to remember sample size for _sfun_state_init()\r
- int final_z; //bool that indicated if z was adjusted to its filnal value\r
- //before the final clean\r
- int time; //timestamp from previously processed packet\r
- //used in detecting frequency of the clening phases with aggregation\r
- \r
- int packet_count; //count for number of packets per time window\r
- int do_clean_count; //count number of cleaning phases per time window\r
- bool do_sooth; //turn on _BURST_SOOTHING_FACTOR\r
- \r
- // flow sampling\r
- int count_closed; //bool indicates that counting phase is being triggered\r
- int count_closed_flows; //count for closed flows\r
- int count_notsampled_new; //count for the number of new groups that were not sampled during\r
- // the final sampling phase will be used in next time window to compute resonable\r
- // approximation for threshold before final sampling phase\r
- \r
- // for reporting\r
- int delay; //interval within which there was no packets that belong to the current flow\r
- //thus flow is considered to be closed.\r
- //parameter specified by the query\r
- int closing_now; //count for groups that are closing at the final cleraning phase\r
- //or were closed during the most recent counting phase (newly closed)\r
- \r
- //for debugging\r
- int how_many_cleaned;\r
-};\r
-\r
-\r
-// the function is called once at the initial initialization of the state\r
-void _sfun_state_clean_init_smart_sampling_state(void *s){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- state->count = 0;\r
- state->gcount = 0;\r
- state->fcount = 0;\r
- state->z = 200; //need to figure out good initial value for z\r
- // cout << "clean init Z: " << state->z << "\n";\r
- state->z_prev = 0;\r
- // cout << "clean init Z prev: " << state->z << "\n";\r
- state->gamma = 2;\r
- state->do_clean = 0;\r
- state->bcount = 0;\r
- state->s_size = 0;\r
- state->final_z = 0;\r
- state->time = 0;\r
- state->count_closed = 0;\r
- state->count_closed_flows = 0;\r
- state->count_notsampled_new = 0;\r
- \r
- state->packet_count = 0;\r
- state->do_clean_count = 0;\r
- state->do_sooth = true;\r
- \r
- state->how_many_cleaned = 0;\r
- state->delay = 0;\r
- state->closing_now = 0;\r
-};\r
-\r
-\r
-// the function will be called at the beginning of every time window\r
-void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){\r
- struct SSstate *state_new = (struct SSstate *)s_new;\r
- struct SSstate *state_old = (struct SSstate *)s_old;\r
- \r
- // cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";\r
- // cout << "***dirty current num of samples: " << curr_num_samples << "\n";\r
- // cout << "dirty init Z old: " << state_old->z << "\n";\r
- if(curr_num_samples < state_old->s_size){\r
- // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));\r
- \r
- //temp fix for early aggregation\r
- state_new->z = (state_old->z)/2;\r
- \r
- if(state_old->do_sooth)\r
- state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;\r
- }\r
- else {\r
- if(curr_num_samples >= state_old->s_size){\r
- //cout << "yes\n";\r
- state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);\r
- \r
- if(state_old->do_sooth)\r
- state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;\r
- }\r
- }\r
- \r
- if(state_new->z <= 1.0)\r
- state_new->z = 1;\r
- // cout << "dirty init Z new: " << state_new->z << "\n";\r
- \r
- state_new->gamma = state_old->gamma;\r
- state_new->do_clean = state_old->do_clean;\r
- state_new->s_size = state_old->s_size;\r
- state_new->bcount = 0;\r
- state_new->gcount = 0;\r
- state_new->count = 0;\r
- state_new->fcount = 0;\r
- state_new->final_z = 0;\r
- state_new->time = 0;\r
- state_new->count_closed = 0;\r
- state_new->count_closed_flows = 0;\r
- state_new->count_notsampled_new = state_old->count_notsampled_new;\r
- \r
- state_new->packet_count = 0;\r
- state_new->do_clean_count = 0;\r
- state_new->do_sooth = true;\r
- \r
- state_new->how_many_cleaned = 0;\r
- state_new->delay = 0;\r
- state_new->closing_now = 0;\r
- \r
- // cout << "dirty init gamma: " << state_new->gamma << "\n";\r
- // cout << "dirty init do_clean: " << state_new->do_clean << "\n";\r
- // cout << "dirty init s_size: " << state_new->s_size << "\n";\r
- // cout << "dirty init bcount: " << state_new->bcount << "\n";\r
- // cout << "dirty init gcount: " << state_new->gcount << "\n";\r
- // cout << "dirty init count: " << state_new->count << "\n";\r
- // cout << "dirty init fcount: " << state_new->fcount << "\n";\r
- // cout << "dirty init final_z: " << state_new->final_z << "\n";\r
- // cout << "dirty init time " << state_new->time << "\n";\r
- // cout << "dirty init count_closed: " << state_new->count_closed << "\n";\r
- // cout << "dirty init count_closed_flows: " << state_new->count_closed_flows << "\n";\r
- \r
- // cout << "dirty init packet count: " << state_new->packet_count << "\n";\r
- // cout << "dirty init do_clean_count: " << state_new->do_clean_count << "\n";\r
- \r
-};\r
-\r
-// function is called in two cases:\r
-// adjustment of threshold before the cleaning phase is being triggered\r
-// adjustment of threshold at the window border for the old state, this new\r
-// threshold will be used by partial_flush() and flush() functions (final_clean())\r
-\r
-void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- // cout << "final init Z old: " << state->z << "\n";\r
- \r
- if(state->final_z == 0){\r
- \r
- // just returned from the counting phase\r
- // case when the time window is changed right after the counting phase\r
- if(state->count_closed == 1){\r
- state->count_closed = 0;\r
- state->count_closed_flows = 0;\r
- state->how_many_cleaned = 0;\r
- }\r
- \r
- state->z_prev = state->z;\r
- // cout << "final current num of samples: " << curr_num_samples << "\n";\r
- // cout << "final count not sampled new: " << state->count_notsampled_new << "\n";\r
- // adjust count for current number of sampled based on statistics\r
- // gathered in the previous time window\r
- // otherwise we overestimate it\r
- if(state->count_notsampled_new != 0)\r
- curr_num_samples -= state->count_notsampled_new;\r
- \r
- if(curr_num_samples < state->s_size){\r
- //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));\r
- \r
- //temp fix for early aggregation\r
- state->z = state->z/2;\r
- }\r
- else {\r
- if(curr_num_samples >= state->s_size){\r
- state->z = state->z*((double)curr_num_samples/(double)state->s_size);\r
- }\r
- }\r
- \r
- if(state->z <= 0)\r
- state->z = 1;\r
- // cout << "final init Z new: " << state->z << "\n";\r
- \r
- state->bcount = 0;\r
- state->final_z = 1;\r
- state->do_clean_count++;\r
- state->count_notsampled_new = 0;\r
- \r
- }\r
- \r
- // cout << "final init gamma: " << state->gamma << "\n";\r
- // cout << "final init do_clean: " << state->do_clean << "\n";\r
- // cout << "final init s_size: " << state->s_size << "\n";\r
- // cout << "final init bcount: " << state->bcount << "\n";\r
- // cout << "final init gcount: " << state->gcount << "\n";\r
- // cout << "final init count: " << state->count << "\n";\r
- // cout << "final init fcount: " << state->fcount << "\n";\r
- // cout << "final init final_z: " << state->final_z << "\n";\r
- // cout << "final init time " << state->time << "\n";\r
- // cout << "final init count_closed: " << state->count_closed << "\n";\r
- // cout << "final init count_closed_flows: " << state->count_closed_flows << "\n";\r
- \r
- // cout << "final init packet count: " << state->packet_count << "\n";\r
- // cout << "final init do_clean_count: " << state->do_clean_count << "\n";\r
- \r
-};\r
-\r
-int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- state->packet_count++;\r
- \r
- state->s_size = sample_size;\r
- int sampled = 0;\r
- \r
- //just returned from the cleaning phase\r
- if(state->do_clean == 1){\r
- state->gcount = 0;\r
- state->do_clean = 0;\r
- }\r
- \r
- //sampling\r
- if(len > state->z){\r
- state->bcount++;\r
- sampled=1;\r
- }\r
- else{\r
- state->count += len;\r
- if(state->count >= state->z){\r
- sampled=1;\r
- state->count -= state->z;\r
- }\r
- }\r
- return sampled;\r
- \r
-};\r
-\r
-int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- state->packet_count++;\r
- \r
- state->s_size = sample_size;\r
- int sampled = 0;\r
- \r
- //just returned from the counting phase\r
- if(state->count_closed == 1)\r
- // cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";\r
- \r
- //just returned from the cleaning phase\r
- if(state->do_clean == 1){\r
- // cout << "flows were sampled: " << state->how_many_cleaned << "\n";\r
- state->how_many_cleaned = 0;\r
- state->gcount = 0;\r
- state->do_clean = 0;\r
- }\r
- \r
- return 1;\r
- \r
-};\r
-\r
-\r
-int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- state->do_sooth = true;\r
- \r
- // for ssample() where just returned from the clening phase\r
- state->do_clean = 1;\r
- \r
- int sampled = 0;\r
- double new_len = 0;\r
- \r
- if (glen < state->z_prev)\r
- new_len = state->z_prev;\r
- else\r
- new_len = glen;\r
- \r
- //no need to clean\r
- if(curr_num_samples <= state->s_size){\r
- return 1;\r
- }\r
- else{\r
- if(new_len > state->z){\r
- sampled = 1;\r
- state->bcount++;\r
- }\r
- else{\r
- state->fcount += new_len;\r
- if(state->fcount >= state->z){\r
- sampled = 1;\r
- state->fcount -= state->z;\r
- }\r
- //else{\r
- //state->scount--;\r
- //}\r
- }\r
- \r
- return sampled;\r
- }\r
- \r
-};\r
-\r
-int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- state->do_sooth = true;\r
- // only for reporting\r
- state->delay = delay;\r
- \r
- //for ssample() where just returned from the clening phase\r
- state->do_clean = 1;\r
- \r
- int sampled = 0;\r
- double new_len = 0;\r
- // group is newly closed or not closed yet\r
- int new_group = 1;\r
- \r
- // the flow is closed\r
- if((ccondition == 1)||((state->time-maxtime)>=delay)){\r
- \r
- new_group = 0;\r
- \r
- // TF case or TT case for ccondition and delay\r
- if(ccondition == 1){\r
- if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows\r
- // the flow was closed in the previous counting slot\r
- new_group = 1;\r
- }\r
- }\r
- //FT case for ccondition and delay\r
- else{\r
- if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows\r
- // the flow was closed in the previous counting slot\r
- new_group = 1;\r
- }\r
- }\r
- // the flow was closed earlier than previous counting slot\r
- // it is closed and old and new_group = 0\r
- }\r
- \r
- //adjust size only for old closed groups\r
- if(new_group == 0){\r
- if (glen < state->z_prev)\r
- new_len = state->z_prev;\r
- else\r
- new_len = glen;\r
- }\r
- // newly closed group\r
- else{\r
- state->closing_now++;\r
- new_len = glen;\r
- }\r
- \r
- \r
- //no need to clean\r
- if(curr_num_samples <= state->s_size){\r
- state->count_notsampled_new = 0;\r
- return 1;\r
- }\r
- else{\r
- if(new_len > state->z){\r
- sampled = 1;\r
- state->bcount++;\r
- }\r
- else{\r
- state->fcount += new_len;\r
- if(state->fcount >= state->z){\r
- sampled = 1;\r
- state->fcount -= state->z;\r
- }\r
- //count number of not sampled newly closed groups\r
- //will use in the next time window at the final cleaning phase\r
- else{\r
- if(new_group == 1)\r
- state->count_notsampled_new++;\r
- }\r
- \r
- }\r
- \r
- state->how_many_cleaned += sampled;\r
- \r
- return sampled;\r
- }\r
- \r
-};\r
-\r
-\r
-int ssdo_clean(void *s, int curr_num_samples){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- //emergency control\r
- //bcount will be different after the cleaning phase\r
- if(curr_num_samples > (state->gamma*state->s_size)){\r
- state->z_prev = state->z;\r
- state->z=(double)state->gamma*state->z;\r
- state->do_clean = 1;\r
- state->bcount = 0;\r
- state->count = 0;\r
- state->gcount = 0;\r
- \r
- state->do_clean_count++;\r
- }\r
- \r
- return state->do_clean;\r
-};\r
-\r
-int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- // initialize timestamp with first seen packet from the current timewindow\r
- if(state->time == 0)\r
- state->time = maxtime;\r
- \r
- // detect next counting time slot\r
- if(state->time != maxtime){\r
- // cout << "need to count\n";\r
- state->time = maxtime;\r
- state->count_closed = 1;\r
- return 1;\r
- }\r
- \r
- //emergency control\r
- //bcount will be different after the cleaning phase\r
- //if(curr_num_samples > (state->gamma*state->s_size)){\r
- if(state->count_closed_flows > (state->gamma*state->s_size)){\r
- // cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";\r
- state->z_prev = state->z;\r
- // cout << "do clean Z old: " << state->z << "\n";\r
- state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;\r
- // cout << "do clean Z new: " << state->z << "\n";\r
- state->do_clean = 1;\r
- state->bcount = 0;\r
- state->count = 0;\r
- state->gcount = 0;\r
- \r
- state->do_clean_count++;\r
- }\r
- \r
- //just returned from the counting iteration\r
- if(state->count_closed == 1){\r
- state->count_closed = 0;\r
- // cout << "number of closed flows: " << state->count_closed_flows << "\n";\r
- state->count_closed_flows = 0;\r
- //initialize gcount since was used for sampling of new closed flows during counting phase\r
- state->gcount = 0;\r
- }\r
- \r
- return state->do_clean;\r
-};\r
-\r
-double ssthreshold(void *s, int curr_num_samples){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- return state->z;\r
-};\r
-\r
-int count_distinct(void *s, int curr_num_samples){\r
- return curr_num_samples;\r
-};\r
-\r
-int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- //cleaning condition\r
- int sampled = 0;\r
- double new_len = 0;\r
- \r
- if (glen < state->z_prev)\r
- new_len = state->z_prev;\r
- else\r
- new_len = glen;\r
- \r
- if(new_len > state->z){\r
- state->bcount++;\r
- sampled = 1;\r
- }\r
- else{\r
- state->gcount += new_len;\r
- if(state->gcount >= state->z){\r
- sampled = 1;\r
- state->gcount -= state->z;\r
- }\r
- //else{\r
- //state->scount--;\r
- //}\r
- }\r
- \r
- return sampled;\r
-};\r
-\r
-int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){\r
- struct SSstate *state = (struct SSstate *)s;\r
- \r
- //cleaning condition\r
- int sampled = 0;\r
- double new_len = 0;\r
- int new_group = 0;\r
- \r
- //need to count closed flows\r
- if(state->count_closed == 1){\r
- //the flow is closed\r
- if((ccondition == 1)||((state->time-maxtime)>=delay)){\r
- state->count_closed_flows++;\r
- // TF case or TT case for ccondition and delay\r
- if(ccondition == 1){\r
- if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows\r
- new_group = 1;\r
- }\r
- //FT case for ccondition and delay\r
- else{\r
- if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows\r
- new_group = 1;\r
- }\r
- }\r
- \r
- // if flow is closed but old, no need to subsample it\r
- if (new_group == 0)\r
- return true;\r
- \r
- }\r
- \r
- // clean only closed flows FF case\r
- // the flow is still open\r
- if((ccondition == 0)&&((state->time-maxtime)<delay)){\r
- return true;\r
- }\r
- \r
- // use glen for a new group and z_prev for old one\r
- if(new_group == 0){\r
- if (glen < state->z_prev)\r
- new_len = state->z_prev;\r
- else\r
- new_len = glen;\r
- }\r
- //the group is new\r
- else{\r
- new_len = glen;\r
- }\r
- \r
- // at this point either flow is closed and old and we are at the cleaning phase\r
- // or flow is closed and new and we are at the counting phase\r
- if(new_len > state->z){\r
- state->bcount++;\r
- sampled = 1;\r
- }\r
- else{\r
- state->gcount += new_len;\r
- if(state->gcount >= state->z){\r
- sampled = 1;\r
- state->gcount -= state->z;\r
- }\r
- //new_group is not sampled during counting phase\r
- else{\r
- if(state->count_closed == 1)\r
- state->count_closed_flows--;\r
- }\r
- \r
- }\r
- \r
- if(state->do_clean == 1)\r
- state->how_many_cleaned += sampled;\r
- \r
- return sampled;\r
-};\r
-\r
-int packet_count(void *s, int curr_num_samples){\r
- struct SSstate *state = (struct SSstate *)s; \r
- return state->packet_count;\r
-};\r
-\r
-double gamma(void *s, int curr_num_samples){\r
- struct SSstate *state = (struct SSstate *)s; \r
- return state->gamma;\r
-};\r
-\r
-int do_clean_count(void *s, int curr_num_samples){\r
- struct SSstate *state = (struct SSstate *)s; \r
- return state->do_clean_count;\r
-};\r
-\r
-int delay(void *s, int curr_num_samples){\r
- struct SSstate *state = (struct SSstate *)s; \r
- return state->delay;\r
-};\r
-\r
-// number of groups which are newly closed (in the most recent\r
-// counting phase or not closed at all during the final\r
-// cleaning phase\r
-int newly_closed(void *s, int curr_num_samples){\r
- struct SSstate *state = (struct SSstate *)s; \r
- return state->closing_now;\r
-};\r
-\r
-\r
-\r
-\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+///////////////////////////////////////////////
+//// State, functions for adaptive smart sampling
+/// written by Irina Rozenbaum
+
+#include <iostream>
+#define max(x,y) ((x)>(y)?(x):(y))
+#define _BURST_SOOTHING_FACTOR 10.0
+
+struct SSstate {
+ //unsigned long long int count; // count to sample small packets with certain probability
+ double count;
+ double gcount; // count for clean_with() function
+ double fcount; // count for final_clean() function
+ double z; //z is the threshold for a size of the packet
+ double z_prev; // value of threshold from the previos cleaning phase
+ double gamma; //tolerance parameter for emergency control over the
+ //number of samples, should be >= 1
+ int do_clean; // trigger cleaning phase
+ int bcount; //count for number of packets that exceed threshold ,
+ //need it for one of the variations of algorithms for
+ //threshold adjustment
+ int s_size; //need to remember sample size for _sfun_state_init()
+ int final_z; //bool that indicated if z was adjusted to its filnal value
+ //before the final clean
+ int time; //timestamp from previously processed packet
+ //used in detecting frequency of the clening phases with aggregation
+
+ int packet_count; //count for number of packets per time window
+ int do_clean_count; //count number of cleaning phases per time window
+ bool do_sooth; //turn on _BURST_SOOTHING_FACTOR
+
+ // flow sampling
+ int count_closed; //bool indicates that counting phase is being triggered
+ int count_closed_flows; //count for closed flows
+ int count_notsampled_new; //count for the number of new groups that were not sampled during
+ // the final sampling phase will be used in next time window to compute resonable
+ // approximation for threshold before final sampling phase
+
+ // for reporting
+ int delay; //interval within which there was no packets that belong to the current flow
+ //thus flow is considered to be closed.
+ //parameter specified by the query
+ int closing_now; //count for groups that are closing at the final cleraning phase
+ //or were closed during the most recent counting phase (newly closed)
+
+ //for debugging
+ int how_many_cleaned;
+};
+
+
+// the function is called once at the initial initialization of the state
+void _sfun_state_clean_init_smart_sampling_state(void *s){
+ struct SSstate *state = (struct SSstate *)s;
+
+ state->count = 0;
+ state->gcount = 0;
+ state->fcount = 0;
+ state->z = 200; //need to figure out good initial value for z
+ // cout << "clean init Z: " << state->z << "\n";
+ state->z_prev = 0;
+ // cout << "clean init Z prev: " << state->z << "\n";
+ state->gamma = 2;
+ state->do_clean = 0;
+ state->bcount = 0;
+ state->s_size = 0;
+ state->final_z = 0;
+ state->time = 0;
+ state->count_closed = 0;
+ state->count_closed_flows = 0;
+ state->count_notsampled_new = 0;
+
+ state->packet_count = 0;
+ state->do_clean_count = 0;
+ state->do_sooth = true;
+
+ state->how_many_cleaned = 0;
+ state->delay = 0;
+ state->closing_now = 0;
+};
+
+
+// the function will be called at the beginning of every time window
+void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){
+ struct SSstate *state_new = (struct SSstate *)s_new;
+ struct SSstate *state_old = (struct SSstate *)s_old;
+
+ // cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";
+ // cout << "***dirty current num of samples: " << curr_num_samples << "\n";
+ // cout << "dirty init Z old: " << state_old->z << "\n";
+ if(curr_num_samples < state_old->s_size){
+ // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));
+
+ //temp fix for early aggregation
+ state_new->z = (state_old->z)/2;
+
+ if(state_old->do_sooth)
+ state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
+ }
+ else {
+ if(curr_num_samples >= state_old->s_size){
+ //cout << "yes\n";
+ state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);
+
+ if(state_old->do_sooth)
+ state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
+ }
+ }
+
+ if(state_new->z <= 1.0)
+ state_new->z = 1;
+ // cout << "dirty init Z new: " << state_new->z << "\n";
+
+ state_new->gamma = state_old->gamma;
+ state_new->do_clean = state_old->do_clean;
+ state_new->s_size = state_old->s_size;
+ state_new->bcount = 0;
+ state_new->gcount = 0;
+ state_new->count = 0;
+ state_new->fcount = 0;
+ state_new->final_z = 0;
+ state_new->time = 0;
+ state_new->count_closed = 0;
+ state_new->count_closed_flows = 0;
+ state_new->count_notsampled_new = state_old->count_notsampled_new;
+
+ state_new->packet_count = 0;
+ state_new->do_clean_count = 0;
+ state_new->do_sooth = true;
+
+ state_new->how_many_cleaned = 0;
+ state_new->delay = 0;
+ state_new->closing_now = 0;
+
+ // cout << "dirty init gamma: " << state_new->gamma << "\n";
+ // cout << "dirty init do_clean: " << state_new->do_clean << "\n";
+ // cout << "dirty init s_size: " << state_new->s_size << "\n";
+ // cout << "dirty init bcount: " << state_new->bcount << "\n";
+ // cout << "dirty init gcount: " << state_new->gcount << "\n";
+ // cout << "dirty init count: " << state_new->count << "\n";
+ // cout << "dirty init fcount: " << state_new->fcount << "\n";
+ // cout << "dirty init final_z: " << state_new->final_z << "\n";
+ // cout << "dirty init time " << state_new->time << "\n";
+ // cout << "dirty init count_closed: " << state_new->count_closed << "\n";
+ // cout << "dirty init count_closed_flows: " << state_new->count_closed_flows << "\n";
+
+ // cout << "dirty init packet count: " << state_new->packet_count << "\n";
+ // cout << "dirty init do_clean_count: " << state_new->do_clean_count << "\n";
+
+};
+
+// function is called in two cases:
+// adjustment of threshold before the cleaning phase is being triggered
+// adjustment of threshold at the window border for the old state, this new
+// threshold will be used by partial_flush() and flush() functions (final_clean())
+
+void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){
+ struct SSstate *state = (struct SSstate *)s;
+
+ // cout << "final init Z old: " << state->z << "\n";
+
+ if(state->final_z == 0){
+
+ // just returned from the counting phase
+ // case when the time window is changed right after the counting phase
+ if(state->count_closed == 1){
+ state->count_closed = 0;
+ state->count_closed_flows = 0;
+ state->how_many_cleaned = 0;
+ }
+
+ state->z_prev = state->z;
+ // cout << "final current num of samples: " << curr_num_samples << "\n";
+ // cout << "final count not sampled new: " << state->count_notsampled_new << "\n";
+ // adjust count for current number of sampled based on statistics
+ // gathered in the previous time window
+ // otherwise we overestimate it
+ if(state->count_notsampled_new != 0)
+ curr_num_samples -= state->count_notsampled_new;
+
+ if(curr_num_samples < state->s_size){
+ //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));
+
+ //temp fix for early aggregation
+ state->z = state->z/2;
+ }
+ else {
+ if(curr_num_samples >= state->s_size){
+ state->z = state->z*((double)curr_num_samples/(double)state->s_size);
+ }
+ }
+
+ if(state->z <= 0)
+ state->z = 1;
+ // cout << "final init Z new: " << state->z << "\n";
+
+ state->bcount = 0;
+ state->final_z = 1;
+ state->do_clean_count++;
+ state->count_notsampled_new = 0;
+
+ }
+
+ // cout << "final init gamma: " << state->gamma << "\n";
+ // cout << "final init do_clean: " << state->do_clean << "\n";
+ // cout << "final init s_size: " << state->s_size << "\n";
+ // cout << "final init bcount: " << state->bcount << "\n";
+ // cout << "final init gcount: " << state->gcount << "\n";
+ // cout << "final init count: " << state->count << "\n";
+ // cout << "final init fcount: " << state->fcount << "\n";
+ // cout << "final init final_z: " << state->final_z << "\n";
+ // cout << "final init time " << state->time << "\n";
+ // cout << "final init count_closed: " << state->count_closed << "\n";
+ // cout << "final init count_closed_flows: " << state->count_closed_flows << "\n";
+
+ // cout << "final init packet count: " << state->packet_count << "\n";
+ // cout << "final init do_clean_count: " << state->do_clean_count << "\n";
+
+};
+
+int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){
+ struct SSstate *state = (struct SSstate *)s;
+
+ state->packet_count++;
+
+ state->s_size = sample_size;
+ int sampled = 0;
+
+ //just returned from the cleaning phase
+ if(state->do_clean == 1){
+ state->gcount = 0;
+ state->do_clean = 0;
+ }
+
+ //sampling
+ if(len > state->z){
+ state->bcount++;
+ sampled=1;
+ }
+ else{
+ state->count += len;
+ if(state->count >= state->z){
+ sampled=1;
+ state->count -= state->z;
+ }
+ }
+ return sampled;
+
+};
+
+int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){
+ struct SSstate *state = (struct SSstate *)s;
+
+ state->packet_count++;
+
+ state->s_size = sample_size;
+ int sampled = 0;
+
+ //just returned from the counting phase
+ if(state->count_closed == 1)
+ // cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";
+
+ //just returned from the cleaning phase
+ if(state->do_clean == 1){
+ // cout << "flows were sampled: " << state->how_many_cleaned << "\n";
+ state->how_many_cleaned = 0;
+ state->gcount = 0;
+ state->do_clean = 0;
+ }
+
+ return 1;
+
+};
+
+
+int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){
+ struct SSstate *state = (struct SSstate *)s;
+
+ state->do_sooth = true;
+
+ // for ssample() where just returned from the clening phase
+ state->do_clean = 1;
+
+ int sampled = 0;
+ double new_len = 0;
+
+ if (glen < state->z_prev)
+ new_len = state->z_prev;
+ else
+ new_len = glen;
+
+ //no need to clean
+ if(curr_num_samples <= state->s_size){
+ return 1;
+ }
+ else{
+ if(new_len > state->z){
+ sampled = 1;
+ state->bcount++;
+ }
+ else{
+ state->fcount += new_len;
+ if(state->fcount >= state->z){
+ sampled = 1;
+ state->fcount -= state->z;
+ }
+ //else{
+ //state->scount--;
+ //}
+ }
+
+ return sampled;
+ }
+
+};
+
+int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
+ struct SSstate *state = (struct SSstate *)s;
+
+ state->do_sooth = true;
+ // only for reporting
+ state->delay = delay;
+
+ //for ssample() where just returned from the clening phase
+ state->do_clean = 1;
+
+ int sampled = 0;
+ double new_len = 0;
+ // group is newly closed or not closed yet
+ int new_group = 1;
+
+ // the flow is closed
+ if((ccondition == 1)||((state->time-maxtime)>=delay)){
+
+ new_group = 0;
+
+ // TF case or TT case for ccondition and delay
+ if(ccondition == 1){
+ if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows
+ // the flow was closed in the previous counting slot
+ new_group = 1;
+ }
+ }
+ //FT case for ccondition and delay
+ else{
+ if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows
+ // the flow was closed in the previous counting slot
+ new_group = 1;
+ }
+ }
+ // the flow was closed earlier than previous counting slot
+ // it is closed and old and new_group = 0
+ }
+
+ //adjust size only for old closed groups
+ if(new_group == 0){
+ if (glen < state->z_prev)
+ new_len = state->z_prev;
+ else
+ new_len = glen;
+ }
+ // newly closed group
+ else{
+ state->closing_now++;
+ new_len = glen;
+ }
+
+
+ //no need to clean
+ if(curr_num_samples <= state->s_size){
+ state->count_notsampled_new = 0;
+ return 1;
+ }
+ else{
+ if(new_len > state->z){
+ sampled = 1;
+ state->bcount++;
+ }
+ else{
+ state->fcount += new_len;
+ if(state->fcount >= state->z){
+ sampled = 1;
+ state->fcount -= state->z;
+ }
+ //count number of not sampled newly closed groups
+ //will use in the next time window at the final cleaning phase
+ else{
+ if(new_group == 1)
+ state->count_notsampled_new++;
+ }
+
+ }
+
+ state->how_many_cleaned += sampled;
+
+ return sampled;
+ }
+
+};
+
+
+int ssdo_clean(void *s, int curr_num_samples){
+ struct SSstate *state = (struct SSstate *)s;
+
+ //emergency control
+ //bcount will be different after the cleaning phase
+ if(curr_num_samples > (state->gamma*state->s_size)){
+ state->z_prev = state->z;
+ state->z=(double)state->gamma*state->z;
+ state->do_clean = 1;
+ state->bcount = 0;
+ state->count = 0;
+ state->gcount = 0;
+
+ state->do_clean_count++;
+ }
+
+ return state->do_clean;
+};
+
+int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){
+ struct SSstate *state = (struct SSstate *)s;
+
+ // initialize timestamp with first seen packet from the current timewindow
+ if(state->time == 0)
+ state->time = maxtime;
+
+ // detect next counting time slot
+ if(state->time != maxtime){
+ // cout << "need to count\n";
+ state->time = maxtime;
+ state->count_closed = 1;
+ return 1;
+ }
+
+ //emergency control
+ //bcount will be different after the cleaning phase
+ //if(curr_num_samples > (state->gamma*state->s_size)){
+ if(state->count_closed_flows > (state->gamma*state->s_size)){
+ // cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";
+ state->z_prev = state->z;
+ // cout << "do clean Z old: " << state->z << "\n";
+ state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;
+ // cout << "do clean Z new: " << state->z << "\n";
+ state->do_clean = 1;
+ state->bcount = 0;
+ state->count = 0;
+ state->gcount = 0;
+
+ state->do_clean_count++;
+ }
+
+ //just returned from the counting iteration
+ if(state->count_closed == 1){
+ state->count_closed = 0;
+ // cout << "number of closed flows: " << state->count_closed_flows << "\n";
+ state->count_closed_flows = 0;
+ //initialize gcount since was used for sampling of new closed flows during counting phase
+ state->gcount = 0;
+ }
+
+ return state->do_clean;
+};
+
+double ssthreshold(void *s, int curr_num_samples){
+ struct SSstate *state = (struct SSstate *)s;
+
+ return state->z;
+};
+
+int count_distinct(void *s, int curr_num_samples){
+ return curr_num_samples;
+};
+
+int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){
+ struct SSstate *state = (struct SSstate *)s;
+
+ //cleaning condition
+ int sampled = 0;
+ double new_len = 0;
+
+ if (glen < state->z_prev)
+ new_len = state->z_prev;
+ else
+ new_len = glen;
+
+ if(new_len > state->z){
+ state->bcount++;
+ sampled = 1;
+ }
+ else{
+ state->gcount += new_len;
+ if(state->gcount >= state->z){
+ sampled = 1;
+ state->gcount -= state->z;
+ }
+ //else{
+ //state->scount--;
+ //}
+ }
+
+ return sampled;
+};
+
+int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
+ struct SSstate *state = (struct SSstate *)s;
+
+ //cleaning condition
+ int sampled = 0;
+ double new_len = 0;
+ int new_group = 0;
+
+ //need to count closed flows
+ if(state->count_closed == 1){
+ //the flow is closed
+ if((ccondition == 1)||((state->time-maxtime)>=delay)){
+ state->count_closed_flows++;
+ // TF case or TT case for ccondition and delay
+ if(ccondition == 1){
+ if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows
+ new_group = 1;
+ }
+ //FT case for ccondition and delay
+ else{
+ if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows
+ new_group = 1;
+ }
+ }
+
+ // if flow is closed but old, no need to subsample it
+ if (new_group == 0)
+ return true;
+
+ }
+
+ // clean only closed flows FF case
+ // the flow is still open
+ if((ccondition == 0)&&((state->time-maxtime)<delay)){
+ return true;
+ }
+
+ // use glen for a new group and z_prev for old one
+ if(new_group == 0){
+ if (glen < state->z_prev)
+ new_len = state->z_prev;
+ else
+ new_len = glen;
+ }
+ //the group is new
+ else{
+ new_len = glen;
+ }
+
+ // at this point either flow is closed and old and we are at the cleaning phase
+ // or flow is closed and new and we are at the counting phase
+ if(new_len > state->z){
+ state->bcount++;
+ sampled = 1;
+ }
+ else{
+ state->gcount += new_len;
+ if(state->gcount >= state->z){
+ sampled = 1;
+ state->gcount -= state->z;
+ }
+ //new_group is not sampled during counting phase
+ else{
+ if(state->count_closed == 1)
+ state->count_closed_flows--;
+ }
+
+ }
+
+ if(state->do_clean == 1)
+ state->how_many_cleaned += sampled;
+
+ return sampled;
+};
+
+int packet_count(void *s, int curr_num_samples){
+ struct SSstate *state = (struct SSstate *)s;
+ return state->packet_count;
+};
+
+double gamma(void *s, int curr_num_samples){
+ struct SSstate *state = (struct SSstate *)s;
+ return state->gamma;
+};
+
+int do_clean_count(void *s, int curr_num_samples){
+ struct SSstate *state = (struct SSstate *)s;
+ return state->do_clean_count;
+};
+
+int delay(void *s, int curr_num_samples){
+ struct SSstate *state = (struct SSstate *)s;
+ return state->delay;
+};
+
+// number of groups which are newly closed (in the most recent
+// counting phase or not closed at all during the final
+// cleaning phase
+int newly_closed(void *s, int curr_num_samples){
+ struct SSstate *state = (struct SSstate *)s;
+ return state->closing_now;
+};
+
+
+
+