Improvements to aggregation code and fucntion library 49/4649/2
authorvlad shkapenyuk <vshkap@research.att.com>
Mon, 31 Aug 2020 13:40:41 +0000 (09:40 -0400)
committerVlad Shkapenyuk <vshkap@research.att.com>
Mon, 31 Aug 2020 13:45:13 +0000 (13:45 +0000)
Signed-off-by: vlad shkapenyuk <vshkap@research.att.com>
Change-Id: I9282ff58d9add29eedfe89a6b4b6e1049282250c

23 files changed:
bin/start_processing
cfg/external_fcns.def
include/gsoptions.h
include/hfta/groupby_operator.h
include/hfta/groupby_slowflush_operator.h [new file with mode: 0644]
include/hfta/hash_table.h
include/hfta/hfta_runtime_library.h
include/hfta/hfta_udaf.h
include/hfta/join_eq_hash_operator.h
include/hfta/running_gb_operator.h
src/ftacmp/analyze_fta.cc
src/ftacmp/generate_lfta_code.cc
src/ftacmp/generate_lfta_code.h
src/ftacmp/parse_schema.h
src/ftacmp/query_plan.cc
src/ftacmp/query_plan.h
src/ftacmp/translate_fta.cc
src/ftacmp/type_objects.cc
src/ftacmp/type_objects.h
src/lib/gscphftaaux/hfta_runtime_library.cc
src/lib/gscphftaaux/hfta_udaf.cc
src/lib/gscprts/rts_csv.cc
src/lib/gscprts/rts_kafka.c

index 3b235cc..1210939 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/bash
+#!/bin/bash
 # ------------------------------------------------
 #   Copyright 2014 AT&T Intellectual Property
 #   Licensed under the Apache License, Version 2.0 (the "License");
index 1c9eb3e..071a290 100644 (file)
@@ -693,11 +693,18 @@ uint FUN [LFTA_LEGAL, COST EXPENSIVE]
        string UDAF  count_diff_lfta_s fstring20 (string);
 
 
+////////////////////////////////////////////////////////////////
+//     string aggregation via catenation
+//////////////////////////////////////////////////////
+
+       string UDAF [HFTA_ONLY] CAT_aggr fstring8 (string, string);
+
 ///////////////////////////////////////////////////////////
-//               integer array aggregation funciton
+//               integer array aggregation function
 //     We are going to store 4 values in LFTA in fixed size buffer
-//  plus one byte for array length (17 bytes total)
-//  HFTA will combine partial aggregates
+//     plus one byte for array length (17 bytes total)
+//     HFTA will combine partial aggregates
+//             This seems to create a string with a comma-separated list of the uints
 ///////////////////////////////////////////////////////////
 
        string UDAF [RUNNING, SUBAGGR running_array_aggr_lfta, SUPERAGGR running_array_aggr_hfta] running_array_aggr string (uint);
@@ -712,3 +719,33 @@ uint FUN [LFTA_LEGAL, COST EXPENSIVE]
        string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(ullong, string HANDLE);
        string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(int, string HANDLE);
        string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(uint, string HANDLE);
+
+//////////////////////////////////////////////////////////
+//     time-averaged aggregate.
+//     time_avg(sample, ts, window_size)
+       float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (uint, llong, llong) ;
+       float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (int, llong, llong) ;
+       float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (ullong, llong, llong) ;
+       float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (llong, llong, llong) ;
+       float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (float, llong, llong) ;
+
+// ------------------------------------------------------------
+//             running_sum_max : get the running sum of an int,
+//             be able to report this sum and also its max value
+//             during the time window
+
+       llong EXTR running_sum run_sum_max extr_running_sum (uint);
+       llong EXTR running_sum run_sum_max extr_running_sum (int);
+       llong EXTR running_sum run_sum_max extr_running_sum (llong);
+       llong EXTR running_sum run_sum_max extr_running_sum (ullong);
+       llong FUN [COST LOW] extr_running_sum(string);
+       llong EXTR running_sum_max run_sum_max extr_running_sum_max (uint);
+       llong EXTR running_sum_max run_sum_max extr_running_sum_max (int);
+       llong EXTR running_sum_max run_sum_max extr_running_sum_max (llong);
+       llong EXTR running_sum_max run_sum_max extr_running_sum_max (ullong);
+       llong FUN [COST LOW] extr_running_sum_max(string);
+       string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (uint);
+       string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (int);
+       string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (ullong);
+       string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (llong);
+
index 3c7e619..d3c0267 100644 (file)
@@ -26,5 +26,8 @@
 // support for KAFKA interfaces
 //#define KAFKA_ENABLED
 
+// support for SSL decryption
+//#define SSL_ENABLED
+
 #endif
 
