-#!/usr/bin/bash
+#!/bin/bash
# ------------------------------------------------
# Copyright 2014 AT&T Intellectual Property
# Licensed under the Apache License, Version 2.0 (the "License");
string UDAF count_diff_lfta_s fstring20 (string);
+////////////////////////////////////////////////////////////////
+// string aggregation via catenation
+//////////////////////////////////////////////////////
+
+ string UDAF [HFTA_ONLY] CAT_aggr fstring8 (string, string);
+
///////////////////////////////////////////////////////////
-// integer array aggregation funciton
+// integer array aggregation function
// We are going to store 4 values in LFTA in fixed size buffer
-// plus one byte for array length (17 bytes total)
-// HFTA will combine partial aggregates
+// plus one byte for array length (17 bytes total)
+// HFTA will combine partial aggregates
+// This seems to create a string with a comma-separated list of the uints
///////////////////////////////////////////////////////////
string UDAF [RUNNING, SUBAGGR running_array_aggr_lfta, SUPERAGGR running_array_aggr_hfta] running_array_aggr string (uint);
string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(ullong, string HANDLE);
string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(int, string HANDLE);
string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(uint, string HANDLE);
+
+//////////////////////////////////////////////////////////
+// time-averaged aggregate.
+// time_avg(sample, ts, window_size)
+ float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (uint, llong, llong) ;
+ float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (int, llong, llong) ;
+ float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (ullong, llong, llong) ;
+ float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (llong, llong, llong) ;
+ float UDAF [HFTA_ONLY, RUNNING] time_avg fstring44 (float, llong, llong) ;
+
+// ------------------------------------------------------------
+// running_sum_max : get the running sum of an int,
+// be able to report this sum and also its max value
+// during the time window
+
+ llong EXTR running_sum run_sum_max extr_running_sum (uint);
+ llong EXTR running_sum run_sum_max extr_running_sum (int);
+ llong EXTR running_sum run_sum_max extr_running_sum (llong);
+ llong EXTR running_sum run_sum_max extr_running_sum (ullong);
+ llong FUN [COST LOW] extr_running_sum(string);
+ llong EXTR running_sum_max run_sum_max extr_running_sum_max (uint);
+ llong EXTR running_sum_max run_sum_max extr_running_sum_max (int);
+ llong EXTR running_sum_max run_sum_max extr_running_sum_max (llong);
+ llong EXTR running_sum_max run_sum_max extr_running_sum_max (ullong);
+ llong FUN [COST LOW] extr_running_sum_max(string);
+ string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (uint);
+ string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (int);
+ string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (ullong);
+ string UDAF [HFTA_ONLY, RUNNING] run_sum_max fstring16 (llong);
+
// support for KAFKA interfaces
//#define KAFKA_ENABLED
+// support for SSL decryption
+//#define SSL_ENABLED
+
#endif
-/* ------------------------------------------------
+/** ------------------------------------------------
Copyright 2014 AT&T Intellectual Property
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
#include <list>
#include "hash_table.h"
-#define _GB_FLUSH_PER_TUPLE_ 1
using namespace std;
class groupby_operator : public base_operator {
private :
groupby_func func;
- hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];
+ hash_table<group, aggregate, hasher_func, equal_func> group_table;
bool flush_finished;
- unsigned int curr_table;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
int n_patterns;
-
-
-
public:
groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
flush_finished = true;
- curr_table = 0;
- flush_pos = group_table[1-curr_table].end();
n_patterns = func.n_groupby_patterns();
}
// Push out completed groups
- if(!flush_finished) partial_flush(result);
-
- // create buffer on the stack to store key object
- char buffer[sizeof(group)];
-
// extract the key information from the tuple and
// copy it into buffer
- group* grp = func.create_group(tup, buffer);
- if (!grp) {
-/*
-// Ignore temp tuples until we can fix their timestamps.
-if (func.temp_status_received()) {
- tup.free_tuple();
- return 0;
-}*/
+ group grp;
+ if (!func.create_group(tup, (gs_sp_t)&grp)) {
+ if(func.disordered()){
+ fprintf(stderr,"Out of order record in %s\n",op_name);
+ return 0;
+ }
if (func.flush_needed()){
flush_old(result);
- }
+ }
if (func.temp_status_received()) {
host_tuple temp_tup;
if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
tup.free_tuple();
return 0;
}
+ if(func.disordered()){
+ fprintf(stderr,"Out of order record in %s\n",op_name);
+ return 0;
+ }
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
- if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+ if ((iter = group_table.find(grp)) != group_table.end()) {
// Temporal GBvar is part of the group so no flush is needed.
- aggregate* old_aggr = (*iter).second;
- func.update_aggregate(tup, grp, old_aggr);
+ func.update_aggregate(tup, grp, (*iter).second);
}else{
if (func.flush_needed()) {
flush_old(result);
}
if(n_patterns <= 1){
- // create a copy of the group on the heap
- group* new_grp = new group(grp); // need a copy constructor for groups
-// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
- aggregate* aggr = new aggregate();
- // create an aggregate in preallocated buffer
- aggr = func.create_aggregate(tup, (char*)aggr);
-
- group_table[curr_table].insert(new_grp, aggr);
+ aggregate aggr;
+ // create an aggregate in preallocated buffer
+ func.create_aggregate(tup, (char*)&aggr);
+ // neeed operator= doing a deep copy
+ group_table.insert(grp, aggr);
}else{
int p;
+// TODO this code is wrong, must check if each pattern is in the group table.
for(p=0;p<n_patterns;++p){
- group* new_grp = new group(grp, func.get_pattern(p));
- aggregate* aggr = new aggregate();
- aggr = func.create_aggregate(tup, (char*)aggr);
- group_table[curr_table].insert(new_grp, aggr);
+ // need shallow copy constructor for groups
+ group new_grp(grp, func.get_pattern(p));
+ aggregate aggr;
+ func.create_aggregate(tup, (char*)&aggr);
+ // neeed operator= doing a deep copy
+ group_table.insert(new_grp, aggr);
}
}
}
return 0;
}
- int partial_flush(list<host_tuple>& result) {
- host_tuple tup;
- unsigned int old_table = 1-curr_table;
- unsigned int i;
-
-// emit up to _GB_FLUSH_PER_TABLE_ output tuples.
- if (!group_table[old_table].empty()) {
- for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
- bool failed = false;
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
- if (!failed) {
- tup.channel = output_channel;
- result.push_back(tup);
- }
- delete ((*flush_pos).first);
- delete ((*flush_pos).second);
-// free((*flush_pos).second);
- }
- }
-
-// Finalize processing if empty.
- if(flush_pos == group_table[old_table].end()) {
- flush_finished = true;
- group_table[old_table].clear();
- group_table[old_table].rehash();
- }
- return 0;
- }
int flush(list<host_tuple>& result) {
host_tuple tup;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
- unsigned int old_table = 1-curr_table;
-// If the old table isn't empty, flush it now.
- if (!group_table[old_table].empty()) {
- for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
+ flush_pos = group_table.begin();
+// If the table isn't empty, flush it now.
+ if (!group_table.empty()) {
+ for (; flush_pos != group_table.end(); ++flush_pos) {
bool failed = false;
tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
if (!failed) {
tup.channel = output_channel;
result.push_back(tup);
}
- delete ((*flush_pos).first);
- delete ((*flush_pos).second);
// free((*flush_pos).second);
}
- group_table[old_table].clear();
- group_table[old_table].rehash();
- }
-
- flush_pos = group_table[curr_table].begin();
-// If the old table isn't empty, flush it now.
- if (!group_table[curr_table].empty()) {
- for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {
- bool failed = false;
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
- if (!failed) {
-
- tup.channel = output_channel;
- result.push_back(tup);
- }
- delete ((*flush_pos).first);
- delete ((*flush_pos).second);
-// free((*flush_pos).second);
- }
- group_table[curr_table].clear();
+ group_table.clear();
}
flush_finished = true;
}
int flush_old(list<host_tuple>& result) {
- host_tuple tup;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
- unsigned int old_table = 1-curr_table;
-
-// If the old table isn't empty, flush it now.
- if (!group_table[old_table].empty()) {
- for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
- bool failed = false;
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
- if (!failed) {
-
- tup.channel = output_channel;
- result.push_back(tup);
- }
- delete ((*flush_pos).first);
- delete ((*flush_pos).second);
-// free((*flush_pos).second);
- }
- group_table[old_table].clear();
- group_table[old_table].rehash();
- }
-
-// swap tables, enable partial flush processing.
- flush_pos = group_table[curr_table].begin();
- curr_table = old_table;
- flush_finished = false;
+ flush(result);
+ group_table.clear();
+ group_table.resize();
return 0;
}
}
unsigned int get_mem_footprint() {
- return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();
+ return group_table.get_mem_footprint();
}
};
--- /dev/null
+/** ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+
+#ifndef GROUPBY_SLOWFLUSH_OPERATOR_H
+#define GROUPBY_OPERATOR_H
+
+#include "host_tuple.h"
+#include "base_operator.h"
+#include <list>
+#include "hash_table.h"
+
+#define _HFTA_SLOW_FLUSH
+
+using namespace std;
+
+template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
+class groupby_slowflush_operator : public base_operator {
+private :
+ groupby_func func;
+ hash_table<group, aggregate, hasher_func, equal_func> group_table[2];
+ bool flush_finished;
+ unsigned int curr_table;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
+ int n_patterns;
+ int gb_per_flush;
+
+public:
+ groupby_slowflush_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
+ flush_finished = true;
+ curr_table = 0;
+ flush_pos = group_table[1-curr_table].end();
+ n_patterns = func.n_groupby_patterns();
+ gb_per_flush = func.gb_flush_per_tuple();
+ }
+
+ int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
+
+// Push out completed groups
+ if(!flush_finished) partial_flush(result);
+
+ // extract the key information from the tuple and
+ // copy it into buffer
+ group grp;
+ if (!func.create_group(tup, (gs_sp_t)&grp)) {
+ if(func.disordered()){
+ fprintf(stderr,"Out of order record in %s\n",op_name);
+ return 0;
+ }
+ if (func.flush_needed()){
+ flush_old(result);
+ }
+ if (func.temp_status_received()) {
+ host_tuple temp_tup;
+ if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
+ temp_tup.channel = output_channel;
+ result.push_back(temp_tup);
+ }
+ }
+ tup.free_tuple();
+ return 0;
+ }
+ if(func.disordered()){
+ fprintf(stderr,"Out of order record in %s\n",op_name);
+ return 0;
+ }
+
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+ if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
+// Temporal GBvar is part of the group so no flush is needed.
+ func.update_aggregate(tup, grp, (*iter).second);
+ }else{
+ if (func.flush_needed()) {
+ flush_old(result);
+ }
+ if(n_patterns <= 1){
+ aggregate aggr;
+ // create an aggregate in preallocated buffer
+ func.create_aggregate(tup, (char*)&aggr);
+ // neeed operator= doing a deep copy
+ group_table[curr_table].insert(grp, aggr);
+ }else{
+ int p;
+ for(p=0;p<n_patterns;++p){
+// TODO this code is wrong need to check each pattern to see if its in the table
+ // need shalow copy constructor for groups
+ group new_grp(grp, func.get_pattern(p));
+ aggregate aggr;
+ func.create_aggregate(tup, (char*)&aggr);
+ // neeed operator= doing a deep copy
+ group_table[curr_table].insert(new_grp, aggr);
+ }
+ }
+ }
+ tup.free_tuple();
+ return 0;
+ }
+
+ int partial_flush(list<host_tuple>& result) {
+ host_tuple tup;
+ unsigned int old_table = 1-curr_table;
+ unsigned int i;
+
+// emit up to _GB_FLUSH_PER_TABLE_ output tuples.
+ if (!group_table[old_table].empty()) {
+ for (i=0; flush_pos != group_table[old_table].end() && i<gb_per_flush; ++flush_pos, ++i) {
+ bool failed = false;
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+ if (!failed) {
+ tup.channel = output_channel;
+ result.push_back(tup);
+ }
+// free((*flush_pos).second);
+ }
+ }
+
+// Finalize processing if empty.
+ if(flush_pos == group_table[old_table].end()) {
+ flush_finished = true;
+ group_table[old_table].clear();
+ group_table[old_table].resize();
+ }
+ return 0;
+ }
+
+ int flush(list<host_tuple>& result) {
+ host_tuple tup;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+ unsigned int old_table = 1-curr_table;
+
+// If the old table isn't empty, flush it now.
+ if (!group_table[old_table].empty()) {
+ for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
+ bool failed = false;
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+ if (!failed) {
+
+ tup.channel = output_channel;
+ result.push_back(tup);
+ }
+// free((*flush_pos).second);
+ }
+ group_table[old_table].clear();
+ group_table[old_table].resize();
+ }
+
+ flush_pos = group_table[curr_table].begin();
+// If the table isn't empty, flush it now.
+ if (!group_table[curr_table].empty()) {
+ for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {
+ bool failed = false;
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+ if (!failed) {
+
+ tup.channel = output_channel;
+ result.push_back(tup);
+ }
+// free((*flush_pos).second);
+ }
+ group_table[curr_table].clear();
+ }
+
+ flush_finished = true;
+
+ return 0;
+ }
+
+ int flush_old(list<host_tuple>& result) {
+
+ host_tuple tup;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+ unsigned int old_table = 1-curr_table;
+
+// If the old table isn't empty, flush it now.
+ if (!group_table[old_table].empty()) {
+ for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
+ bool failed = false;
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+ if (!failed) {
+
+ tup.channel = output_channel;
+ result.push_back(tup);
+ }
+// free((*flush_pos).second);
+ }
+
+ //group_table[old_table].clear();
+ //group_table[old_table].resize();
+ }
+
+ group_table[old_table].clear();
+ group_table[old_table].resize();
+
+// swap tables, enable partial flush processing.
+ flush_pos = group_table[curr_table].begin();
+ curr_table = old_table;
+ flush_finished = false;
+ return 0;
+ }
+
+ int set_param_block(int sz, void * value) {
+ func.set_param_block(sz, value);
+ return 0;
+ }
+
+ int get_temp_status(host_tuple& result) {
+ result.channel = output_channel;
+ return func.create_temp_status_tuple(result, flush_finished);
+ }
+
+ int get_blocked_status () {
+ return -1;
+ }
+
+ unsigned int get_mem_footprint() {
+ return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();
+ }
+};
+
+#endif // GROUPBY_OPERATOR_H
value_type second;
data_item* next; // next data item in overflow chain
- data_item(const key_type& k, const value_type& v) : first(k), second(v), next(NULL) { }
+ data_item(const key_type& k, const value_type& v) {
+ first = k;
+ second = v;
+ next = NULL;
+ }
};
struct hash_bucket {
equal_func equal;
double load_factor;
-// double max_load;
size_t _size;
size_t _max_size;
size_t num_buckets;
size_t hash_mask;
+ // bucket allocation happens on first insert
+ // this flag indicates that buckets has been allocated
+ unsigned allocated;
+
+
hash_bucket* first_bucket; // first used bucket
- hash_bucket* last_bucket; // last used bucket
+ hash_bucket* last_bucket; // last used bucket
hash_bucket* buckets;
public:
- hash_table(const size_t n_buckets = 65536, const double load = 1.0) {
- load_factor = load;
+ hash_table(const size_t expected_size = 100000, const double max_load = 1.25) {
+
+ size_t min_buckets = (size_t)(expected_size / max_load);
+ load_factor = max_load;
int nb;
- for(nb=2;nb<n_buckets;nb*=2);
+ for(nb=2;nb<min_buckets;nb*=2);
num_buckets = nb;
hash_mask = nb-1;
- buckets = new hash_bucket[num_buckets];
+
+ // we will make hash table start with 0-size and expand of first insertion
+ buckets = new hash_bucket[0];
+ allocated = 0;
_size = 0;
_max_size = 0;
-// max_load = 0.0;
first_bucket = 0;
last_bucket = 0;
- total_memory = num_buckets * sizeof(hash_bucket);
+ total_memory = 0;
}
int insert(const key_type& key, const value_type& val) {
+
+ if (__builtin_expect(!allocated, 0)) { // we expect buckets to be allocated most of the time
+ delete buckets;
+ buckets = new hash_bucket[num_buckets];
+ allocated = 1;
+ total_memory = num_buckets * sizeof(hash_bucket);
+
+ fprintf(stderr, "Initial allocaton of %d buckets\n", (int)num_buckets);
+ }
+
data_item* d = new data_item(key, val);
total_memory += sizeof(data_item);
iterator find(const key_type& key) {
iterator iter;
+ if (__builtin_expect(!allocated, 0)) { // we expect buckets to be allocated most of the time
+ iter.bucket = NULL;
+ iter.data = NULL;
+ return iter;
+ }
+
// find the insertion bucket
size_t bucket_index = hasher(key) & hash_mask;
// if the bucket is empty just add new data_item
}
void erase(const key_type& key) {
+
+ if (__builtin_expect(!allocated, 0)) { // we expect buckets to be allocated most of the time
+ return;
+ }
+
// find the bucket
size_t bucket_index = hasher(key) & hash_mask;
// if the bucket is empty just add new data_item
}
void erase_full(const key_type& key) {
+
+ if (__builtin_expect(!allocated, 0)) { // we expect buckets to be allocated most of the time
+ return;
+ }
+
// find the bucket
size_t bucket_index = hasher(key) & hash_mask;
// if the bucket is empty just add new data_item
}else{
prev->next = temp->next;
}
- delete (*temp).first;
- delete (*temp).second;
+// delete (*temp).first;
+// delete (*temp).second;
delete temp;
total_memory -= sizeof(data_item);
}
last_bucket = NULL;
_size = 0;
total_memory = num_buckets * sizeof(hash_bucket);
-
}
- int rehash() {
+ int resize() {
if (_size) {
- fprintf(stderr, "Error: rehashing non-empty hash table\n");
+ fprintf(stderr, "Error: resizing non-empty hash table\n");
exit(1);
- }
+ }
double max_load = (1.0 * _max_size) / num_buckets;
for(nb=2;nb<min_buckets;nb*=2);
num_buckets = nb;
+ fprintf(stderr, "Resizing to %d buckets\n", (int)num_buckets);
delete[] buckets;
hash_mask = num_buckets-1;
- buckets = new hash_bucket[num_buckets];
+ buckets = new hash_bucket[num_buckets];
+ total_memory = num_buckets * sizeof(hash_bucket);
}
return 0;
- }
-
-
+ }
size_t size() const {
return _size;
}
+ size_t max_size() const {
+ return _max_size;
+ }
+
bool empty() const {
return (_size == 0);
}
equal_func equal;
double load_factor;
-// double max_load;
size_t _size;
size_t _max_size;
hash_bucket* buckets;
public:
- hash_set(const size_t n_buckets = 65536, const double load = 1.0) {
- load_factor = load;
+ hash_set(const size_t expected_size = 100000, const double max_load = 1.25) {
+
+ size_t min_buckets = (size_t)(expected_size / max_load);
+ load_factor = max_load;
int nb;
- for(nb=2;nb<n_buckets;nb*=2);
+ for(nb=2;nb<min_buckets;nb*=2);
num_buckets = nb;
- hash_mask = num_buckets-1;
-
+ hash_mask = nb-1;
buckets = new hash_bucket[num_buckets];
_size = 0;
_max_size = 0;
-// max_load = 0.0;
first_bucket = 0;
last_bucket = 0;
}
}
- int rehash() {
+ int resize() {
if (_size) {
- fprintf(stderr, "Error: rehashing non-empty hash table\n");
+ fprintf(stderr, "Error: resizing non-empty hash table\n");
exit(1);
- }
-
+ }
+
double max_load = (1.0 * _max_size) / num_buckets;
// reize table if its maximum load exceed the load factor
for(nb=2;nb<min_buckets;nb*=2);
num_buckets = nb;
+ fprintf(stderr, "Resizing to %d buckets\n", num_buckets);
delete[] buckets;
hash_mask = num_buckets-1;
// Internal functions
gs_retval_t Vstring_Constructor(vstring *, gs_csp_t);
gs_retval_t hfta_vstr_length(vstring *);
-void hfta_vstr_assign_with_copy_in_tuple(vstring32 *, vstring *, gs_sp_t, int);
-void hfta_vstr_assign_with_copy(vstring *, vstring *);
+void hfta_vstr_assign_with_copy_in_tuple(vstring32 *, const vstring *,
+ gs_sp_t, int);
+void hfta_vstr_assign_with_copy(vstring *, const vstring *);
void hfta_vstr_destroy(vstring *);
-void hfta_vstr_replace(vstring *, vstring *);
+void hfta_vstr_replace(vstring *, const vstring *);
gs_uint32_t hfta_vstr_hashfunc(const vstring *);
gs_uint64_t hfta_vstr_long_hashfunc(const vstring *);
gs_retval_t hfta_vstr_compare(const vstring *, const vstring *);
+gs_retval_t hfta_vstr_equal(const vstring *, const vstring *);
gs_retval_t hfta_ipv6_compare(const hfta_ipv6_str &i1, const hfta_ipv6_str &i2);
hfta_ipv6_str And_Ipv6(const hfta_ipv6_str &i1, const hfta_ipv6_str &i2);
gs_retval_t str_exists_substr(vstring * s1, vstring * s2);
gs_retval_t str_compare(vstring * s1, vstring * s2);
+gs_retval_t str_equal(vstring * s1, vstring * s2);
gs_uint32_t str_match_offset(gs_uint32_t offset,vstring *s1,vstring *s2);
gs_uint32_t byte_match_offset( gs_uint32_t offset, gs_uint32_t val,vstring *s2);
void count_diff_hfta_HFTA_AGGR_OUTPUT_(gs_uint32_t *res, gs_sp_t s);
void count_diff_hfta_HFTA_AGGR_DESTROY_(gs_sp_t scratch);
+//////////////////////////////////////////////
+// CAT_aggr, aggregate strings by catenation
+//////////////////////////////////////////////
+void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s);
+void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s);
+void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str);
+void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s);
+void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s);
+
+/////////////////////////////////////////////////////////
+// time-averaged sum, from aperiodic reports
+////////////////////////////////////////////////////////
+
+void time_avg_HFTA_AGGR_INIT_(gs_sp_t s);
+void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s);
+void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s);
+void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window);
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window);
+
+// ------------------------------------------------------------
+// running_sum_max : get the running sum of an int,
+// be able to report this sum and also its max value
+// during the time window
+// ------------------------------------------------------------
+
+void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s);
+void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s);
+void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b);
+void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b);
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v);
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v);
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v);
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v);
+gs_int64_t extr_running_sum(vstring *v);
+gs_int64_t extr_running_sum_max(vstring *v);
+
+
///////////////////////////////////////////////////////////////
}
}
}
- join_tbl[i].clear(); join_tbl[i].rehash();
+ join_tbl[i].clear(); join_tbl[i].resize();
}
}
class running_agg_operator : public base_operator {
private :
groupby_func func;
- hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+ hash_table<group, aggregate, hasher_func, equal_func> group_table;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
gs_int32_t nflushes;
// Push out completed groups
-// create buffer on the stack to store key object
- char buffer[sizeof(group)];
-// Number of flushes required
-
-// extract the key information from the tuple and
-// copy it into buffer
- group* grp = func.create_group(tup, buffer);
+ group grp, *ret;
+ ret = func.create_group(tup, (gs_sp_t)&grp);
nflushes = func.flush_needed();
-
- if (!grp) {
+ if(func.disordered()){
+ fprintf(stderr,"Out of order record in %s\n",op_name);
+ return 0;
+ }
+
+ if (! ret) {
if (nflushes>0){
flush(result);
}
if (nflushes>0) {
flush(result);
}
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
if ((iter = group_table.find(grp)) != group_table.end()) {
- aggregate* old_aggr = (*iter).second;
- func.update_aggregate(tup, grp, old_aggr);
+ func.update_aggregate(tup, grp, (*iter).second);
}else{
- // create a copy of the group on the heap
- group* new_grp = new group(grp); // need a copy constructor for groups
-// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
- aggregate* aggr = new aggregate();
- // create an aggregate in preallocated buffer
- aggr = func.create_aggregate(tup, (char*)aggr);
-
- group_table.insert(new_grp, aggr);
+ aggregate aggr;
+ // create an aggregate in preallocated buffer
+ func.create_aggregate(tup, (char*)&aggr);
+ // neeed operator= doing a deep copy
+ group_table.insert(grp, aggr);
}
tup.free_tuple();
return 0;
virtual int flush(list<host_tuple>& result) {
host_tuple tup;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
// Limit the number of successive flushes - avoid explosive behavior
const gs_int32_t max_flushes = 10;
result.push_back(tup);
}
if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
- group* g = (*flush_pos).first;
- aggregate* a = (*flush_pos).second;
+ group &g = (*flush_pos).first;
+ //aggregate a = (*flush_pos).second;
++flush_pos;
group_table.erase(g);
- delete (g);
- delete (a);
}else{
func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
++flush_pos;
case PRED_IN:
ldt = pr->get_left_se()->get_data_type();
if(ldt->complex_comparison(ldt) ){
- fcn_set.insert( ldt->get_comparison_fcn(ldt) );
+ fcn_set.insert( ldt->get_equals_fcn(ldt) );
}
gather_se_opcmp_fcns(pr->get_left_se(), fcn_set);
return;
ldt = pr->get_left_se()->get_data_type();
rdt = pr->get_right_se()->get_data_type();
if(ldt->complex_comparison(rdt) ){
- fcn_set.insert( ldt->get_comparison_fcn(rdt) );
+ fcn_set.insert( ldt->get_comparison_fcn(ldt) );
}
gather_se_opcmp_fcns(pr->get_left_se(),fcn_set) ;
gather_se_opcmp_fcns(pr->get_right_se(),fcn_set) ;
ret += "( ";
if(ldt->complex_comparison(ldt) ){
- ret += ldt->get_comparison_fcn(ldt) ;
+ ret += ldt->get_equals_fcn(ldt) ;
ret += "( ";
if(ldt->is_buffer_type() ) ret += "&";
ret += generate_se_code(pr->get_left_se(), schema);
ret += "( ";
if(ldt->complex_comparison(rdt) ){
+// TODO can use get_equals_fcn if op is "=" ?
ret += ldt->get_comparison_fcn(rdt);
ret += "(";
if(ldt->is_buffer_type() ) ret += "&";
string ret;
if(dt->complex_comparison(dt) ){
- ret += dt->get_comparison_fcn(dt);
+ ret += dt->get_equals_fcn(dt);
ret += "(";
if(dt->is_buffer_type() ) ret += "&";
ret += lhs_op;
return(ret);
}
-static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
- string ret;
-
- if(dt->complex_comparison(dt) ){
- ret += dt->get_comparison_fcn(dt);
- ret += "(";
- if(dt->is_buffer_type() ) ret += "&";
- ret += lhs_op;
- ret += ", ";
- if(dt->is_buffer_type() ) ret += "&";
- ret += rhs_op;
- ret += ") == 0";
- }else{
- ret += lhs_op;
- ret += " == ";
- ret += rhs_op;
- }
-
- return(ret);
-}
+//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+// string ret;
+//
+ // if(dt->complex_comparison(dt) ){
+// ret += dt->get_equals_fcn(dt);
+// ret += "(";
+// if(dt->is_buffer_type() ) ret += "&";
+// ret += lhs_op;
+// ret += ", ";
+// if(dt->is_buffer_type() ) ret += "&";
+// ret += rhs_op;
+// ret += ") == 0";
+// }else{
+// ret += lhs_op;
+// ret += " == ";
+// ret += rhs_op;
+// }
+//
+// return(ret);
+//}
// Here I assume that only MIN and MAX aggregates can be computed
// over BUFFER data types.
-int compute_snap_len(qp_node *fs, table_list *schema){
+int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
// Initialize global vars
gb_tbl = NULL;
int tblref = (*csi).tblvar_ref;
string field = (*csi).field;
- param_list *field_params = schema->get_modifier_list(schref, field);
- if(field_params->contains_key("snap_len")){
- string fld_snap_str = field_params->val_of("snap_len");
- int fld_snap;
- if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
- if(fld_snap > snap_len) snap_len = fld_snap;
- n_snap++;
- }else{
- fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+ if(snap_type == "index"){
+ int pos = schema->get_field_idx(schref, field);
+ if(pos>snap_len) snap_len = pos;
+ n_snap++;
+ }else{
+ param_list *field_params = schema->get_modifier_list(schref, field);
+ if(field_params->contains_key("snap_len")){
+ string fld_snap_str = field_params->val_of("snap_len");
+ int fld_snap;
+ if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
+ if(fld_snap > snap_len) snap_len = fld_snap;
+ n_snap++;
+ }else{
+ fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+ }
}
}
}
std::string generate_lfta_prefilter(std::vector<cnf_set *> &pred_list, col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns, std::vector<col_id_set> &lfta_cols, std::vector<long long int> &lfta_sigs, vector<int> &lfta_snap_lens, std::string iface);
std::string generate_lfta_prefilter_struct(col_id_set &temp_cids, table_list *Schema);
-int compute_snap_len(qp_node *fs, table_list *schema);
+// a snap_type of "index" computes position, else use snap_len metadata
+int compute_snap_len(qp_node *fs, table_list *schema, string snap_type);
std::string generate_watchlist_element_name(std::string node_name);
std::string generate_watchlist_struct_name(std::string node_name);
return tbl_list[t]->get_field(tbl_list[t]->get_field_idx(f));
}
int get_field_idx(std::string t, std::string f);
+ int get_field_idx(int t, std::string f){
+ return tbl_list[t]->get_field_idx(f);
+ }
int find_tbl(std::string t);
new_opl.push_back(new_se);
}
}
- stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),false, false,aggr_tbl.has_bailout(a));
+ stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),aggr_tbl.is_superaggr(a), aggr_tbl.is_running_aggr(a),aggr_tbl.has_bailout(a));
hse = new scalarexp_t(aggr_tbl.get_op(a).c_str(),new_opl);
hse->set_data_type(Ext_fcns->get_fcn_dt(aggr_tbl.get_fcn_id(a)));
hse->set_fcn_id(aggr_tbl.get_fcn_id(a));
stream_node->set_node_name( node_name );
stream_node->table_name->set_range_var(table_name->get_var_name());
-// allowed stream disorder. Default is 2,
+// allowed stream disorder. Default is 1,
// can override with max_lfta_disorder setting.
// Also limit the hfta disorder, set to lfta disorder + 1.
// can override with max_hfta_disorder.
- fta_node->lfta_disorder = 2;
+ fta_node->lfta_disorder = 1;
if(this->get_val_of_def("max_lfta_disorder") != ""){
int d = atoi(this->get_val_of_def("max_lfta_disorder").c_str() );
if(d<1){
}
}
+
// First, process the group-by variables.
// The fta must supply the values of all the gbvars.
// If a gb is computed, the computation must be
ret.append("( ");
if(ldt->complex_comparison(ldt) ){
- ret.append( ldt->get_hfta_comparison_fcn(ldt) );
+ ret.append( ldt->get_hfta_equals_fcn(ldt) );
ret.append("( ");
if(ldt->is_buffer_type() )
ret.append("&");
ret.append("( ");
if(ldt->complex_comparison(rdt) ){
+// TODO can use get_hfta_equals_fcn if op is "=" ?
ret.append(ldt->get_hfta_comparison_fcn(rdt));
ret.append("(");
if(ldt->is_buffer_type() )
ret.append("( ");
if(ldt->complex_comparison(ldt) ){
- ret.append( ldt->get_hfta_comparison_fcn(ldt) );
+ ret.append( ldt->get_hfta_equals_fcn(ldt) );
ret.append("( ");
if(ldt->is_buffer_type() )
ret.append("&");
ret.append("( ");
if(ldt->complex_comparison(rdt) ){
+// TODO can use get_hfta_equals_fcn if op is "=" ?
ret.append(ldt->get_hfta_comparison_fcn(rdt));
ret.append("(");
if(ldt->is_buffer_type() )
string ret;
if(dt->complex_comparison(dt) ){
- ret.append(dt->get_hfta_comparison_fcn(dt));
+ ret.append(dt->get_hfta_equals_fcn(dt));
ret.append("(");
if(dt->is_buffer_type() )
ret.append("&");
return(ret);
}
-static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
- string ret;
-
- if(dt->complex_comparison(dt) ){
- ret.append(dt->get_hfta_comparison_fcn(dt));
- ret.append("(");
- if(dt->is_buffer_type() )
- ret.append("&");
- ret.append(lhs_op);
- ret.append(", ");
- if(dt->is_buffer_type() )
- ret.append("&");
- ret.append(rhs_op );
- ret.append(") == 0");
- }else{
- ret.append(lhs_op );
- ret.append(" == ");
- ret.append(rhs_op );
- }
-
- return(ret);
-}
+//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+// string ret;
+//
+// if(dt->complex_comparison(dt) ){
+// ret.append(dt->get_hfta_equals_fcn(dt));
+// ret.append("(");
+// if(dt->is_buffer_type() )
+// ret.append("&");
+// ret.append(lhs_op);
+// ret.append(", ");
+// if(dt->is_buffer_type() )
+// ret.append("&");
+// ret.append(rhs_op );
+// ret.append(") == 0");
+// }else{
+// ret.append(lhs_op );
+// ret.append(" == ");
+// ret.append(rhs_op );
+// }
+//
+// return(ret);
+//}
// Here I assume that only MIN and MAX aggregates can be computed
string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector<bool> &needs_xform){
int a,g,w,s;
+// Regular or slow flush?
+ hfta_slow_flush = 0;
+ if(this->get_val_of_def("hfta_slow_flush") != ""){
+ int d = atoi(this->get_val_of_def("hfta_slow_flush").c_str() );
+ if(d<0){
+ fprintf(stderr,"Warning, hfta_slow_flush in node %s is %d, must be at least 0, setting to 0.\n",node_name.c_str(), d);
+ hfta_slow_flush = 0;
+ }else{
+ hfta_slow_flush = d;
+ }
+ }
+
// Initialize generate utility globals
segen_gb_tbl = &(gb_tbl);
ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
}
// empty strucutred literals
- map<int, string>::iterator sii;
- for(sii=structured_types.begin();sii!=structured_types.end();++sii){
- data_type dt(sii->second);
- literal_t empty_lit(sii->first);
- ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
- }
+// map<int, string>::iterator sii;
+// for(sii=structured_types.begin();sii!=structured_types.end();++sii){
+// data_type dt(sii->second);
+// literal_t empty_lit(sii->first);
+// ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
+// }
// Constructors
if(structured_types.size()==0){
ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
}
+ ret += "\t// shallow copy constructors\n";
ret += "\t"+generate_functor_name() + "_groupdef("+
- this->generate_functor_name() + "_groupdef *gd){\n";
+ "const " + this->generate_functor_name() + "_groupdef &gd){\n";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
- if(gdt->is_buffer_type()){
- sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
- gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
- ret += tmpstr;
- }else{
- sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
- ret += tmpstr;
- }
+ sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
+// TODO : do strings perisist after the call? its a shllow copy
+// if(gdt->is_buffer_type()){
+// sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
+// gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+// ret += tmpstr;
+// }else{
+// sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+// ret += tmpstr;
+// }
}
ret += "\t}\n";
ret += "\t"+generate_functor_name() + "_groupdef("+
- this->generate_functor_name() + "_groupdef *gd, bool *pattern){\n";
+ "const " + this->generate_functor_name() + "_groupdef &gd, bool *pattern){\n";
+// -- For patterns, need empty strucutred literals
+ map<int, string>::iterator sii;
+ for(sii=structured_types.begin();sii!=structured_types.end();++sii){
+ data_type dt(sii->second);
+ literal_t empty_lit(sii->first);
+ ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
+ }
+
for(sii=structured_types.begin();sii!=structured_types.end();++sii){
literal_t empty_lit(sii->first);
ret += "\t\t"+empty_lit.to_hfta_C_code("&"+empty_lit.hfta_empty_literal_name())+";\n";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
ret += "\t\tif(pattern["+int_to_string(g)+"]){\n";
- if(gdt->is_buffer_type()){
- sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
- gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
- ret += tmpstr;
- }else{
- sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g);
- ret += tmpstr;
- }
+ sprintf(tmpstr,"\t\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
+// TODO Do strings persist long enough? its a shllow copy constructor?
+// if(gdt->is_buffer_type()){
+// sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
+// gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+// ret += tmpstr;
+// }else{
+// sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+// ret += tmpstr;
+// }
ret += "\t\t}else{\n";
literal_t empty_lit(gdt->type_indicator());
if(empty_lit.is_cpx_lit()){
ret += "\t\t}\n";
}
ret += "\t};\n";
+
+ ret += "\t// deep assignment operator\n";
+ ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+
+ this->generate_functor_name() + "_groupdef &gd){\n";
+ for(g=0;g<gb_tbl.size();g++){
+ data_type *gdt = gb_tbl.get_data_type(g);
+ if(gdt->is_buffer_type()){
+ sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n",
+ gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+ ret += tmpstr;
+ }else{
+ sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
+ }
+ }
+ ret += "\t}\n";
+
// destructor
ret += "\t~"+ generate_functor_name() + "_groupdef(){\n";
for(g=0;g<gb_tbl.size();g++){
}
}
ret += "\tbool needs_temporal_flush;\n";
+ ret += "\tbool disordered_arrival;\n";
}
"}\n\n"
;
+//---------------------------------------
+// Parameterized number of tuples output per slow flush
+ ret +=
+"int gb_flush_per_tuple(){\n"
+" return "+int_to_string(hfta_slow_flush)+";\n"
+"}\n\n";
+
+
ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
if(hfta_disorder < 2){
if(uses_temporal_flush){
- ret+= "\tif( !( (";
+ ret+= "\tif( ( (";
bool first_one = true;
+ string disorder_test;
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
- sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr;
- sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr;
- if(first_one){first_one = false;} else {ret += ") && (";}
- ret += generate_equality_test(lhs_op, rhs_op, gdt);
+ sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr;
+ sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr;
+ if(first_one){first_one = false;} else {ret += ") && (";}
+ ret += generate_lt_test(lhs_op, rhs_op, gdt);
+ disorder_test += generate_lt_test(rhs_op, lhs_op, gdt);
}
}
ret += ") ) ){\n";
}
}
ret += "\t\tneeds_temporal_flush=true;\n";
- ret += "\t\t}else{\n"
- "\t\t\tneeds_temporal_flush=false;\n"
- "\t\t}\n";
+ ret += "\t}else{\n"
+ "\t\tneeds_temporal_flush=false;\n"
+ "\t}\n";
+
+ ret += "\tdisordered_arrival = "+disorder_test+";\n";
+// ret += "\tif( ( ("+disorder_test+") ) ){\n";
+// ret += "\t\tdisordered_arrival=true;\n";
+// ret += "\t}else{\n";
+// ret += "\t\tdisordered_arrival=false;\n";
+// ret += "\t}\n";
+
}
}else{
ret+= "\tif(temp_tuple_received && !( (";
// update an aggregate object
ret += "void update_aggregate(host_tuple &tup0, "
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval){\n";
// Variables for execution of the function.
ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure
// Unpack all remaining attributes
ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform);
for(a=0;a<aggr_tbl.size();a++){
- sprintf(tmpstr,"aggval->aggr_var%d",a);
+ sprintf(tmpstr,"aggval.aggr_var%d",a);
string varname = tmpstr;
ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema));
}
}
ret += "\t};\n";
+ ret += "bool disordered(){return disordered_arrival;}\n";
+
//---------------------------------------------------
// create output tuple
// Unpack the partial functions ref'd in the where clause,
// so I'll leave it in longhand.
ret += "host_tuple create_output_tuple("
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval, bool &failed){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval, bool &failed){\n";
ret += "\thost_tuple tup;\n";
ret += "\tfailed = false;\n";
ret += "\tgs_retval_t retval = 0;\n";
- string gbvar = "gbval->gb_var";
- string aggvar = "aggval->";
+ string gbvar = "gbval.gb_var";
+ string aggvar = "aggval.";
// Create cached temporaries for UDAF return values.
for(a=0;a<aggr_tbl.size();a++){
ret += "struct "+generate_functor_name()+"_hash_func{\n";
ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+
- "_groupdef *grp) const{\n";
+ "_groupdef &grp) const{\n";
ret += "\t\treturn( (";
for(g=0;g<gb_tbl.size();g++){
if(g>0) ret += "^";
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->use_hashfunc()){
if(gdt->is_buffer_type())
- sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+ sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
else
- sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+ sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
}else{
- sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
+ sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
}
ret += tmpstr;
}
// The comparison function
ret += "struct "+generate_functor_name()+"_equal_func{\n";
- ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+
- generate_functor_name()+"_groupdef *grp2) const{\n";
+ ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+
+ "const "+generate_functor_name()+"_groupdef &grp2) const{\n";
ret += "\t\treturn( (";
for(g=0;g<gb_tbl.size();g++){
if(g>0) ret += ") && (";
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->complex_comparison(gdt)){
- if(gdt->is_buffer_type())
- sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
- else
- sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ if(gdt->is_buffer_type())
+ sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)",
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
+ else
+ sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)",
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
}else{
- sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
+ sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g);
}
ret += tmpstr;
}
string sgah_qpn::generate_operator(int i, string params){
if(hfta_disorder < 2){
+ string op_name = "groupby_operator";
+ if(hfta_slow_flush>0)
+ op_name = "groupby_slowflush_operator";
return(
- " groupby_operator<" +
+ " "+op_name+"<" +
generate_functor_name()+","+
generate_functor_name() + "_groupdef, " +
generate_functor_name() + "_aggrdef, " +
generate_functor_name()+"_hash_func, "+
generate_functor_name()+"_equal_func "
- "> *op"+int_to_string(i)+" = new groupby_operator<"+
+ "> *op"+int_to_string(i)+" = new "+op_name+"<"+
generate_functor_name()+","+
generate_functor_name() + "_groupdef, " +
generate_functor_name() + "_aggrdef, " +
if(hashkey_dt[p]->complex_comparison(hashkey_dt[p])){
if(hashkey_dt[p]->is_buffer_type())
sprintf(tmpstr,"(%s(&(key1->hashkey_var%d), &(key2->hashkey_var%d))==0)",
- hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p);
+ hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p);
else
sprintf(tmpstr,"(%s((key1->hashkey_var%d), (key2->hashkey_var%d))==0)",
- hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p);
+ hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p);
}else{
sprintf(tmpstr,"key1->hashkey_var%d == key2->hashkey_var%d",p,p);
}
if(gdt->complex_comparison(gdt)){
if(gdt->is_buffer_type())
sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
else
sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
}else{
sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
}
if(gdt->complex_comparison(gdt)){
if(gdt->is_buffer_type())
sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
else
sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
}else{
sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
}
ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
}
// Constructors
+
ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
+ ret += "\t// shallow copy constructor\n";
ret += "\t"+generate_functor_name() + "_groupdef("+
- this->generate_functor_name() + "_groupdef *gd){\n";
+ this->generate_functor_name() + "_groupdef &gd){\n";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
- if(gdt->is_buffer_type()){
- sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
- gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
- ret += tmpstr;
- }else{
- sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
- ret += tmpstr;
- }
+ sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
}
ret += "\t};\n";
+
+ ret += "\t// deep assignment operator\n";
+ ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+
+ this->generate_functor_name() + "_groupdef &gd){\n";
+ for(g=0;g<gb_tbl.size();g++){
+ data_type *gdt = gb_tbl.get_data_type(g);
+ if(gdt->is_buffer_type()){
+ sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n",
+ gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+ ret += tmpstr;
+ }else{
+ sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
+ }
+ }
+ ret += "\t}\n";
+
// destructor
ret += "\t~"+ generate_functor_name() + "_groupdef(){\n";
for(g=0;g<gb_tbl.size();g++){
}
}
ret += "\tgs_int32_t needs_temporal_flush;\n";
+ ret += "\tbool disordered_arrival;\n";
}
// The publicly exposed functions
// temporal flush variables
// ASSUME that structured values won't be temporal.
- gs_int32_t temporal_gb = 0;
if(uses_temporal_flush){
ret += "//\t\tInitialize temporal flush variables.\n";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
literal_t gl(gdt->type_indicator());
- sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
- ret.append(tmpstr);
sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
ret.append(tmpstr);
- temporal_gb = g;
+ sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
+ ret.append(tmpstr);
}
}
ret += "\tneeds_temporal_flush = 0;\n";
if(uses_temporal_flush){
ret+= "\tif( ( (";
bool first_one = true;
+ string disorder_test;
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr;
if(first_one){first_one = false;} else {ret += ") && (";}
ret += generate_lt_test(lhs_op, rhs_op, gdt);
+ disorder_test += generate_lt_test(rhs_op, lhs_op, gdt);
}
}
ret += ") ) ){\n";
+ int temporal_gb=-1;
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
- temporal_gb = g;
- if(gdt->is_buffer_type()){ // TODO first, last? or delete?
- sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
+ if(gdt->is_buffer_type()){
+ sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&curr_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
}else{
+// sprintf(tmpstr,"\t\tlast_gb%d = curr_gb%d;\n",g,g);
+// ret += tmpstr;
+// sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
+
ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n";
ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n";
ret += "\t\t}else{\n";
ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n";
ret += "\t\t}\n";
sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
+ temporal_gb=g;
}
ret += tmpstr;
}
}
- ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n";
- ret += "\t\t}else{\n"
- "\t\t\tneeds_temporal_flush=0;\n"
- "\t\t}\n";
+ ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n";
+ ret += "\t}else{\n"
+ "\t\tneeds_temporal_flush=0;\n"
+ "\t}\n";
+
+ ret += "\tdisordered_arrival = "+disorder_test+";\n";
+// ret += "\tif( ( ("+disorder_test+") ) ){\n";
+// ret += "\t\tdisordered_arrival=true;\n";
+// ret += "\t}else{\n";
+// ret += "\t\tdisordered_arrival=false;\n";
+// ret += "\t}\n";
}
// update an aggregate object
ret += "void update_aggregate(host_tuple &tup0, "
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval){\n";
// Variables for execution of the function.
ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure
// Unpack all remaining attributes
ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform);
for(a=0;a<aggr_tbl.size();a++){
- sprintf(tmpstr,"aggval->aggr_var%d",a);
+ sprintf(tmpstr,"aggval.aggr_var%d",a);
string varname = tmpstr;
ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema));
}
// reinitialize an aggregate object
ret += "void reinit_aggregates( "+
- generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval){\n";
+ generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval){\n";
// Variables for execution of the function.
ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure
// use of temporaries depends on the aggregate,
// generate them in generate_aggr_update
+ int temporal_gb; // track the # of the temporal gb
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
if(gdt->is_buffer_type()){
- sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
+ sprintf(tmpstr,"\t\t%s(&(gbval.gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
}else{
- sprintf(tmpstr,"\t\t gbval->gb_var%d =last_gb%d;\n",g,g);
+ sprintf(tmpstr,"\t\t gbval.gb_var%d =last_gb%d;\n",g,g);
}
ret += tmpstr;
+ temporal_gb = g;
}
}
// Unpack all remaining attributes
for(a=0;a<aggr_tbl.size();a++){
- sprintf(tmpstr,"aggval->aggr_var%d",a);
+ sprintf(tmpstr,"aggval.aggr_var%d",a);
string varname = tmpstr;
ret.append(generate_aggr_reinitialize(varname,&aggr_tbl,a, schema));
}
if(uses_temporal_flush){
ret += "\treturn needs_temporal_flush;\n";
}else{
- ret += "\treturn 0;\n";
+ ret += "\treturn false;\n";
}
ret += "};\n";
+ ret += "bool disordered(){return disordered_arrival;}\n";
+
//------------------------------------------------
// time bucket management
ret += "void advance_last_tb(){\n";
// so I'll leave it in longhand.
ret += "host_tuple create_output_tuple("
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval, bool &failed){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval, bool &failed){\n";
ret += "\thost_tuple tup;\n";
ret += "\tfailed = false;\n";
ret += "\tgs_retval_t retval = 0;\n";
- string gbvar = "gbval->gb_var";
- string aggvar = "aggval->";
+ string gbvar = "gbval.gb_var";
+ string aggvar = "aggval.";
// First, get the return values from the UDAFS
// been unpacked. delete the string udaf return values at the end.
ret += "bool cleaning_when("
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval){\n";
ret += "\tbool retval = true;\n";
- gbvar = "gbval->gb_var";
- aggvar = "aggval->";
+ gbvar = "gbval.gb_var";
+ aggvar = "aggval.";
set<int> clw_pfcns;
ret += "struct "+generate_functor_name()+"_hash_func{\n";
ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+
- "_groupdef *grp) const{\n";
+ "_groupdef &grp) const{\n";
ret += "\t\treturn(0";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
ret += "^";
if(gdt->use_hashfunc()){
if(gdt->is_buffer_type())
- sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+ sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
else
- sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+ sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
}else{
- sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
+ sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
}
ret += tmpstr;
}
// The comparison function
ret += "struct "+generate_functor_name()+"_equal_func{\n";
- ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+
- generate_functor_name()+"_groupdef *grp2) const{\n";
+ ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+
+ "const "+generate_functor_name()+"_groupdef &grp2) const{\n";
ret += "\t\treturn( (";
string hcmpr = "";
if(first_exec){first_exec=false;}else{ hcmpr += ") && (";}
if(gdt->complex_comparison(gdt)){
if(gdt->is_buffer_type())
- sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)",
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
else
- sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)",
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
}else{
- sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
+ sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g);
}
hcmpr += tmpstr;
}
int lfta_disorder; // maximum disorder in the steam between lfta, hfta
int hfta_disorder; // maximum disorder in the hfta
+ int hfta_slow_flush; // outputs per input, 0 means no slow flush
// rollup, cube, and grouping_sets cannot be readily reconstructed by
// analyzing the patterns, so explicitly record them here.
std::string generate_operator(int i, std::string params);
std::string get_include_file(){
- if(hfta_disorder <= 1){
- return("#include <groupby_operator.h>\n");
+ if(hfta_disorder <= 1){
+ if(hfta_slow_flush>0){
+ return("#include <groupby_slowflush_operator.h>\n");
}else{
- return("#include <groupby_operator_oop.h>\n");
+ return("#include <groupby_operator.h>\n");
}
+ }else{
+ return("#include <groupby_operator_oop.h>\n");
+ }
};
std::vector<select_element *> get_select_list(){return select_list;};
sgah_qpn(){
lfta_disorder = 1;
hfta_disorder = 1;
+ hfta_slow_flush = 0;
};
sgah_qpn(query_summary_class *qs,table_list *Schema){
lfta_disorder = 1;
hfta_disorder = 1;
+ hfta_slow_flush = 0;
// Get the table name.
// NOTE the colrefs have the tablevar ref (an int)
param_tbl->handle_access(param_names[pi]));
}
ret->definitions = definitions;
+ ret->hfta_slow_flush = hfta_slow_flush;
ret->node_name = node_name + suffix;
vector<string> registration_query_names; // for lfta.c registration
map<string, vector<int> > mach_query_names; // list queries of machine
vector<int> snap_lengths; // for lfta.c registration
+ vector<int> snap_position; // for lfta.c registration
vector<string> interface_names; // for lfta.c registration
vector<string> machine_names; // machine of interface
vector<bool> lfta_reuse_options; // for lfta.c registration
lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
*/
- snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
+ snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
+ snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
+
+// STOPPED HERE need to figure out how to generate the code that Vlad needs
+// from snap_postion
// TODO NOTE : I'd like it to be the case that registration_query_names
// are the queries to be registered for subsciption.
// Find common filter predicates in the LFTAs.
-// in addition generate structs to store the temporal attributes unpacked by prefilter
+// in addition generate structs to store the
+// temporal attributes unpacked by prefilter
+// compute & provide interface for per-interface
+// record extraction properties
map<string, vector<stream_query *> >::iterator ssqi;
for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
#endif
map<string, vector<long long int> > lfta_sigs; // used again later
+ map<string, int> lfta_snap_pos; // optimize csv parsing
+ // compute now, use in get_iface_properties
for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
string liface = (*mvsi).first;
vector<long long int> empty_list;
lfta_sigs[liface] = empty_list;
+ lfta_snap_pos[liface] = -1;
vector<col_id_set> lfta_cols;
vector<int> lfta_snap_length;
}
lfta_sigs[liface].push_back(mask);
lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
- lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
+ lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
+ int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
+ if(this_snap_pos > lfta_snap_pos[liface])
+ lfta_snap_pos[liface] = this_snap_pos;
}
//for(li=0;li<mach_lftas.size();++li){
}
lfta_val[lmach] += "\";\n";
}
+ lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
+ lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
lfta_val[lmach] += "\t\t} else\n";
lfta_val[lmach] += "\t\t\treturn NULL;\n";
}
bool use_proto = false;
bool use_bsa = false;
bool use_kafka = false;
+ bool use_ssl = false;
int erri;
string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
#endif
}
}
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef SSL_ENABLED
+ use_ssl = true;
+#else
+ fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
}
fprintf(outfl,
if(use_pads)
fprintf(outfl,"-lgscppads -lpads ");
fprintf(outfl,
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
if(use_pads)
fprintf(outfl, " -lpz -lz -lbz ");
if(libz_exists && libast_exists)
#endif
#ifdef KAFKA_ENABLED
fprintf(outfl, " -lrdkafka ");
+#endif
+#ifdef SSL_ENABLED
+ fprintf(outfl, " -lssl -lcrypto ");
#endif
fprintf(outfl," -lgscpaux");
#ifdef GCOV
default:
return("ERROR_NO_SUCH_COMPARISON_FCN");
}
-
-
+}
+string data_type::get_equals_fcn(data_type *dt){
+ switch(type){
+ case timeval_t:
+ return("Compare_Timeval");
+ case v_str_t:
+ return("str_equal");
+ case ipv6_t:
+ return("ipv6_compare");
+ default:
+ return("ERROR_NO_SUCH_COMPARISON_FCN");
+ }
}
string data_type::get_hfta_comparison_fcn(data_type *dt){
return("ERROR_NO_SUCH_COMPARISON_FCN");
}
}
+string data_type::get_hfta_equals_fcn(data_type *dt){
+ switch(type){
+ case timeval_t:
+ return("hfta_Compare_Timeval");
+ case v_str_t:
+ return("hfta_vstr_equal");
+ case ipv6_t:
+ return("hfta_ipv6_compare");
+ default:
+ return("ERROR_NO_SUCH_COMPARISON_FCN");
+ }
+}
// Return true if operating on these types requires
// a special function for this operator.
bool complex_comparison(data_type *dt);
std::string get_comparison_fcn(data_type *dt);
std::string get_hfta_comparison_fcn(data_type *dt);
+ std::string get_equals_fcn(data_type *dt);
+ std::string get_hfta_equals_fcn(data_type *dt);
bool complex_operator(data_type *dt, std::string &op);
std::string get_complex_operator(data_type *dt, std::string &op);
}
// Assume that SRC is either INTERNAL or SHALLOW_COPY
-void hfta_vstr_assign_with_copy_in_tuple(vstring32 * target, vstring * src,
- gs_sp_t data_offset, gs_retval_t int_offset) {
+void hfta_vstr_assign_with_copy_in_tuple(vstring32 * target,
+ const vstring * src, gs_sp_t data_offset, gs_retval_t int_offset) {
target->length = src->length;
target->offset = int_offset;
target->reserved = PACKED;
// Ted wrote the following function.
// make deep copy of src. Assume that dst is already empty.
// Assume that SRC is either INTERNAL or SHALLOW_COPY
-void hfta_vstr_assign_with_copy(vstring *dst, vstring *src){
+void hfta_vstr_assign_with_copy(vstring *dst, const vstring *src){
dst->length=src->length;
if(src->length){
dst->offset=(gs_p_t)malloc(dst->length);
// Ted wrote the following function.
// Make a deep copy of src. garbage collect dst if needed.
// Assume that SRC is either INTERNAL or SHALLOW_COPY
-void hfta_vstr_replace(vstring *dst, vstring *src){
+void hfta_vstr_replace(vstring *dst, const vstring *src){
hfta_vstr_destroy(dst);
hfta_vstr_assign_with_copy(dst,src);
}
return(s1->length - s2->length);
}
+gs_retval_t hfta_vstr_equal(const vstring * s1, const vstring * s2) {
+ gs_int32_t x;
+
+ if(s1->length != s2->length)
+ return -1;
+
+// cmp_ret=memcmp((void *)s1->offset,(void *)s2->offset,s1->length);
+ for(x=0;x<s1->length;x++) {
+ if (((char *)(s1->offset))[x]!=((char *)(s2->offset))[x]) {
+ return -1;
+ }
+ }
+
+
+ return 0;
+}
+
gs_param_handle_t register_handle_for_str_regex_match_slot_1(vstring* pattern) {
gs_uint32_t cnt;
};
-// sctarchpad struct
+// scratchpad struct
struct avg_udaf_hfta_struct_t{
gs_int64_t sum;
gs_uint32_t cnt;
hfta_vstr_destroy(scratch);
}
+
+////////////////////////////////////////////
+// Aggregate strings by catenation
+
+
+struct CAT_aggr_scratch{
+ std::string val;
+ int x;
+};
+
+struct CAT_aggr_scratch_ptr{
+ CAT_aggr_scratch *ptr;
+};
+
+void CAT_aggr_HFTA_AGGR_INIT_(gs_sp_t s){
+ CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+ CAT_aggr_scratch *v = new CAT_aggr_scratch();
+ v->x = 101;
+
+ p->ptr = v;
+}
+void CAT_aggr_HFTA_AGGR_REINIT_(gs_sp_t s){
+ CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+ CAT_aggr_scratch *v = p->ptr;
+ v->val="";
+}
+void CAT_aggr_HFTA_AGGR_UPDATE_(gs_sp_t s, vstring *sep, vstring *str){
+char buf1[MAXTUPLESZ-20], buf2[MAXTUPLESZ-20];
+int i;
+for(i=0;i<sep->length;++i) buf1[i] = *(((char *)sep->offset)+i);
+buf1[i]='\0';
+for(i=0;i<str->length;++i) buf2[i] = *(((char *)str->offset)+i);
+buf2[i]='\0';
+ CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+ CAT_aggr_scratch *v = p->ptr;
+ if(v->val.size()>0)
+ v->val.append((char *)(sep->offset), sep->length);
+ v->val.append((char *)(str->offset), str->length);
+//printf("sep=%s, str=%s, val=%s\n",buf1,buf2,v->val.c_str());
+}
+void CAT_aggr_HFTA_AGGR_OUTPUT_(vstring *res, gs_sp_t s){
+ CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+ CAT_aggr_scratch *v = p->ptr;
+//printf("output val=%s\n",v->val.c_str());
+ res->offset = (gs_p_t)malloc(v->val.size());
+ res->length = v->val.size();
+ if(res->length>MAXTUPLESZ-20)
+ res->length=MAXTUPLESZ-20;
+// v->val.copy((char *)(res->offset), 0, res->length);
+ const char *dat = v->val.c_str();
+ memcpy((char *)(res->offset), dat, res->length);
+// for(int i=0;i<res->length;++i)
+// *(((char *)res->offset)+i) = dat[i];
+ res->reserved = INTERNAL;
+}
+void CAT_aggr_HFTA_AGGR_DESTROY_(gs_sp_t s){
+ CAT_aggr_scratch_ptr *p = (CAT_aggr_scratch_ptr *)s;
+ CAT_aggr_scratch *v = p->ptr;
+ delete v;
+}
+
+
+
+///////////////////////////////////////////////////////////////
+// time_avg((sample, ts, window_size)
+// Compute time-weighted average sum(sample*duration)/window_size
+// duration is difference between current and next ts.
+// The idea is to compute a sum over a step function.
+//
+
+struct time_avg_udaf_str{
+ gs_float_t sum;
+ gs_float_t last_val;
+ gs_uint64_t last_ts;
+ gs_uint64_t window;
+ gs_uint64_t first_ts;
+ gs_uint8_t event_occurred;
+};
+
+void time_avg_HFTA_AGGR_INIT_(gs_sp_t s){
+ time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+ scratch->sum = 0.0;
+ scratch->last_val = 0.0;
+ scratch->last_ts = 0;
+ scratch->first_ts = 0;
+ scratch->event_occurred = 0;
+}
+
+void time_avg_HFTA_AGGR_DESTROY_(gs_sp_t s){
+}
+
+void time_avg_HFTA_AGGR_REINIT_(gs_sp_t s){
+ time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+ scratch->event_occurred = 0;
+ scratch->sum = 0;
+//printf("time_avg_reinit: occurred=%d, last_val=%lf, sum=%lf, first_ts=%lld, last_ts=%lld\n",scratch->event_occurred, scratch->last_val, scratch->sum, scratch->first_ts, scratch->last_ts);
+}
+
+void time_avg_HFTA_AGGR_OUTPUT_(gs_float_t *result, gs_sp_t s){
+ time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+ if(scratch->event_occurred==0){
+ *result = scratch->last_val;
+//printf("\ttime_avg outpt1 sum=%lf, last_val=%lf, result=%lf\n", scratch->sum, scratch->last_val, *result);
+ return;
+ }
+ gs_int64_t end_time = scratch->window * (scratch->last_ts/scratch->window + 1);
+ scratch->sum += (end_time - scratch->last_ts) * (gs_float_t)(scratch->last_val);
+ gs_int64_t start_time = end_time - scratch->window;
+ if(scratch->first_ts > start_time){
+ *result = scratch->sum / (end_time - scratch->first_ts);
+//printf("\ttime_avg outpt2 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
+ }else{
+ *result = scratch->sum / (end_time - start_time);
+//printf("\ttime_avg outpt3 sum=%lf, start_time=%lld, end_time=%lld, first_ts=%lld, last_ts=%lld,result=%lf\n", scratch->sum, start_time, end_time, scratch->first_ts, scratch->last_ts, *result);
+ }
+}
+
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_float_t val, gs_int64_t ts, gs_int64_t window){
+ time_avg_udaf_str *scratch = (time_avg_udaf_str *)s;
+ scratch->window = window;
+ if(scratch->first_ts==0){
+ scratch->first_ts = ts;
+ }else{
+ if(scratch->event_occurred){
+
+ scratch->sum += (ts - scratch->last_ts) * scratch->last_val;
+ }else{
+ gs_int64_t start_time = scratch->window * (scratch->last_ts/scratch->window);
+ scratch->sum += (ts - start_time) * scratch->last_val;
+ }
+ }
+//printf("time_avg_upd: val=%lf, occurred=%d, last_val=%lf, sum=%lf, ts=%lld, first_ts=%lld, last_ts=%lld\n",val, scratch->event_occurred, scratch->last_val, scratch->sum, ts, scratch->first_ts, scratch->last_ts);
+ scratch->last_val = val;
+ scratch->last_ts = ts;
+ scratch->event_occurred = 1;
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t val, gs_int64_t ts, gs_int64_t window){
+ time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t val, gs_int64_t ts, gs_int64_t window){
+ time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t val, gs_int64_t ts, gs_int64_t window){
+ time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+void time_avg_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t val, gs_int64_t ts, gs_int64_t window){
+ time_avg_HFTA_AGGR_UPDATE_(s, (gs_float_t)val, ts, window);
+}
+
+
+// ------------------------------------------------------------
+// running_sum_max : get the running sum of an int,
+// be able to report this sum and also its max value
+// during the time window
+
+struct run_sum_max_udaf_str{
+ gs_int64_t sum;
+ gs_int64_t max;
+};
+void run_sum_max_HFTA_AGGR_INIT_(gs_sp_t s){
+ run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+ scratch->sum = 0;
+ scratch->max = 0;
+}
+void run_sum_max_HFTA_AGGR_REINIT_(gs_sp_t s){
+ run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+ scratch->max = scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_OUTPUT_(vstring *r,gs_sp_t b){
+ r->length = sizeof(run_sum_max_udaf_str);
+ r->offset = (gs_p_t)(b);
+ r->reserved = SHALLOW_COPY;
+}
+void run_sum_max_HFTA_AGGR_DESTROY_(gs_sp_t b){
+ return;
+}
+
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint64_t v){
+ run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+ scratch->sum+=v;
+ if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int64_t v){
+ run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+ scratch->sum+=v;
+ if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_uint32_t v){
+ run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+ scratch->sum+=v;
+ if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+void run_sum_max_HFTA_AGGR_UPDATE_(gs_sp_t s, gs_int32_t v){
+ run_sum_max_udaf_str *scratch = (run_sum_max_udaf_str *)s;
+ scratch->sum+=v;
+ if(scratch->sum>scratch->max) scratch->max=scratch->sum;
+}
+// the extraction functions
+gs_int64_t extr_running_sum(vstring *v){
+ if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
+ run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
+ return vs->sum;
+}
+gs_int64_t extr_running_sum_max(vstring *v){
+ if(v->length != sizeof(run_sum_max_udaf_str)) return 0;
+ run_sum_max_udaf_str *vs = (run_sum_max_udaf_str *)(v->offset);
+ return vs->max;
+}
+
+
#include <sys/socket.h>
#include <netinet/in.h>
#include <zlib.h>
-#include "errno.h"
-#include "stdio.h"
-#include "stdlib.h"
+#include <errno.h>
+#include <stdio.h>
+#include <dirent.h>
extern "C" {
BSA::FileStream::ISubStream* stream;
BSA::FileStream::IFileHandle* ifh;
BSA::FileStream::Reader* reader;
+
+#endif
+
+#ifdef SSL_ENABLED
+#include <openssl/pem.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+#include <openssl/ssl.h>
+#include <openssl/crypto.h>
+#include <openssl/err.h>
+
+EVP_PKEY *rkey;
+PKCS7 *p7;
+BIO* mem_io;
+char pwd[CSVMAXLINE];
+
+// callback for passing password to private key reader
+int pass_cb(char *buf, int size, int rwflag, void *u) {
+ int len = strlen(pwd);
+ memcpy(buf, pwd, len);
+ return len;
+}
+
#endif
gs_sp_t dev;
static int fd=-1;
static struct packet cur_packet;
static gs_sp_t name;
+static gs_sp_t dir_name;
+struct dirent **namelist;
+static gs_int32_t num_dir_files;
static gs_sp_t line;
static ssize_t len;
static size_t line_len;
static gs_uint32_t singlefile=0;
static gs_uint32_t use_gzip=0;
static gs_uint32_t use_bsa=0;
+static gs_uint32_t use_decryption=0;
static gs_uint32_t gshub=0;
static int socket_desc=0;
}
static void next_file() {
+
+ static gs_uint32_t file_pos = 0;
+ static gs_uint32_t scan_finished = 0;
+
+ char buf[CSVMAXLINE];
+
+ if (dir_name) {
+ if (scan_finished) {
+ if (verbose)
+ fprintf(stderr,"Done processing, waiting for things to shut down\n");
+ rts_fta_done();
+ // now just service message queue until we get killed or loose connectivity
+ while (true) {
+ fta_start_service(0); // service all waiting messages
+ usleep(1000); // sleep a millisecond
+ }
+ }
+ if (num_dir_files) { // we already started directory scan
+ free(name);
+ if (file_pos < num_dir_files) {
+ sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
+ name = strdup(buf);
+ free(namelist[file_pos]);
+ file_pos++;
+ } else {
+ free(namelist);
+ scan_finished = 1;
+ return;
+ }
+ } else {
+ num_dir_files = scandir(dir_name, &namelist, NULL, alphasort);
+ if (num_dir_files == -1) {
+ num_dir_files = 0;
+ print_error((gs_sp_t)"ERROR: Unable to scan directory");
+ return;
+ }
+ if (num_dir_files == 2) { // only . and . are there, empty dir
+ free(namelist[0]);
+ free(namelist[1]);
+ scan_finished = 1;
+ return;
+ } else
+ file_pos = 2;
+
+ sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
+ name = strdup(buf);
+ free(namelist[file_pos]);
+ file_pos++;
+ }
+ }
+
struct stat s;
if (verbose) {
fprintf(stderr,"Opening %s\n",name);
exit(10);
}
posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
- if (singlefile == 0) {
+
+#ifdef SSL_ENABLED
+ if (use_decryption) {
+ // free SSL resources
+ if (mem_io)
+ BIO_free(mem_io);
+ if (p7)
+ PKCS7_free(p7);
+
+ FILE *fp = fdopen(fd, "r");
+ p7 = d2i_PKCS7_fp(fp, NULL);
+ if (p7 == NULL) {
+ print_error((gs_sp_t)"Error reading SMIME message from file");
+ exit(-1);
+ }
+
+ if(!(mem_io = PKCS7_dataDecode(p7, rkey, NULL, NULL))) {
+ print_error((gs_sp_t)"Error decoding PKCS7 file\n");
+ exit(-1);
+ }
+
+ fclose(fp);
+ }
+#endif
+ if (!dir_name && !singlefile) {
unlink(name);
}
if (use_gzip) {
gs_sp_t tempdel;
gs_sp_t singlefiletmp;
gs_sp_t compressortmp;
- gs_sp_t bsatmp;
+ gs_sp_t bsatmp;
+ gs_sp_t encryptedtmp;
+ gs_sp_t maxfieldtmp
+
+ gs_sp_t pkey_fname;
+ gs_sp_t pwd_fname;
+
if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
if (strncmp(verbosetmp,"TRUE",4)==0) {
}
}
- if ((name=get_iface_properties(device,(gs_sp_t)"filename"))==0) {
- print_error((gs_sp_t)"csv_replay_init::No CSV \"Filename\" defined");
+ name=get_iface_properties(device,(gs_sp_t)"filename");
+ dir_name=get_iface_properties(device,(gs_sp_t)"directoryname");
+ if (!name && !dir_name) {
+ print_error((gs_sp_t)"csv_replay_init::Either \"Filename\" or \"Dirname\" must be defined");
exit(0);
}
+
tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
if (tempdel != 0 ) {
csvdel = tempdel[0];
if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
if (verbose) {
- fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")));
+ fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
}
- startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"));
+ startupdelay=atoi(delaytmp);
}
+
+ if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
+ max_field_csv=atoi(maxfieldtmp);
+ }
+
if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) {
if (verbose) {
fprintf(stderr,"CSV format using gshub\n");
}
gshub=1;
+ if (!name) {
+ print_error((gs_sp_t)"csv_replay_init::Filename must be defined for gshub interfaces");
+ exit(0);
+ }
+ }
+
+ pkey_fname=get_iface_properties(device,(gs_sp_t)"privatekey");
+ pwd_fname=get_iface_properties(device,(gs_sp_t)"password");
+
+ if ((encryptedtmp=get_iface_properties(device,(gs_sp_t)"encrypted"))!=0) {
+ if (strncmp(encryptedtmp,"TRUE",4)==0) {
+ #ifndef SSL_ENABLED
+ print_error((gs_sp_t)"csv_replay_init::runtime not built with SSL support to use encrypted interfaces");
+ exit(0);
+ #else
+ use_decryption=1;
+ if (verbose) {
+ fprintf(stderr,"CSV file is encrypted\n");
+ }
+ if (!pkey_fname || !pwd_fname) {
+ print_error((gs_sp_t)"csv_replay_init::privatekey and/or password filenames not specified for encrypted itnerface");
+ exit(0);
+ }
+
+ OpenSSL_add_all_algorithms();
+ ERR_load_crypto_strings();
+
+ // Read password file
+ FILE* in_fd = fopen(pwd_fname, "r");
+ if (!in_fd) {
+ fprintf(stderr, "Unable to open password file %s\n", pwd_fname);
+ exit(0);
+ }
+
+ if (!fgets(pwd, CSVMAXLINE, in_fd)) {
+ fprintf(stderr, "Error reading password from file %s\n", pwd_fname);
+ exit(0);
+ }
+ strtok(pwd, "\r\n\t ");
+ fclose(in_fd);
+
+ // Read the private key
+ in_fd = fopen(pkey_fname, "r");
+ if (!in_fd) {
+ fprintf(stderr, "Unable to open private key file %s\n", pkey_fname);
+ exit(0);
+ }
+
+ rkey = PEM_read_PrivateKey(in_fd, NULL, pass_cb, NULL);
+ if (!rkey) {
+ fprintf(stderr, "Unable to read private key file %s\n", pkey_fname);
+ exit(-1);
+ }
+
+ fclose(in_fd);
+ #endif
+ }
}
cur_packet.ptype=PTYPE_CSV;
}
} else {
#endif
- if (fd <= 0) next_file();
- while ((have = read(fd, read_pos, CHUNK)) == 0) {
+ if (fd <= 0) next_file();
+
+#ifdef SSL_ENABLED
+ if (use_decryption) {
+
+ while ((have = BIO_read (mem_io, read_pos, CHUNK)) == 0) {
if (singlefile==1) {
if(verbose) {
fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
} else {
next_file();
}
+ }
+
+ } else {
+#endif
+ while ((have = read(fd, read_pos, CHUNK)) == 0) {
+ if (singlefile==1) {
+ if(verbose) {
+ fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
+ }
+ return -2;
+
+ } else {
+ next_file();
}
+ }
+#ifdef SSL_ENABLED
+ }
+#endif
#ifdef BSA_ENABLED
}
#endif
gs_sp_t verbosetmp;
gs_sp_t delaytmp;
gs_sp_t tempdel;
+ gs_sp_t maxfieldtmp;
if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
if (strncmp(verbosetmp,"TRUE",4)==0) {
if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
if (verbose) {
- fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")));
+ fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
}
- startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"));
+ startupdelay=atoi(delaytmp);
}
+ if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
+ max_field_kafka=atoi(maxfieldtmp);
+ }
+
// set maximum field nubmer to be extracted by csv parser
csv_set_maxfield(max_field_kafka);