Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphftaaux / SSstateful_count_distinct.cc
index 0adec27..47e0ca8 100644 (file)
-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-///////////////////////////////////////////////
-////   State, functions for adaptive smart sampling
-///            written by Irina Rozenbaum
-
-#include <iostream>
-#define max(x,y) ((x)>(y)?(x):(y))
-#define _BURST_SOOTHING_FACTOR 10.0
-
-struct SSstate {
-    //unsigned long long int count;   // count to sample small packets with certain probability
-    double count;
-    double gcount;  // count for clean_with() function
-    double fcount;  // count for final_clean() function
-    double z;       //z is the threshold for a size of the packet
-    double z_prev;  // value of threshold from the previos cleaning phase
-    double gamma;   //tolerance parameter for emergency control over the
-    //number of samples, should be >= 1
-    int do_clean; // trigger cleaning phase
-    int bcount;  //count for number of packets that exceed threshold ,
-    //need it for one of the variations of algorithms for
-    //threshold adjustment
-    int s_size;  //need to remember sample size for _sfun_state_init()
-    int final_z; //bool that indicated if z was adjusted to its filnal value
-    //before the final clean
-    int time;    //timestamp from previously processed packet
-    //used in detecting frequency of the clening phases with aggregation
-    
-    int packet_count; //count for number of packets per time window
-    int do_clean_count; //count number of cleaning phases per time window
-    bool do_sooth; //turn on _BURST_SOOTHING_FACTOR
-    
-    // flow sampling
-    int count_closed; //bool indicates that counting phase is being triggered
-    int count_closed_flows; //count for closed flows
-    int count_notsampled_new; //count for the number of new groups that were not sampled during
-    // the final sampling phase will be used in next time window to compute resonable
-    // approximation for threshold before final sampling phase
-    
-    // for reporting
-    int delay; //interval within which there was no packets that belong to the current flow
-    //thus flow is considered to be closed.
-    //parameter specified by the query
-    int closing_now; //count for groups that are closing at the final cleraning phase
-    //or were closed during the most recent counting phase (newly closed)
-    
-    //for debugging
-    int how_many_cleaned;
-};
-
-
-// the function is called once at the initial initialization of the state
-void _sfun_state_clean_init_smart_sampling_state(void *s){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    state->count = 0;
-    state->gcount = 0;
-    state->fcount = 0;
-    state->z = 200; //need to figure out good initial value for z
-    //    cout << "clean init Z: " <<  state->z << "\n";
-    state->z_prev = 0;
-    //    cout << "clean init Z prev: " <<  state->z << "\n";
-    state->gamma = 2;
-    state->do_clean = 0;
-    state->bcount = 0;
-    state->s_size = 0;
-    state->final_z = 0;
-    state->time = 0;
-    state->count_closed = 0;
-    state->count_closed_flows = 0;
-    state->count_notsampled_new = 0;
-    
-    state->packet_count = 0;
-    state->do_clean_count = 0;
-    state->do_sooth = true;
-    
-    state->how_many_cleaned = 0;
-    state->delay = 0;
-    state->closing_now = 0;
-};
-
-
-// the function will be called at the beginning of every time window
-void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){
-    struct SSstate *state_new  = (struct SSstate *)s_new;
-    struct SSstate *state_old  = (struct SSstate *)s_old;
-    
-    //    cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";
-    //    cout << "***dirty current num of samples: " << curr_num_samples << "\n";
-    //    cout << "dirty init Z old: " <<  state_old->z << "\n";
-    if(curr_num_samples < state_old->s_size){
-        // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));
-        
-        //temp fix for early aggregation
-        state_new->z = (state_old->z)/2;
-        
-        if(state_old->do_sooth)
-            state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
-    }
-    else {
-        if(curr_num_samples >= state_old->s_size){
-            //cout << "yes\n";
-            state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);
-            
-            if(state_old->do_sooth)
-                state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
-        }
-    }
-    
-    if(state_new->z <= 1.0)
-        state_new->z = 1;
-    //    cout << "dirty init Z new: " <<  state_new->z << "\n";
-    
-    state_new->gamma = state_old->gamma;
-    state_new->do_clean = state_old->do_clean;
-    state_new->s_size = state_old->s_size;
-    state_new->bcount = 0;
-    state_new->gcount = 0;
-    state_new->count = 0;
-    state_new->fcount = 0;
-    state_new->final_z = 0;
-    state_new->time = 0;
-    state_new->count_closed = 0;
-    state_new->count_closed_flows = 0;
-    state_new->count_notsampled_new = state_old->count_notsampled_new;
-    
-    state_new->packet_count = 0;
-    state_new->do_clean_count = 0;
-    state_new->do_sooth = true;
-    
-    state_new->how_many_cleaned = 0;
-    state_new->delay = 0;
-    state_new->closing_now = 0;
-    
-    //  cout << "dirty init gamma: " <<   state_new->gamma << "\n";
-    //     cout << "dirty init do_clean: " <<  state_new->do_clean << "\n";
-    //     cout << "dirty init s_size: " <<  state_new->s_size << "\n";
-    //     cout << "dirty init bcount: " <<  state_new->bcount << "\n";
-    //     cout << "dirty init gcount: " <<  state_new->gcount << "\n";
-    //     cout << "dirty init count: " <<  state_new->count << "\n";
-    //     cout << "dirty init fcount: " <<  state_new->fcount << "\n";
-    //     cout << "dirty init final_z: " <<  state_new->final_z << "\n";
-    //     cout << "dirty init time " <<  state_new->time  << "\n";
-    //     cout << "dirty init count_closed: " <<  state_new->count_closed << "\n";
-    //     cout << "dirty init count_closed_flows: " <<  state_new->count_closed_flows << "\n";
-    
-    //     cout << "dirty init packet count: " <<  state_new->packet_count  << "\n";
-    //     cout << "dirty init do_clean_count: " <<  state_new->do_clean_count  << "\n";
-    
-};
-
-// function is called in two cases:
-// adjustment of threshold before the cleaning phase is being triggered
-// adjustment of threshold at the window border for the old state, this new
-// threshold will be used by partial_flush() and flush() functions (final_clean())
-
-void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    //  cout << "final init Z old: " <<  state->z << "\n";
-    
-    if(state->final_z == 0){
-        
-        // just returned from the counting phase
-        // case when the time window is changed right after the counting phase
-        if(state->count_closed == 1){
-            state->count_closed = 0;
-            state->count_closed_flows = 0;
-            state->how_many_cleaned = 0;
-        }
-        
-        state->z_prev = state->z;
-        //    cout << "final current num of samples: " << curr_num_samples << "\n";
-        //    cout << "final count not sampled new: " << state->count_notsampled_new << "\n";
-        // adjust count for current number of sampled based on statistics
-        // gathered in the previous time window
-        // otherwise we overestimate it
-        if(state->count_notsampled_new != 0)
-            curr_num_samples -= state->count_notsampled_new;
-        
-        if(curr_num_samples < state->s_size){
-            //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));
-            
-            //temp fix for early aggregation
-            state->z = state->z/2;
-        }
-        else {
-            if(curr_num_samples >= state->s_size){
-                state->z = state->z*((double)curr_num_samples/(double)state->s_size);
-            }
-        }
-        
-        if(state->z <= 0)
-            state->z = 1;
-        //    cout << "final init Z new: " <<  state->z << "\n";
-        
-        state->bcount = 0;
-        state->final_z = 1;
-        state->do_clean_count++;
-        state->count_notsampled_new = 0;
-        
-    }
-    
-    //  cout << "final init gamma: " <<   state->gamma << "\n";
-    //     cout << "final init do_clean: " <<  state->do_clean << "\n";
-    //     cout << "final init s_size: " <<  state->s_size << "\n";
-    //     cout << "final init bcount: " <<  state->bcount << "\n";
-    //     cout << "final init gcount: " <<  state->gcount << "\n";
-    //     cout << "final init count: " <<  state->count << "\n";
-    //     cout << "final init fcount: " <<  state->fcount << "\n";
-    //     cout << "final init final_z: " <<  state->final_z << "\n";
-    //     cout << "final init time " <<  state->time  << "\n";
-    //     cout << "final init count_closed: " <<  state->count_closed << "\n";
-    //     cout << "final init count_closed_flows: " <<  state->count_closed_flows << "\n";
-    
-    //     cout << "final init packet count: " <<  state->packet_count  << "\n";
-    //     cout << "final init do_clean_count: " <<  state->do_clean_count  << "\n";
-    
-};
-
-int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    state->packet_count++;
-    
-    state->s_size = sample_size;
-    int sampled = 0;
-    
-    //just returned from the cleaning phase
-    if(state->do_clean == 1){
-        state->gcount = 0;
-        state->do_clean = 0;
-    }
-    
-    //sampling
-    if(len > state->z){
-        state->bcount++;
-        sampled=1;
-    }
-    else{
-        state->count += len;
-        if(state->count >= state->z){
-            sampled=1;
-            state->count -= state->z;
-        }
-    }
-    return sampled;
-    
-};
-
-int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    state->packet_count++;
-    
-    state->s_size = sample_size;
-    int sampled = 0;
-    
-    //just returned from the counting phase
-    if(state->count_closed == 1)
-        //      cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";
-        
-        //just returned from the cleaning phase
-        if(state->do_clean == 1){
-            //      cout << "flows were sampled: " << state->how_many_cleaned << "\n";
-            state->how_many_cleaned = 0;
-            state->gcount = 0;
-            state->do_clean = 0;
-        }
-    
-    return 1;
-    
-};
-
-
-int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    state->do_sooth = true;
-    
-    // for ssample() where just returned from the clening phase
-    state->do_clean = 1;
-    
-    int sampled = 0;
-    double new_len = 0;
-    
-    if (glen < state->z_prev)
-        new_len = state->z_prev;
-    else
-        new_len = glen;
-    
-    //no need to clean
-    if(curr_num_samples <= state->s_size){
-        return 1;
-    }
-    else{
-        if(new_len > state->z){
-            sampled = 1;
-            state->bcount++;
-        }
-        else{
-            state->fcount += new_len;
-            if(state->fcount >= state->z){
-                sampled = 1;
-                state->fcount -= state->z;
-            }
-            //else{
-            //state->scount--;
-            //}
-        }
-        
-        return sampled;
-    }
-    
-};
-
-int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    state->do_sooth = true;
-    // only for reporting
-    state->delay = delay;
-    
-    //for ssample() where just returned from the clening phase
-    state->do_clean = 1;
-    
-    int sampled = 0;
-    double new_len = 0;
-    // group is newly closed or not closed yet
-    int new_group = 1;
-    
-    // the flow is closed
-    if((ccondition == 1)||((state->time-maxtime)>=delay)){
-        
-        new_group = 0;
-        
-        // TF case or TT case for ccondition and delay
-        if(ccondition == 1){
-            if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows
-                // the flow was closed in the previous counting slot
-                new_group = 1;
-            }
-        }
-        //FT case for ccondition and delay
-        else{
-            if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows
-                // the flow was closed in the previous counting slot
-                new_group = 1;
-            }
-        }
-        // the flow was closed earlier than previous counting slot
-        // it is closed and old and new_group = 0
-    }
-    
-    //adjust size only for old closed groups
-    if(new_group == 0){
-        if (glen < state->z_prev)
-            new_len = state->z_prev;
-        else
-            new_len = glen;
-    }
-    // newly closed group
-    else{
-        state->closing_now++;
-        new_len = glen;
-    }
-    
-    
-    //no need to clean
-    if(curr_num_samples <= state->s_size){
-        state->count_notsampled_new = 0;
-        return 1;
-    }
-    else{
-        if(new_len > state->z){
-            sampled = 1;
-            state->bcount++;
-        }
-        else{
-            state->fcount += new_len;
-            if(state->fcount >= state->z){
-                sampled = 1;
-                state->fcount -= state->z;
-            }
-            //count number of not sampled newly closed groups
-            //will use in the next time window at the final cleaning phase
-            else{
-                if(new_group == 1)
-                    state->count_notsampled_new++;
-            }
-            
-        }
-        
-        state->how_many_cleaned += sampled;
-        
-        return sampled;
-    }
-    
-};
-
-
-int ssdo_clean(void *s, int curr_num_samples){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    //emergency control
-    //bcount will be different after the cleaning phase
-    if(curr_num_samples > (state->gamma*state->s_size)){
-        state->z_prev = state->z;
-        state->z=(double)state->gamma*state->z;
-        state->do_clean = 1;
-        state->bcount = 0;
-        state->count = 0;
-        state->gcount = 0;
-        
-        state->do_clean_count++;
-    }
-    
-    return state->do_clean;
-};
-
-int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    // initialize timestamp with first seen packet from the current timewindow
-    if(state->time == 0)
-        state->time = maxtime;
-    
-    // detect next counting time slot
-    if(state->time != maxtime){
-        //      cout << "need to count\n";
-        state->time = maxtime;
-        state->count_closed = 1;
-        return 1;
-    }
-    
-    //emergency control
-    //bcount will be different after the cleaning phase
-    //if(curr_num_samples > (state->gamma*state->s_size)){
-    if(state->count_closed_flows > (state->gamma*state->s_size)){
-        //      cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";
-        state->z_prev = state->z;
-        //      cout << "do clean Z old: " <<  state->z << "\n";
-        state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;
-        //      cout << "do clean Z new: " <<  state->z << "\n";
-        state->do_clean = 1;
-        state->bcount = 0;
-        state->count = 0;
-        state->gcount = 0;
-        
-        state->do_clean_count++;
-    }
-    
-    //just returned from the counting iteration
-    if(state->count_closed == 1){
-        state->count_closed = 0;
-        //      cout << "number of closed flows: " << state->count_closed_flows << "\n";
-        state->count_closed_flows = 0;
-        //initialize gcount since was used for sampling of new closed flows during counting phase
-        state->gcount = 0;
-    }
-    
-    return state->do_clean;
-};
-
-double ssthreshold(void *s, int curr_num_samples){
-    struct SSstate *state = (struct SSstate *)s;
-    
-    return state->z;
-};
-
-int count_distinct(void *s, int curr_num_samples){
-    return curr_num_samples;
-};
-
-int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    //cleaning condition
-    int sampled = 0;
-    double new_len = 0;
-    
-    if (glen < state->z_prev)
-        new_len = state->z_prev;
-    else
-        new_len = glen;
-    
-    if(new_len > state->z){
-        state->bcount++;
-        sampled = 1;
-    }
-    else{
-        state->gcount += new_len;
-        if(state->gcount >= state->z){
-            sampled = 1;
-            state->gcount -= state->z;
-        }
-        //else{
-        //state->scount--;
-        //}
-    }
-    
-    return sampled;
-};
-
-int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
-    struct SSstate *state  = (struct SSstate *)s;
-    
-    //cleaning condition
-    int sampled = 0;
-    double new_len = 0;
-    int new_group = 0;
-    
-    //need to count closed flows
-    if(state->count_closed == 1){
-        //the flow is closed
-        if((ccondition == 1)||((state->time-maxtime)>=delay)){
-            state->count_closed_flows++;
-            // TF case or TT case for ccondition and delay
-            if(ccondition == 1){
-                if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows
-                    new_group = 1;
-            }
-            //FT case for ccondition and delay
-            else{
-                if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows
-                    new_group = 1;
-            }
-        }
-        
-        // if flow is closed but old, no need to subsample it
-        if (new_group == 0)
-            return true;
-        
-    }
-    
-    // clean only closed flows FF case
-    // the flow is still open
-    if((ccondition == 0)&&((state->time-maxtime)<delay)){
-        return true;
-    }
-    
-    // use glen for a new group and z_prev for old one
-    if(new_group == 0){
-        if (glen < state->z_prev)
-            new_len = state->z_prev;
-        else
-            new_len = glen;
-    }
-    //the group is new
-    else{
-        new_len = glen;
-    }
-    
-    // at this point either flow is closed and old and we are at the cleaning phase
-    // or flow is closed and new and we are at the counting phase
-    if(new_len > state->z){
-        state->bcount++;
-        sampled = 1;
-    }
-    else{
-        state->gcount += new_len;
-        if(state->gcount >= state->z){
-            sampled = 1;
-            state->gcount -= state->z;
-        }
-        //new_group is not sampled during counting phase
-        else{
-            if(state->count_closed == 1)
-                state->count_closed_flows--;
-        }
-        
-    }
-    
-    if(state->do_clean == 1)
-        state->how_many_cleaned += sampled;
-    
-    return sampled;
-};
-
-int packet_count(void *s, int curr_num_samples){
-    struct SSstate *state = (struct SSstate *)s;  
-    return state->packet_count;
-};
-
-double gamma(void *s, int curr_num_samples){
-    struct SSstate *state = (struct SSstate *)s;  
-    return state->gamma;
-};
-
-int do_clean_count(void *s, int curr_num_samples){
-    struct SSstate *state = (struct SSstate *)s;  
-    return state->do_clean_count;
-};
-
-int delay(void *s, int curr_num_samples){
-    struct SSstate *state = (struct SSstate *)s;  
-    return state->delay;
-};
-
-// number of groups which are newly closed (in the most recent
-// counting phase or not closed at all during the final
-// cleaning phase
-int newly_closed(void *s, int curr_num_samples){
-    struct SSstate *state = (struct SSstate *)s;  
-    return state->closing_now;
-};
-
-
-
-
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+///////////////////////////////////////////////\r
+////   State, functions for adaptive smart sampling\r
+///            written by Irina Rozenbaum\r
+\r
+#include <iostream>\r
+#define max(x,y) ((x)>(y)?(x):(y))\r
+#define _BURST_SOOTHING_FACTOR 10.0\r
+\r
+struct SSstate {\r
+    //unsigned long long int count;   // count to sample small packets with certain probability\r
+    double count;\r
+    double gcount;  // count for clean_with() function\r
+    double fcount;  // count for final_clean() function\r
+    double z;       //z is the threshold for a size of the packet\r
+    double z_prev;  // value of threshold from the previos cleaning phase\r
+    double gamma;   //tolerance parameter for emergency control over the\r
+    //number of samples, should be >= 1\r
+    int do_clean; // trigger cleaning phase\r
+    int bcount;  //count for number of packets that exceed threshold ,\r
+    //need it for one of the variations of algorithms for\r
+    //threshold adjustment\r
+    int s_size;  //need to remember sample size for _sfun_state_init()\r
+    int final_z; //bool that indicated if z was adjusted to its filnal value\r
+    //before the final clean\r
+    int time;    //timestamp from previously processed packet\r
+    //used in detecting frequency of the clening phases with aggregation\r
+    \r
+    int packet_count; //count for number of packets per time window\r
+    int do_clean_count; //count number of cleaning phases per time window\r
+    bool do_sooth; //turn on _BURST_SOOTHING_FACTOR\r
+    \r
+    // flow sampling\r
+    int count_closed; //bool indicates that counting phase is being triggered\r
+    int count_closed_flows; //count for closed flows\r
+    int count_notsampled_new; //count for the number of new groups that were not sampled during\r
+    // the final sampling phase will be used in next time window to compute resonable\r
+    // approximation for threshold before final sampling phase\r
+    \r
+    // for reporting\r
+    int delay; //interval within which there was no packets that belong to the current flow\r
+    //thus flow is considered to be closed.\r
+    //parameter specified by the query\r
+    int closing_now; //count for groups that are closing at the final cleraning phase\r
+    //or were closed during the most recent counting phase (newly closed)\r
+    \r
+    //for debugging\r
+    int how_many_cleaned;\r
+};\r
+\r
+\r
+// the function is called once at the initial initialization of the state\r
+void _sfun_state_clean_init_smart_sampling_state(void *s){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    state->count = 0;\r
+    state->gcount = 0;\r
+    state->fcount = 0;\r
+    state->z = 200; //need to figure out good initial value for z\r
+    //    cout << "clean init Z: " <<  state->z << "\n";\r
+    state->z_prev = 0;\r
+    //    cout << "clean init Z prev: " <<  state->z << "\n";\r
+    state->gamma = 2;\r
+    state->do_clean = 0;\r
+    state->bcount = 0;\r
+    state->s_size = 0;\r
+    state->final_z = 0;\r
+    state->time = 0;\r
+    state->count_closed = 0;\r
+    state->count_closed_flows = 0;\r
+    state->count_notsampled_new = 0;\r
+    \r
+    state->packet_count = 0;\r
+    state->do_clean_count = 0;\r
+    state->do_sooth = true;\r
+    \r
+    state->how_many_cleaned = 0;\r
+    state->delay = 0;\r
+    state->closing_now = 0;\r
+};\r
+\r
+\r
+// the function will be called at the beginning of every time window\r
+void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){\r
+    struct SSstate *state_new  = (struct SSstate *)s_new;\r
+    struct SSstate *state_old  = (struct SSstate *)s_old;\r
+    \r
+    //    cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";\r
+    //    cout << "***dirty current num of samples: " << curr_num_samples << "\n";\r
+    //    cout << "dirty init Z old: " <<  state_old->z << "\n";\r
+    if(curr_num_samples < state_old->s_size){\r
+        // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));\r
+        \r
+        //temp fix for early aggregation\r
+        state_new->z = (state_old->z)/2;\r
+        \r
+        if(state_old->do_sooth)\r
+            state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;\r
+    }\r
+    else {\r
+        if(curr_num_samples >= state_old->s_size){\r
+            //cout << "yes\n";\r
+            state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);\r
+            \r
+            if(state_old->do_sooth)\r
+                state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;\r
+        }\r
+    }\r
+    \r
+    if(state_new->z <= 1.0)\r
+        state_new->z = 1;\r
+    //    cout << "dirty init Z new: " <<  state_new->z << "\n";\r
+    \r
+    state_new->gamma = state_old->gamma;\r
+    state_new->do_clean = state_old->do_clean;\r
+    state_new->s_size = state_old->s_size;\r
+    state_new->bcount = 0;\r
+    state_new->gcount = 0;\r
+    state_new->count = 0;\r
+    state_new->fcount = 0;\r
+    state_new->final_z = 0;\r
+    state_new->time = 0;\r
+    state_new->count_closed = 0;\r
+    state_new->count_closed_flows = 0;\r
+    state_new->count_notsampled_new = state_old->count_notsampled_new;\r
+    \r
+    state_new->packet_count = 0;\r
+    state_new->do_clean_count = 0;\r
+    state_new->do_sooth = true;\r
+    \r
+    state_new->how_many_cleaned = 0;\r
+    state_new->delay = 0;\r
+    state_new->closing_now = 0;\r
+    \r
+    //  cout << "dirty init gamma: " <<   state_new->gamma << "\n";\r
+    //     cout << "dirty init do_clean: " <<  state_new->do_clean << "\n";\r
+    //     cout << "dirty init s_size: " <<  state_new->s_size << "\n";\r
+    //     cout << "dirty init bcount: " <<  state_new->bcount << "\n";\r
+    //     cout << "dirty init gcount: " <<  state_new->gcount << "\n";\r
+    //     cout << "dirty init count: " <<  state_new->count << "\n";\r
+    //     cout << "dirty init fcount: " <<  state_new->fcount << "\n";\r
+    //     cout << "dirty init final_z: " <<  state_new->final_z << "\n";\r
+    //     cout << "dirty init time " <<  state_new->time  << "\n";\r
+    //     cout << "dirty init count_closed: " <<  state_new->count_closed << "\n";\r
+    //     cout << "dirty init count_closed_flows: " <<  state_new->count_closed_flows << "\n";\r
+    \r
+    //     cout << "dirty init packet count: " <<  state_new->packet_count  << "\n";\r
+    //     cout << "dirty init do_clean_count: " <<  state_new->do_clean_count  << "\n";\r
+    \r
+};\r
+\r
+// function is called in two cases:\r
+// adjustment of threshold before the cleaning phase is being triggered\r
+// adjustment of threshold at the window border for the old state, this new\r
+// threshold will be used by partial_flush() and flush() functions (final_clean())\r
+\r
+void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    //  cout << "final init Z old: " <<  state->z << "\n";\r
+    \r
+    if(state->final_z == 0){\r
+        \r
+        // just returned from the counting phase\r
+        // case when the time window is changed right after the counting phase\r
+        if(state->count_closed == 1){\r
+            state->count_closed = 0;\r
+            state->count_closed_flows = 0;\r
+            state->how_many_cleaned = 0;\r
+        }\r
+        \r
+        state->z_prev = state->z;\r
+        //    cout << "final current num of samples: " << curr_num_samples << "\n";\r
+        //    cout << "final count not sampled new: " << state->count_notsampled_new << "\n";\r
+        // adjust count for current number of sampled based on statistics\r
+        // gathered in the previous time window\r
+        // otherwise we overestimate it\r
+        if(state->count_notsampled_new != 0)\r
+            curr_num_samples -= state->count_notsampled_new;\r
+        \r
+        if(curr_num_samples < state->s_size){\r
+            //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));\r
+            \r
+            //temp fix for early aggregation\r
+            state->z = state->z/2;\r
+        }\r
+        else {\r
+            if(curr_num_samples >= state->s_size){\r
+                state->z = state->z*((double)curr_num_samples/(double)state->s_size);\r
+            }\r
+        }\r
+        \r
+        if(state->z <= 0)\r
+            state->z = 1;\r
+        //    cout << "final init Z new: " <<  state->z << "\n";\r
+        \r
+        state->bcount = 0;\r
+        state->final_z = 1;\r
+        state->do_clean_count++;\r
+        state->count_notsampled_new = 0;\r
+        \r
+    }\r
+    \r
+    //  cout << "final init gamma: " <<   state->gamma << "\n";\r
+    //     cout << "final init do_clean: " <<  state->do_clean << "\n";\r
+    //     cout << "final init s_size: " <<  state->s_size << "\n";\r
+    //     cout << "final init bcount: " <<  state->bcount << "\n";\r
+    //     cout << "final init gcount: " <<  state->gcount << "\n";\r
+    //     cout << "final init count: " <<  state->count << "\n";\r
+    //     cout << "final init fcount: " <<  state->fcount << "\n";\r
+    //     cout << "final init final_z: " <<  state->final_z << "\n";\r
+    //     cout << "final init time " <<  state->time  << "\n";\r
+    //     cout << "final init count_closed: " <<  state->count_closed << "\n";\r
+    //     cout << "final init count_closed_flows: " <<  state->count_closed_flows << "\n";\r
+    \r
+    //     cout << "final init packet count: " <<  state->packet_count  << "\n";\r
+    //     cout << "final init do_clean_count: " <<  state->do_clean_count  << "\n";\r
+    \r
+};\r
+\r
+int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    state->packet_count++;\r
+    \r
+    state->s_size = sample_size;\r
+    int sampled = 0;\r
+    \r
+    //just returned from the cleaning phase\r
+    if(state->do_clean == 1){\r
+        state->gcount = 0;\r
+        state->do_clean = 0;\r
+    }\r
+    \r
+    //sampling\r
+    if(len > state->z){\r
+        state->bcount++;\r
+        sampled=1;\r
+    }\r
+    else{\r
+        state->count += len;\r
+        if(state->count >= state->z){\r
+            sampled=1;\r
+            state->count -= state->z;\r
+        }\r
+    }\r
+    return sampled;\r
+    \r
+};\r
+\r
+int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    state->packet_count++;\r
+    \r
+    state->s_size = sample_size;\r
+    int sampled = 0;\r
+    \r
+    //just returned from the counting phase\r
+    if(state->count_closed == 1)\r
+        //      cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";\r
+        \r
+        //just returned from the cleaning phase\r
+        if(state->do_clean == 1){\r
+            //      cout << "flows were sampled: " << state->how_many_cleaned << "\n";\r
+            state->how_many_cleaned = 0;\r
+            state->gcount = 0;\r
+            state->do_clean = 0;\r
+        }\r
+    \r
+    return 1;\r
+    \r
+};\r
+\r
+\r
+int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    state->do_sooth = true;\r
+    \r
+    // for ssample() where just returned from the clening phase\r
+    state->do_clean = 1;\r
+    \r
+    int sampled = 0;\r
+    double new_len = 0;\r
+    \r
+    if (glen < state->z_prev)\r
+        new_len = state->z_prev;\r
+    else\r
+        new_len = glen;\r
+    \r
+    //no need to clean\r
+    if(curr_num_samples <= state->s_size){\r
+        return 1;\r
+    }\r
+    else{\r
+        if(new_len > state->z){\r
+            sampled = 1;\r
+            state->bcount++;\r
+        }\r
+        else{\r
+            state->fcount += new_len;\r
+            if(state->fcount >= state->z){\r
+                sampled = 1;\r
+                state->fcount -= state->z;\r
+            }\r
+            //else{\r
+            //state->scount--;\r
+            //}\r
+        }\r
+        \r
+        return sampled;\r
+    }\r
+    \r
+};\r
+\r
+int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    state->do_sooth = true;\r
+    // only for reporting\r
+    state->delay = delay;\r
+    \r
+    //for ssample() where just returned from the clening phase\r
+    state->do_clean = 1;\r
+    \r
+    int sampled = 0;\r
+    double new_len = 0;\r
+    // group is newly closed or not closed yet\r
+    int new_group = 1;\r
+    \r
+    // the flow is closed\r
+    if((ccondition == 1)||((state->time-maxtime)>=delay)){\r
+        \r
+        new_group = 0;\r
+        \r
+        // TF case or TT case for ccondition and delay\r
+        if(ccondition == 1){\r
+            if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows\r
+                // the flow was closed in the previous counting slot\r
+                new_group = 1;\r
+            }\r
+        }\r
+        //FT case for ccondition and delay\r
+        else{\r
+            if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows\r
+                // the flow was closed in the previous counting slot\r
+                new_group = 1;\r
+            }\r
+        }\r
+        // the flow was closed earlier than previous counting slot\r
+        // it is closed and old and new_group = 0\r
+    }\r
+    \r
+    //adjust size only for old closed groups\r
+    if(new_group == 0){\r
+        if (glen < state->z_prev)\r
+            new_len = state->z_prev;\r
+        else\r
+            new_len = glen;\r
+    }\r
+    // newly closed group\r
+    else{\r
+        state->closing_now++;\r
+        new_len = glen;\r
+    }\r
+    \r
+    \r
+    //no need to clean\r
+    if(curr_num_samples <= state->s_size){\r
+        state->count_notsampled_new = 0;\r
+        return 1;\r
+    }\r
+    else{\r
+        if(new_len > state->z){\r
+            sampled = 1;\r
+            state->bcount++;\r
+        }\r
+        else{\r
+            state->fcount += new_len;\r
+            if(state->fcount >= state->z){\r
+                sampled = 1;\r
+                state->fcount -= state->z;\r
+            }\r
+            //count number of not sampled newly closed groups\r
+            //will use in the next time window at the final cleaning phase\r
+            else{\r
+                if(new_group == 1)\r
+                    state->count_notsampled_new++;\r
+            }\r
+            \r
+        }\r
+        \r
+        state->how_many_cleaned += sampled;\r
+        \r
+        return sampled;\r
+    }\r
+    \r
+};\r
+\r
+\r
+int ssdo_clean(void *s, int curr_num_samples){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    //emergency control\r
+    //bcount will be different after the cleaning phase\r
+    if(curr_num_samples > (state->gamma*state->s_size)){\r
+        state->z_prev = state->z;\r
+        state->z=(double)state->gamma*state->z;\r
+        state->do_clean = 1;\r
+        state->bcount = 0;\r
+        state->count = 0;\r
+        state->gcount = 0;\r
+        \r
+        state->do_clean_count++;\r
+    }\r
+    \r
+    return state->do_clean;\r
+};\r
+\r
+int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    // initialize timestamp with first seen packet from the current timewindow\r
+    if(state->time == 0)\r
+        state->time = maxtime;\r
+    \r
+    // detect next counting time slot\r
+    if(state->time != maxtime){\r
+        //      cout << "need to count\n";\r
+        state->time = maxtime;\r
+        state->count_closed = 1;\r
+        return 1;\r
+    }\r
+    \r
+    //emergency control\r
+    //bcount will be different after the cleaning phase\r
+    //if(curr_num_samples > (state->gamma*state->s_size)){\r
+    if(state->count_closed_flows > (state->gamma*state->s_size)){\r
+        //      cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";\r
+        state->z_prev = state->z;\r
+        //      cout << "do clean Z old: " <<  state->z << "\n";\r
+        state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;\r
+        //      cout << "do clean Z new: " <<  state->z << "\n";\r
+        state->do_clean = 1;\r
+        state->bcount = 0;\r
+        state->count = 0;\r
+        state->gcount = 0;\r
+        \r
+        state->do_clean_count++;\r
+    }\r
+    \r
+    //just returned from the counting iteration\r
+    if(state->count_closed == 1){\r
+        state->count_closed = 0;\r
+        //      cout << "number of closed flows: " << state->count_closed_flows << "\n";\r
+        state->count_closed_flows = 0;\r
+        //initialize gcount since was used for sampling of new closed flows during counting phase\r
+        state->gcount = 0;\r
+    }\r
+    \r
+    return state->do_clean;\r
+};\r
+\r
+double ssthreshold(void *s, int curr_num_samples){\r
+    struct SSstate *state = (struct SSstate *)s;\r
+    \r
+    return state->z;\r
+};\r
+\r
+int count_distinct(void *s, int curr_num_samples){\r
+    return curr_num_samples;\r
+};\r
+\r
+int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    //cleaning condition\r
+    int sampled = 0;\r
+    double new_len = 0;\r
+    \r
+    if (glen < state->z_prev)\r
+        new_len = state->z_prev;\r
+    else\r
+        new_len = glen;\r
+    \r
+    if(new_len > state->z){\r
+        state->bcount++;\r
+        sampled = 1;\r
+    }\r
+    else{\r
+        state->gcount += new_len;\r
+        if(state->gcount >= state->z){\r
+            sampled = 1;\r
+            state->gcount -= state->z;\r
+        }\r
+        //else{\r
+        //state->scount--;\r
+        //}\r
+    }\r
+    \r
+    return sampled;\r
+};\r
+\r
+int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){\r
+    struct SSstate *state  = (struct SSstate *)s;\r
+    \r
+    //cleaning condition\r
+    int sampled = 0;\r
+    double new_len = 0;\r
+    int new_group = 0;\r
+    \r
+    //need to count closed flows\r
+    if(state->count_closed == 1){\r
+        //the flow is closed\r
+        if((ccondition == 1)||((state->time-maxtime)>=delay)){\r
+            state->count_closed_flows++;\r
+            // TF case or TT case for ccondition and delay\r
+            if(ccondition == 1){\r
+                if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows\r
+                    new_group = 1;\r
+            }\r
+            //FT case for ccondition and delay\r
+            else{\r
+                if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows\r
+                    new_group = 1;\r
+            }\r
+        }\r
+        \r
+        // if flow is closed but old, no need to subsample it\r
+        if (new_group == 0)\r
+            return true;\r
+        \r
+    }\r
+    \r
+    // clean only closed flows FF case\r
+    // the flow is still open\r
+    if((ccondition == 0)&&((state->time-maxtime)<delay)){\r
+        return true;\r
+    }\r
+    \r
+    // use glen for a new group and z_prev for old one\r
+    if(new_group == 0){\r
+        if (glen < state->z_prev)\r
+            new_len = state->z_prev;\r
+        else\r
+            new_len = glen;\r
+    }\r
+    //the group is new\r
+    else{\r
+        new_len = glen;\r
+    }\r
+    \r
+    // at this point either flow is closed and old and we are at the cleaning phase\r
+    // or flow is closed and new and we are at the counting phase\r
+    if(new_len > state->z){\r
+        state->bcount++;\r
+        sampled = 1;\r
+    }\r
+    else{\r
+        state->gcount += new_len;\r
+        if(state->gcount >= state->z){\r
+            sampled = 1;\r
+            state->gcount -= state->z;\r
+        }\r
+        //new_group is not sampled during counting phase\r
+        else{\r
+            if(state->count_closed == 1)\r
+                state->count_closed_flows--;\r
+        }\r
+        \r
+    }\r
+    \r
+    if(state->do_clean == 1)\r
+        state->how_many_cleaned += sampled;\r
+    \r
+    return sampled;\r
+};\r
+\r
+int packet_count(void *s, int curr_num_samples){\r
+    struct SSstate *state = (struct SSstate *)s;  \r
+    return state->packet_count;\r
+};\r
+\r
+double gamma(void *s, int curr_num_samples){\r
+    struct SSstate *state = (struct SSstate *)s;  \r
+    return state->gamma;\r
+};\r
+\r
+int do_clean_count(void *s, int curr_num_samples){\r
+    struct SSstate *state = (struct SSstate *)s;  \r
+    return state->do_clean_count;\r
+};\r
+\r
+int delay(void *s, int curr_num_samples){\r
+    struct SSstate *state = (struct SSstate *)s;  \r
+    return state->delay;\r
+};\r
+\r
+// number of groups which are newly closed (in the most recent\r
+// counting phase or not closed at all during the final\r
+// cleaning phase\r
+int newly_closed(void *s, int curr_num_samples){\r
+    struct SSstate *state = (struct SSstate *)s;  \r
+    return state->closing_now;\r
+};\r
+\r
+\r
+\r
+\r