index 26645c3..e831bc0 100644 (file)
@@ -1,4 +1,4 @@
-/* ------------------------------------------------
+/** ------------------------------------------------
 Copyright 2014 AT&T Intellectual Property
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@ Copyright 2014 AT&T Intellectual Property
 #include <list>
 #include "hash_table.h"
 
-#define _GB_FLUSH_PER_TUPLE_ 1
 
 using namespace std;
 
@@ -29,19 +28,13 @@ template <class groupby_func, class group, class aggregate, class hasher_func, c
 class groupby_operator : public base_operator {
 private :
        groupby_func func;
-       hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];
+       hash_table<group, aggregate, hasher_func, equal_func> group_table;
        bool flush_finished;
-       unsigned int curr_table;
-       typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+       typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
        int n_patterns;
-
-
-
 public:
        groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
                flush_finished = true;
-               curr_table = 0;
-               flush_pos = group_table[1-curr_table].end();
                n_patterns = func.n_groupby_patterns();
        }
 
@@ -49,24 +42,17 @@ public:
 
 //                     Push out completed groups
 
-               if(!flush_finished) partial_flush(result);
-
-               // create buffer on the stack to store key object
-               char buffer[sizeof(group)];
-
                // extract the key information from the tuple and
                // copy it into buffer
-               group* grp = func.create_group(tup, buffer);
-               if (!grp) {
-/*
-//                     Ignore temp tuples until we can fix their timestamps.
-if (func.temp_status_received()) {
- tup.free_tuple();
- return 0;
-}*/
+               group grp;
+               if (!func.create_group(tup, (gs_sp_t)&grp)) {
+                       if(func.disordered()){
+                               fprintf(stderr,"Out of order record in %s\n",op_name);
+                               return 0;
+                       }
                        if (func.flush_needed()){
                                flush_old(result);
-               }
+                       }
                        if (func.temp_status_received()) {
                                host_tuple temp_tup;
                                if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
@@ -77,32 +63,35 @@ if (func.temp_status_received()) {
                        tup.free_tuple();
                        return 0;
                }
+               if(func.disordered()){
+                       fprintf(stderr,"Out of order record in %s\n",op_name);
+                       return 0;
+               }
 
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-               if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+               if ((iter = group_table.find(grp)) != group_table.end()) {
 //                             Temporal GBvar is part of the group so no flush is needed.
-                       aggregate* old_aggr = (*iter).second;
-                       func.update_aggregate(tup, grp, old_aggr);
+                       func.update_aggregate(tup, grp, (*iter).second);
                }else{
                        if (func.flush_needed()) {
                                flush_old(result);
                        }
                        if(n_patterns <= 1){
-                       // create a copy of the group on the heap
-                               group* new_grp = new group(grp);        // need a copy constructor for groups
-//                     aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
-                               aggregate* aggr = new aggregate();
-                       // create an aggregate in preallocated buffer
-                               aggr = func.create_aggregate(tup, (char*)aggr);
-
-                               group_table[curr_table].insert(new_grp, aggr);
+                               aggregate aggr;
+                               // create an aggregate in preallocated buffer
+                               func.create_aggregate(tup, (char*)&aggr);
+                               // neeed operator= doing a deep copy
+                               group_table.insert(grp, aggr);
                        }else{
                                int p;
+// TODO this code is wrong, must check if each pattern is in the group table.
                                for(p=0;p<n_patterns;++p){
-                                       group* new_grp = new group(grp, func.get_pattern(p));
-                                       aggregate* aggr = new aggregate();
-                                       aggr = func.create_aggregate(tup, (char*)aggr);
-                                       group_table[curr_table].insert(new_grp, aggr);
+                                       // need shallow copy constructor for groups
+                                       group new_grp(grp, func.get_pattern(p));
+                                       aggregate aggr;
+                                       func.create_aggregate(tup, (char*)&aggr);
+                                       // neeed operator= doing a deep copy
+                                       group_table.insert(new_grp, aggr);
                                }
                        }
                }
@@ -110,43 +99,14 @@ if (func.temp_status_received()) {
                return 0;
        }
 
-       int partial_flush(list<host_tuple>& result) {
-               host_tuple tup;
-               unsigned int old_table = 1-curr_table;
-               unsigned int i;
-
-//                             emit up to _GB_FLUSH_PER_TABLE_ output tuples.
-               if (!group_table[old_table].empty()) {
-                       for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
-                               bool failed = false;
-                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-                               if (!failed) {
-                                       tup.channel = output_channel;
-                                       result.push_back(tup);
-                               }
-                               delete ((*flush_pos).first);
-                               delete ((*flush_pos).second);
-//                             free((*flush_pos).second);
-                       }
-               }
-
-//                     Finalize processing if empty.
-               if(flush_pos == group_table[old_table].end()) {
-                       flush_finished = true;
-                       group_table[old_table].clear();
-                       group_table[old_table].rehash();
-               }
-               return 0;
-       }
 
        int flush(list<host_tuple>& result) {
                host_tuple tup;
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-               unsigned int old_table = 1-curr_table;
 
-//                     If the old table isn't empty, flush it now.
-               if (!group_table[old_table].empty()) {
-                       for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
+               flush_pos = group_table.begin();
+//                     If the table isn't empty, flush it now.
+               if (!group_table.empty()) {
+                       for (; flush_pos != group_table.end(); ++flush_pos) {
                                bool failed = false;
                                tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
                                if (!failed) {
@@ -154,30 +114,9 @@ if (func.temp_status_received()) {
                                        tup.channel = output_channel;
                                        result.push_back(tup);
                                }
-                               delete ((*flush_pos).first);
-                               delete ((*flush_pos).second);
 //                             free((*flush_pos).second);
                        }
-                       group_table[old_table].clear();
-                       group_table[old_table].rehash();
-               }
-
-               flush_pos = group_table[curr_table].begin();
-//                     If the old table isn't empty, flush it now.
-               if (!group_table[curr_table].empty()) {
-                       for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {
-                               bool failed = false;
-                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-                               if (!failed) {
-
-                                       tup.channel = output_channel;
-                                       result.push_back(tup);
-                               }
-                               delete ((*flush_pos).first);
-                               delete ((*flush_pos).second);
-//                             free((*flush_pos).second);
-                       }
-                       group_table[curr_table].clear();
+                       group_table.clear();
                }
 
                flush_finished = true;
@@ -186,33 +125,10 @@ if (func.temp_status_received()) {
        }
 
        int flush_old(list<host_tuple>& result) {
-               host_tuple tup;
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-               unsigned int old_table = 1-curr_table;
-
-//                     If the old table isn't empty, flush it now.
-               if (!group_table[old_table].empty()) {
-                       for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
-                               bool failed = false;
-                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-                               if (!failed) {
-
-                                       tup.channel = output_channel;
-                                       result.push_back(tup);
-                               }
-                               delete ((*flush_pos).first);
-                               delete ((*flush_pos).second);
-//                             free((*flush_pos).second);
-                       }
-                       group_table[old_table].clear();
-                       group_table[old_table].rehash();
-               }
-
-//                     swap tables, enable partial flush processing.
-               flush_pos = group_table[curr_table].begin();
-               curr_table = old_table;
-               flush_finished = false;
 
+               flush(result);
+               group_table.clear();
+               group_table.resize();
                return 0;
        }
 
@@ -231,7 +147,7 @@ if (func.temp_status_received()) {
        }
 
        unsigned int get_mem_footprint() {
-               return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();
+               return group_table.get_mem_footprint();
        }
 };
 
diff --git a/include/hfta/groupby_slowflush_operator.h b/include/hfta/groupby_slowflush_operator.h
new file mode 100644 (file)
index 0000000..d2120ba
--- /dev/null
@@ -0,0 +1,231 @@
+/** ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ ------------------------------------------- */
+
+#ifndef GROUPBY_SLOWFLUSH_OPERATOR_H
+#define GROUPBY_OPERATOR_H
+
+#include "host_tuple.h"
+#include "base_operator.h"
+#include <list>
+#include "hash_table.h"
+
+#define _HFTA_SLOW_FLUSH
+
+using namespace std;
+
+template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
+class groupby_slowflush_operator : public base_operator {
+private :
+       groupby_func func;
+       hash_table<group, aggregate, hasher_func, equal_func> group_table[2];
+       bool flush_finished;
+       unsigned int curr_table;
+       typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
+       int n_patterns;
+       int gb_per_flush;
+
+public:
+       groupby_slowflush_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
+               flush_finished = true;
+               curr_table = 0;
+               flush_pos = group_table[1-curr_table].end();
+               n_patterns = func.n_groupby_patterns();
+               gb_per_flush = func.gb_flush_per_tuple();
+       }
+
+       int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
+
+//                     Push out completed groups
+               if(!flush_finished) partial_flush(result);
+
+               // extract the key information from the tuple and
+               // copy it into buffer
+               group grp;
+               if (!func.create_group(tup, (gs_sp_t)&grp)) {
+                       if(func.disordered()){
+                               fprintf(stderr,"Out of order record in %s\n",op_name);
+                               return 0;
+                       }
+                       if (func.flush_needed()){
+                               flush_old(result);
+                       }
+                       if (func.temp_status_received()) {
+                               host_tuple temp_tup;
+                               if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
+                                       temp_tup.channel = output_channel;
+                                       result.push_back(temp_tup);
+                               }
+                       }
+                       tup.free_tuple();
+                       return 0;
+               }
+               if(func.disordered()){
+                       fprintf(stderr,"Out of order record in %s\n",op_name);
+                       return 0;
+               }
+
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+               if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
+//                             Temporal GBvar is part of the group so no flush is needed.
+                       func.update_aggregate(tup, grp, (*iter).second);
+               }else{
+                       if (func.flush_needed()) {
+                               flush_old(result);
+                       }
+                       if(n_patterns <= 1){
+                               aggregate aggr;
+                               // create an aggregate in preallocated buffer
+                               func.create_aggregate(tup, (char*)&aggr);
+                               // neeed operator= doing a deep copy
+                               group_table[curr_table].insert(grp, aggr);
+                       }else{
+                               int p;
+                               for(p=0;p<n_patterns;++p){
+// TODO this code is wrong need to check each pattern to see if its in the table
+                                       // need shalow copy constructor for groups
+                                       group new_grp(grp, func.get_pattern(p));
+                                       aggregate aggr;
+                                       func.create_aggregate(tup, (char*)&aggr);
+                                       // neeed operator= doing a deep copy
+                                       group_table[curr_table].insert(new_grp, aggr);
+                               }
+                       }
+               }
+               tup.free_tuple();
+               return 0;
+       }
+
+       int partial_flush(list<host_tuple>& result) {
+               host_tuple tup;
+               unsigned int old_table = 1-curr_table;
+               unsigned int i;
+
+//                             emit up to _GB_FLUSH_PER_TABLE_ output tuples.
+               if (!group_table[old_table].empty()) {
+                       for (i=0; flush_pos != group_table[old_table].end() && i<gb_per_flush; ++flush_pos, ++i) {
+                               bool failed = false;
+                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+                               if (!failed) {
+                                       tup.channel = output_channel;
+                                       result.push_back(tup);
+                               }
+//                             free((*flush_pos).second);
+                       }
+               }
+
+//                     Finalize processing if empty.
+               if(flush_pos == group_table[old_table].end()) {
+                       flush_finished = true;
+                       group_table[old_table].clear();
+                       group_table[old_table].resize();
+               }
+               return 0;
+       }
+
+       int flush(list<host_tuple>& result) {
+               host_tuple tup;
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+               unsigned int old_table = 1-curr_table;
+
+//                     If the old table isn't empty, flush it now.
+               if (!group_table[old_table].empty()) {
+                       for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
+                               bool failed = false;
+                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+                               if (!failed) {
+
+                                       tup.channel = output_channel;
+                                       result.push_back(tup);
+                               }
+//                             free((*flush_pos).second);
+                       }
+                       group_table[old_table].clear();
+                       group_table[old_table].resize();
+               }
+
+               flush_pos = group_table[curr_table].begin();
+//                     If the table isn't empty, flush it now.
+               if (!group_table[curr_table].empty()) {
+                       for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {
+                               bool failed = false;
+                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+                               if (!failed) {
+
+                                       tup.channel = output_channel;
+                                       result.push_back(tup);
+                               }
+//                             free((*flush_pos).second);
+                       }
+                       group_table[curr_table].clear();
+               }
+
+               flush_finished = true;
+
+               return 0;
+       }
+
+       int flush_old(list<host_tuple>& result) {
+
+               host_tuple tup;
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+               unsigned int old_table = 1-curr_table;
+
+//                     If the old table isn't empty, flush it now.
+               if (!group_table[old_table].empty()) {
+                       for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
+                               bool failed = false;
+                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+                               if (!failed) {
+
+                                       tup.channel = output_channel;
+                                       result.push_back(tup);
+                               }
+//                             free((*flush_pos).second);
+                       }
+
+                       //group_table[old_table].clear();
+                       //group_table[old_table].resize();
+               }
+
+               group_table[old_table].clear();
+               group_table[old_table].resize();
+
+//                     swap tables, enable partial flush processing.
+               flush_pos = group_table[curr_table].begin();
+               curr_table = old_table;
+               flush_finished = false;
+               return 0;
+       }
+
+       int set_param_block(int sz, void * value) {
+               func.set_param_block(sz, value);
+               return 0;
+       }
+
+       int get_temp_status(host_tuple& result) {
+               result.channel = output_channel;
+               return func.create_temp_status_tuple(result, flush_finished);
+       }
+
+       int get_blocked_status () {
+               return -1;
+       }
+
+       unsigned int get_mem_footprint() {
+               return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();
+       }
+};
+
+#endif // GROUPBY_OPERATOR_H
index cf913e5..94d077b 100644 (file)
@@ -40,7 +40,11 @@ public :
                value_type second;
                data_item* next;        // next data item in overflow chain
 
-               data_item(const key_type& k, const value_type& v) : first(k), second(v), next(NULL) { }
+               data_item(const key_type& k, const value_type& v) {
+                       first = k;
+                       second = v;
+                       next = NULL;
+                }
        };
 
        struct hash_bucket {
@@ -101,15 +105,19 @@ private:
        equal_func equal;
 
        double load_factor;
-//     double max_load;
 
        size_t _size;
        size_t _max_size;
        size_t num_buckets;
        size_t hash_mask;
 
+       // bucket allocation happens on first insert
+       // this flag indicates that buckets has been allocated
+       unsigned allocated;
+
+
        hash_bucket* first_bucket;      // first used bucket
-       hash_bucket* last_bucket;               // last used bucket
+       hash_bucket* last_bucket;       // last used bucket
 
        hash_bucket* buckets;
 
@@ -119,24 +127,38 @@ private:
 public:
 
 
-       hash_table(const size_t n_buckets = 65536, const double load = 1.0) {
-               load_factor = load;
+       hash_table(const size_t expected_size = 100000, const double max_load = 1.25) {
+
+               size_t min_buckets = (size_t)(expected_size / max_load);
+               load_factor = max_load;
                int nb;
-               for(nb=2;nb<n_buckets;nb*=2);
+               for(nb=2;nb<min_buckets;nb*=2);
                num_buckets = nb;
                hash_mask = nb-1;
-               buckets = new hash_bucket[num_buckets];
+
+               // we will make hash table start with 0-size and expand of first insertion
+               buckets = new hash_bucket[0];
+               allocated = 0;
 
                _size = 0;
                _max_size = 0;
-//             max_load = 0.0;
                first_bucket = 0;
                last_bucket = 0;
 
-               total_memory = num_buckets * sizeof(hash_bucket);
+               total_memory = 0;
        }
 
        int insert(const key_type& key, const value_type& val) {
+
+               if (__builtin_expect(!allocated, 0)) {          // we expect buckets to be allocated most of the time
+                       delete buckets;
+                       buckets = new hash_bucket[num_buckets];
+                       allocated = 1;  
+                       total_memory = num_buckets * sizeof(hash_bucket);
+
+                       fprintf(stderr, "Initial allocaton of %d buckets\n", (int)num_buckets);
+               }
+
                data_item* d = new data_item(key, val);
                total_memory += sizeof(data_item);
 
@@ -182,6 +204,12 @@ public:
        iterator find(const key_type& key) {
                iterator iter;
 
+               if (__builtin_expect(!allocated, 0)) {          // we expect buckets to be allocated most of the time
+                       iter.bucket = NULL;
+                       iter.data = NULL;
+                       return iter;
+               }               
+
                // find the insertion bucket
                size_t bucket_index = hasher(key) & hash_mask;
                // if the bucket is empty just add new data_item
@@ -205,6 +233,11 @@ public:
        }
 
        void erase(const key_type& key) {
+
+               if (__builtin_expect(!allocated, 0)) {          // we expect buckets to be allocated most of the time
+                       return;
+               }       
+
                // find the  bucket
                size_t bucket_index = hasher(key) & hash_mask;
                // if the bucket is empty just add new data_item
@@ -247,6 +280,11 @@ public:
        }
 
        void erase_full(const key_type& key) {
+
+               if (__builtin_expect(!allocated, 0)) {          // we expect buckets to be allocated most of the time
+                       return;
+               }
+
                // find the  bucket
                size_t bucket_index = hasher(key) & hash_mask;
                // if the bucket is empty just add new data_item
@@ -284,8 +322,8 @@ public:
                }else{
                        prev->next = temp->next;
                }
-        delete (*temp).first;
-        delete (*temp).second;
+//        delete (*temp).first;
+//        delete (*temp).second;
         delete temp;
         total_memory -= sizeof(data_item);
        }
@@ -326,14 +364,13 @@ public:
                last_bucket = NULL;
                _size = 0;
                total_memory = num_buckets * sizeof(hash_bucket);
-
        }
 
-       int rehash() {
+       int resize() {
                if (_size) {
-                       fprintf(stderr, "Error: rehashing non-empty hash table\n");
+                       fprintf(stderr, "Error: resizing non-empty hash table\n");
                        exit(1);
-               }               
+               }
 
                double max_load = (1.0 * _max_size) / num_buckets;
 
@@ -350,21 +387,25 @@ public:
                        for(nb=2;nb<min_buckets;nb*=2);
                        num_buckets = nb;
 
+                       fprintf(stderr, "Resizing to %d buckets\n", (int)num_buckets);
                        delete[] buckets;
                        hash_mask = num_buckets-1;
 
-                       buckets = new hash_bucket[num_buckets];         
+                       buckets = new hash_bucket[num_buckets];
+                       total_memory = num_buckets * sizeof(hash_bucket);                       
                }
 
                return 0;
-       }
-
-
+       }               
 
        size_t size() const {
                return _size;
        }
 
