Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscphftaaux / SSstateful_count_distinct.cc
index 47e0ca8..0adec27 100644 (file)
-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
\r
- http://www.apache.org/licenses/LICENSE-2.0\r
\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-///////////////////////////////////////////////\r
-////   State, functions for adaptive smart sampling\r
-///            written by Irina Rozenbaum\r
-\r
-#include <iostream>\r
-#define max(x,y) ((x)>(y)?(x):(y))\r
-#define _BURST_SOOTHING_FACTOR 10.0\r
-\r
-struct SSstate {\r
-    //unsigned long long int count;   // count to sample small packets with certain probability\r
-    double count;\r
-    double gcount;  // count for clean_with() function\r
-    double fcount;  // count for final_clean() function\r
-    double z;       //z is the threshold for a size of the packet\r
-    double z_prev;  // value of threshold from the previos cleaning phase\r
-    double gamma;   //tolerance parameter for emergency control over the\r
-    //number of samples, should be >= 1\r
-    int do_clean; // trigger cleaning phase\r
-    int bcount;  //count for number of packets that exceed threshold ,\r
-    //need it for one of the variations of algorithms for\r
-    //threshold adjustment\r
-    int s_size;  //need to remember sample size for _sfun_state_init()\r
-    int final_z; //bool that indicated if z was adjusted to its filnal value\r
-    //before the final clean\r
-    int time;    //timestamp from previously processed packet\r
-    //used in detecting frequency of the clening phases with aggregation\r
-    \r
-    int packet_count; //count for number of packets per time window\r
-    int do_clean_count; //count number of cleaning phases per time window\r
-    bool do_sooth; //turn on _BURST_SOOTHING_FACTOR\r
-    \r
-    // flow sampling\r
-    int count_closed; //bool indicates that counting phase is being triggered\r
-    int count_closed_flows; //count for closed flows\r
-    int count_notsampled_new; //count for the number of new groups that were not sampled during\r
-    // the final sampling phase will be used in next time window to compute resonable\r
-    // approximation for threshold before final sampling phase\r
-    \r
-    // for reporting\r
-    int delay; //interval within which there was no packets that belong to the current flow\r
-    //thus flow is considered to be closed.\r
-    //parameter specified by the query\r
-    int closing_now; //count for groups that are closing at the final cleraning phase\r
-    //or were closed during the most recent counting phase (newly closed)\r
-    \r
-    //for debugging\r
-    int how_many_cleaned;\r
-};\r
-\r
-\r
-// the function is called once at the initial initialization of the state\r
-void _sfun_state_clean_init_smart_sampling_state(void *s){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    state->count = 0;\r
-    state->gcount = 0;\r
-    state->fcount = 0;\r
-    state->z = 200; //need to figure out good initial value for z\r
-    //    cout << "clean init Z: " <<  state->z << "\n";\r
-    state->z_prev = 0;\r
-    //    cout << "clean init Z prev: " <<  state->z << "\n";\r
-    state->gamma = 2;\r
-    state->do_clean = 0;\r
-    state->bcount = 0;\r
-    state->s_size = 0;\r
-    state->final_z = 0;\r
-    state->time = 0;\r
-    state->count_closed = 0;\r
-    state->count_closed_flows = 0;\r
-    state->count_notsampled_new = 0;\r
-    \r
-    state->packet_count = 0;\r
-    state->do_clean_count = 0;\r
-    state->do_sooth = true;\r
-    \r
-    state->how_many_cleaned = 0;\r
-    state->delay = 0;\r
-    state->closing_now = 0;\r
-};\r
-\r
-\r
-// the function will be called at the beginning of every time window\r
-void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){\r
-    struct SSstate *state_new  = (struct SSstate *)s_new;\r
-    struct SSstate *state_old  = (struct SSstate *)s_old;\r
-    \r
-    //    cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";\r
-    //    cout << "***dirty current num of samples: " << curr_num_samples << "\n";\r
-    //    cout << "dirty init Z old: " <<  state_old->z << "\n";\r
-    if(curr_num_samples < state_old->s_size){\r
-        // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));\r
-        \r
-        //temp fix for early aggregation\r
-        state_new->z = (state_old->z)/2;\r
-        \r
-        if(state_old->do_sooth)\r
-            state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;\r
-    }\r
-    else {\r
-        if(curr_num_samples >= state_old->s_size){\r
-            //cout << "yes\n";\r
-            state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);\r
-            \r
-            if(state_old->do_sooth)\r
-                state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;\r
-        }\r
-    }\r
-    \r
-    if(state_new->z <= 1.0)\r
-        state_new->z = 1;\r
-    //    cout << "dirty init Z new: " <<  state_new->z << "\n";\r
-    \r
-    state_new->gamma = state_old->gamma;\r
-    state_new->do_clean = state_old->do_clean;\r
-    state_new->s_size = state_old->s_size;\r
-    state_new->bcount = 0;\r
-    state_new->gcount = 0;\r
-    state_new->count = 0;\r
-    state_new->fcount = 0;\r
-    state_new->final_z = 0;\r
-    state_new->time = 0;\r
-    state_new->count_closed = 0;\r
-    state_new->count_closed_flows = 0;\r
-    state_new->count_notsampled_new = state_old->count_notsampled_new;\r
-    \r
-    state_new->packet_count = 0;\r
-    state_new->do_clean_count = 0;\r
-    state_new->do_sooth = true;\r
-    \r
-    state_new->how_many_cleaned = 0;\r
-    state_new->delay = 0;\r
-    state_new->closing_now = 0;\r
-    \r
-    //  cout << "dirty init gamma: " <<   state_new->gamma << "\n";\r
-    //     cout << "dirty init do_clean: " <<  state_new->do_clean << "\n";\r
-    //     cout << "dirty init s_size: " <<  state_new->s_size << "\n";\r
-    //     cout << "dirty init bcount: " <<  state_new->bcount << "\n";\r
-    //     cout << "dirty init gcount: " <<  state_new->gcount << "\n";\r
-    //     cout << "dirty init count: " <<  state_new->count << "\n";\r
-    //     cout << "dirty init fcount: " <<  state_new->fcount << "\n";\r
-    //     cout << "dirty init final_z: " <<  state_new->final_z << "\n";\r
-    //     cout << "dirty init time " <<  state_new->time  << "\n";\r
-    //     cout << "dirty init count_closed: " <<  state_new->count_closed << "\n";\r
-    //     cout << "dirty init count_closed_flows: " <<  state_new->count_closed_flows << "\n";\r
-    \r
-    //     cout << "dirty init packet count: " <<  state_new->packet_count  << "\n";\r
-    //     cout << "dirty init do_clean_count: " <<  state_new->do_clean_count  << "\n";\r
-    \r
-};\r
-\r
-// function is called in two cases:\r
-// adjustment of threshold before the cleaning phase is being triggered\r
-// adjustment of threshold at the window border for the old state, this new\r
-// threshold will be used by partial_flush() and flush() functions (final_clean())\r
-\r
-void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    //  cout << "final init Z old: " <<  state->z << "\n";\r
-    \r
-    if(state->final_z == 0){\r
-        \r
-        // just returned from the counting phase\r
-        // case when the time window is changed right after the counting phase\r
-        if(state->count_closed == 1){\r
-            state->count_closed = 0;\r
-            state->count_closed_flows = 0;\r
-            state->how_many_cleaned = 0;\r
-        }\r
-        \r
-        state->z_prev = state->z;\r
-        //    cout << "final current num of samples: " << curr_num_samples << "\n";\r
-        //    cout << "final count not sampled new: " << state->count_notsampled_new << "\n";\r
-        // adjust count for current number of sampled based on statistics\r
-        // gathered in the previous time window\r
-        // otherwise we overestimate it\r
-        if(state->count_notsampled_new != 0)\r
-            curr_num_samples -= state->count_notsampled_new;\r
-        \r
-        if(curr_num_samples < state->s_size){\r
-            //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));\r
-            \r
-            //temp fix for early aggregation\r
-            state->z = state->z/2;\r
-        }\r
-        else {\r
-            if(curr_num_samples >= state->s_size){\r
-                state->z = state->z*((double)curr_num_samples/(double)state->s_size);\r
-            }\r
-        }\r
-        \r
-        if(state->z <= 0)\r
-            state->z = 1;\r
-        //    cout << "final init Z new: " <<  state->z << "\n";\r
-        \r
-        state->bcount = 0;\r
-        state->final_z = 1;\r
-        state->do_clean_count++;\r
-        state->count_notsampled_new = 0;\r
-        \r
-    }\r
-    \r
-    //  cout << "final init gamma: " <<   state->gamma << "\n";\r
-    //     cout << "final init do_clean: " <<  state->do_clean << "\n";\r
-    //     cout << "final init s_size: " <<  state->s_size << "\n";\r
-    //     cout << "final init bcount: " <<  state->bcount << "\n";\r
-    //     cout << "final init gcount: " <<  state->gcount << "\n";\r
-    //     cout << "final init count: " <<  state->count << "\n";\r
-    //     cout << "final init fcount: " <<  state->fcount << "\n";\r
-    //     cout << "final init final_z: " <<  state->final_z << "\n";\r
-    //     cout << "final init time " <<  state->time  << "\n";\r
-    //     cout << "final init count_closed: " <<  state->count_closed << "\n";\r
-    //     cout << "final init count_closed_flows: " <<  state->count_closed_flows << "\n";\r
-    \r
-    //     cout << "final init packet count: " <<  state->packet_count  << "\n";\r
-    //     cout << "final init do_clean_count: " <<  state->do_clean_count  << "\n";\r
-    \r
-};\r
-\r
-int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    state->packet_count++;\r
-    \r
-    state->s_size = sample_size;\r
-    int sampled = 0;\r
-    \r
-    //just returned from the cleaning phase\r
-    if(state->do_clean == 1){\r
-        state->gcount = 0;\r
-        state->do_clean = 0;\r
-    }\r
-    \r
-    //sampling\r
-    if(len > state->z){\r
-        state->bcount++;\r
-        sampled=1;\r
-    }\r
-    else{\r
-        state->count += len;\r
-        if(state->count >= state->z){\r
-            sampled=1;\r
-            state->count -= state->z;\r
-        }\r
-    }\r
-    return sampled;\r
-    \r
-};\r
-\r
-int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    state->packet_count++;\r
-    \r
-    state->s_size = sample_size;\r
-    int sampled = 0;\r
-    \r
-    //just returned from the counting phase\r
-    if(state->count_closed == 1)\r
-        //      cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";\r
-        \r
-        //just returned from the cleaning phase\r
-        if(state->do_clean == 1){\r
-            //      cout << "flows were sampled: " << state->how_many_cleaned << "\n";\r
-            state->how_many_cleaned = 0;\r
-            state->gcount = 0;\r
-            state->do_clean = 0;\r
-        }\r
-    \r
-    return 1;\r
-    \r
-};\r
-\r
-\r
-int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    state->do_sooth = true;\r
-    \r
-    // for ssample() where just returned from the clening phase\r
-    state->do_clean = 1;\r
-    \r
-    int sampled = 0;\r
-    double new_len = 0;\r
-    \r
-    if (glen < state->z_prev)\r
-        new_len = state->z_prev;\r
-    else\r
-        new_len = glen;\r
-    \r
-    //no need to clean\r
-    if(curr_num_samples <= state->s_size){\r
-        return 1;\r
-    }\r
-    else{\r
-        if(new_len > state->z){\r
-            sampled = 1;\r
-            state->bcount++;\r
-        }\r
-        else{\r
-            state->fcount += new_len;\r
-            if(state->fcount >= state->z){\r
-                sampled = 1;\r
-                state->fcount -= state->z;\r
-            }\r
-            //else{\r
-            //state->scount--;\r
-            //}\r
-        }\r
-        \r
-        return sampled;\r
-    }\r
-    \r
-};\r
-\r
-int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    state->do_sooth = true;\r
-    // only for reporting\r
-    state->delay = delay;\r
-    \r
-    //for ssample() where just returned from the clening phase\r
-    state->do_clean = 1;\r
-    \r
-    int sampled = 0;\r
-    double new_len = 0;\r
-    // group is newly closed or not closed yet\r
-    int new_group = 1;\r
-    \r
-    // the flow is closed\r
-    if((ccondition == 1)||((state->time-maxtime)>=delay)){\r
-        \r
-        new_group = 0;\r
-        \r
-        // TF case or TT case for ccondition and delay\r
-        if(ccondition == 1){\r
-            if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows\r
-                // the flow was closed in the previous counting slot\r
-                new_group = 1;\r
-            }\r
-        }\r
-        //FT case for ccondition and delay\r
-        else{\r
-            if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows\r
-                // the flow was closed in the previous counting slot\r
-                new_group = 1;\r
-            }\r
-        }\r
-        // the flow was closed earlier than previous counting slot\r
-        // it is closed and old and new_group = 0\r
-    }\r
-    \r
-    //adjust size only for old closed groups\r
-    if(new_group == 0){\r
-        if (glen < state->z_prev)\r
-            new_len = state->z_prev;\r
-        else\r
-            new_len = glen;\r
-    }\r
-    // newly closed group\r
-    else{\r
-        state->closing_now++;\r
-        new_len = glen;\r
-    }\r
-    \r
-    \r
-    //no need to clean\r
-    if(curr_num_samples <= state->s_size){\r
-        state->count_notsampled_new = 0;\r
-        return 1;\r
-    }\r
-    else{\r
-        if(new_len > state->z){\r
-            sampled = 1;\r
-            state->bcount++;\r
-        }\r
-        else{\r
-            state->fcount += new_len;\r
-            if(state->fcount >= state->z){\r
-                sampled = 1;\r
-                state->fcount -= state->z;\r
-            }\r
-            //count number of not sampled newly closed groups\r
-            //will use in the next time window at the final cleaning phase\r
-            else{\r
-                if(new_group == 1)\r
-                    state->count_notsampled_new++;\r
-            }\r
-            \r
-        }\r
-        \r
-        state->how_many_cleaned += sampled;\r
-        \r
-        return sampled;\r
-    }\r
-    \r
-};\r
-\r
-\r
-int ssdo_clean(void *s, int curr_num_samples){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    //emergency control\r
-    //bcount will be different after the cleaning phase\r
-    if(curr_num_samples > (state->gamma*state->s_size)){\r
-        state->z_prev = state->z;\r
-        state->z=(double)state->gamma*state->z;\r
-        state->do_clean = 1;\r
-        state->bcount = 0;\r
-        state->count = 0;\r
-        state->gcount = 0;\r
-        \r
-        state->do_clean_count++;\r
-    }\r
-    \r
-    return state->do_clean;\r
-};\r
-\r
-int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    // initialize timestamp with first seen packet from the current timewindow\r
-    if(state->time == 0)\r
-        state->time = maxtime;\r
-    \r
-    // detect next counting time slot\r
-    if(state->time != maxtime){\r
-        //      cout << "need to count\n";\r
-        state->time = maxtime;\r
-        state->count_closed = 1;\r
-        return 1;\r
-    }\r
-    \r
-    //emergency control\r
-    //bcount will be different after the cleaning phase\r
-    //if(curr_num_samples > (state->gamma*state->s_size)){\r
-    if(state->count_closed_flows > (state->gamma*state->s_size)){\r
-        //      cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";\r
-        state->z_prev = state->z;\r
-        //      cout << "do clean Z old: " <<  state->z << "\n";\r
-        state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;\r
-        //      cout << "do clean Z new: " <<  state->z << "\n";\r
-        state->do_clean = 1;\r
-        state->bcount = 0;\r
-        state->count = 0;\r
-        state->gcount = 0;\r
-        \r
-        state->do_clean_count++;\r
-    }\r
-    \r
-    //just returned from the counting iteration\r
-    if(state->count_closed == 1){\r
-        state->count_closed = 0;\r
-        //      cout << "number of closed flows: " << state->count_closed_flows << "\n";\r
-        state->count_closed_flows = 0;\r
-        //initialize gcount since was used for sampling of new closed flows during counting phase\r
-        state->gcount = 0;\r
-    }\r
-    \r
-    return state->do_clean;\r
-};\r
-\r
-double ssthreshold(void *s, int curr_num_samples){\r
-    struct SSstate *state = (struct SSstate *)s;\r
-    \r
-    return state->z;\r
-};\r
-\r
-int count_distinct(void *s, int curr_num_samples){\r
-    return curr_num_samples;\r
-};\r
-\r
-int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    //cleaning condition\r
-    int sampled = 0;\r
-    double new_len = 0;\r
-    \r
-    if (glen < state->z_prev)\r
-        new_len = state->z_prev;\r
-    else\r
-        new_len = glen;\r
-    \r
-    if(new_len > state->z){\r
-        state->bcount++;\r
-        sampled = 1;\r
-    }\r
-    else{\r
-        state->gcount += new_len;\r
-        if(state->gcount >= state->z){\r
-            sampled = 1;\r
-            state->gcount -= state->z;\r
-        }\r
-        //else{\r
-        //state->scount--;\r
-        //}\r
-    }\r
-    \r
-    return sampled;\r
-};\r
-\r
-int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){\r
-    struct SSstate *state  = (struct SSstate *)s;\r
-    \r
-    //cleaning condition\r
-    int sampled = 0;\r
-    double new_len = 0;\r
-    int new_group = 0;\r
-    \r
-    //need to count closed flows\r
-    if(state->count_closed == 1){\r
-        //the flow is closed\r
-        if((ccondition == 1)||((state->time-maxtime)>=delay)){\r
-            state->count_closed_flows++;\r
-            // TF case or TT case for ccondition and delay\r
-            if(ccondition == 1){\r
-                if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows\r
-                    new_group = 1;\r
-            }\r
-            //FT case for ccondition and delay\r
-            else{\r
-                if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows\r
-                    new_group = 1;\r
-            }\r
-        }\r
-        \r
-        // if flow is closed but old, no need to subsample it\r
-        if (new_group == 0)\r
-            return true;\r
-        \r
-    }\r
-    \r
-    // clean only closed flows FF case\r
-    // the flow is still open\r
-    if((ccondition == 0)&&((state->time-maxtime)<delay)){\r
-        return true;\r
-    }\r
-    \r
-    // use glen for a new group and z_prev for old one\r
-    if(new_group == 0){\r
-        if (glen < state->z_prev)\r
-            new_len = state->z_prev;\r
-        else\r
-            new_len = glen;\r
-    }\r
-    //the group is new\r
-    else{\r
-        new_len = glen;\r
-    }\r
-    \r
-    // at this point either flow is closed and old and we are at the cleaning phase\r
-    // or flow is closed and new and we are at the counting phase\r
-    if(new_len > state->z){\r
-        state->bcount++;\r
-        sampled = 1;\r
-    }\r
-    else{\r
-        state->gcount += new_len;\r
-        if(state->gcount >= state->z){\r
-            sampled = 1;\r
-            state->gcount -= state->z;\r
-        }\r
-        //new_group is not sampled during counting phase\r
-        else{\r
-            if(state->count_closed == 1)\r
-                state->count_closed_flows--;\r
-        }\r
-        \r
-    }\r
-    \r
-    if(state->do_clean == 1)\r
-        state->how_many_cleaned += sampled;\r
-    \r
-    return sampled;\r
-};\r
-\r
-int packet_count(void *s, int curr_num_samples){\r
-    struct SSstate *state = (struct SSstate *)s;  \r
-    return state->packet_count;\r
-};\r
-\r
-double gamma(void *s, int curr_num_samples){\r
-    struct SSstate *state = (struct SSstate *)s;  \r
-    return state->gamma;\r
-};\r
-\r
-int do_clean_count(void *s, int curr_num_samples){\r
-    struct SSstate *state = (struct SSstate *)s;  \r
-    return state->do_clean_count;\r
-};\r
-\r
-int delay(void *s, int curr_num_samples){\r
-    struct SSstate *state = (struct SSstate *)s;  \r
-    return state->delay;\r
-};\r
-\r
-// number of groups which are newly closed (in the most recent\r
-// counting phase or not closed at all during the final\r
-// cleaning phase\r
-int newly_closed(void *s, int curr_num_samples){\r
-    struct SSstate *state = (struct SSstate *)s;  \r
-    return state->closing_now;\r
-};\r
-\r
-\r
-\r
-\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+///////////////////////////////////////////////
+////   State, functions for adaptive smart sampling
+///            written by Irina Rozenbaum
+
+#include <iostream>
+#define max(x,y) ((x)>(y)?(x):(y))
+#define _BURST_SOOTHING_FACTOR 10.0
+
+struct SSstate {
+    //unsigned long long int count;   // count to sample small packets with certain probability
+    double count;
+    double gcount;  // count for clean_with() function
+    double fcount;  // count for final_clean() function
+    double z;       //z is the threshold for a size of the packet
+    double z_prev;  // value of threshold from the previos cleaning phase
+    double gamma;   //tolerance parameter for emergency control over the
+    //number of samples, should be >= 1
+    int do_clean; // trigger cleaning phase
+    int bcount;  //count for number of packets that exceed threshold ,
+    //need it for one of the variations of algorithms for
+    //threshold adjustment
+    int s_size;  //need to remember sample size for _sfun_state_init()
+    int final_z; //bool that indicated if z was adjusted to its filnal value
+    //before the final clean
+    int time;    //timestamp from previously processed packet
+    //used in detecting frequency of the clening phases with aggregation
+    
+    int packet_count; //count for number of packets per time window
+    int do_clean_count; //count number of cleaning phases per time window
+    bool do_sooth; //turn on _BURST_SOOTHING_FACTOR
+    
+    // flow sampling
+    int count_closed; //bool indicates that counting phase is being triggered
+    int count_closed_flows; //count for closed flows
+    int count_notsampled_new; //count for the number of new groups that were not sampled during
+    // the final sampling phase will be used in next time window to compute resonable
+    // approximation for threshold before final sampling phase
+    
+    // for reporting
+    int delay; //interval within which there was no packets that belong to the current flow
+    //thus flow is considered to be closed.
+    //parameter specified by the query
+    int closing_now; //count for groups that are closing at the final cleraning phase
+    //or were closed during the most recent counting phase (newly closed)
+    
+    //for debugging
+    int how_many_cleaned;
+};
+
+
+// the function is called once at the initial initialization of the state
+void _sfun_state_clean_init_smart_sampling_state(void *s){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    state->count = 0;
+    state->gcount = 0;
+    state->fcount = 0;
+    state->z = 200; //need to figure out good initial value for z
+    //    cout << "clean init Z: " <<  state->z << "\n";
+    state->z_prev = 0;
+    //    cout << "clean init Z prev: " <<  state->z << "\n";
+    state->gamma = 2;
+    state->do_clean = 0;
+    state->bcount = 0;
+    state->s_size = 0;
+    state->final_z = 0;
+    state->time = 0;
+    state->count_closed = 0;
+    state->count_closed_flows = 0;
+    state->count_notsampled_new = 0;
+    
+    state->packet_count = 0;
+    state->do_clean_count = 0;
+    state->do_sooth = true;
+    
+    state->how_many_cleaned = 0;
+    state->delay = 0;
+    state->closing_now = 0;
+};
+
+
+// the function will be called at the beginning of every time window
+void _sfun_state_dirty_init_smart_sampling_state(void *s_new, void *s_old, int curr_num_samples){
+    struct SSstate *state_new  = (struct SSstate *)s_new;
+    struct SSstate *state_old  = (struct SSstate *)s_old;
+    
+    //    cout << "**flows were sampled: " << state_old->how_many_cleaned << "\n";
+    //    cout << "***dirty current num of samples: " << curr_num_samples << "\n";
+    //    cout << "dirty init Z old: " <<  state_old->z << "\n";
+    if(curr_num_samples < state_old->s_size){
+        // state_new->z = state_old->z*((max((double)curr_num_samples-(double)state_old->bcount,1))/((double)state_old->s_size-(double)state_old->bcount));
+        
+        //temp fix for early aggregation
+        state_new->z = (state_old->z)/2;
+        
+        if(state_old->do_sooth)
+            state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
+    }
+    else {
+        if(curr_num_samples >= state_old->s_size){
+            //cout << "yes\n";
+            state_new->z = state_old->z*((double)curr_num_samples/(double)state_old->s_size);
+            
+            if(state_old->do_sooth)
+                state_new->z = state_old->z / _BURST_SOOTHING_FACTOR;
+        }
+    }
+    
+    if(state_new->z <= 1.0)
+        state_new->z = 1;
+    //    cout << "dirty init Z new: " <<  state_new->z << "\n";
+    
+    state_new->gamma = state_old->gamma;
+    state_new->do_clean = state_old->do_clean;
+    state_new->s_size = state_old->s_size;
+    state_new->bcount = 0;
+    state_new->gcount = 0;
+    state_new->count = 0;
+    state_new->fcount = 0;
+    state_new->final_z = 0;
+    state_new->time = 0;
+    state_new->count_closed = 0;
+    state_new->count_closed_flows = 0;
+    state_new->count_notsampled_new = state_old->count_notsampled_new;
+    
+    state_new->packet_count = 0;
+    state_new->do_clean_count = 0;
+    state_new->do_sooth = true;
+    
+    state_new->how_many_cleaned = 0;
+    state_new->delay = 0;
+    state_new->closing_now = 0;
+    
+    //  cout << "dirty init gamma: " <<   state_new->gamma << "\n";
+    //     cout << "dirty init do_clean: " <<  state_new->do_clean << "\n";
+    //     cout << "dirty init s_size: " <<  state_new->s_size << "\n";
+    //     cout << "dirty init bcount: " <<  state_new->bcount << "\n";
+    //     cout << "dirty init gcount: " <<  state_new->gcount << "\n";
+    //     cout << "dirty init count: " <<  state_new->count << "\n";
+    //     cout << "dirty init fcount: " <<  state_new->fcount << "\n";
+    //     cout << "dirty init final_z: " <<  state_new->final_z << "\n";
+    //     cout << "dirty init time " <<  state_new->time  << "\n";
+    //     cout << "dirty init count_closed: " <<  state_new->count_closed << "\n";
+    //     cout << "dirty init count_closed_flows: " <<  state_new->count_closed_flows << "\n";
+    
+    //     cout << "dirty init packet count: " <<  state_new->packet_count  << "\n";
+    //     cout << "dirty init do_clean_count: " <<  state_new->do_clean_count  << "\n";
+    
+};
+
+// function is called in two cases:
+// adjustment of threshold before the cleaning phase is being triggered
+// adjustment of threshold at the window border for the old state, this new
+// threshold will be used by partial_flush() and flush() functions (final_clean())
+
+void _sfun_state_final_init_smart_sampling_state(void *s, int curr_num_samples){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    //  cout << "final init Z old: " <<  state->z << "\n";
+    
+    if(state->final_z == 0){
+        
+        // just returned from the counting phase
+        // case when the time window is changed right after the counting phase
+        if(state->count_closed == 1){
+            state->count_closed = 0;
+            state->count_closed_flows = 0;
+            state->how_many_cleaned = 0;
+        }
+        
+        state->z_prev = state->z;
+        //    cout << "final current num of samples: " << curr_num_samples << "\n";
+        //    cout << "final count not sampled new: " << state->count_notsampled_new << "\n";
+        // adjust count for current number of sampled based on statistics
+        // gathered in the previous time window
+        // otherwise we overestimate it
+        if(state->count_notsampled_new != 0)
+            curr_num_samples -= state->count_notsampled_new;
+        
+        if(curr_num_samples < state->s_size){
+            //state->z = state->z*((max((double)curr_num_samples-(double)state->bcount,1))/((double)state->s_size-(double)state->bcount));
+            
+            //temp fix for early aggregation
+            state->z = state->z/2;
+        }
+        else {
+            if(curr_num_samples >= state->s_size){
+                state->z = state->z*((double)curr_num_samples/(double)state->s_size);
+            }
+        }
+        
+        if(state->z <= 0)
+            state->z = 1;
+        //    cout << "final init Z new: " <<  state->z << "\n";
+        
+        state->bcount = 0;
+        state->final_z = 1;
+        state->do_clean_count++;
+        state->count_notsampled_new = 0;
+        
+    }
+    
+    //  cout << "final init gamma: " <<   state->gamma << "\n";
+    //     cout << "final init do_clean: " <<  state->do_clean << "\n";
+    //     cout << "final init s_size: " <<  state->s_size << "\n";
+    //     cout << "final init bcount: " <<  state->bcount << "\n";
+    //     cout << "final init gcount: " <<  state->gcount << "\n";
+    //     cout << "final init count: " <<  state->count << "\n";
+    //     cout << "final init fcount: " <<  state->fcount << "\n";
+    //     cout << "final init final_z: " <<  state->final_z << "\n";
+    //     cout << "final init time " <<  state->time  << "\n";
+    //     cout << "final init count_closed: " <<  state->count_closed << "\n";
+    //     cout << "final init count_closed_flows: " <<  state->count_closed_flows << "\n";
+    
+    //     cout << "final init packet count: " <<  state->packet_count  << "\n";
+    //     cout << "final init do_clean_count: " <<  state->do_clean_count  << "\n";
+    
+};
+
+int ssample(void *s,int curr_num_samples, unsigned long long int len, unsigned int sample_size){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    state->packet_count++;
+    
+    state->s_size = sample_size;
+    int sampled = 0;
+    
+    //just returned from the cleaning phase
+    if(state->do_clean == 1){
+        state->gcount = 0;
+        state->do_clean = 0;
+    }
+    
+    //sampling
+    if(len > state->z){
+        state->bcount++;
+        sampled=1;
+    }
+    else{
+        state->count += len;
+        if(state->count >= state->z){
+            sampled=1;
+            state->count -= state->z;
+        }
+    }
+    return sampled;
+    
+};
+
+int flow_ssample(void *s,int curr_num_samples, unsigned int sample_size){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    state->packet_count++;
+    
+    state->s_size = sample_size;
+    int sampled = 0;
+    
+    //just returned from the counting phase
+    if(state->count_closed == 1)
+        //      cout << "closed flows after counting phase: " << state->count_closed_flows << "\n";
+        
+        //just returned from the cleaning phase
+        if(state->do_clean == 1){
+            //      cout << "flows were sampled: " << state->how_many_cleaned << "\n";
+            state->how_many_cleaned = 0;
+            state->gcount = 0;
+            state->do_clean = 0;
+        }
+    
+    return 1;
+    
+};
+
+
+int ssfinal_clean(void *s, int curr_num_samples, unsigned long long int glen){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    state->do_sooth = true;
+    
+    // for ssample() where just returned from the clening phase
+    state->do_clean = 1;
+    
+    int sampled = 0;
+    double new_len = 0;
+    
+    if (glen < state->z_prev)
+        new_len = state->z_prev;
+    else
+        new_len = glen;
+    
+    //no need to clean
+    if(curr_num_samples <= state->s_size){
+        return 1;
+    }
+    else{
+        if(new_len > state->z){
+            sampled = 1;
+            state->bcount++;
+        }
+        else{
+            state->fcount += new_len;
+            if(state->fcount >= state->z){
+                sampled = 1;
+                state->fcount -= state->z;
+            }
+            //else{
+            //state->scount--;
+            //}
+        }
+        
+        return sampled;
+    }
+    
+};
+
+int flow_ssfinal_clean(void *s, int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    state->do_sooth = true;
+    // only for reporting
+    state->delay = delay;
+    
+    //for ssample() where just returned from the clening phase
+    state->do_clean = 1;
+    
+    int sampled = 0;
+    double new_len = 0;
+    // group is newly closed or not closed yet
+    int new_group = 1;
+    
+    // the flow is closed
+    if((ccondition == 1)||((state->time-maxtime)>=delay)){
+        
+        new_group = 0;
+        
+        // TF case or TT case for ccondition and delay
+        if(ccondition == 1){
+            if((state->time-maxtime) <= 1){ // 1 is time interval for count of closed flows
+                // the flow was closed in the previous counting slot
+                new_group = 1;
+            }
+        }
+        //FT case for ccondition and delay
+        else{
+            if((state->time-maxtime) <= (delay+1)){ // 1 is time interval for count of closed flows
+                // the flow was closed in the previous counting slot
+                new_group = 1;
+            }
+        }
+        // the flow was closed earlier than previous counting slot
+        // it is closed and old and new_group = 0
+    }
+    
+    //adjust size only for old closed groups
+    if(new_group == 0){
+        if (glen < state->z_prev)
+            new_len = state->z_prev;
+        else
+            new_len = glen;
+    }
+    // newly closed group
+    else{
+        state->closing_now++;
+        new_len = glen;
+    }
+    
+    
+    //no need to clean
+    if(curr_num_samples <= state->s_size){
+        state->count_notsampled_new = 0;
+        return 1;
+    }
+    else{
+        if(new_len > state->z){
+            sampled = 1;
+            state->bcount++;
+        }
+        else{
+            state->fcount += new_len;
+            if(state->fcount >= state->z){
+                sampled = 1;
+                state->fcount -= state->z;
+            }
+            //count number of not sampled newly closed groups
+            //will use in the next time window at the final cleaning phase
+            else{
+                if(new_group == 1)
+                    state->count_notsampled_new++;
+            }
+            
+        }
+        
+        state->how_many_cleaned += sampled;
+        
+        return sampled;
+    }
+    
+};
+
+
+int ssdo_clean(void *s, int curr_num_samples){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    //emergency control
+    //bcount will be different after the cleaning phase
+    if(curr_num_samples > (state->gamma*state->s_size)){
+        state->z_prev = state->z;
+        state->z=(double)state->gamma*state->z;
+        state->do_clean = 1;
+        state->bcount = 0;
+        state->count = 0;
+        state->gcount = 0;
+        
+        state->do_clean_count++;
+    }
+    
+    return state->do_clean;
+};
+
+int flow_ssdo_clean(void *s, int curr_num_samples, unsigned int maxtime){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    // initialize timestamp with first seen packet from the current timewindow
+    if(state->time == 0)
+        state->time = maxtime;
+    
+    // detect next counting time slot
+    if(state->time != maxtime){
+        //      cout << "need to count\n";
+        state->time = maxtime;
+        state->count_closed = 1;
+        return 1;
+    }
+    
+    //emergency control
+    //bcount will be different after the cleaning phase
+    //if(curr_num_samples > (state->gamma*state->s_size)){
+    if(state->count_closed_flows > (state->gamma*state->s_size)){
+        //      cout << "need to clean, num closed flows: " << state->count_closed_flows << "\n";
+        state->z_prev = state->z;
+        //      cout << "do clean Z old: " <<  state->z << "\n";
+        state->z=(((double)state->count_closed_flows)/((double)state->s_size))*state->z;
+        //      cout << "do clean Z new: " <<  state->z << "\n";
+        state->do_clean = 1;
+        state->bcount = 0;
+        state->count = 0;
+        state->gcount = 0;
+        
+        state->do_clean_count++;
+    }
+    
+    //just returned from the counting iteration
+    if(state->count_closed == 1){
+        state->count_closed = 0;
+        //      cout << "number of closed flows: " << state->count_closed_flows << "\n";
+        state->count_closed_flows = 0;
+        //initialize gcount since was used for sampling of new closed flows during counting phase
+        state->gcount = 0;
+    }
+    
+    return state->do_clean;
+};
+
+double ssthreshold(void *s, int curr_num_samples){
+    struct SSstate *state = (struct SSstate *)s;
+    
+    return state->z;
+};
+
+int count_distinct(void *s, int curr_num_samples){
+    return curr_num_samples;
+};
+
+int ssclean_with(void *s,int curr_num_samples, unsigned long long int glen){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    //cleaning condition
+    int sampled = 0;
+    double new_len = 0;
+    
+    if (glen < state->z_prev)
+        new_len = state->z_prev;
+    else
+        new_len = glen;
+    
+    if(new_len > state->z){
+        state->bcount++;
+        sampled = 1;
+    }
+    else{
+        state->gcount += new_len;
+        if(state->gcount >= state->z){
+            sampled = 1;
+            state->gcount -= state->z;
+        }
+        //else{
+        //state->scount--;
+        //}
+    }
+    
+    return sampled;
+};
+
+int flow_ssclean_with(void *s,int curr_num_samples, unsigned int ccondition, unsigned int delay, unsigned int maxtime, unsigned long long int glen){
+    struct SSstate *state  = (struct SSstate *)s;
+    
+    //cleaning condition
+    int sampled = 0;
+    double new_len = 0;
+    int new_group = 0;
+    
+    //need to count closed flows
+    if(state->count_closed == 1){
+        //the flow is closed
+        if((ccondition == 1)||((state->time-maxtime)>=delay)){
+            state->count_closed_flows++;
+            // TF case or TT case for ccondition and delay
+            if(ccondition == 1){
+                if((state->time-maxtime) <= 1) //1 is time interval for count of closed flows
+                    new_group = 1;
+            }
+            //FT case for ccondition and delay
+            else{
+                if((state->time-maxtime) <= (delay+1)) //1 is time interval for count of closed flows
+                    new_group = 1;
+            }
+        }
+        
+        // if flow is closed but old, no need to subsample it
+        if (new_group == 0)
+            return true;
+        
+    }
+    
+    // clean only closed flows FF case
+    // the flow is still open
+    if((ccondition == 0)&&((state->time-maxtime)<delay)){
+        return true;
+    }
+    
+    // use glen for a new group and z_prev for old one
+    if(new_group == 0){
+        if (glen < state->z_prev)
+            new_len = state->z_prev;
+        else
+            new_len = glen;
+    }
+    //the group is new
+    else{
+        new_len = glen;
+    }
+    
+    // at this point either flow is closed and old and we are at the cleaning phase
+    // or flow is closed and new and we are at the counting phase
+    if(new_len > state->z){
+        state->bcount++;
+        sampled = 1;
+    }
+    else{
+        state->gcount += new_len;
+        if(state->gcount >= state->z){
+            sampled = 1;
+            state->gcount -= state->z;
+        }
+        //new_group is not sampled during counting phase
+        else{
+            if(state->count_closed == 1)
+                state->count_closed_flows--;
+        }
+        
+    }
+    
+    if(state->do_clean == 1)
+        state->how_many_cleaned += sampled;
+    
+    return sampled;
+};
+
+int packet_count(void *s, int curr_num_samples){
+    struct SSstate *state = (struct SSstate *)s;  
+    return state->packet_count;
+};
+
+double gamma(void *s, int curr_num_samples){
+    struct SSstate *state = (struct SSstate *)s;  
+    return state->gamma;
+};
+
+int do_clean_count(void *s, int curr_num_samples){
+    struct SSstate *state = (struct SSstate *)s;  
+    return state->do_clean_count;
+};
+
+int delay(void *s, int curr_num_samples){
+    struct SSstate *state = (struct SSstate *)s;  
+    return state->delay;
+};
+
+// number of groups which are newly closed (in the most recent
+// counting phase or not closed at all during the final
+// cleaning phase
+int newly_closed(void *s, int curr_num_samples){
+    struct SSstate *state = (struct SSstate *)s;  
+    return state->closing_now;
+};
+
+
+
+