From dec9c93423775db0f4783a93145f016d5d780ffd Mon Sep 17 00:00:00 2001 From: vlad shkapenyuk Date: Mon, 31 Aug 2020 09:40:41 -0400 Subject: [PATCH] Improvements to aggregation code and fucntion library Signed-off-by: vlad shkapenyuk Change-Id: I9282ff58d9add29eedfe89a6b4b6e1049282250c --- bin/start_processing | 2 +- cfg/external_fcns.def | 43 +++- include/gsoptions.h | 3 + include/hfta/groupby_operator.h | 160 +++--------- include/hfta/groupby_slowflush_operator.h | 231 +++++++++++++++++ include/hfta/hash_table.h | 101 +++++--- include/hfta/hfta_runtime_library.h | 9 +- include/hfta/hfta_udaf.h | 41 +++ include/hfta/join_eq_hash_operator.h | 2 +- include/hfta/running_gb_operator.h | 47 ++-- src/ftacmp/analyze_fta.cc | 4 +- src/ftacmp/generate_lfta_code.cc | 71 +++--- src/ftacmp/generate_lfta_code.h | 3 +- src/ftacmp/parse_schema.h | 3 + src/ftacmp/query_plan.cc | 373 ++++++++++++++++++---------- src/ftacmp/query_plan.h | 14 +- src/ftacmp/translate_fta.cc | 39 ++- src/ftacmp/type_objects.cc | 26 +- src/ftacmp/type_objects.h | 2 + src/lib/gscphftaaux/hfta_runtime_library.cc | 25 +- src/lib/gscphftaaux/hfta_udaf.cc | 212 +++++++++++++++- src/lib/gscprts/rts_csv.cc | 216 +++++++++++++++- src/lib/gscprts/rts_kafka.c | 9 +- 23 files changed, 1251 insertions(+), 385 deletions(-) create mode 100644 include/hfta/groupby_slowflush_operator.h diff --git a/bin/start_processing b/bin/start_processing index 3b235cc..1210939 100755 --- a/bin/start_processing +++ b/bin/start_processing @@ -1,4 +1,4 @@ -#!/usr/bin/bash +#!/bin/bash # ------------------------------------------------ # Copyright 2014 AT&T Intellectual Property # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/cfg/external_fcns.def b/cfg/external_fcns.def index 1c9eb3e..071a290 100644 --- a/cfg/external_fcns.def +++ b/cfg/external_fcns.def @@ -693,11 +693,18 @@ uint FUN [LFTA_LEGAL, COST EXPENSIVE] string UDAF count_diff_lfta_s fstring20 (string); +//////////////////////////////////////////////////////////////// +// string aggregation via catenation +////////////////////////////////////////////////////// + + string UDAF [HFTA_ONLY] CAT_aggr fstring8 (string, string); + /////////////////////////////////////////////////////////// -// integer array aggregation funciton +// integer array aggregation function // We are going to store 4 values in LFTA in fixed size buffer -// plus one byte for array length (17 bytes total) -// HFTA will combine partial aggregates +// plus one byte for array length (17 bytes total) +// HFTA will combine partial aggregates +// This seems to create a string with a comma-separated list of the uints /////////////////////////////////////////////////////////// string UDAF [RUNNING, SUBAGGR running_array_aggr_lfta, SUPERAGGR running_array_aggr_hfta] running_array_aggr string (uint); @@ -712,3 +719,33 @@ uint FUN [LFTA_LEGAL, COST EXPENSIVE] string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(ullong, string HANDLE); string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(int, string HANDLE); string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(uint, string HANDLE); + +////////////////////////////////////////////////////////// +// time-averaged aggregate. +// time_avg(sample, ts, window_size) + float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (uint, llong, llong) ; + float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (int, llong, llong) ; + float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (ullong, llong, llong) ; + float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (llong, llong, llong) ; + float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (float, llong, llong) ; + +// ------------------------------------------------------------ +// running_sum_max : get the running sum of an int, +// be able to report this sum and also its max value +// during the time window + + llong EXTR running_sum run_sum_max extr_running_sum (uint); + llong EXTR running_sum run_sum_max extr_running_sum (int); + llong EXTR running_sum run_sum_max extr_running_sum (llong); + llong EXTR running_sum run_sum_max extr_running_sum (ullong); + llong FUN [COST LOW] extr_running_sum(string); + llong EXTR running_sum_max run_sum_max extr_running_sum_max (uint); + llong EXTR running_sum_max run_sum_max extr_running_sum_max (int); + llong EXTR running_sum_max run_sum_max extr_running_sum_max (llong); + llong EXTR running_sum_max run_sum_max extr_running_sum_max (ullong); + llong FUN [COST LOW] extr_running_sum_max(string); + string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (uint); + string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (int); + string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (ullong); + string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (llong); + diff --git a/include/gsoptions.h b/include/gsoptions.h index 3c7e619..d3c0267 100644 --- a/include/gsoptions.h +++ b/include/gsoptions.h @@ -26,5 +26,8 @@ // support for KAFKA interfaces //#define KAFKA_ENABLED +// support for SSL decryption +//#define SSL_ENABLED + #endif diff --git a/include/hfta/groupby_operator.h b/include/hfta/groupby_operator.h index 26645c3..e831bc0 100644 --- a/include/hfta/groupby_operator.h +++ b/include/hfta/groupby_operator.h @@ -1,4 +1,4 @@ -/* ------------------------------------------------ +/** ------------------------------------------------ Copyright 2014 AT&T Intellectual Property Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ Copyright 2014 AT&T Intellectual Property #include #include "hash_table.h" -#define _GB_FLUSH_PER_TUPLE_ 1 using namespace std; @@ -29,19 +28,13 @@ template group_table[2]; + hash_table group_table; bool flush_finished; - unsigned int curr_table; - typename hash_table::iterator flush_pos; + typename hash_table::iterator flush_pos; int n_patterns; - - - public: groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) { flush_finished = true; - curr_table = 0; - flush_pos = group_table[1-curr_table].end(); n_patterns = func.n_groupby_patterns(); } @@ -49,24 +42,17 @@ public: // Push out completed groups - if(!flush_finished) partial_flush(result); - - // create buffer on the stack to store key object - char buffer[sizeof(group)]; - // extract the key information from the tuple and // copy it into buffer - group* grp = func.create_group(tup, buffer); - if (!grp) { -/* -// Ignore temp tuples until we can fix their timestamps. -if (func.temp_status_received()) { - tup.free_tuple(); - return 0; -}*/ + group grp; + if (!func.create_group(tup, (gs_sp_t)&grp)) { + if(func.disordered()){ + fprintf(stderr,"Out of order record in %s\n",op_name); + return 0; + } if (func.flush_needed()){ flush_old(result); - } + } if (func.temp_status_received()) { host_tuple temp_tup; if (!func.create_temp_status_tuple(temp_tup, flush_finished)) { @@ -77,32 +63,35 @@ if (func.temp_status_received()) { tup.free_tuple(); return 0; } + if(func.disordered()){ + fprintf(stderr,"Out of order record in %s\n",op_name); + return 0; + } - typename hash_table::iterator iter; - if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) { + typename hash_table::iterator iter; + if ((iter = group_table.find(grp)) != group_table.end()) { // Temporal GBvar is part of the group so no flush is needed. - aggregate* old_aggr = (*iter).second; - func.update_aggregate(tup, grp, old_aggr); + func.update_aggregate(tup, grp, (*iter).second); }else{ if (func.flush_needed()) { flush_old(result); } if(n_patterns <= 1){ - // create a copy of the group on the heap - group* new_grp = new group(grp); // need a copy constructor for groups -// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate)); - aggregate* aggr = new aggregate(); - // create an aggregate in preallocated buffer - aggr = func.create_aggregate(tup, (char*)aggr); - - group_table[curr_table].insert(new_grp, aggr); + aggregate aggr; + // create an aggregate in preallocated buffer + func.create_aggregate(tup, (char*)&aggr); + // neeed operator= doing a deep copy + group_table.insert(grp, aggr); }else{ int p; +// TODO this code is wrong, must check if each pattern is in the group table. for(p=0;p& result) { - host_tuple tup; - unsigned int old_table = 1-curr_table; - unsigned int i; - -// emit up to _GB_FLUSH_PER_TABLE_ output tuples. - if (!group_table[old_table].empty()) { - for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) { - bool failed = false; - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); - if (!failed) { - tup.channel = output_channel; - result.push_back(tup); - } - delete ((*flush_pos).first); - delete ((*flush_pos).second); -// free((*flush_pos).second); - } - } - -// Finalize processing if empty. - if(flush_pos == group_table[old_table].end()) { - flush_finished = true; - group_table[old_table].clear(); - group_table[old_table].rehash(); - } - return 0; - } int flush(list& result) { host_tuple tup; - typename hash_table::iterator iter; - unsigned int old_table = 1-curr_table; -// If the old table isn't empty, flush it now. - if (!group_table[old_table].empty()) { - for (; flush_pos != group_table[old_table].end(); ++flush_pos) { + flush_pos = group_table.begin(); +// If the table isn't empty, flush it now. + if (!group_table.empty()) { + for (; flush_pos != group_table.end(); ++flush_pos) { bool failed = false; tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); if (!failed) { @@ -154,30 +114,9 @@ if (func.temp_status_received()) { tup.channel = output_channel; result.push_back(tup); } - delete ((*flush_pos).first); - delete ((*flush_pos).second); // free((*flush_pos).second); } - group_table[old_table].clear(); - group_table[old_table].rehash(); - } - - flush_pos = group_table[curr_table].begin(); -// If the old table isn't empty, flush it now. - if (!group_table[curr_table].empty()) { - for (; flush_pos != group_table[curr_table].end(); ++flush_pos) { - bool failed = false; - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); - if (!failed) { - - tup.channel = output_channel; - result.push_back(tup); - } - delete ((*flush_pos).first); - delete ((*flush_pos).second); -// free((*flush_pos).second); - } - group_table[curr_table].clear(); + group_table.clear(); } flush_finished = true; @@ -186,33 +125,10 @@ if (func.temp_status_received()) { } int flush_old(list& result) { - host_tuple tup; - typename hash_table::iterator iter; - unsigned int old_table = 1-curr_table; - -// If the old table isn't empty, flush it now. - if (!group_table[old_table].empty()) { - for (; flush_pos != group_table[old_table].end(); ++flush_pos) { - bool failed = false; - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); - if (!failed) { - - tup.channel = output_channel; - result.push_back(tup); - } - delete ((*flush_pos).first); - delete ((*flush_pos).second); -// free((*flush_pos).second); - } - group_table[old_table].clear(); - group_table[old_table].rehash(); - } - -// swap tables, enable partial flush processing. - flush_pos = group_table[curr_table].begin(); - curr_table = old_table; - flush_finished = false; + flush(result); + group_table.clear(); + group_table.resize(); return 0; } @@ -231,7 +147,7 @@ if (func.temp_status_received()) { } unsigned int get_mem_footprint() { - return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint(); + return group_table.get_mem_footprint(); } }; diff --git a/include/hfta/groupby_slowflush_operator.h b/include/hfta/groupby_slowflush_operator.h new file mode 100644 index 0000000..d2120ba --- /dev/null +++ b/include/hfta/groupby_slowflush_operator.h @@ -0,0 +1,231 @@ +/** ------------------------------------------------ +Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ + +#ifndef GROUPBY_SLOWFLUSH_OPERATOR_H +#define GROUPBY_OPERATOR_H + +#include "host_tuple.h" +#include "base_operator.h" +#include +#include "hash_table.h" + +#define _HFTA_SLOW_FLUSH + +using namespace std; + +template +class groupby_slowflush_operator : public base_operator { +private : + groupby_func func; + hash_table group_table[2]; + bool flush_finished; + unsigned int curr_table; + typename hash_table::iterator flush_pos; + int n_patterns; + int gb_per_flush; + +public: + groupby_slowflush_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) { + flush_finished = true; + curr_table = 0; + flush_pos = group_table[1-curr_table].end(); + n_patterns = func.n_groupby_patterns(); + gb_per_flush = func.gb_flush_per_tuple(); + } + + int accept_tuple(host_tuple& tup, list& result) { + +// Push out completed groups + if(!flush_finished) partial_flush(result); + + // extract the key information from the tuple and + // copy it into buffer + group grp; + if (!func.create_group(tup, (gs_sp_t)&grp)) { + if(func.disordered()){ + fprintf(stderr,"Out of order record in %s\n",op_name); + return 0; + } + if (func.flush_needed()){ + flush_old(result); + } + if (func.temp_status_received()) { + host_tuple temp_tup; + if (!func.create_temp_status_tuple(temp_tup, flush_finished)) { + temp_tup.channel = output_channel; + result.push_back(temp_tup); + } + } + tup.free_tuple(); + return 0; + } + if(func.disordered()){ + fprintf(stderr,"Out of order record in %s\n",op_name); + return 0; + } + + typename hash_table::iterator iter; + if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) { +// Temporal GBvar is part of the group so no flush is needed. + func.update_aggregate(tup, grp, (*iter).second); + }else{ + if (func.flush_needed()) { + flush_old(result); + } + if(n_patterns <= 1){ + aggregate aggr; + // create an aggregate in preallocated buffer + func.create_aggregate(tup, (char*)&aggr); + // neeed operator= doing a deep copy + group_table[curr_table].insert(grp, aggr); + }else{ + int p; + for(p=0;p& result) { + host_tuple tup; + unsigned int old_table = 1-curr_table; + unsigned int i; + +// emit up to _GB_FLUSH_PER_TABLE_ output tuples. + if (!group_table[old_table].empty()) { + for (i=0; flush_pos != group_table[old_table].end() && i& result) { + host_tuple tup; + typename hash_table::iterator iter; + unsigned int old_table = 1-curr_table; + +// If the old table isn't empty, flush it now. + if (!group_table[old_table].empty()) { + for (; flush_pos != group_table[old_table].end(); ++flush_pos) { + bool failed = false; + tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); + if (!failed) { + + tup.channel = output_channel; + result.push_back(tup); + } +// free((*flush_pos).second); + } + group_table[old_table].clear(); + group_table[old_table].resize(); + } + + flush_pos = group_table[curr_table].begin(); +// If the table isn't empty, flush it now. + if (!group_table[curr_table].empty()) { + for (; flush_pos != group_table[curr_table].end(); ++flush_pos) { + bool failed = false; + tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); + if (!failed) { + + tup.channel = output_channel; + result.push_back(tup); + } +// free((*flush_pos).second); + } + group_table[curr_table].clear(); + } + + flush_finished = true; + + return 0; + } + + int flush_old(list& result) { + + host_tuple tup; + typename hash_table::iterator iter; + unsigned int old_table = 1-curr_table; + +// If the old table isn't empty, flush it now. + if (!group_table[old_table].empty()) { + for (; flush_pos != group_table[old_table].end(); ++flush_pos) { + bool failed = false; + tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); + if (!failed) { + + tup.channel = output_channel; + result.push_back(tup); + } +// free((*flush_pos).second); + } + + //group_table[old_table].clear(); + //group_table[old_table].resize(); + } + + group_table[old_table].clear(); + group_table[old_table].resize(); + +// swap tables, enable partial flush processing. + flush_pos = group_table[curr_table].begin(); + curr_table = old_table; + flush_finished = false; + return 0; + } + + int set_param_block(int sz, void * value) { + func.set_param_block(sz, value); + return 0; + } + + int get_temp_status(host_tuple& result) { + result.channel = output_channel; + return func.create_temp_status_tuple(result, flush_finished); + } + + int get_blocked_status () { + return -1; + } + + unsigned int get_mem_footprint() { + return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint(); + } +}; + +#endif // GROUPBY_OPERATOR_H diff --git a/include/hfta/hash_table.h b/include/hfta/hash_table.h index cf913e5..94d077b 100644 --- a/include/hfta/hash_table.h +++ b/include/hfta/hash_table.h @@ -40,7 +40,11 @@ public : value_type second; data_item* next; // next data item in overflow chain - data_item(const key_type& k, const value_type& v) : first(k), second(v), next(NULL) { } + data_item(const key_type& k, const value_type& v) { + first = k; + second = v; + next = NULL; + } }; struct hash_bucket { @@ -101,15 +105,19 @@ private: equal_func equal; double load_factor; -// double max_load; size_t _size; size_t _max_size; size_t num_buckets; size_t hash_mask; + // bucket allocation happens on first insert + // this flag indicates that buckets has been allocated + unsigned allocated; + + hash_bucket* first_bucket; // first used bucket - hash_bucket* last_bucket; // last used bucket + hash_bucket* last_bucket; // last used bucket hash_bucket* buckets; @@ -119,24 +127,38 @@ private: public: - hash_table(const size_t n_buckets = 65536, const double load = 1.0) { - load_factor = load; + hash_table(const size_t expected_size = 100000, const double max_load = 1.25) { + + size_t min_buckets = (size_t)(expected_size / max_load); + load_factor = max_load; int nb; - for(nb=2;nbnext = temp->next; } - delete (*temp).first; - delete (*temp).second; +// delete (*temp).first; +// delete (*temp).second; delete temp; total_memory -= sizeof(data_item); } @@ -326,14 +364,13 @@ public: last_bucket = NULL; _size = 0; total_memory = num_buckets * sizeof(hash_bucket); - } - int rehash() { + int resize() { if (_size) { - fprintf(stderr, "Error: rehashing non-empty hash table\n"); + fprintf(stderr, "Error: resizing non-empty hash table\n"); exit(1); - } + } double max_load = (1.0 * _max_size) / num_buckets; @@ -350,21 +387,25 @@ public: for(nb=2;nb group_table; - typename hash_table::iterator flush_pos; + hash_table group_table; + typename hash_table::iterator flush_pos; gs_int32_t nflushes; @@ -44,16 +44,15 @@ public: // Push out completed groups -// create buffer on the stack to store key object - char buffer[sizeof(group)]; -// Number of flushes required - -// extract the key information from the tuple and -// copy it into buffer - group* grp = func.create_group(tup, buffer); + group grp, *ret; + ret = func.create_group(tup, (gs_sp_t)&grp); nflushes = func.flush_needed(); - - if (!grp) { + if(func.disordered()){ + fprintf(stderr,"Out of order record in %s\n",op_name); + return 0; + } + + if (! ret) { if (nflushes>0){ flush(result); } @@ -71,19 +70,15 @@ public: if (nflushes>0) { flush(result); } - typename hash_table::iterator iter; + typename hash_table::iterator iter; if ((iter = group_table.find(grp)) != group_table.end()) { - aggregate* old_aggr = (*iter).second; - func.update_aggregate(tup, grp, old_aggr); + func.update_aggregate(tup, grp, (*iter).second); }else{ - // create a copy of the group on the heap - group* new_grp = new group(grp); // need a copy constructor for groups -// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate)); - aggregate* aggr = new aggregate(); - // create an aggregate in preallocated buffer - aggr = func.create_aggregate(tup, (char*)aggr); - - group_table.insert(new_grp, aggr); + aggregate aggr; + // create an aggregate in preallocated buffer + func.create_aggregate(tup, (char*)&aggr); + // neeed operator= doing a deep copy + group_table.insert(grp, aggr); } tup.free_tuple(); return 0; @@ -91,7 +86,7 @@ public: virtual int flush(list& result) { host_tuple tup; - typename hash_table::iterator iter; + typename hash_table::iterator iter; // Limit the number of successive flushes - avoid explosive behavior const gs_int32_t max_flushes = 10; @@ -116,12 +111,10 @@ public: result.push_back(tup); } if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){ - group* g = (*flush_pos).first; - aggregate* a = (*flush_pos).second; + group &g = (*flush_pos).first; + //aggregate a = (*flush_pos).second; ++flush_pos; group_table.erase(g); - delete (g); - delete (a); }else{ func.reinit_aggregates((*flush_pos).first, (*flush_pos).second); ++flush_pos; diff --git a/src/ftacmp/analyze_fta.cc b/src/ftacmp/analyze_fta.cc index d793577..57fc11f 100644 --- a/src/ftacmp/analyze_fta.cc +++ b/src/ftacmp/analyze_fta.cc @@ -5252,7 +5252,7 @@ void gather_pr_opcmp_fcns(predicate_t *pr, set &fcn_set){ case PRED_IN: ldt = pr->get_left_se()->get_data_type(); if(ldt->complex_comparison(ldt) ){ - fcn_set.insert( ldt->get_comparison_fcn(ldt) ); + fcn_set.insert( ldt->get_equals_fcn(ldt) ); } gather_se_opcmp_fcns(pr->get_left_se(), fcn_set); return; @@ -5260,7 +5260,7 @@ void gather_pr_opcmp_fcns(predicate_t *pr, set &fcn_set){ ldt = pr->get_left_se()->get_data_type(); rdt = pr->get_right_se()->get_data_type(); if(ldt->complex_comparison(rdt) ){ - fcn_set.insert( ldt->get_comparison_fcn(rdt) ); + fcn_set.insert( ldt->get_comparison_fcn(ldt) ); } gather_se_opcmp_fcns(pr->get_left_se(),fcn_set) ; gather_se_opcmp_fcns(pr->get_right_se(),fcn_set) ; diff --git a/src/ftacmp/generate_lfta_code.cc b/src/ftacmp/generate_lfta_code.cc index 55227b1..d6026bb 100644 --- a/src/ftacmp/generate_lfta_code.cc +++ b/src/ftacmp/generate_lfta_code.cc @@ -1053,7 +1053,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){ ret += "( "; if(ldt->complex_comparison(ldt) ){ - ret += ldt->get_comparison_fcn(ldt) ; + ret += ldt->get_equals_fcn(ldt) ; ret += "( "; if(ldt->is_buffer_type() ) ret += "&"; ret += generate_se_code(pr->get_left_se(), schema); @@ -1083,6 +1083,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){ ret += "( "; if(ldt->complex_comparison(rdt) ){ +// TODO can use get_equals_fcn if op is "=" ? ret += ldt->get_comparison_fcn(rdt); ret += "("; if(ldt->is_buffer_type() ) ret += "&"; @@ -1153,7 +1154,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type * string ret; if(dt->complex_comparison(dt) ){ - ret += dt->get_comparison_fcn(dt); + ret += dt->get_equals_fcn(dt); ret += "("; if(dt->is_buffer_type() ) ret += "&"; ret += lhs_op; @@ -1170,26 +1171,26 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type * return(ret); } -static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ - string ret; - - if(dt->complex_comparison(dt) ){ - ret += dt->get_comparison_fcn(dt); - ret += "("; - if(dt->is_buffer_type() ) ret += "&"; - ret += lhs_op; - ret += ", "; - if(dt->is_buffer_type() ) ret += "&"; - ret += rhs_op; - ret += ") == 0"; - }else{ - ret += lhs_op; - ret += " == "; - ret += rhs_op; - } - - return(ret); -} +//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ +// string ret; +// + // if(dt->complex_comparison(dt) ){ +// ret += dt->get_equals_fcn(dt); +// ret += "("; +// if(dt->is_buffer_type() ) ret += "&"; +// ret += lhs_op; +// ret += ", "; +// if(dt->is_buffer_type() ) ret += "&"; +// ret += rhs_op; +// ret += ") == 0"; +// }else{ +// ret += lhs_op; +// ret += " == "; +// ret += rhs_op; +// } +// +// return(ret); +//} // Here I assume that only MIN and MAX aggregates can be computed // over BUFFER data types. @@ -4630,7 +4631,7 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, -int compute_snap_len(qp_node *fs, table_list *schema){ +int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){ // Initialize global vars gb_tbl = NULL; @@ -4691,15 +4692,21 @@ int compute_snap_len(qp_node *fs, table_list *schema){ int tblref = (*csi).tblvar_ref; string field = (*csi).field; - param_list *field_params = schema->get_modifier_list(schref, field); - if(field_params->contains_key("snap_len")){ - string fld_snap_str = field_params->val_of("snap_len"); - int fld_snap; - if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){ - if(fld_snap > snap_len) snap_len = fld_snap; - n_snap++; - }else{ - fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() ); + if(snap_type == "index"){ + int pos = schema->get_field_idx(schref, field); + if(pos>snap_len) snap_len = pos; + n_snap++; + }else{ + param_list *field_params = schema->get_modifier_list(schref, field); + if(field_params->contains_key("snap_len")){ + string fld_snap_str = field_params->val_of("snap_len"); + int fld_snap; + if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){ + if(fld_snap > snap_len) snap_len = fld_snap; + n_snap++; + }else{ + fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() ); + } } } } diff --git a/src/ftacmp/generate_lfta_code.h b/src/ftacmp/generate_lfta_code.h index 875247f..2716111 100644 --- a/src/ftacmp/generate_lfta_code.h +++ b/src/ftacmp/generate_lfta_code.h @@ -30,7 +30,8 @@ std::string generate_lfta_block(qp_node *fs, table_list *schema, int gid, std::string generate_lfta_prefilter(std::vector &pred_list, col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns, std::vector &lfta_cols, std::vector &lfta_sigs, vector &lfta_snap_lens, std::string iface); std::string generate_lfta_prefilter_struct(col_id_set &temp_cids, table_list *Schema); -int compute_snap_len(qp_node *fs, table_list *schema); +// a snap_type of "index" computes position, else use snap_len metadata +int compute_snap_len(qp_node *fs, table_list *schema, string snap_type); std::string generate_watchlist_element_name(std::string node_name); std::string generate_watchlist_struct_name(std::string node_name); diff --git a/src/ftacmp/parse_schema.h b/src/ftacmp/parse_schema.h index 763ebd5..e1f21fa 100644 --- a/src/ftacmp/parse_schema.h +++ b/src/ftacmp/parse_schema.h @@ -542,6 +542,9 @@ public: return tbl_list[t]->get_field(tbl_list[t]->get_field_idx(f)); } int get_field_idx(std::string t, std::string f); + int get_field_idx(int t, std::string f){ + return tbl_list[t]->get_field_idx(f); + } int find_tbl(std::string t); diff --git a/src/ftacmp/query_plan.cc b/src/ftacmp/query_plan.cc index 8e05ae2..ec391dd 100644 --- a/src/ftacmp/query_plan.cc +++ b/src/ftacmp/query_plan.cc @@ -3874,7 +3874,7 @@ vector rsgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_li new_opl.push_back(new_se); } } - stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),false, false,aggr_tbl.has_bailout(a)); + stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),aggr_tbl.is_superaggr(a), aggr_tbl.is_running_aggr(a),aggr_tbl.has_bailout(a)); hse = new scalarexp_t(aggr_tbl.get_op(a).c_str(),new_opl); hse->set_data_type(Ext_fcns->get_fcn_dt(aggr_tbl.get_fcn_id(a))); hse->set_fcn_id(aggr_tbl.get_fcn_id(a)); @@ -4332,12 +4332,12 @@ vector sgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_lis stream_node->set_node_name( node_name ); stream_node->table_name->set_range_var(table_name->get_var_name()); -// allowed stream disorder. Default is 2, +// allowed stream disorder. Default is 1, // can override with max_lfta_disorder setting. // Also limit the hfta disorder, set to lfta disorder + 1. // can override with max_hfta_disorder. - fta_node->lfta_disorder = 2; + fta_node->lfta_disorder = 1; if(this->get_val_of_def("max_lfta_disorder") != ""){ int d = atoi(this->get_val_of_def("max_lfta_disorder").c_str() ); if(d<1){ @@ -4364,6 +4364,7 @@ printf("node %s setting lfta_disorder = %d\n",node_name.c_str(),fta_node->lfta_d } } + // First, process the group-by variables. // The fta must supply the values of all the gbvars. // If a gb is computed, the computation must be @@ -6823,7 +6824,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){ ret.append("( "); if(ldt->complex_comparison(ldt) ){ - ret.append( ldt->get_hfta_comparison_fcn(ldt) ); + ret.append( ldt->get_hfta_equals_fcn(ldt) ); ret.append("( "); if(ldt->is_buffer_type() ) ret.append("&"); @@ -6855,6 +6856,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){ ret.append("( "); if(ldt->complex_comparison(rdt) ){ +// TODO can use get_hfta_equals_fcn if op is "=" ? ret.append(ldt->get_hfta_comparison_fcn(rdt)); ret.append("("); if(ldt->is_buffer_type() ) @@ -6924,7 +6926,7 @@ static string generate_predicate_code_fm_aggr(predicate_t *pr, string gbvar, str ret.append("( "); if(ldt->complex_comparison(ldt) ){ - ret.append( ldt->get_hfta_comparison_fcn(ldt) ); + ret.append( ldt->get_hfta_equals_fcn(ldt) ); ret.append("( "); if(ldt->is_buffer_type() ) ret.append("&"); @@ -6956,6 +6958,7 @@ static string generate_predicate_code_fm_aggr(predicate_t *pr, string gbvar, str ret.append("( "); if(ldt->complex_comparison(rdt) ){ +// TODO can use get_hfta_equals_fcn if op is "=" ? ret.append(ldt->get_hfta_comparison_fcn(rdt)); ret.append("("); if(ldt->is_buffer_type() ) @@ -7014,7 +7017,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type * string ret; if(dt->complex_comparison(dt) ){ - ret.append(dt->get_hfta_comparison_fcn(dt)); + ret.append(dt->get_hfta_equals_fcn(dt)); ret.append("("); if(dt->is_buffer_type() ) ret.append("&"); @@ -7056,28 +7059,28 @@ static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){ return(ret); } -static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ - string ret; - - if(dt->complex_comparison(dt) ){ - ret.append(dt->get_hfta_comparison_fcn(dt)); - ret.append("("); - if(dt->is_buffer_type() ) - ret.append("&"); - ret.append(lhs_op); - ret.append(", "); - if(dt->is_buffer_type() ) - ret.append("&"); - ret.append(rhs_op ); - ret.append(") == 0"); - }else{ - ret.append(lhs_op ); - ret.append(" == "); - ret.append(rhs_op ); - } - - return(ret); -} +//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ +// string ret; +// +// if(dt->complex_comparison(dt) ){ +// ret.append(dt->get_hfta_equals_fcn(dt)); +// ret.append("("); +// if(dt->is_buffer_type() ) +// ret.append("&"); +// ret.append(lhs_op); +// ret.append(", "); +// if(dt->is_buffer_type() ) +// ret.append("&"); +// ret.append(rhs_op ); +// ret.append(") == 0"); +// }else{ +// ret.append(lhs_op ); +// ret.append(" == "); +// ret.append(rhs_op ); +// } +// +// return(ret); +//} // Here I assume that only MIN and MAX aggregates can be computed @@ -8881,6 +8884,18 @@ string sgah_qpn::generate_functor_name(){ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector &needs_xform){ int a,g,w,s; +// Regular or slow flush? + hfta_slow_flush = 0; + if(this->get_val_of_def("hfta_slow_flush") != ""){ + int d = atoi(this->get_val_of_def("hfta_slow_flush").c_str() ); + if(d<0){ + fprintf(stderr,"Warning, hfta_slow_flush in node %s is %d, must be at least 0, setting to 0.\n",node_name.c_str(), d); + hfta_slow_flush = 0; + }else{ + hfta_slow_flush = d; + } + } + // Initialize generate utility globals segen_gb_tbl = &(gb_tbl); @@ -8902,12 +8917,12 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n"; } // empty strucutred literals - map::iterator sii; - for(sii=structured_types.begin();sii!=structured_types.end();++sii){ - data_type dt(sii->second); - literal_t empty_lit(sii->first); - ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n"; - } +// map::iterator sii; +// for(sii=structured_types.begin();sii!=structured_types.end();++sii){ +// data_type dt(sii->second); +// literal_t empty_lit(sii->first); +// ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n"; +// } // Constructors if(structured_types.size()==0){ ret += "\t"+generate_functor_name() + "_groupdef(){};\n"; @@ -8916,22 +8931,34 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve } + ret += "\t// shallow copy constructors\n"; ret += "\t"+generate_functor_name() + "_groupdef("+ - this->generate_functor_name() + "_groupdef *gd){\n"; + "const " + this->generate_functor_name() + "_groupdef &gd){\n"; for(g=0;gis_buffer_type()){ - sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", - gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); - ret += tmpstr; - }else{ - sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g); - ret += tmpstr; - } + sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g); + ret += tmpstr; +// TODO : do strings perisist after the call? its a shllow copy +// if(gdt->is_buffer_type()){ +// sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", +// gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); +// ret += tmpstr; +// }else{ +// sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g); +// ret += tmpstr; +// } } ret += "\t}\n"; ret += "\t"+generate_functor_name() + "_groupdef("+ - this->generate_functor_name() + "_groupdef *gd, bool *pattern){\n"; + "const " + this->generate_functor_name() + "_groupdef &gd, bool *pattern){\n"; +// -- For patterns, need empty strucutred literals + map::iterator sii; + for(sii=structured_types.begin();sii!=structured_types.end();++sii){ + data_type dt(sii->second); + literal_t empty_lit(sii->first); + ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n"; + } + for(sii=structured_types.begin();sii!=structured_types.end();++sii){ literal_t empty_lit(sii->first); ret += "\t\t"+empty_lit.to_hfta_C_code("&"+empty_lit.hfta_empty_literal_name())+";\n"; @@ -8939,14 +8966,17 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve for(g=0;gis_buffer_type()){ - sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", - gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); - ret += tmpstr; - }else{ - sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g); - ret += tmpstr; - } + sprintf(tmpstr,"\t\t\tgb_var%d = gd.gb_var%d;\n",g,g); + ret += tmpstr; +// TODO Do strings persist long enough? its a shllow copy constructor? +// if(gdt->is_buffer_type()){ +// sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", +// gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); +// ret += tmpstr; +// }else{ +// sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g); +// ret += tmpstr; +// } ret += "\t\t}else{\n"; literal_t empty_lit(gdt->type_indicator()); if(empty_lit.is_cpx_lit()){ @@ -8957,6 +8987,23 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve ret += "\t\t}\n"; } ret += "\t};\n"; + + ret += "\t// deep assignment operator\n"; + ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+ + this->generate_functor_name() + "_groupdef &gd){\n"; + for(g=0;gis_buffer_type()){ + sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n", + gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); + ret += tmpstr; + }else{ + sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g); + ret += tmpstr; + } + } + ret += "\t}\n"; + // destructor ret += "\t~"+ generate_functor_name() + "_groupdef(){\n"; for(g=0;gis_temporal()){ - sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr; - sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr; - if(first_one){first_one = false;} else {ret += ") && (";} - ret += generate_equality_test(lhs_op, rhs_op, gdt); + sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr; + sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr; + if(first_one){first_one = false;} else {ret += ") && (";} + ret += generate_lt_test(lhs_op, rhs_op, gdt); + disorder_test += generate_lt_test(rhs_op, lhs_op, gdt); } } ret += ") ) ){\n"; @@ -9364,9 +9422,17 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; } } ret += "\t\tneeds_temporal_flush=true;\n"; - ret += "\t\t}else{\n" - "\t\t\tneeds_temporal_flush=false;\n" - "\t\t}\n"; + ret += "\t}else{\n" + "\t\tneeds_temporal_flush=false;\n" + "\t}\n"; + + ret += "\tdisordered_arrival = "+disorder_test+";\n"; +// ret += "\tif( ( ("+disorder_test+") ) ){\n"; +// ret += "\t\tdisordered_arrival=true;\n"; +// ret += "\t}else{\n"; +// ret += "\t\tdisordered_arrival=false;\n"; +// ret += "\t}\n"; + } }else{ ret+= "\tif(temp_tuple_received && !( ("; @@ -9515,8 +9581,8 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; // update an aggregate object ret += "void update_aggregate(host_tuple &tup0, " - +generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval){\n"; + +generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval){\n"; // Variables for execution of the function. ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure @@ -9527,7 +9593,7 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; // Unpack all remaining attributes ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform); for(a=0;aaggr_var%d",a); + sprintf(tmpstr,"aggval.aggr_var%d",a); string varname = tmpstr; ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema)); } @@ -9546,6 +9612,8 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; } ret += "\t};\n"; + ret += "bool disordered(){return disordered_arrival;}\n"; + //--------------------------------------------------- // create output tuple // Unpack the partial functions ref'd in the where clause, @@ -9556,15 +9624,15 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; // so I'll leave it in longhand. ret += "host_tuple create_output_tuple(" - +generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval, bool &failed){\n"; + +generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval, bool &failed){\n"; ret += "\thost_tuple tup;\n"; ret += "\tfailed = false;\n"; ret += "\tgs_retval_t retval = 0;\n"; - string gbvar = "gbval->gb_var"; - string aggvar = "aggval->"; + string gbvar = "gbval.gb_var"; + string aggvar = "aggval."; // Create cached temporaries for UDAF return values. for(a=0;a0) ret += "^"; data_type *gdt = gb_tbl.get_data_type(g); if(gdt->use_hashfunc()){ if(gdt->is_buffer_type()) - sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); + sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); else - sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); + sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); }else{ - sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g); + sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g); } ret += tmpstr; } @@ -9802,22 +9870,22 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; // The comparison function ret += "struct "+generate_functor_name()+"_equal_func{\n"; - ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+ - generate_functor_name()+"_groupdef *grp2) const{\n"; + ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+ + "const "+generate_functor_name()+"_groupdef &grp2) const{\n"; ret += "\t\treturn( ("; for(g=0;g0) ret += ") && ("; data_type *gdt = gb_tbl.get_data_type(g); if(gdt->complex_comparison(gdt)){ - if(gdt->is_buffer_type()) - sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); - else - sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + if(gdt->is_buffer_type()) + sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)", + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); + else + sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)", + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); }else{ - sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g); + sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g); } ret += tmpstr; } @@ -9832,14 +9900,17 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; string sgah_qpn::generate_operator(int i, string params){ if(hfta_disorder < 2){ + string op_name = "groupby_operator"; + if(hfta_slow_flush>0) + op_name = "groupby_slowflush_operator"; return( - " groupby_operator<" + + " "+op_name+"<" + generate_functor_name()+","+ generate_functor_name() + "_groupdef, " + generate_functor_name() + "_aggrdef, " + generate_functor_name()+"_hash_func, "+ generate_functor_name()+"_equal_func " - "> *op"+int_to_string(i)+" = new groupby_operator<"+ + "> *op"+int_to_string(i)+" = new "+op_name+"<"+ generate_functor_name()+","+ generate_functor_name() + "_groupdef, " + generate_functor_name() + "_aggrdef, " + @@ -11244,10 +11315,10 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_ if(hashkey_dt[p]->complex_comparison(hashkey_dt[p])){ if(hashkey_dt[p]->is_buffer_type()) sprintf(tmpstr,"(%s(&(key1->hashkey_var%d), &(key2->hashkey_var%d))==0)", - hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p); + hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p); else sprintf(tmpstr,"(%s((key1->hashkey_var%d), (key2->hashkey_var%d))==0)", - hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p); + hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p); }else{ sprintf(tmpstr,"key1->hashkey_var%d == key2->hashkey_var%d",p,p); } @@ -12591,10 +12662,10 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(gdt->complex_comparison(gdt)){ if(gdt->is_buffer_type()) sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); else sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); }else{ sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g); } @@ -12621,10 +12692,10 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(gdt->complex_comparison(gdt)){ if(gdt->is_buffer_type()) sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); else sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); }else{ sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g); } @@ -12695,21 +12766,34 @@ string rsgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, v ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n"; } // Constructors + ret += "\t"+generate_functor_name() + "_groupdef(){};\n"; + ret += "\t// shallow copy constructor\n"; ret += "\t"+generate_functor_name() + "_groupdef("+ - this->generate_functor_name() + "_groupdef *gd){\n"; + this->generate_functor_name() + "_groupdef &gd){\n"; for(g=0;gis_buffer_type()){ - sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", - gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); - ret += tmpstr; - }else{ - sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g); - ret += tmpstr; - } + sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g); + ret += tmpstr; } ret += "\t};\n"; + + ret += "\t// deep assignment operator\n"; + ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+ + this->generate_functor_name() + "_groupdef &gd){\n"; + for(g=0;gis_buffer_type()){ + sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n", + gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); + ret += tmpstr; + }else{ + sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g); + ret += tmpstr; + } + } + ret += "\t}\n"; + // destructor ret += "\t~"+ generate_functor_name() + "_groupdef(){\n"; for(g=0;gis_temporal()){ literal_t gl(gdt->type_indicator()); - sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str()); - ret.append(tmpstr); sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str()); ret.append(tmpstr); - temporal_gb = g; + sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str()); + ret.append(tmpstr); } } ret += "\tneeds_temporal_flush = 0;\n"; @@ -13044,6 +13127,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(uses_temporal_flush){ ret+= "\tif( ( ("; bool first_one = true; + string disorder_test; for(g=0;ggb_var%d",g); string rhs_op = tmpstr; if(first_one){first_one = false;} else {ret += ") && (";} ret += generate_lt_test(lhs_op, rhs_op, gdt); + disorder_test += generate_lt_test(rhs_op, lhs_op, gdt); } } ret += ") ) ){\n"; + int temporal_gb=-1; for(g=0;gis_temporal()){ - temporal_gb = g; - if(gdt->is_buffer_type()){ // TODO first, last? or delete? - sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g); + if(gdt->is_buffer_type()){ + sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&curr_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g); }else{ +// sprintf(tmpstr,"\t\tlast_gb%d = curr_gb%d;\n",g,g); +// ret += tmpstr; +// sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g); + ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n"; ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n"; ret += "\t\t}else{\n"; ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n"; ret += "\t\t}\n"; sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g); + temporal_gb=g; } ret += tmpstr; } } - ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n"; - ret += "\t\t}else{\n" - "\t\t\tneeds_temporal_flush=0;\n" - "\t\t}\n"; + ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n"; + ret += "\t}else{\n" + "\t\tneeds_temporal_flush=0;\n" + "\t}\n"; + + ret += "\tdisordered_arrival = "+disorder_test+";\n"; +// ret += "\tif( ( ("+disorder_test+") ) ){\n"; +// ret += "\t\tdisordered_arrival=true;\n"; +// ret += "\t}else{\n"; +// ret += "\t\tdisordered_arrival=false;\n"; +// ret += "\t}\n"; } @@ -13184,8 +13281,8 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // update an aggregate object ret += "void update_aggregate(host_tuple &tup0, " - +generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval){\n"; + +generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval){\n"; // Variables for execution of the function. ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure @@ -13196,7 +13293,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // Unpack all remaining attributes ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform); for(a=0;aaggr_var%d",a); + sprintf(tmpstr,"aggval.aggr_var%d",a); string varname = tmpstr; ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema)); } @@ -13208,29 +13305,31 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // reinitialize an aggregate object ret += "void reinit_aggregates( "+ - generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval){\n"; + generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval){\n"; // Variables for execution of the function. ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure // use of temporaries depends on the aggregate, // generate them in generate_aggr_update + int temporal_gb; // track the # of the temporal gb for(g=0;gis_temporal()){ if(gdt->is_buffer_type()){ - sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g); + sprintf(tmpstr,"\t\t%s(&(gbval.gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g); }else{ - sprintf(tmpstr,"\t\t gbval->gb_var%d =last_gb%d;\n",g,g); + sprintf(tmpstr,"\t\t gbval.gb_var%d =last_gb%d;\n",g,g); } ret += tmpstr; + temporal_gb = g; } } // Unpack all remaining attributes for(a=0;aaggr_var%d",a); + sprintf(tmpstr,"aggval.aggr_var%d",a); string varname = tmpstr; ret.append(generate_aggr_reinitialize(varname,&aggr_tbl,a, schema)); } @@ -13249,10 +13348,12 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(uses_temporal_flush){ ret += "\treturn needs_temporal_flush;\n"; }else{ - ret += "\treturn 0;\n"; + ret += "\treturn false;\n"; } ret += "};\n"; + ret += "bool disordered(){return disordered_arrival;}\n"; + //------------------------------------------------ // time bucket management ret += "void advance_last_tb(){\n"; @@ -13272,15 +13373,15 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // so I'll leave it in longhand. ret += "host_tuple create_output_tuple(" - +generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval, bool &failed){\n"; + +generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval, bool &failed){\n"; ret += "\thost_tuple tup;\n"; ret += "\tfailed = false;\n"; ret += "\tgs_retval_t retval = 0;\n"; - string gbvar = "gbval->gb_var"; - string aggvar = "aggval->"; + string gbvar = "gbval.gb_var"; + string aggvar = "aggval."; // First, get the return values from the UDAFS @@ -13424,14 +13525,14 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // been unpacked. delete the string udaf return values at the end. ret += "bool cleaning_when(" - +generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval){\n"; + +generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval){\n"; ret += "\tbool retval = true;\n"; - gbvar = "gbval->gb_var"; - aggvar = "aggval->"; + gbvar = "gbval.gb_var"; + aggvar = "aggval."; set clw_pfcns; @@ -13508,7 +13609,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; ret += "struct "+generate_functor_name()+"_hash_func{\n"; ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+ - "_groupdef *grp) const{\n"; + "_groupdef &grp) const{\n"; ret += "\t\treturn(0"; for(g=0;guse_hashfunc()){ if(gdt->is_buffer_type()) - sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); + sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); else - sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); + sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); }else{ - sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g); + sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g); } ret += tmpstr; } @@ -13533,8 +13634,8 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // The comparison function ret += "struct "+generate_functor_name()+"_equal_func{\n"; - ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+ - generate_functor_name()+"_groupdef *grp2) const{\n"; + ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+ + "const "+generate_functor_name()+"_groupdef &grp2) const{\n"; ret += "\t\treturn( ("; string hcmpr = ""; @@ -13545,13 +13646,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(first_exec){first_exec=false;}else{ hcmpr += ") && (";} if(gdt->complex_comparison(gdt)){ if(gdt->is_buffer_type()) - sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)", + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); else - sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)", + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); }else{ - sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g); + sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g); } hcmpr += tmpstr; } diff --git a/src/ftacmp/query_plan.h b/src/ftacmp/query_plan.h index ec782e0..c1001d6 100644 --- a/src/ftacmp/query_plan.h +++ b/src/ftacmp/query_plan.h @@ -492,6 +492,7 @@ public: int lfta_disorder; // maximum disorder in the steam between lfta, hfta int hfta_disorder; // maximum disorder in the hfta + int hfta_slow_flush; // outputs per input, 0 means no slow flush // rollup, cube, and grouping_sets cannot be readily reconstructed by // analyzing the patterns, so explicitly record them here. @@ -517,11 +518,15 @@ public: std::string generate_operator(int i, std::string params); std::string get_include_file(){ - if(hfta_disorder <= 1){ - return("#include \n"); + if(hfta_disorder <= 1){ + if(hfta_slow_flush>0){ + return("#include \n"); }else{ - return("#include \n"); + return("#include \n"); } + }else{ + return("#include \n"); + } }; std::vector get_select_list(){return select_list;}; @@ -554,10 +559,12 @@ public: sgah_qpn(){ lfta_disorder = 1; hfta_disorder = 1; + hfta_slow_flush = 0; }; sgah_qpn(query_summary_class *qs,table_list *Schema){ lfta_disorder = 1; hfta_disorder = 1; + hfta_slow_flush = 0; // Get the table name. // NOTE the colrefs have the tablevar ref (an int) @@ -640,6 +647,7 @@ public: param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; + ret->hfta_slow_flush = hfta_slow_flush; ret->node_name = node_name + suffix; diff --git a/src/ftacmp/translate_fta.cc b/src/ftacmp/translate_fta.cc index 4ee0bb4..19ff634 100644 --- a/src/ftacmp/translate_fta.cc +++ b/src/ftacmp/translate_fta.cc @@ -169,6 +169,7 @@ int main(int argc, char **argv){ vector registration_query_names; // for lfta.c registration map > mach_query_names; // list queries of machine vector snap_lengths; // for lfta.c registration + vector snap_position; // for lfta.c registration vector interface_names; // for lfta.c registration vector machine_names; // machine of interface vector lfta_reuse_options; // for lfta.c registration @@ -2095,7 +2096,11 @@ for(q=0;qquery_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop); */ - snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema)); + snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap")); + snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index")); + +// STOPPED HERE need to figure out how to generate the code that Vlad needs +// from snap_postion // TODO NOTE : I'd like it to be the case that registration_query_names // are the queries to be registered for subsciption. @@ -2552,7 +2557,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e // Find common filter predicates in the LFTAs. -// in addition generate structs to store the temporal attributes unpacked by prefilter +// in addition generate structs to store the +// temporal attributes unpacked by prefilter +// compute & provide interface for per-interface +// record extraction properties map >::iterator ssqi; for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){ @@ -2655,10 +2663,13 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n"; #endif map > lfta_sigs; // used again later + map lfta_snap_pos; // optimize csv parsing + // compute now, use in get_iface_properties for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){ string liface = (*mvsi).first; vector empty_list; lfta_sigs[liface] = empty_list; + lfta_snap_pos[liface] = -1; vector lfta_cols; vector lfta_snap_length; @@ -2672,7 +2683,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e } lfta_sigs[liface].push_back(mask); lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema)); - lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema)); + lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap")); + int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index"); + if(this_snap_pos > lfta_snap_pos[liface]) + lfta_snap_pos[liface] = this_snap_pos; } //for(li=0;li &input_file_names, int nfiles, bool use_proto = false; bool use_bsa = false; bool use_kafka = false; + bool use_ssl = false; int erri; string err_str; for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){ @@ -3116,6 +3133,17 @@ void generate_makefile(vector &input_file_names, int nfiles, #endif } } + ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str); + for(int ift_i=0;ift_i &input_file_names, int nfiles, if(use_pads) fprintf(outfl,"-lgscppads -lpads "); fprintf(outfl, -"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz"); +"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt"); if(use_pads) fprintf(outfl, " -lpz -lz -lbz "); if(libz_exists && libast_exists) @@ -3152,6 +3180,9 @@ void generate_makefile(vector &input_file_names, int nfiles, #endif #ifdef KAFKA_ENABLED fprintf(outfl, " -lrdkafka "); +#endif +#ifdef SSL_ENABLED + fprintf(outfl, " -lssl -lcrypto "); #endif fprintf(outfl," -lgscpaux"); #ifdef GCOV diff --git a/src/ftacmp/type_objects.cc b/src/ftacmp/type_objects.cc index 9772ec0..5464dcf 100644 --- a/src/ftacmp/type_objects.cc +++ b/src/ftacmp/type_objects.cc @@ -1172,8 +1172,18 @@ string data_type::get_comparison_fcn(data_type *dt){ default: return("ERROR_NO_SUCH_COMPARISON_FCN"); } - - +} +string data_type::get_equals_fcn(data_type *dt){ + switch(type){ + case timeval_t: + return("Compare_Timeval"); + case v_str_t: + return("str_equal"); + case ipv6_t: + return("ipv6_compare"); + default: + return("ERROR_NO_SUCH_COMPARISON_FCN"); + } } string data_type::get_hfta_comparison_fcn(data_type *dt){ @@ -1188,6 +1198,18 @@ string data_type::get_hfta_comparison_fcn(data_type *dt){ return("ERROR_NO_SUCH_COMPARISON_FCN"); } } +string data_type::get_hfta_equals_fcn(data_type *dt){ + switch(type){ + case timeval_t: + return("hfta_Compare_Timeval"); + case v_str_t: + return("hfta_vstr_equal"); + case ipv6_t: + return("hfta_ipv6_compare"); + default: + return("ERROR_NO_SUCH_COMPARISON_FCN"); + } +} // Return true if operating on these types requires // a special function for this operator. diff --git a/src/ftacmp/type_objects.h b/src/ftacmp/type_objects.h index f136ae9..e99eeb5 100644 --- a/src/ftacmp/type_objects.h +++ b/src/ftacmp/type_objects.h @@ -95,6 +95,8 @@ public: bool complex_comparison(data_type *dt); std::string get_comparison_fcn(data_type *dt); std::string get_hfta_comparison_fcn(data_type *dt); + std::string get_equals_fcn(data_type *dt); + std::string get_hfta_equals_fcn(data_type *dt); bool complex_operator(data_type *dt, std::string &op); std::string get_complex_operator(data_type *dt, std::string &op); diff --git a/src/lib/gscphftaaux/hfta_runtime_library.cc b/src/lib/gscphftaaux/hfta_runtime_library.cc index 8396e4d..f9b057a 100644 --- a/src/lib/gscphftaaux/hfta_runtime_library.cc +++ b/src/lib/gscphftaaux/hfta_runtime_library.cc @@ -65,8 +65,8 @@ gs_retval_t hfta_vstr_length(vstring *str) { } // Assume that SRC is either INTERNAL or SHALLOW_COPY -void hfta_vstr_assign_with_copy_in_tuple(vstring32 * target, vstring * src, - gs_sp_t data_offset, gs_retval_t int_offset) { +void hfta_vstr_assign_with_copy_in_tuple(vstring32 * target, + const vstring * src, gs_sp_t data_offset, gs_retval_t int_offset) { target->length = src->length; target->offset = int_offset; target->reserved = PACKED; @@ -77,7 +77,7 @@ void hfta_vstr_assign_with_copy_in_tuple(vstring32 * target, vstring * src, // Ted wrote the following function. // make deep copy of src. Assume that dst is already empty. // Assume that SRC is either INTERNAL or SHALLOW_COPY -void hfta_vstr_assign_with_copy(vstring *dst, vstring *src){ +void hfta_vstr_assign_with_copy(vstring *dst, const vstring *src){ dst->length=src->length; if(src->length){ dst->offset=(gs_p_t)malloc(dst->length); @@ -89,7 +89,7 @@ void hfta_vstr_assign_with_copy(vstring *dst, vstring *src){ // Ted wrote the following function. // Make a deep copy of src. garbage collect dst if needed. // Assume that SRC is either INTERNAL or SHALLOW_COPY -void hfta_vstr_replace(vstring *dst, vstring *src){ +void hfta_vstr_replace(vstring *dst, const vstring *src){ hfta_vstr_destroy(dst); hfta_vstr_assign_with_copy(dst,src); } @@ -171,6 +171,23 @@ gs_retval_t hfta_vstr_compare(const vstring * s1, const vstring * s2) { return(s1->length - s2->length); } +gs_retval_t hfta_vstr_equal(const vstring * s1, const vstring * s2) { + gs_int32_t x; + + if(s1->length != s2->length) + return -1; + +// cmp_ret=memcmp((void *)s1->offset,(void *)s2->offset,s1->length); + for(x=0;xlength;x++) { + if (((char *)(s1->offset))[x]!=((char *)(s2->offset))[x]) { + return -1; + } + } + + + return 0; +} + gs_param_handle_t register_handle_for_str_regex_match_slot_1(vstring* pattern) { diff --git a/src/lib/gscphftaaux/hfta_udaf.cc b/src/lib/gscphftaaux/hfta_udaf.cc index 1366cef..7eef46d 100644 --- a/src/lib/gscphftaaux/hfta_udaf.cc +++ b/src/lib/gscphftaaux/hfta_udaf.cc @@ -207,7 +207,7 @@ struct avg_udaf_lfta_struct_t{ gs_uint32_t cnt; }; -// sctarchpad struct +// scratchpad struct struct avg_udaf_hfta_struct_t{ gs_int64_t sum; gs_uint32_t cnt; @@ -715,3 +715,213 @@ void running_array_aggr_hfta_HFTA_AGGR_DESTROY_(vstring* scratch) { hfta_vstr_destroy(scratch); } + +//////////////////////////////////////////// +// Aggregate strings by catenation + + +struct CAT_aggr_scratch{ + std::string val; + int x; +}; + +struct CAT_aggr_scratch_ptr{ + CAT_aggr_scratch *ptr; +}; + +void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s){ + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = new CAT_aggr_scratch(); + v->x = 101; + + p->ptr = v; +} +void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s){ + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = p->ptr; + v->val=""; +} +void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str){ +char buf1[MAXTUPLESZ-20], buf2[MAXTUPLESZ-20]; +int i; +for(i=0;ilength;++i) buf1[i] = *(((char *)sep->offset)+i); +buf1[i]='\0'; +for(i=0;ilength;++i) buf2[i] = *(((char *)str->offset)+i); +buf2[i]='\0'; + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = p->ptr; + if(v->val.size()>0) + v->val.append((char *)(sep->offset), sep->length); + v->val.append((char *)(str->offset), str->length); +//printf("sep=%s, str=%s, val=%s\n",buf1,buf2,v->val.c_str()); +} +void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){ + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = p->ptr; +//printf("output val=%s\n",v->val.c_str()); + res->offset = (gs_p_t)malloc(v->val.size()); + res->length = v->val.size(); + if(res->length>MAXTUPLESZ-20) + res->length=MAXTUPLESZ-20; +// v->val.copy((char *)(res->offset), 0, res->length); + const char *dat = v->val.c_str(); + memcpy((char *)(res->offset), dat, res->length); +// for(int i=0;ilength;++i) +// *(((char *)res->offset)+i) = dat[i]; + res->reserved = INTERNAL; +} +void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s){ + CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s; + CAT_aggr_scratch *v = p->ptr; + delete v; +} + + + +/////////////////////////////////////////////////////////////// +// time_avg((sample, ts, window_size) +// Compute time-weighted average sum(sample*duration)/window_size +// duration is difference between current and next ts. +// The idea is to compute a sum over a step function. +// + +struct time_avg_udaf_str{ + gs_float_t sum; + gs_float_t last_val; + gs_uint64_t last_ts; + gs_uint64_t window; + gs_uint64_t first_ts; + gs_uint8_t event_occurred; +}; + +void time_avg_HFTA_AGGR_INIT_(gs_sp_t s){ + time_avg_udaf_str *scratch = (time_avg_udaf_str *)s; + scratch->sum = 0.0; + scratch->last_val = 0.0; + scratch->last_ts = 0; + scratch->first_ts = 0; + scratch->event_occurred = 0; +} + +void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s){ +} + +void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s){ + time_avg_udaf_str *scratch = (time_avg_udaf_str *)s; + scratch->event_occurred = 0; + scratch->sum = 0; +//printf("time_avg_reinit: occurred=%d, last_val=%lf, sum=%lf, first_ts=%lld, last_ts=%lld\n",scratch->event_occurred, scratch->last_val, scratch->sum, scratch->first_ts, scratch->last_ts); +} + +void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s){ + time_avg_udaf_str *scratch = (time_avg_udaf_str *)s; + if(scratch->event_occurred==0){ + *result = scratch->last_val; +//printf("\ttime_avg outpt1 sum=%lf, last_val=%lf, result=%lf\n", scratch->sum, scratch->last_val, *result); + return; + } + gs_int64_t end_time = scratch->window * (scratch->last_ts/scratch->window + 1); + scratch->sum += (end_time - scratch->last_ts) * (gs_float_t)(scratch->last_val); + gs_int64_t start_time = end_time - scratch->window; + if(scratch->first_ts > start_time){ + *result = scratch->sum / (end_time - scratch->first_ts); +//printf("\ttime_avg outpt2 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result); + }else{ + *result = scratch->sum / (end_time - start_time); +//printf("\ttime_avg outpt3 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result); + } +} + +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_udaf_str *scratch = (time_avg_udaf_str *)s; + scratch->window = window; + if(scratch->first_ts==0){ + scratch->first_ts = ts; + }else{ + if(scratch->event_occurred){ + + scratch->sum += (ts - scratch->last_ts) * scratch->last_val; + }else{ + gs_int64_t start_time = scratch->window * (scratch->last_ts/scratch->window); + scratch->sum += (ts - start_time) * scratch->last_val; + } + } +//printf("time_avg_upd: val=%lf, occurred=%d, last_val=%lf, sum=%lf, ts=%lld, first_ts=%lld, last_ts=%lld\n",val, scratch->event_occurred, scratch->last_val, scratch->sum, ts, scratch->first_ts, scratch->last_ts); + scratch->last_val = val; + scratch->last_ts = ts; + scratch->event_occurred = 1; +} +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window); +} +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window); +} +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window); +} +void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window){ + time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window); +} + + +// ------------------------------------------------------------ +// running_sum_max : get the running sum of an int, +// be able to report this sum and also its max value +// during the time window + +struct run_sum_max_udaf_str{ + gs_int64_t sum; + gs_int64_t max; +}; +void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum = 0; + scratch->max = 0; +} +void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->max = scratch->sum; +} +void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){ + r->length = sizeof(run_sum_max_udaf_str); + r->offset = (gs_p_t)(b); + r->reserved = SHALLOW_COPY; +} +void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b){ + return; +} + +void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum+=v; + if(scratch->sum>scratch->max) scratch->max=scratch->sum; +} +void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum+=v; + if(scratch->sum>scratch->max) scratch->max=scratch->sum; +} +void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum+=v; + if(scratch->sum>scratch->max) scratch->max=scratch->sum; +} +void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v){ + run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s; + scratch->sum+=v; + if(scratch->sum>scratch->max) scratch->max=scratch->sum; +} +// the extraction functions +gs_int64_t extr_running_sum(vstring *v){ + if(v->length != sizeof(run_sum_max_udaf_str)) return 0; + run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset); + return vs->sum; +} +gs_int64_t extr_running_sum_max(vstring *v){ + if(v->length != sizeof(run_sum_max_udaf_str)) return 0; + run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset); + return vs->max; +} + + diff --git a/src/lib/gscprts/rts_csv.cc b/src/lib/gscprts/rts_csv.cc index 33eae61..ed7403a 100644 --- a/src/lib/gscprts/rts_csv.cc +++ b/src/lib/gscprts/rts_csv.cc @@ -23,9 +23,9 @@ #include #include #include -#include "errno.h" -#include "stdio.h" -#include "stdlib.h" +#include +#include +#include extern "C" { @@ -64,6 +64,29 @@ z_stream strm; BSA::FileStream::ISubStream* stream; BSA::FileStream::IFileHandle* ifh; BSA::FileStream::Reader* reader; + +#endif + +#ifdef SSL_ENABLED +#include +#include +#include +#include +#include +#include + +EVP_PKEY *rkey; +PKCS7 *p7; +BIO* mem_io; +char pwd[CSVMAXLINE]; + +// callback for passing password to private key reader +int pass_cb(char *buf, int size, int rwflag, void *u) { + int len = strlen(pwd); + memcpy(buf, pwd, len); + return len; +} + #endif gs_sp_t dev; @@ -72,6 +95,9 @@ static int listensockfd=-1; static int fd=-1; static struct packet cur_packet; static gs_sp_t name; +static gs_sp_t dir_name; +struct dirent **namelist; +static gs_int32_t num_dir_files; static gs_sp_t line; static ssize_t len; static size_t line_len; @@ -82,6 +108,7 @@ static gs_uint32_t startupdelay=0; static gs_uint32_t singlefile=0; static gs_uint32_t use_gzip=0; static gs_uint32_t use_bsa=0; +static gs_uint32_t use_decryption=0; static gs_uint32_t gshub=0; static int socket_desc=0; @@ -191,6 +218,57 @@ static void init_socket() { } static void next_file() { + + static gs_uint32_t file_pos = 0; + static gs_uint32_t scan_finished = 0; + + char buf[CSVMAXLINE]; + + if (dir_name) { + if (scan_finished) { + if (verbose) + fprintf(stderr,"Done processing, waiting for things to shut down\n"); + rts_fta_done(); + // now just service message queue until we get killed or loose connectivity + while (true) { + fta_start_service(0); // service all waiting messages + usleep(1000); // sleep a millisecond + } + } + if (num_dir_files) { // we already started directory scan + free(name); + if (file_pos < num_dir_files) { + sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name); + name = strdup(buf); + free(namelist[file_pos]); + file_pos++; + } else { + free(namelist); + scan_finished = 1; + return; + } + } else { + num_dir_files = scandir(dir_name, &namelist, NULL, alphasort); + if (num_dir_files == -1) { + num_dir_files = 0; + print_error((gs_sp_t)"ERROR: Unable to scan directory"); + return; + } + if (num_dir_files == 2) { // only . and . are there, empty dir + free(namelist[0]); + free(namelist[1]); + scan_finished = 1; + return; + } else + file_pos = 2; + + sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name); + name = strdup(buf); + free(namelist[file_pos]); + file_pos++; + } + } + struct stat s; if (verbose) { fprintf(stderr,"Opening %s\n",name); @@ -213,7 +291,31 @@ static void next_file() { exit(10); } posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL); - if (singlefile == 0) { + +#ifdef SSL_ENABLED + if (use_decryption) { + // free SSL resources + if (mem_io) + BIO_free(mem_io); + if (p7) + PKCS7_free(p7); + + FILE *fp = fdopen(fd, "r"); + p7 = d2i_PKCS7_fp(fp, NULL); + if (p7 == NULL) { + print_error((gs_sp_t)"Error reading SMIME message from file"); + exit(-1); + } + + if(!(mem_io = PKCS7_dataDecode(p7, rkey, NULL, NULL))) { + print_error((gs_sp_t)"Error decoding PKCS7 file\n"); + exit(-1); + } + + fclose(fp); + } +#endif + if (!dir_name && !singlefile) { unlink(name); } if (use_gzip) { @@ -274,7 +376,13 @@ static gs_retval_t csv_replay_init(gs_sp_t device) gs_sp_t tempdel; gs_sp_t singlefiletmp; gs_sp_t compressortmp; - gs_sp_t bsatmp; + gs_sp_t bsatmp; + gs_sp_t encryptedtmp; + gs_sp_t maxfieldtmp + + gs_sp_t pkey_fname; + gs_sp_t pwd_fname; + if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) { if (strncmp(verbosetmp,"TRUE",4)==0) { @@ -285,10 +393,13 @@ static gs_retval_t csv_replay_init(gs_sp_t device) } } - if ((name=get_iface_properties(device,(gs_sp_t)"filename"))==0) { - print_error((gs_sp_t)"csv_replay_init::No CSV \"Filename\" defined"); + name=get_iface_properties(device,(gs_sp_t)"filename"); + dir_name=get_iface_properties(device,(gs_sp_t)"directoryname"); + if (!name && !dir_name) { + print_error((gs_sp_t)"csv_replay_init::Either \"Filename\" or \"Dirname\" must be defined"); exit(0); } + tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator"); if (tempdel != 0 ) { csvdel = tempdel[0]; @@ -332,15 +443,77 @@ static gs_retval_t csv_replay_init(gs_sp_t device) if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) { if (verbose) { - fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"))); + fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp)); } - startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")); + startupdelay=atoi(delaytmp); } + + if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) { + max_field_csv=atoi(maxfieldtmp); + } + if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) { if (verbose) { fprintf(stderr,"CSV format using gshub\n"); } gshub=1; + if (!name) { + print_error((gs_sp_t)"csv_replay_init::Filename must be defined for gshub interfaces"); + exit(0); + } + } + + pkey_fname=get_iface_properties(device,(gs_sp_t)"privatekey"); + pwd_fname=get_iface_properties(device,(gs_sp_t)"password"); + + if ((encryptedtmp=get_iface_properties(device,(gs_sp_t)"encrypted"))!=0) { + if (strncmp(encryptedtmp,"TRUE",4)==0) { + #ifndef SSL_ENABLED + print_error((gs_sp_t)"csv_replay_init::runtime not built with SSL support to use encrypted interfaces"); + exit(0); + #else + use_decryption=1; + if (verbose) { + fprintf(stderr,"CSV file is encrypted\n"); + } + if (!pkey_fname || !pwd_fname) { + print_error((gs_sp_t)"csv_replay_init::privatekey and/or password filenames not specified for encrypted itnerface"); + exit(0); + } + + OpenSSL_add_all_algorithms(); + ERR_load_crypto_strings(); + + // Read password file + FILE* in_fd = fopen(pwd_fname, "r"); + if (!in_fd) { + fprintf(stderr, "Unable to open password file %s\n", pwd_fname); + exit(0); + } + + if (!fgets(pwd, CSVMAXLINE, in_fd)) { + fprintf(stderr, "Error reading password from file %s\n", pwd_fname); + exit(0); + } + strtok(pwd, "\r\n\t "); + fclose(in_fd); + + // Read the private key + in_fd = fopen(pkey_fname, "r"); + if (!in_fd) { + fprintf(stderr, "Unable to open private key file %s\n", pkey_fname); + exit(0); + } + + rkey = PEM_read_PrivateKey(in_fd, NULL, pass_cb, NULL); + if (!rkey) { + fprintf(stderr, "Unable to read private key file %s\n", pkey_fname); + exit(-1); + } + + fclose(in_fd); + #endif + } } cur_packet.ptype=PTYPE_CSV; @@ -439,8 +612,12 @@ static gs_int32_t csv_read_chunk() { } } else { #endif - if (fd <= 0) next_file(); - while ((have = read(fd, read_pos, CHUNK)) == 0) { + if (fd <= 0) next_file(); + +#ifdef SSL_ENABLED + if (use_decryption) { + + while ((have = BIO_read (mem_io, read_pos, CHUNK)) == 0) { if (singlefile==1) { if(verbose) { fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n"); @@ -450,7 +627,24 @@ static gs_int32_t csv_read_chunk() { } else { next_file(); } + } + + } else { +#endif + while ((have = read(fd, read_pos, CHUNK)) == 0) { + if (singlefile==1) { + if(verbose) { + fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n"); + } + return -2; + + } else { + next_file(); } + } +#ifdef SSL_ENABLED + } +#endif #ifdef BSA_ENABLED } #endif diff --git a/src/lib/gscprts/rts_kafka.c b/src/lib/gscprts/rts_kafka.c index 31049e5..4e3eda4 100644 --- a/src/lib/gscprts/rts_kafka.c +++ b/src/lib/gscprts/rts_kafka.c @@ -167,6 +167,7 @@ static gs_retval_t kafka_replay_init(gs_sp_t device) gs_sp_t verbosetmp; gs_sp_t delaytmp; gs_sp_t tempdel; + gs_sp_t maxfieldtmp; if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) { if (strncmp(verbosetmp,"TRUE",4)==0) { @@ -195,11 +196,15 @@ static gs_retval_t kafka_replay_init(gs_sp_t device) if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) { if (verbose) { - fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"))); + fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp)); } - startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")); + startupdelay=atoi(delaytmp); } + if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) { + max_field_kafka=atoi(maxfieldtmp); + } + // set maximum field nubmer to be extracted by csv parser csv_set_maxfield(max_field_kafka); -- 2.16.6