+       size_t max_size() const {
+               return _max_size;
+       }
+
        bool empty() const {
                return (_size == 0);
        }
@@ -438,7 +479,6 @@ private:
        equal_func equal;
 
        double load_factor;
-//     double max_load;
 
        size_t _size;
        size_t _max_size;
@@ -451,19 +491,19 @@ private:
        hash_bucket* buckets;
 
 public:
-       hash_set(const size_t n_buckets = 65536, const double load = 1.0) {
-               load_factor = load;
 
+       hash_set(const size_t expected_size = 100000, const double max_load = 1.25) {
+
+               size_t min_buckets = (size_t)(expected_size / max_load);
+               load_factor = max_load;
                int nb;
-               for(nb=2;nb<n_buckets;nb*=2);
+               for(nb=2;nb<min_buckets;nb*=2);
                num_buckets = nb;
-               hash_mask = num_buckets-1;
-
+               hash_mask = nb-1;
                buckets = new hash_bucket[num_buckets];
 
                _size = 0;
                _max_size = 0;
-//             max_load = 0.0;
                first_bucket = 0;
                last_bucket = 0;
        }
@@ -580,12 +620,12 @@ public:
 
        }
 
-       int rehash() {
+       int resize() {
                if (_size) {
-                       fprintf(stderr, "Error: rehashing non-empty hash table\n");
+                       fprintf(stderr, "Error: resizing non-empty hash table\n");
                        exit(1);
-               }               
-
+               }
+               
                double max_load = (1.0 * _max_size) / num_buckets;
 
                // reize table if its maximum load exceed the load factor
@@ -601,6 +641,7 @@ public:
                        for(nb=2;nb<min_buckets;nb*=2);
                        num_buckets = nb;
 
+                       fprintf(stderr, "Resizing to %d buckets\n", num_buckets);
                        delete[] buckets;
                        hash_mask = num_buckets-1;
 
index 2839bef..1106406 100644 (file)
@@ -32,14 +32,16 @@ Copyright 2014 AT&T Intellectual Property
 //             Internal functions
 gs_retval_t Vstring_Constructor(vstring *, gs_csp_t);
 gs_retval_t hfta_vstr_length(vstring *);
-void hfta_vstr_assign_with_copy_in_tuple(vstring32 *, vstring *, gs_sp_t, int);
-void hfta_vstr_assign_with_copy(vstring *, vstring *);
+void hfta_vstr_assign_with_copy_in_tuple(vstring32 *, const vstring *,
+       gs_sp_t, int);
+void hfta_vstr_assign_with_copy(vstring *, const vstring *);
 void hfta_vstr_destroy(vstring *);
-void hfta_vstr_replace(vstring *, vstring *);
+void hfta_vstr_replace(vstring *, const vstring *);
 
 gs_uint32_t hfta_vstr_hashfunc(const vstring *);
 gs_uint64_t hfta_vstr_long_hashfunc(const vstring *);
 gs_retval_t hfta_vstr_compare(const vstring *, const vstring *);
+gs_retval_t hfta_vstr_equal(const vstring *, const vstring *);
 
 gs_retval_t hfta_ipv6_compare(const hfta_ipv6_str &i1, const hfta_ipv6_str &i2);
 hfta_ipv6_str And_Ipv6(const hfta_ipv6_str &i1, const hfta_ipv6_str &i2);
@@ -63,6 +65,7 @@ inline static gs_retval_t str_truncate(vstring * result, vstring *str, gs_uint32
 
 gs_retval_t str_exists_substr(vstring * s1, vstring * s2);
 gs_retval_t str_compare(vstring * s1, vstring * s2);
+gs_retval_t str_equal(vstring * s1, vstring * s2);
 
 gs_uint32_t str_match_offset(gs_uint32_t offset,vstring *s1,vstring *s2);
 gs_uint32_t byte_match_offset( gs_uint32_t offset, gs_uint32_t val,vstring *s2);
index d3bb8d0..6281859 100644 (file)
@@ -220,6 +220,47 @@ void count_diff_hfta_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *val);
 void count_diff_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s);
 void count_diff_hfta_HFTA_AGGR_DESTROY_(gs_sp_t scratch);
 
+//////////////////////////////////////////////
+//     CAT_aggr, aggregate strings by catenation
+//////////////////////////////////////////////
+void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s);
+void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s);
+void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str);
+void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s);
+void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s);
+
+/////////////////////////////////////////////////////////
+//     time-averaged sum, from aperiodic reports
+////////////////////////////////////////////////////////
+
+void time_avg_HFTA_AGGR_INIT_(gs_sp_t s);
+void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s);
+void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s);
+void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window);
+
+// ------------------------------------------------------------
+//             running_sum_max : get the running sum of an int,
+//             be able to report this sum and also its max value
+//             during the time window
+// ------------------------------------------------------------
+
+void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s);
+void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s);
+void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b);
+void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b);
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v);
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v);
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v);
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v);
+gs_int64_t extr_running_sum(vstring *v);
+gs_int64_t extr_running_sum_max(vstring *v);
+
+
 
 
 ///////////////////////////////////////////////////////////////
index 0b9a9f1..fd8bf00 100644 (file)
@@ -219,7 +219,7 @@ int n_calls, n_iters, n_eqk;
                                }
                        }
                }
-               join_tbl[i].clear(); join_tbl[i].rehash();
+               join_tbl[i].clear(); join_tbl[i].resize();
        }
 
   }
index c20c87f..569bbc4 100644 (file)
@@ -29,8 +29,8 @@ template <class groupby_func, class group, class aggregate, class hasher_func, c
 class running_agg_operator : public base_operator {
 private :
        groupby_func func;
-       hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
-       typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+       hash_table<group, aggregate, hasher_func, equal_func> group_table;
+       typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
        gs_int32_t nflushes;
 
 
@@ -44,16 +44,15 @@ public:
 
 //                     Push out completed groups
 
-// create buffer on the stack to store key object
-               char buffer[sizeof(group)];
-//     Number of flushes required
-
-// extract the key information from the tuple and
-// copy it into buffer
-               group* grp = func.create_group(tup, buffer);
+               group grp, *ret;
+               ret = func.create_group(tup, (gs_sp_t)&grp);
                nflushes = func.flush_needed();
-               
-               if (!grp) {
+               if(func.disordered()){
+                       fprintf(stderr,"Out of order record in %s\n",op_name);
+                       return 0;
+               }
+
+               if (! ret) {
                        if (nflushes>0){
                                flush(result);
                        }
@@ -71,19 +70,15 @@ public:
                if (nflushes>0) {
                        flush(result);
                }
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
                if ((iter = group_table.find(grp)) != group_table.end()) {
-                       aggregate* old_aggr = (*iter).second;
-                       func.update_aggregate(tup, grp, old_aggr);
+                       func.update_aggregate(tup, grp, (*iter).second);
                }else{
-                       // create a copy of the group on the heap
-                       group* new_grp = new group(grp);        // need a copy constructor for groups
-//                     aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
-                       aggregate* aggr = new aggregate();
-                       // create an aggregate in preallocated buffer
-                       aggr = func.create_aggregate(tup, (char*)aggr);
-
-                       group_table.insert(new_grp, aggr);
+                               aggregate aggr;
+                               // create an aggregate in preallocated buffer
+                               func.create_aggregate(tup, (char*)&aggr);
+                               // neeed operator= doing a deep copy
+                               group_table.insert(grp, aggr);
                }
                tup.free_tuple();
                return 0;
@@ -91,7 +86,7 @@ public:
 
        virtual int flush(list<host_tuple>& result) {
                host_tuple tup;
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
 
 //     Limit the number of successive flushes - avoid explosive behavior
                const gs_int32_t max_flushes = 10;
@@ -116,12 +111,10 @@ public:
                                        result.push_back(tup);
                                }
                                if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
-                               group* g = (*flush_pos).first;
-                               aggregate* a = (*flush_pos).second;
+                                       group &g = (*flush_pos).first;
+                                       //aggregate a = (*flush_pos).second;
                                        ++flush_pos;
                                        group_table.erase(g);
-                                       delete (g);
-                                       delete (a);
                                }else{
                                        func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
                                        ++flush_pos;
index d793577..57fc11f 100644 (file)
@@ -5252,7 +5252,7 @@ void gather_pr_opcmp_fcns(predicate_t *pr, set<string> &fcn_set){
        case PRED_IN:
                ldt = pr->get_left_se()->get_data_type();
                if(ldt->complex_comparison(ldt) ){
-                       fcn_set.insert( ldt->get_comparison_fcn(ldt) );
+                       fcn_set.insert( ldt->get_equals_fcn(ldt) );
                }
                gather_se_opcmp_fcns(pr->get_left_se(), fcn_set);
                return;
@@ -5260,7 +5260,7 @@ void gather_pr_opcmp_fcns(predicate_t *pr, set<string> &fcn_set){
                ldt = pr->get_left_se()->get_data_type();
                rdt = pr->get_right_se()->get_data_type();
                if(ldt->complex_comparison(rdt) ){
-                       fcn_set.insert( ldt->get_comparison_fcn(rdt) );
+                       fcn_set.insert( ldt->get_comparison_fcn(ldt) );
                }
                gather_se_opcmp_fcns(pr->get_left_se(),fcn_set) ;
                gather_se_opcmp_fcns(pr->get_right_se(),fcn_set) ;
index 55227b1..d6026bb 100644 (file)
@@ -1053,7 +1053,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){
                        ret += "( ";
 
                if(ldt->complex_comparison(ldt) ){
-                               ret +=  ldt->get_comparison_fcn(ldt) ;
+                               ret +=  ldt->get_equals_fcn(ldt) ;
                                ret += "( ";
                                if(ldt->is_buffer_type() ) ret += "&";
                                ret += generate_se_code(pr->get_left_se(), schema);
@@ -1083,6 +1083,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){
 
                ret += "( ";
         if(ldt->complex_comparison(rdt) ){
+// TODO can use get_equals_fcn if op is "=" ?
                        ret += ldt->get_comparison_fcn(rdt);
                        ret += "(";
                        if(ldt->is_buffer_type() ) ret += "&";
@@ -1153,7 +1154,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        string ret;
 
     if(dt->complex_comparison(dt) ){
-               ret += dt->get_comparison_fcn(dt);
+               ret += dt->get_equals_fcn(dt);
                ret += "(";
                        if(dt->is_buffer_type() ) ret += "&";
                ret += lhs_op;
@@ -1170,26 +1171,26 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        return(ret);
 }
 
-static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
-       string ret;
-
-    if(dt->complex_comparison(dt) ){
-               ret += dt->get_comparison_fcn(dt);
-               ret += "(";
-                       if(dt->is_buffer_type() ) ret += "&";
-               ret += lhs_op;
-               ret += ", ";
-                       if(dt->is_buffer_type() ) ret += "&";
-               ret += rhs_op;
-               ret += ") == 0";
-       }else{
-               ret += lhs_op;
-               ret += " == ";
-               ret += rhs_op;
-       }
-
-       return(ret);
-}
+//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+//     string ret;
+//
//   if(dt->complex_comparison(dt) ){
+//             ret += dt->get_equals_fcn(dt);
+//             ret += "(";
+//                     if(dt->is_buffer_type() ) ret += "&";
+//             ret += lhs_op;
+//             ret += ", ";
+//                     if(dt->is_buffer_type() ) ret += "&";
+//             ret += rhs_op;
+//             ret += ") == 0";
+//     }else{
+//             ret += lhs_op;
+//             ret += " == ";
+//             ret += rhs_op;
+//     }
+//
+//     return(ret);
+//}
 
 //             Here I assume that only MIN and MAX aggregates can be computed
 //             over BUFFER data types.
@@ -4630,7 +4631,7 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
 
 
 
-int compute_snap_len(qp_node *fs, table_list *schema){
+int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
 
 //             Initialize global vars
        gb_tbl = NULL;
@@ -4691,15 +4692,21 @@ int compute_snap_len(qp_node *fs, table_list *schema){
        int tblref = (*csi).tblvar_ref;
     string field = (*csi).field;
 
-       param_list *field_params = schema->get_modifier_list(schref, field);
-       if(field_params->contains_key("snap_len")){
-               string fld_snap_str = field_params->val_of("snap_len");
-               int fld_snap;
-               if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
-                       if(fld_snap > snap_len) snap_len = fld_snap;
-                       n_snap++;
-               }else{
-                       fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+       if(snap_type == "index"){
+               int pos = schema->get_field_idx(schref, field);
+               if(pos>snap_len) snap_len = pos;
+               n_snap++;
+       }else{
+               param_list *field_params = schema->get_modifier_list(schref, field);
+               if(field_params->contains_key("snap_len")){
+                       string fld_snap_str = field_params->val_of("snap_len");
+                       int fld_snap;
+                       if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
+                               if(fld_snap > snap_len) snap_len = fld_snap;
+                               n_snap++;
+                       }else{
+                               fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+                       }
                }
        }
   }
index 875247f..2716111 100644 (file)
@@ -30,7 +30,8 @@ std::string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
 std::string generate_lfta_prefilter(std::vector<cnf_set *> &pred_list, col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns, std::vector<col_id_set> &lfta_cols, std::vector<long long int> &lfta_sigs, vector<int> &lfta_snap_lens, std::string iface);
 std::string generate_lfta_prefilter_struct(col_id_set &temp_cids, table_list *Schema);
 
-int compute_snap_len(qp_node *fs, table_list *schema);
+// a snap_type of "index" computes position, else use snap_len metadata
+int compute_snap_len(qp_node *fs, table_list *schema, string snap_type);
 
 std::string generate_watchlist_element_name(std::string node_name);
 std::string generate_watchlist_struct_name(std::string node_name);
index 763ebd5..e1f21fa 100644 (file)
@@ -542,6 +542,9 @@ public:
                return tbl_list[t]->get_field(tbl_list[t]->get_field_idx(f));
        }
        int get_field_idx(std::string t, std::string f);
+       int get_field_idx(int t, std::string f){
+               return tbl_list[t]->get_field_idx(f);
+       }
 
        int find_tbl(std::string t);
 
index 8e05ae2..ec391dd 100644 (file)
@@ -3874,7 +3874,7 @@ vector<qp_node *> rsgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_li
                                        new_opl.push_back(new_se);
                                }
                        }
-                       stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),false, false,aggr_tbl.has_bailout(a));
+                       stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),aggr_tbl.is_superaggr(a), aggr_tbl.is_running_aggr(a),aggr_tbl.has_bailout(a));
                        hse = new scalarexp_t(aggr_tbl.get_op(a).c_str(),new_opl);
                        hse->set_data_type(Ext_fcns->get_fcn_dt(aggr_tbl.get_fcn_id(a)));
                        hse->set_fcn_id(aggr_tbl.get_fcn_id(a));
@@ -4332,12 +4332,12 @@ vector<qp_node *> sgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_lis
                stream_node->set_node_name( node_name );
                stream_node->table_name->set_range_var(table_name->get_var_name());
 
-//                     allowed stream disorder.  Default is 2,
+//                     allowed stream disorder.  Default is 1,
 //                     can override with max_lfta_disorder setting.
 //                     Also limit the hfta disorder, set to lfta disorder + 1.
 //                     can override with max_hfta_disorder.
 
-       fta_node->lfta_disorder = 2;
+       fta_node->lfta_disorder = 1;
        if(this->get_val_of_def("max_lfta_disorder") != ""){
                int d = atoi(this->get_val_of_def("max_lfta_disorder").c_str() );
                if(d<1){
@@ -4364,6 +4364,7 @@ printf("node %s setting lfta_disorder = %d\n",node_name.c_str(),fta_node->lfta_d
                }
        }
 
+
 //                     First, process the group-by variables.
 //                     The fta must supply the values of all the gbvars.
 //                     If a gb is computed, the computation must be
@@ -6823,7 +6824,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){
                        ret.append("( ");
 
                if(ldt->complex_comparison(ldt) ){
-                               ret.append( ldt->get_hfta_comparison_fcn(ldt) );
+                               ret.append( ldt->get_hfta_equals_fcn(ldt) );
                                ret.append("( ");
                                if(ldt->is_buffer_type() )
                                        ret.append("&");
@@ -6855,6 +6856,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){
 
                ret.append("( ");
         if(ldt->complex_comparison(rdt) ){
+// TODO can use get_hfta_equals_fcn if op is "=" ?
                        ret.append(ldt->get_hfta_comparison_fcn(rdt));
                        ret.append("(");
                        if(ldt->is_buffer_type() )
@@ -6924,7 +6926,7 @@ static string generate_predicate_code_fm_aggr(predicate_t *pr, string gbvar, str
                        ret.append("( ");
 
                if(ldt->complex_comparison(ldt) ){
-                               ret.append( ldt->get_hfta_comparison_fcn(ldt) );
+                               ret.append( ldt->get_hfta_equals_fcn(ldt) );
                                ret.append("( ");
                                if(ldt->is_buffer_type() )
                                        ret.append("&");
@@ -6956,6 +6958,7 @@ static string generate_predicate_code_fm_aggr(predicate_t *pr, string gbvar, str
 
                ret.append("( ");
         if(ldt->complex_comparison(rdt) ){
+// TODO can use get_hfta_equals_fcn if op is "=" ?
                        ret.append(ldt->get_hfta_comparison_fcn(rdt));
                        ret.append("(");
                        if(ldt->is_buffer_type() )
@@ -7014,7 +7017,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        string ret;
 
     if(dt->complex_comparison(dt) ){
-               ret.append(dt->get_hfta_comparison_fcn(dt));
+               ret.append(dt->get_hfta_equals_fcn(dt));
                ret.append("(");
                        if(dt->is_buffer_type() )
                                ret.append("&");
@@ -7056,28 +7059,28 @@ static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){
        return(ret);
 }
 
-static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
-       string ret;
-
-    if(dt->complex_comparison(dt) ){
-               ret.append(dt->get_hfta_comparison_fcn(dt));
-               ret.append("(");
-                       if(dt->is_buffer_type() )
-                               ret.append("&");
-               ret.append(lhs_op);
-               ret.append(", ");
-                       if(dt->is_buffer_type() )
-                               ret.append("&");
-               ret.append(rhs_op );
-               ret.append(") == 0");
-       }else{
-               ret.append(lhs_op );
-               ret.append(" == ");
-               ret.append(rhs_op );
-       }
-
-       return(ret);
-}
+//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+//     string ret;
+//
+//    if(dt->complex_comparison(dt) ){
+//             ret.append(dt->get_hfta_equals_fcn(dt));
+//             ret.append("(");
+//                     if(dt->is_buffer_type() )
+//                             ret.append("&");
+//             ret.append(lhs_op);
+//             ret.append(", ");
+//                     if(dt->is_buffer_type() )
+//                             ret.append("&");
+//             ret.append(rhs_op );
+//             ret.append(") == 0");
+//     }else{
+//             ret.append(lhs_op );
+//             ret.append(" == ");
+//             ret.append(rhs_op );
+//     }
+//
+//     return(ret);
+//}
 
 
 //             Here I assume that only MIN and MAX aggregates can be computed
@@ -8881,6 +8884,18 @@ string sgah_qpn::generate_functor_name(){
 string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector<bool> &needs_xform){
        int a,g,w,s;
 
+//                     Regular or slow flush?
+       hfta_slow_flush = 0;
+       if(this->get_val_of_def("hfta_slow_flush") != ""){
+               int d = atoi(this->get_val_of_def("hfta_slow_flush").c_str() );
+               if(d<0){
+                       fprintf(stderr,"Warning, hfta_slow_flush in node %s is %d, must be at least 0, setting to 0.\n",node_name.c_str(), d);
+                       hfta_slow_flush = 0;
+               }else{
+                       hfta_slow_flush = d;
+               }
+       }
+       
 
 //                     Initialize generate utility globals
        segen_gb_tbl = &(gb_tbl);
@@ -8902,12 +8917,12 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
                ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
        }
 //             empty strucutred literals
-       map<int, string>::iterator sii;
-       for(sii=structured_types.begin();sii!=structured_types.end();++sii){
-               data_type dt(sii->second);
-               literal_t empty_lit(sii->first);
-               ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
-       }
+//     map<int, string>::iterator sii;
+//     for(sii=structured_types.begin();sii!=structured_types.end();++sii){
+//             data_type dt(sii->second);
+//             literal_t empty_lit(sii->first);
+//             ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
+//     }
 //             Constructors
        if(structured_types.size()==0){
                ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
@@ -8916,22 +8931,34 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
        }
 
 
+       ret += "\t// shallow copy constructors\n";
        ret += "\t"+generate_functor_name() + "_groupdef("+
-               this->generate_functor_name() + "_groupdef *gd){\n";
+               "const " + this->generate_functor_name() + "_groupdef &gd){\n";
        for(g=0;g<gb_tbl.size();g++){
                data_type *gdt = gb_tbl.get_data_type(g);
-               if(gdt->is_buffer_type()){
-                       sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
-                         gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
-                       ret += tmpstr;
-               }else{
-                       sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
-                       ret += tmpstr;
-               }
+               sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+               ret += tmpstr;
+// TODO : do strings perisist after the call?  its a shllow copy 
+//             if(gdt->is_buffer_type()){
+//                     sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
+//                       gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+//                     ret += tmpstr;
+//             }else{
+//                     sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+//                     ret += tmpstr;
+//             }
        }
        ret += "\t}\n";
        ret += "\t"+generate_functor_name() + "_groupdef("+
-               this->generate_functor_name() + "_groupdef *gd, bool *pattern){\n";
+               "const " + this->generate_functor_name() + "_groupdef &gd, bool *pattern){\n";
+//     -- For patterns, need empty strucutred literals
+       map<int, string>::iterator sii;
+       for(sii=structured_types.begin();sii!=structured_types.end();++sii){
+               data_type dt(sii->second);
+               literal_t empty_lit(sii->first);
+               ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
+       }
+
        for(sii=structured_types.begin();sii!=structured_types.end();++sii){
                literal_t empty_lit(sii->first);
                ret += "\t\t"+empty_lit.to_hfta_C_code("&"+empty_lit.hfta_empty_literal_name())+";\n";
@@ -8939,14 +8966,17 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
        for(g=0;g<gb_tbl.size();g++){
                data_type *gdt = gb_tbl.get_data_type(g);
                ret += "\t\tif(pattern["+int_to_string(g)+"]){\n";
-               if(gdt->is_buffer_type()){
-                       sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
-                         gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
-                       ret += tmpstr;
-               }else{
-                       sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g);
-                       ret += tmpstr;
-               }
+               sprintf(tmpstr,"\t\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+               ret += tmpstr;
+//     TODO Do strings persist long enough?  its a shllow copy constructor?
+//             if(gdt->is_buffer_type()){
+//                     sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
+//                       gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+//                     ret += tmpstr;
+//             }else{
+//                     sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+//                     ret += tmpstr;
+//             }
                ret += "\t\t}else{\n";
                literal_t empty_lit(gdt->type_indicator());
                if(empty_lit.is_cpx_lit()){
@@ -8957,6 +8987,23 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
                ret += "\t\t}\n";
        }
        ret += "\t};\n";
+
+       ret += "\t// deep assignment operator\n";
+       ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+
+                this->generate_functor_name() + "_groupdef &gd){\n";
+        for(g=0;g<gb_tbl.size();g++){
+                data_type *gdt = gb_tbl.get_data_type(g);
+                if(gdt->is_buffer_type()){
+                        sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n",
+                          gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+                        ret += tmpstr;
+                }else{
+                        sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+                        ret += tmpstr;
+                }
+        }
+        ret += "\t}\n";
+
 //             destructor
        ret += "\t~"+ generate_functor_name() + "_groupdef(){\n";
        for(g=0;g<gb_tbl.size();g++){
@@ -9158,6 +9205,7 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
                        }
                }
                ret += "\tbool needs_temporal_flush;\n";
+               ret += "\tbool disordered_arrival;\n";
        }
 
 
@@ -9266,6 +9314,14 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
 "}\n\n"
 ;
 
+//---------------------------------------
+//             Parameterized number of tuples output per slow flush
+       ret += 
+"int gb_flush_per_tuple(){\n"
+"      return "+int_to_string(hfta_slow_flush)+";\n"
+"}\n\n";
+
+
 
 
 
@@ -9337,16 +9393,18 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
 ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
        if(hfta_disorder < 2){
                if(uses_temporal_flush){
-                       ret+= "\tif( !( (";
+                       ret+= "\tif( ( (";
                        bool first_one = true;
+                       string disorder_test;
                        for(g=0;g<gb_tbl.size();g++){
                                data_type *gdt = gb_tbl.get_data_type(g);
 
                                if(gdt->is_temporal()){
-                               sprintf(tmpstr,"last_gb%d",g);   string lhs_op = tmpstr;
-                               sprintf(tmpstr,"gbval->gb_var%d",g);   string rhs_op = tmpstr;
-                               if(first_one){first_one = false;} else {ret += ") && (";}
-                               ret += generate_equality_test(lhs_op, rhs_op, gdt);
+                                       sprintf(tmpstr,"last_gb%d",g);   string lhs_op = tmpstr;
+                                       sprintf(tmpstr,"gbval->gb_var%d",g);   string rhs_op = tmpstr;
+                                       if(first_one){first_one = false;} else {ret += ") && (";}
+                                       ret += generate_lt_test(lhs_op, rhs_op, gdt);
+                                       disorder_test += generate_lt_test(rhs_op, lhs_op, gdt);
                                }
                        }
                        ret += ") ) ){\n";
@@ -9364,9 +9422,17 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
                                }
                        }
                        ret += "\t\tneeds_temporal_flush=true;\n";
-                       ret += "\t\t}else{\n"
-                               "\t\t\tneeds_temporal_flush=false;\n"
-                               "\t\t}\n";
+                       ret += "\t}else{\n"
+                               "\t\tneeds_temporal_flush=false;\n"
+                               "\t}\n";
+
+                       ret += "\tdisordered_arrival = "+disorder_test+";\n";
+//                     ret += "\tif( ( ("+disorder_test+") ) ){\n";
+//                     ret += "\t\tdisordered_arrival=true;\n";
+//                     ret += "\t}else{\n";
+//                     ret += "\t\tdisordered_arrival=false;\n";
+//                     ret += "\t}\n";
+
                }
        }else{
                ret+= "\tif(temp_tuple_received && !( (";
@@ -9515,8 +9581,8 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 //                     update an aggregate object
 
        ret += "void update_aggregate(host_tuple &tup0, "
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval){\n";
        //              Variables for execution of the function.
        ret += "\tgs_int32_t problem = 0;\n";   // return unpack failure
 
@@ -9527,7 +9593,7 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 //             Unpack all remaining attributes
        ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform);
        for(a=0;a<aggr_tbl.size();a++){
-         sprintf(tmpstr,"aggval->aggr_var%d",a);
+         sprintf(tmpstr,"aggval.aggr_var%d",a);
          string varname = tmpstr;
          ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema));
        }
@@ -9546,6 +9612,8 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
        }
        ret += "\t};\n";
 
+       ret += "bool disordered(){return disordered_arrival;}\n";
+
 //---------------------------------------------------
 //                     create output tuple
 //                     Unpack the partial functions ref'd in the where clause,
@@ -9556,15 +9624,15 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 //                     so I'll leave it in longhand.
 
        ret += "host_tuple create_output_tuple("
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval, bool &failed){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval, bool &failed){\n";
 
        ret += "\thost_tuple tup;\n";
        ret += "\tfailed = false;\n";
        ret += "\tgs_retval_t retval = 0;\n";
 
-       string gbvar = "gbval->gb_var";
-       string aggvar = "aggval->";
+       string gbvar = "gbval.gb_var";
+       string aggvar = "aggval.";
 
 //                     Create cached temporaries for UDAF return values.
        for(a=0;a<aggr_tbl.size();a++){
@@ -9779,18 +9847,18 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 
        ret += "struct "+generate_functor_name()+"_hash_func{\n";
        ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+
-                               "_groupdef *grp) const{\n";
+                               "_groupdef &grp) const{\n";
        ret += "\t\treturn( (";
        for(g=0;g<gb_tbl.size();g++){
                if(g>0) ret += "^";
                data_type *gdt = gb_tbl.get_data_type(g);
                if(gdt->use_hashfunc()){
                        if(gdt->is_buffer_type())
-                               sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+                               sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
                        else
-                               sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+                               sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
                }else{
-                       sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
+                       sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
                }
                ret += tmpstr;
        }
@@ -9802,22 +9870,22 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 //                     The comparison function
 
        ret += "struct "+generate_functor_name()+"_equal_func{\n";
-       ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+
-                       generate_functor_name()+"_groupdef *grp2) const{\n";
+       ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+
+                       "const "+generate_functor_name()+"_groupdef &grp2) const{\n";
        ret += "\t\treturn( (";
 
        for(g=0;g<gb_tbl.size();g++){
                if(g>0) ret += ") && (";
                data_type *gdt = gb_tbl.get_data_type(g);
                if(gdt->complex_comparison(gdt)){
-               if(gdt->is_buffer_type())
-                       sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
-               else
-                       sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                       if(gdt->is_buffer_type())
+                               sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)",
+                                       gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
+                       else
+                               sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)",
+                                       gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                }else{
-                       sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
+                       sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g);
                }
                ret += tmpstr;
        }
@@ -9832,14 +9900,17 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 string sgah_qpn::generate_operator(int i, string params){
 
        if(hfta_disorder < 2){
+               string op_name = "groupby_operator";
+               if(hfta_slow_flush>0)
+                       op_name = "groupby_slowflush_operator";
                return(
-                       "       groupby_operator<" +
+                       "       "+op_name+"<" +
                        generate_functor_name()+","+
                        generate_functor_name() + "_groupdef, " +
                        generate_functor_name() + "_aggrdef, " +
                        generate_functor_name()+"_hash_func, "+
                        generate_functor_name()+"_equal_func "
-                       "> *op"+int_to_string(i)+" = new groupby_operator<"+
+                       "> *op"+int_to_string(i)+" = new "+op_name+"<"+
                        generate_functor_name()+","+
                        generate_functor_name() + "_groupdef, " +
                        generate_functor_name() + "_aggrdef, " +
@@ -11244,10 +11315,10 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_
                if(hashkey_dt[p]->complex_comparison(hashkey_dt[p])){
                  if(hashkey_dt[p]->is_buffer_type())
                        sprintf(tmpstr,"(%s(&(key1->hashkey_var%d), &(key2->hashkey_var%d))==0)",
-                               hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p);
+                               hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p);
                  else
                        sprintf(tmpstr,"(%s((key1->hashkey_var%d), (key2->hashkey_var%d))==0)",
-                               hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p);
+                               hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p);
                }else{
                        sprintf(tmpstr,"key1->hashkey_var%d == key2->hashkey_var%d",p,p);
                }
@@ -12591,10 +12662,10 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                if(gdt->complex_comparison(gdt)){
                  if(gdt->is_buffer_type())
                        sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                  else
                        sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                }else{
                        sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
                }
@@ -12621,10 +12692,10 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                                if(gdt->complex_comparison(gdt)){
                                  if(gdt->is_buffer_type())
                                        sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
-                                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                                  else
                                        sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
-                                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                                }else{
                                        sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
                                }
@@ -12695,21 +12766,34 @@ string rsgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, v
                ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
        }
 //             Constructors
+
        ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
+       ret += "\t// shallow copy constructor\n";
        ret += "\t"+generate_functor_name() + "_groupdef("+
-               this->generate_functor_name() + "_groupdef *gd){\n";
+               this->generate_functor_name() + "_groupdef &gd){\n";
        for(g=0;g<gb_tbl.size();g++){
                data_type *gdt = gb_tbl.get_data_type(g);
-               if(gdt->is_buffer_type()){
-                       sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
-                         gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
-                       ret += tmpstr;
-               }else{
-                       sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
-                       ret += tmpstr;
-               }
+               sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+               ret += tmpstr;
        }
        ret += "\t};\n";
+
+       ret += "\t// deep assignment operator\n";
+       ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+
+            this->generate_functor_name() + "_groupdef &gd){\n";
+    for(g=0;g<gb_tbl.size();g++){
+            data_type *gdt = gb_tbl.get_data_type(g);
+            if(gdt->is_buffer_type()){
+                    sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n",
+                      gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+                    ret += tmpstr;
+            }else{
+                    sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+                    ret += tmpstr;
+            }
+    }
+    ret += "\t}\n";    
+
 //             destructor
        ret += "\t~"+ generate_functor_name() + "_groupdef(){\n";
        for(g=0;g<gb_tbl.size();g++){
@@ -12877,6 +12961,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                        }
                }
                ret += "\tgs_int32_t needs_temporal_flush;\n";
+               ret += "\tbool disordered_arrival;\n";
        }
 
 //                     The publicly exposed functions
@@ -12914,18 +12999,16 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 
 //             temporal flush variables
 //             ASSUME that structured values won't be temporal.
-       gs_int32_t temporal_gb = 0;
        if(uses_temporal_flush){
                ret += "//\t\tInitialize temporal flush variables.\n";
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
                        if(gdt->is_temporal()){
                                literal_t gl(gdt->type_indicator());
-                               sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
-                               ret.append(tmpstr);
                                sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
                                ret.append(tmpstr);
-                               temporal_gb = g;
+                               sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
+                               ret.append(tmpstr);
                        }
                }
                ret += "\tneeds_temporal_flush = 0;\n";
@@ -13044,6 +13127,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
        if(uses_temporal_flush){
                ret+= "\tif( ( (";
                bool first_one = true;
+               string disorder_test;
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
 
@@ -13052,30 +13136,43 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                          sprintf(tmpstr,"gbval->gb_var%d",g);   string rhs_op = tmpstr;
                          if(first_one){first_one = false;} else {ret += ") && (";}
                          ret += generate_lt_test(lhs_op, rhs_op, gdt);
+                         disorder_test += generate_lt_test(rhs_op, lhs_op, gdt);
                        }
                }
                ret += ") ) ){\n";
+               int temporal_gb=-1;
                for(g=0;g<gb_tbl.size();g++){
                  data_type *gdt = gb_tbl.get_data_type(g);
                  if(gdt->is_temporal()){
-                               temporal_gb = g;
-                         if(gdt->is_buffer_type()){    // TODO first, last?  or delete?
-                               sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
+                         if(gdt->is_buffer_type()){
+                               sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&curr_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
                          }else{
+//                             sprintf(tmpstr,"\t\tlast_gb%d = curr_gb%d;\n",g,g);
+//                             ret += tmpstr;
+//                             sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
+
                                ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n";
                                ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n";
                                ret += "\t\t}else{\n";
                                ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n";
                                ret += "\t\t}\n";
                                sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
+                               temporal_gb=g;
                          }
                          ret += tmpstr;
                        }
                }
-               ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n"; 
-               ret += "\t\t}else{\n"
-                       "\t\t\tneeds_temporal_flush=0;\n"
-                       "\t\t}\n";
+               ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n";
+               ret += "\t}else{\n"
+                       "\t\tneeds_temporal_flush=0;\n"
+                       "\t}\n";
+
+               ret += "\tdisordered_arrival = "+disorder_test+";\n";
+//             ret += "\tif( ( ("+disorder_test+") ) ){\n";
+//             ret += "\t\tdisordered_arrival=true;\n";
+//             ret += "\t}else{\n";
+//             ret += "\t\tdisordered_arrival=false;\n";
+//             ret += "\t}\n";
        }
 
 
@@ -13184,8 +13281,8 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     update an aggregate object
 
        ret += "void update_aggregate(host_tuple &tup0, "
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval){\n";
        //              Variables for execution of the function.
        ret += "\tgs_int32_t problem = 0;\n";   // return unpack failure
 
@@ -13196,7 +13293,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //             Unpack all remaining attributes
        ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform);
        for(a=0;a<aggr_tbl.size();a++){
-         sprintf(tmpstr,"aggval->aggr_var%d",a);
+         sprintf(tmpstr,"aggval.aggr_var%d",a);
          string varname = tmpstr;
          ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema));
        }
@@ -13208,29 +13305,31 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     reinitialize an aggregate object
 
        ret += "void reinit_aggregates( "+
-               generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval){\n";
+               generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval){\n";
        //              Variables for execution of the function.
        ret += "\tgs_int32_t problem = 0;\n";   // return unpack failure
 
 //                     use of temporaries depends on the aggregate,
 //                     generate them in generate_aggr_update
 
+       int temporal_gb;        // track the # of the temporal gb
        for(g=0;g<gb_tbl.size();g++){
          data_type *gdt = gb_tbl.get_data_type(g);
          if(gdt->is_temporal()){
                  if(gdt->is_buffer_type()){
-                       sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
+                       sprintf(tmpstr,"\t\t%s(&(gbval.gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
                  }else{
-                       sprintf(tmpstr,"\t\t gbval->gb_var%d =last_gb%d;\n",g,g);
+                       sprintf(tmpstr,"\t\t gbval.gb_var%d =last_gb%d;\n",g,g);
                  }
                  ret += tmpstr;
+                 temporal_gb = g;
                }
        }
 
 //             Unpack all remaining attributes
        for(a=0;a<aggr_tbl.size();a++){
-         sprintf(tmpstr,"aggval->aggr_var%d",a);
+         sprintf(tmpstr,"aggval.aggr_var%d",a);
          string varname = tmpstr;
          ret.append(generate_aggr_reinitialize(varname,&aggr_tbl,a, schema));
        }
@@ -13249,10 +13348,12 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
        if(uses_temporal_flush){
                ret += "\treturn needs_temporal_flush;\n";
        }else{
-               ret += "\treturn 0;\n";
+               ret += "\treturn false;\n";
        }
        ret += "};\n";
 
+       ret += "bool disordered(){return disordered_arrival;}\n";
+
 //------------------------------------------------
 //     time bucket management
        ret += "void advance_last_tb(){\n";
@@ -13272,15 +13373,15 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     so I'll leave it in longhand.
 
        ret += "host_tuple create_output_tuple("
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval, bool &failed){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval, bool &failed){\n";
 
        ret += "\thost_tuple tup;\n";
        ret += "\tfailed = false;\n";
        ret += "\tgs_retval_t retval = 0;\n";
 
-       string gbvar = "gbval->gb_var";
-       string aggvar = "aggval->";
+       string gbvar = "gbval.gb_var";
+       string aggvar = "aggval.";
 
 
 //                     First, get the return values from the UDAFS
@@ -13424,14 +13525,14 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //             been unpacked.  delete the string udaf return values at the end.
 
        ret += "bool cleaning_when("
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval){\n";
 
        ret += "\tbool retval = true;\n";
 
 
-       gbvar = "gbval->gb_var";
-       aggvar = "aggval->";
+       gbvar = "gbval.gb_var";
+       aggvar = "aggval.";
 
 
        set<int> clw_pfcns;
@@ -13508,7 +13609,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 
        ret += "struct "+generate_functor_name()+"_hash_func{\n";
        ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+
-                               "_groupdef *grp) const{\n";
+                               "_groupdef &grp) const{\n";
        ret += "\t\treturn(0";
        for(g=0;g<gb_tbl.size();g++){
                data_type *gdt = gb_tbl.get_data_type(g);
@@ -13516,11 +13617,11 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                        ret += "^";
                        if(gdt->use_hashfunc()){
                                if(gdt->is_buffer_type())
-                                       sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+                                       sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
                                        else
-                               sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+                               sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
                        }else{
-                               sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
+                               sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
                        }
                        ret += tmpstr;
                }
@@ -13533,8 +13634,8 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     The comparison function
 
        ret += "struct "+generate_functor_name()+"_equal_func{\n";
-       ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+
-                       generate_functor_name()+"_groupdef *grp2) const{\n";
+       ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+
+                       "const "+generate_functor_name()+"_groupdef &grp2) const{\n";
        ret += "\t\treturn( (";
 
        string hcmpr = "";
@@ -13545,13 +13646,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                        if(first_exec){first_exec=false;}else{ hcmpr += ") && (";}
                        if(gdt->complex_comparison(gdt)){
                          if(gdt->is_buffer_type())
-                               sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                               sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)",
+                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                          else
-                               sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                               sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)",
+                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                        }else{
-                               sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
+                               sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g);
                        }
                        hcmpr += tmpstr;
                }
index ec782e0..c1001d6 100644 (file)
@@ -492,6 +492,7 @@ public:
 
        int lfta_disorder;              // maximum disorder in the steam between lfta, hfta
        int hfta_disorder;              // maximum disorder in the  hfta
+       int hfta_slow_flush;    // outputs per input, 0 means no slow flush
 
 //             rollup, cube, and grouping_sets cannot be readily reconstructed by
 //             analyzing the patterns, so explicitly record them here.
@@ -517,11 +518,15 @@ public:
 
        std::string generate_operator(int i, std::string params);
        std::string get_include_file(){
-                       if(hfta_disorder <= 1){
-                               return("#include <groupby_operator.h>\n");
+               if(hfta_disorder <= 1){
+                       if(hfta_slow_flush>0){
+                               return("#include <groupby_slowflush_operator.h>\n");
                        }else{
-                               return("#include <groupby_operator_oop.h>\n");
+                               return("#include <groupby_operator.h>\n");
                        }
+               }else{
+                       return("#include <groupby_operator_oop.h>\n");
+               }
        };
 
     std::vector<select_element *> get_select_list(){return select_list;};
@@ -554,10 +559,12 @@ public:
        sgah_qpn(){
                lfta_disorder = 1;
                hfta_disorder = 1;
+               hfta_slow_flush = 0;
        };
        sgah_qpn(query_summary_class *qs,table_list *Schema){
                lfta_disorder = 1;
                hfta_disorder = 1;
+               hfta_slow_flush = 0;
 
 //                             Get the table name.
 //                             NOTE the colrefs have the tablevar ref (an int)
@@ -640,6 +647,7 @@ public:
                                                        param_tbl->handle_access(param_names[pi]));
                }
                ret->definitions = definitions;
+               ret->hfta_slow_flush = hfta_slow_flush;
 
                ret->node_name = node_name + suffix;
 
index 4ee0bb4..19ff634 100644 (file)
@@ -169,6 +169,7 @@ int main(int argc, char **argv){
   vector<string> registration_query_names;                     // for lfta.c registration
   map<string, vector<int> > mach_query_names;  // list queries of machine
   vector<int> snap_lengths;                            // for lfta.c registration
+  vector<int> snap_position;                           // for lfta.c registration
   vector<string> interface_names;                      // for lfta.c registration
   vector<string> machine_names;                        // machine of interface
   vector<bool> lfta_reuse_options;                     // for lfta.c registration
@@ -2095,7 +2096,11 @@ for(q=0;q<hfta_sets.size();++q){
                lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
 */
 
-               snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
+               snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
+               snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
+
+// STOPPED HERE need to figure out how to generate the code that Vlad needs
+//             from snap_postion
 
 // TODO NOTE : I'd like it to be the case that registration_query_names
 //     are the queries to be registered for subsciption.
@@ -2552,7 +2557,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
 
 
 //                     Find common filter predicates in the LFTAs.
-//                     in addition generate structs to store the temporal attributes unpacked by prefilter
+//                     in addition generate structs to store the
+//                     temporal attributes unpacked by prefilter
+//                     compute & provide interface for per-interface
+//                     record extraction properties
        
        map<string, vector<stream_query *> >::iterator ssqi;
        for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
@@ -2655,10 +2663,13 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                        lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
 #endif
                map<string, vector<long long int> > lfta_sigs; // used again later
+               map<string, int> lfta_snap_pos; // optimize csv parsing
+                                                                       // compute now, use in get_iface_properties
                for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
                        string liface = (*mvsi).first;
                        vector<long long int> empty_list;
                        lfta_sigs[liface] = empty_list;
+                       lfta_snap_pos[liface] = -1;
 
                        vector<col_id_set> lfta_cols;
                        vector<int> lfta_snap_length;
@@ -2672,7 +2683,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                                }
                                lfta_sigs[liface].push_back(mask);
                                lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
-                               lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
+                               lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
+                               int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
+                               if(this_snap_pos > lfta_snap_pos[liface])
+                                       lfta_snap_pos[liface]  = this_snap_pos;
                        }
 
 //for(li=0;li<mach_lftas.size();++li){
@@ -2749,6 +2763,8 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                                }
                                lfta_val[lmach] += "\";\n";
                        }
+                       lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
+                       lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
                        lfta_val[lmach] += "\t\t} else\n";
                        lfta_val[lmach] += "\t\t\treturn NULL;\n";
                }
@@ -3079,6 +3095,7 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
        bool use_proto = false;
        bool use_bsa = false;
        bool use_kafka = false;
+       bool use_ssl = false;   
        int erri;
        string err_str;
        for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
@@ -3116,6 +3133,17 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
 #endif 
                        }
                }
+               ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
+               for(int ift_i=0;ift_i<ift.size();ift_i++){
+                       if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
+#ifdef SSL_ENABLED                             
+                               use_ssl = true;
+#else
+                               fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
+                               exit(0);
+#endif 
+                       }
+               }               
        }
 
        fprintf(outfl,
@@ -3136,7 +3164,7 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
        if(use_pads)
                fprintf(outfl,"-lgscppads -lpads ");
        fprintf(outfl,
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
        if(use_pads)
                fprintf(outfl, " -lpz -lz -lbz ");
        if(libz_exists && libast_exists)
@@ -3152,6 +3180,9 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
 #endif
 #ifdef KAFKA_ENABLED   
        fprintf(outfl, " -lrdkafka ");
+#endif
+#ifdef SSL_ENABLED     
+       fprintf(outfl, " -lssl -lcrypto ");
 #endif
        fprintf(outfl," -lgscpaux");
 #ifdef GCOV
index 9772ec0..5464dcf 100644 (file)
@@ -1172,8 +1172,18 @@ string data_type::get_comparison_fcn(data_type *dt){
        default:
                return("ERROR_NO_SUCH_COMPARISON_FCN");
        }
-
-
+}
+string data_type::get_equals_fcn(data_type *dt){
+  switch(type){
+       case timeval_t:
+               return("Compare_Timeval");
+       case v_str_t:
+               return("str_equal");
+       case ipv6_t:
+               return("ipv6_compare");
+       default:
+               return("ERROR_NO_SUCH_COMPARISON_FCN");
+       }
 }
 
 string data_type::get_hfta_comparison_fcn(data_type *dt){
@@ -1188,6 +1198,18 @@ string data_type::get_hfta_comparison_fcn(data_type *dt){
                return("ERROR_NO_SUCH_COMPARISON_FCN");
        }
 }
+string data_type::get_hfta_equals_fcn(data_type *dt){
+  switch(type){
+       case timeval_t:
+               return("hfta_Compare_Timeval");
+       case v_str_t:
+               return("hfta_vstr_equal");
+       case ipv6_t:
+               return("hfta_ipv6_compare");
+       default:
+               return("ERROR_NO_SUCH_COMPARISON_FCN");
+       }
+}
 
 //             Return true if operating on  these types requires
 //             a special function for this operator.
index f136ae9..e99eeb5 100644 (file)
@@ -95,6 +95,8 @@ public:
   bool complex_comparison(data_type *dt);
   std::string get_comparison_fcn(data_type *dt);
   std::string get_hfta_comparison_fcn(data_type *dt);
+  std::string get_equals_fcn(data_type *dt);
+  std::string get_hfta_equals_fcn(data_type *dt);
 
   bool complex_operator(data_type *dt, std::string &op);
   std::string get_complex_operator(data_type *dt, std::string &op);
index 8396e4d..f9b057a 100644 (file)
@@ -65,8 +65,8 @@ gs_retval_t hfta_vstr_length(vstring *str) {
 }
 
 //             Assume that SRC is either INTERNAL or SHALLOW_COPY
-void hfta_vstr_assign_with_copy_in_tuple(vstring32 * target, vstring * src,
-       gs_sp_t data_offset,  gs_retval_t int_offset) {
+void hfta_vstr_assign_with_copy_in_tuple(vstring32 * target,
+               const vstring * src, gs_sp_t data_offset,  gs_retval_t int_offset) {
        target->length = src->length;
        target->offset = int_offset;
        target->reserved = PACKED;
@@ -77,7 +77,7 @@ void hfta_vstr_assign_with_copy_in_tuple(vstring32 * target, vstring * src,
 //             Ted wrote the following function.
 //             make deep copy of src.  Assume that dst is already empty.
 //             Assume that SRC is either INTERNAL or SHALLOW_COPY
-void hfta_vstr_assign_with_copy(vstring *dst, vstring *src){
+void hfta_vstr_assign_with_copy(vstring *dst, const vstring *src){
        dst->length=src->length;
        if(src->length){
                dst->offset=(gs_p_t)malloc(dst->length);
@@ -89,7 +89,7 @@ void hfta_vstr_assign_with_copy(vstring *dst, vstring *src){
 //             Ted wrote the following function.
 //             Make a deep copy of src.  garbage collect dst if needed.
 //             Assume that SRC is either INTERNAL or SHALLOW_COPY
-void hfta_vstr_replace(vstring *dst, vstring *src){
+void hfta_vstr_replace(vstring *dst, const vstring *src){
        hfta_vstr_destroy(dst);
        hfta_vstr_assign_with_copy(dst,src);
 }
@@ -171,6 +171,23 @@ gs_retval_t hfta_vstr_compare(const vstring * s1, const vstring * s2) {
        return(s1->length - s2->length);
 }
 
+gs_retval_t hfta_vstr_equal(const vstring * s1, const vstring * s2) {
+       gs_int32_t x;
+
+       if(s1->length != s2->length)
+               return -1;
+
+//     cmp_ret=memcmp((void *)s1->offset,(void *)s2->offset,s1->length);
+    for(x=0;x<s1->length;x++) {
+        if (((char *)(s1->offset))[x]!=((char *)(s2->offset))[x]) {
+            return -1;
+        }
+    }
+
+
+       return 0;
+}
+
 
 
 gs_param_handle_t register_handle_for_str_regex_match_slot_1(vstring* pattern) {
index 1366cef..7eef46d 100644 (file)
@@ -207,7 +207,7 @@ struct avg_udaf_lfta_struct_t{
         gs_uint32_t cnt;
 };
 
-//              sctarchpad struct
+//              scratchpad struct
 struct avg_udaf_hfta_struct_t{
         gs_int64_t sum;
         gs_uint32_t cnt;
@@ -715,3 +715,213 @@ void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) {
   hfta_vstr_destroy(scratch);
  }
 
+
+////////////////////////////////////////////
+//             Aggregate strings by catenation
+       
+
+struct CAT_aggr_scratch{
+       std::string val;
+       int x;
+};
+
+struct CAT_aggr_scratch_ptr{
+       CAT_aggr_scratch *ptr;
+};
+
+void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s){
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = new CAT_aggr_scratch();
+       v->x = 101;
+
+       p->ptr = v;
+}
+void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s){
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = p->ptr;
+       v->val="";
+}
+void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str){
+char buf1[MAXTUPLESZ-20], buf2[MAXTUPLESZ-20];
+int i;
+for(i=0;i<sep->length;++i) buf1[i] = *(((char *)sep->offset)+i);
+buf1[i]='\0';
+for(i=0;i<str->length;++i) buf2[i] = *(((char *)str->offset)+i);
+buf2[i]='\0';
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = p->ptr;
+       if(v->val.size()>0)
+               v->val.append((char *)(sep->offset), sep->length);
+       v->val.append((char *)(str->offset), str->length);
+//printf("sep=%s, str=%s, val=%s\n",buf1,buf2,v->val.c_str());
+}
+void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = p->ptr;
+//printf("output val=%s\n",v->val.c_str());
+       res->offset = (gs_p_t)malloc(v->val.size());
+       res->length = v->val.size();
+       if(res->length>MAXTUPLESZ-20)
+               res->length=MAXTUPLESZ-20;
+//     v->val.copy((char *)(res->offset), 0, res->length);
+       const char *dat = v->val.c_str();
+       memcpy((char *)(res->offset), dat, res->length);
+//     for(int i=0;i<res->length;++i)
+//             *(((char *)res->offset)+i) = dat[i];
+       res->reserved = INTERNAL;
+}
+void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s){
+       CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+       CAT_aggr_scratch *v = p->ptr;
+       delete v;
+}
+
+       
+       
+///////////////////////////////////////////////////////////////
+//     time_avg((sample, ts, window_size)
+//  Compute time-weighted average sum(sample*duration)/window_size
+//  duration is difference between current and next ts.
+//  The idea is to compute a sum over a step function.
+//  
+       
+struct time_avg_udaf_str{
+       gs_float_t sum;
+       gs_float_t last_val;
+       gs_uint64_t last_ts;
+       gs_uint64_t window;
+       gs_uint64_t first_ts;
+       gs_uint8_t event_occurred;
+};
+       
+void time_avg_HFTA_AGGR_INIT_(gs_sp_t s){
+       time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+       scratch->sum = 0.0;
+       scratch->last_val = 0.0;
+       scratch->last_ts = 0;
+       scratch->first_ts = 0;
+       scratch->event_occurred = 0;
+}
+
+void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s){
+}
+
+void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s){
+       time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+       scratch->event_occurred = 0;
+       scratch->sum = 0;
+//printf("time_avg_reinit: occurred=%d, last_val=%lf, sum=%lf, first_ts=%lld, last_ts=%lld\n",scratch->event_occurred, scratch->last_val, scratch->sum, scratch->first_ts, scratch->last_ts);
+}
+
+void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s){
+       time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+       if(scratch->event_occurred==0){
+               *result =  scratch->last_val;
+//printf("\ttime_avg outpt1 sum=%lf, last_val=%lf, result=%lf\n", scratch->sum, scratch->last_val, *result);
+               return;
+       }
+       gs_int64_t end_time = scratch->window * (scratch->last_ts/scratch->window + 1);
+       scratch->sum += (end_time - scratch->last_ts) * (gs_float_t)(scratch->last_val); 
+       gs_int64_t start_time = end_time - scratch->window;
+       if(scratch->first_ts > start_time){
+               *result = scratch->sum / (end_time - scratch->first_ts);
+//printf("\ttime_avg outpt2 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
+       }else{
+               *result = scratch->sum / (end_time - start_time);
+//printf("\ttime_avg outpt3 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
+       }
+}
+
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+       scratch->window = window;
+       if(scratch->first_ts==0){
+               scratch->first_ts = ts;
+       }else{
+               if(scratch->event_occurred){
+
+                       scratch->sum += (ts - scratch->last_ts) * scratch->last_val;
+               }else{
+                       gs_int64_t start_time = scratch->window * (scratch->last_ts/scratch->window);
+                       scratch->sum += (ts - start_time) * scratch->last_val;
+               }
+       }
+//printf("time_avg_upd: val=%lf, occurred=%d, last_val=%lf, sum=%lf, ts=%lld, first_ts=%lld, last_ts=%lld\n",val, scratch->event_occurred, scratch->last_val, scratch->sum, ts, scratch->first_ts, scratch->last_ts);
+       scratch->last_val = val;
+       scratch->last_ts = ts;
+       scratch->event_occurred = 1;
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window){
+       time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+       
+
+// ------------------------------------------------------------
+//             running_sum_max : get the running sum of an int,
+//             be able to report this sum and also its max value
+//             during the time window
+
+struct run_sum_max_udaf_str{
+       gs_int64_t sum;
+       gs_int64_t max;
+};
+void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum = 0;
+       scratch->max = 0;
+}
+void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->max = scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
+        r->length = sizeof(run_sum_max_udaf_str);
+        r->offset = (gs_p_t)(b);
+        r->reserved = SHALLOW_COPY;
+}
+void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b){
+        return;
+}
+
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum+=v;
+       if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum+=v;
+       if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum+=v;
+       if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v){
+       run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+       scratch->sum+=v;
+       if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+//     the extraction functions
+gs_int64_t extr_running_sum(vstring *v){
+       if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
+       run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
+       return vs->sum;
+}
+gs_int64_t extr_running_sum_max(vstring *v){
+       if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
+       run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
+       return vs->max;
+}
+
+
index 33eae61..ed7403a 100644 (file)
@@ -23,9 +23,9 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <zlib.h>
-#include "errno.h"
-#include "stdio.h"
-#include "stdlib.h"
+#include <errno.h>
+#include <stdio.h>
+#include <dirent.h>
 
 
 extern "C" {
@@ -64,6 +64,29 @@ z_stream strm;
 BSA::FileStream::ISubStream* stream;
 BSA::FileStream::IFileHandle* ifh;
 BSA::FileStream::Reader* reader;
+
+#endif
+
+#ifdef SSL_ENABLED
+#include <openssl/pem.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+#include <openssl/ssl.h>
+#include <openssl/crypto.h>
+#include <openssl/err.h>
+
+EVP_PKEY *rkey;
+PKCS7 *p7;
+BIO* mem_io;
+char pwd[CSVMAXLINE];
+
+// callback for passing password to private key reader
+int pass_cb(char *buf, int size, int rwflag, void *u) {
+    int len = strlen(pwd);
+    memcpy(buf, pwd, len);
+    return len;
+}
+
 #endif
 
 gs_sp_t dev;
@@ -72,6 +95,9 @@ static int listensockfd=-1;
 static int fd=-1;
 static struct packet cur_packet;
 static gs_sp_t name;
+static gs_sp_t dir_name;
+struct dirent **namelist;
+static gs_int32_t num_dir_files;
 static gs_sp_t line;
 static ssize_t len;
 static size_t line_len;
@@ -82,6 +108,7 @@ static gs_uint32_t startupdelay=0;
 static gs_uint32_t singlefile=0;
 static gs_uint32_t use_gzip=0;
 static gs_uint32_t use_bsa=0;
+static gs_uint32_t use_decryption=0;
 static gs_uint32_t gshub=0;
 static int socket_desc=0;
 
@@ -191,6 +218,57 @@ static void init_socket() {
 }
 
 static void next_file() {
+
+       static gs_uint32_t file_pos = 0;
+       static gs_uint32_t scan_finished = 0;
+
+       char buf[CSVMAXLINE];
+
+       if (dir_name) {
+               if (scan_finished) {
+                       if (verbose)
+                               fprintf(stderr,"Done processing, waiting for things to shut down\n");
+                       rts_fta_done();
+                       // now just service message queue until we get killed or loose connectivity
+                       while (true) {
+                               fta_start_service(0); // service all waiting messages
+                               usleep(1000); // sleep a millisecond
+                       }
+               }
+               if (num_dir_files) {            // we already started directory scan
+                       free(name);
+                       if (file_pos < num_dir_files) {
+                               sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
+                               name = strdup(buf);
+                               free(namelist[file_pos]);
+                               file_pos++;
+                       } else {
+                               free(namelist);
+                               scan_finished = 1;
+                               return;
+                       }
+               } else {
+                       num_dir_files = scandir(dir_name, &namelist, NULL, alphasort);
+                       if (num_dir_files == -1) {
+                               num_dir_files = 0;
+                               print_error((gs_sp_t)"ERROR: Unable to scan directory");
+                               return;
+                       }
+                       if (num_dir_files == 2) {       // only . and . are there, empty dir
+                               free(namelist[0]);
+                               free(namelist[1]);
+                               scan_finished = 1;
+                               return;
+                       } else
+                               file_pos = 2;
+                       
+                       sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
+                       name = strdup(buf);
+                       free(namelist[file_pos]);
+                       file_pos++;
+               }
+       }
+
        struct stat s;
        if (verbose) {
                fprintf(stderr,"Opening %s\n",name);
@@ -213,7 +291,31 @@ static void next_file() {
                exit(10);
        }
        posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
-       if (singlefile == 0) {
+
+#ifdef SSL_ENABLED
+       if (use_decryption) {
+               // free SSL resources
+               if (mem_io)
+                       BIO_free(mem_io);
+               if (p7)
+                       PKCS7_free(p7);
+
+               FILE *fp = fdopen(fd, "r");
+               p7 = d2i_PKCS7_fp(fp, NULL);
+       if (p7 == NULL) {
+               print_error((gs_sp_t)"Error reading SMIME message from file");
+               exit(-1);
+       }
+
+       if(!(mem_io = PKCS7_dataDecode(p7, rkey, NULL, NULL))) {
+               print_error((gs_sp_t)"Error decoding PKCS7 file\n");
+               exit(-1);
+       }
+
+               fclose(fp);
+       }
+#endif 
+       if (!dir_name && !singlefile) {
                unlink(name);
        }
        if (use_gzip) {
@@ -274,7 +376,13 @@ static gs_retval_t csv_replay_init(gs_sp_t device)
        gs_sp_t tempdel;
        gs_sp_t singlefiletmp;
        gs_sp_t compressortmp;
-       gs_sp_t bsatmp;    
+       gs_sp_t bsatmp;  
+       gs_sp_t encryptedtmp;  
+       gs_sp_t maxfieldtmp
+
+       gs_sp_t pkey_fname;  
+       gs_sp_t pwd_fname;              
+
 
        if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
                if (strncmp(verbosetmp,"TRUE",4)==0) {
@@ -285,10 +393,13 @@ static gs_retval_t csv_replay_init(gs_sp_t device)
                }
        }
 
-       if ((name=get_iface_properties(device,(gs_sp_t)"filename"))==0) {
-               print_error((gs_sp_t)"csv_replay_init::No CSV \"Filename\" defined");
+       name=get_iface_properties(device,(gs_sp_t)"filename");
+       dir_name=get_iface_properties(device,(gs_sp_t)"directoryname");
+       if (!name && !dir_name) {
+               print_error((gs_sp_t)"csv_replay_init::Either \"Filename\" or \"Dirname\" must be defined");
                exit(0);
        }
+
        tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
        if (tempdel != 0 ) {
                csvdel = tempdel[0];
@@ -332,15 +443,77 @@ static gs_retval_t csv_replay_init(gs_sp_t device)
 
        if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
                if (verbose) {
-                               fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")));
+                               fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
                }
-               startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"));
+               startupdelay=atoi(delaytmp);
        }
+
+       if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
+               max_field_csv=atoi(maxfieldtmp);
+       }       
+
        if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) {
                if (verbose) {
                                fprintf(stderr,"CSV format using gshub\n");
                }
                gshub=1;
+               if (!name) {
+                       print_error((gs_sp_t)"csv_replay_init::Filename must be defined for gshub interfaces");
+                       exit(0);                        
+               }
+       }
+
+       pkey_fname=get_iface_properties(device,(gs_sp_t)"privatekey");
+       pwd_fname=get_iface_properties(device,(gs_sp_t)"password");
+
+       if ((encryptedtmp=get_iface_properties(device,(gs_sp_t)"encrypted"))!=0) {
+               if (strncmp(encryptedtmp,"TRUE",4)==0) {
+                       #ifndef SSL_ENABLED
+                               print_error((gs_sp_t)"csv_replay_init::runtime not built with SSL support to use encrypted interfaces");                
+                               exit(0);
+                       #else
+                               use_decryption=1;
+                               if (verbose) {
+                                       fprintf(stderr,"CSV file is encrypted\n");
+                               }
+                               if (!pkey_fname || !pwd_fname) {
+                                       print_error((gs_sp_t)"csv_replay_init::privatekey and/or password filenames not specified for encrypted itnerface");            
+                                       exit(0);
+                               }
+
+                               OpenSSL_add_all_algorithms();
+                               ERR_load_crypto_strings();
+
+                               // Read password file
+                               FILE* in_fd = fopen(pwd_fname, "r");
+                               if (!in_fd) {
+                                       fprintf(stderr, "Unable to open password file %s\n", pwd_fname);
+                                       exit(0);        
+                               }
+
+                               if (!fgets(pwd, CSVMAXLINE, in_fd)) {
+                                       fprintf(stderr, "Error reading password from file %s\n", pwd_fname);
+                                       exit(0);                                        
+                               }
+                               strtok(pwd, "\r\n\t ");
+                               fclose(in_fd);                  
+
+                               // Read the private key
+                               in_fd = fopen(pkey_fname, "r");
+                               if (!in_fd) {
+                                       fprintf(stderr, "Unable to open private key file %s\n", pkey_fname);
+                                       exit(0);        
+                               }
+
+                               rkey = PEM_read_PrivateKey(in_fd, NULL, pass_cb, NULL);
+                               if (!rkey) {
+                                       fprintf(stderr, "Unable to read private key file %s\n", pkey_fname);
+                                       exit(-1);        
+                               }
+                               
+                               fclose(in_fd);
+                       #endif
+               }
        }
 
        cur_packet.ptype=PTYPE_CSV;
@@ -439,8 +612,12 @@ static gs_int32_t csv_read_chunk() {
                }
                } else {
 #endif
-                       if (fd <= 0) next_file();
-                       while ((have = read(fd, read_pos, CHUNK)) == 0) {
+               if (fd <= 0) next_file();
+                       
+#ifdef SSL_ENABLED
+               if (use_decryption) {
+
+               while ((have = BIO_read (mem_io, read_pos, CHUNK)) == 0) {
                                if (singlefile==1) {
                                        if(verbose) {
                                                fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
@@ -450,7 +627,24 @@ static gs_int32_t csv_read_chunk() {
                                } else {
                                        next_file();
                                }
+               }
+
+               }       else {                  
+#endif
+               while ((have = read(fd, read_pos, CHUNK)) == 0) {
+                       if (singlefile==1) {
+                               if(verbose) {
+                                       fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
+                               }
+                               return -2;
+
+                       } else {
+                               next_file();
                        }
+               }
+#ifdef SSL_ENABLED             
+               }
+#endif
 #ifdef BSA_ENABLED             
                }
 #endif
index 31049e5..4e3eda4 100644 (file)
@@ -167,6 +167,7 @@ static gs_retval_t kafka_replay_init(gs_sp_t device)
        gs_sp_t verbosetmp;
        gs_sp_t delaytmp;
        gs_sp_t tempdel;
+       gs_sp_t maxfieldtmp;    
 
        if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
                if (strncmp(verbosetmp,"TRUE",4)==0) {
@@ -195,11 +196,15 @@ static gs_retval_t kafka_replay_init(gs_sp_t device)
 
        if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
                if (verbose) {
-                               fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")));
+                               fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
                }
-               startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"));
+               startupdelay=atoi(delaytmp);
        }
 
+       if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
+               max_field_kafka=atoi(maxfieldtmp);
+       }       
+
        // set maximum field nubmer to be extracted by csv parser
        csv_set_maxfield(max_field_kafka);