string UDAF [RUNNING] running_array_aggr_hfta string (string);
string UDAF [RUNNING] running_array_aggr_lfta fstring17 (uint);
+//////////////////////////////////////////////////////////
+// Mapping functions
+// Should eventually be replaced by watchlist join ?
+//////////////////////////////////
+ string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(llong, string HANDLE);
+ string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(ullong, string HANDLE);
+ string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(int, string HANDLE);
+ string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(uint, string HANDLE);
return 1;
}
+// -------------------------------------------------------
+// map_int_to_string and its support functions, structs
+
+gs_param_handle_t register_handle_for_int_to_string_map_slot_1(vstring *filename);
+gs_retval_t int_to_string_map(vstring *result, gs_int64_t val, gs_param_handle_t handle);
+gs_param_handle_t deregister_handle_for_int_to_string_map_slot_1(gs_param_handle_t handle);
#endif
#define JOIN_OP_OUTER_JOIN 3
-#define MAX_TUPLE_SIZE 1024
+#define MAX_TUPLE_SIZE 10240
template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
class join_eq_hash_operator : public base_operator {
// list of tuples from one of the channel waiting to be compared
// against tuple from the other channel
+ // Normally at least one should be empty after processing accept_tuple
list<host_tuple> input_queue[2];
// Admission control timestamp objects
- timestamp *max_input_ts[2], *curr_ts;
- bool hash_empty, curr_ts_valid;
+ timestamp *max_input_ts[2]; // largest timestamp received on this channel
+ // perhaps from a temporal tuple
+ timestamp *curr_ts; // current ts being processed.
+ bool curr_ts_valid; // both channels have a ts so curr_ts has been
+ // assigned a value
+
+ bool hash_empty; // always true, seems an artifact
+
// max tuples received from input channels
char max_input_tuple_data[2][MAX_TUPLE_SIZE];
int compare_qts_to_hashts(int i){
timestamp tmp_ts;
if(max_input_ts[i] == NULL) return(-1);
+//printf("compare_qts_to_hashts channel %d: ",i);
if(input_queue[i].empty())
return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
rts = &tmp_rts;
}
+//printf("compare_qts : ");
return(func.compare_ts_with_ts(lts,rts));
}
int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
timestamp tmp_ts;
func.load_ts_from_tup(&tmp_ts, tup);
+//printf("compare_tup_with_ts channel %d: ",tup.channel);
return(func.compare_ts_with_ts(&tmp_ts, ts));
}
void process_join(list<host_tuple>& result){
int i;
for(i=0;i<2;++i){
+//printf("\tprocess join channel %d input q len is %lu\n",i, input_queue[i].size());
while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
// apply tuples to join
int other = 1-i; // the other channel
host_tuple empty_tuple;
empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
+//printf("Processing outer join\n");
+
hash_empty = true;
typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
for(i=0;i<2;++i){
// Ensure that the queue ts is initialized.
if(max_input_ts[tup.channel] == NULL){
+//printf("Loading channel %d\n",tup.channel);
max_input_ts[tup.channel] = new timestamp();
if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
tup.free_tuple();
return(0); // can't load ts -- bail out.
}
+//printf("accept_tuple channel=%d: ",tup.channel);
int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
if (tup_order < 0){
-printf("%s: out of order ts.\n", op_name);
+ printf("%s: out of order ts, channel=%d.\n", op_name, tup.channel);
tup.free_tuple();
// even for out of order temporal tuples we need to post new temporal tuple
// Update max if larger
if(tup_order > 0){
+//printf("Loading channel %d\n",tup.channel);
func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
// save the content of the max tuple
}
int flush(list<host_tuple>& result) {
+//printf("Calling flush\n");
process_outer_join(result);
int qcmp = compare_qts();
int minq = 0; if(qcmp>0) minq = 1;
- host_tuple left_tuple, right_tuple;
- left_tuple.tuple_size=0; left_tuple.data=NULL;
- right_tuple.tuple_size=0; right_tuple.data=NULL;
+ timestamp left_ts, right_ts;
+ timestamp *left_ts_ptr = &left_ts;
+ timestamp *right_ts_ptr = &right_ts;
- if (minq == 0) {
- if(max_input_ts[minq]) {
- if (input_queue[minq].empty())
- left_tuple = max_input_tuple[minq];
- else
- left_tuple = input_queue[minq].front();
- }
- } else {
- if(max_input_ts[minq]) {
- if (input_queue[minq].empty())
- right_tuple = max_input_tuple[minq];
- else
- right_tuple = input_queue[minq].front();
- }
+
+/*
+ if (input_queue[0].empty()){
+ printf("L=max_ts, ");
+ left_ts_ptr = max_input_ts[0];
+ }else{
+ printf("L=q, ");
+ func.load_ts_from_tup(left_ts_ptr, input_queue[0].front());
+ }
+
+ if (input_queue[1].empty()){
+ printf("R=max_ts, ");
+ right_ts_ptr = max_input_ts[1];
+ }else{
+ printf("L=q, ");
+ func.load_ts_from_tup(right_ts_ptr, input_queue[1].front());
+ }
+*/
+ if(curr_ts_valid){
+ left_ts_ptr = curr_ts;
+ right_ts_ptr = curr_ts;
+ }else{
+//printf("curr_ts invalid\n");
+ left_ts_ptr = NULL;
+ right_ts_ptr = NULL;
}
result.channel = output_channel;
- return func.create_temp_status_tuple(left_tuple, right_tuple, result);
+ return func.create_temp_status_tuple(left_ts_ptr, right_ts_ptr, result);
}
#define LEFT_OUTER_JOIN_PROPERTY 1
#define RIGHT_OUTER_JOIN_PROPERTY 2
#define OUTER_JOIN_PROPERTY 3
-#define WATCHLIST_JOIN_PROPERTY 3
-#define FILTER_JOIN_PROPERTY 4
+#define WATCHLIST_JOIN_PROPERTY 4
+#define FILTER_JOIN_PROPERTY 5
// tablevar_list_t is the list of tablevars in a FROM clause
// create a temp status tuple
- ret += "int create_temp_status_tuple(const host_tuple &tup0, const host_tuple &tup1, host_tuple& result) {\n\n";
+ ret += "int create_temp_status_tuple("+this->generate_functor_name()+"_tempeqdef *lts,"+this->generate_functor_name()+"_tempeqdef *rts, host_tuple& result) {\n\n";
ret += "\tgs_retval_t retval = 0;\n";
ret += "\tgs_int32_t problem = 0;\n";
- ret += "\tif(tup0.data){\n";
-
-// Unpack all the temporal attributes references in select list
- col_id_set found_cids;
-
- for(s=0;s<select_list.size();s++){
- if (select_list[s]->se->get_data_type()->is_temporal()) {
-// Find the set of attributes accessed in this SE
- col_id_set new_cids;
- get_new_se_cids(select_list[s]->se,found_cids, new_cids, NULL);
- }
+ for(p=0;p<temporal_dt.size();p++){
+ sprintf(tmpstr,"lhs_var");
+ ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";
+ sprintf(tmpstr,"rhs_var");
+ ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";
}
- // Deal with outer join stuff
- l_cids.clear(), r_cids.clear();
- for(ocsi=found_cids.begin();ocsi!=found_cids.end();++ocsi){
- if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
- else r_cids.insert((*ocsi));
+ ret += "\tif(lts!=NULL){\n";
+ for(p=0;p<temporal_dt.size();p++){
+ ret += "\t\tlhs_var = lts->tempeq_var"+to_string(p)+";\n";
}
- unpack_null = "";
- extra_cids.clear();
- for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){
- string field = (*ocsi).field;
- if(r_equiv.count(field)){
- unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n";
- col_id_set addnl_cids;
- get_new_se_cids(r_equiv[field],l_cids,addnl_cids,NULL);
- }else{
- int schref = (*ocsi).schema_ref;
- data_type dt(schema->get_type_name(schref,field));
- literal_t empty_lit(dt.type_indicator());
- if(empty_lit.is_cpx_lit()){
- sprintf(tmpstr,"&(unpack_var_%s_1)",field.c_str());
- unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
- }else{
- unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n";
- }
- }
+ ret += "\t}else{\n";
+ for(p=0;p<temporal_dt.size();p++){
+ ret += "\t\tlhs_var = 0;\n";
}
- ret += gen_unpack_cids(schema, l_cids, "1", needs_xform);
- ret += gen_unpack_cids(schema, extra_cids, "1", needs_xform);
- ret += unpack_null;
+ ret += "\t}\n";
- ret+="\t}else if (tup1.data) {\n";
- unpack_null = ""; extra_cids.clear();
- for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){
- string field = (*ocsi).field;
- if(l_equiv.count(field)){
- unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n";
- col_id_set addnl_cids;
- get_new_se_cids(l_equiv[field],r_cids,addnl_cids,NULL);
- }else{
- int schref = (*ocsi).schema_ref;
- data_type dt(schema->get_type_name(schref,field));
- literal_t empty_lit(dt.type_indicator());
- if(empty_lit.is_cpx_lit()){
- sprintf(tmpstr,"&(unpack_var_%s_0)",field.c_str());
- unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
- }else{
- unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n";
- }
- }
+ ret += "\tif(rts!=NULL){\n";
+ for(p=0;p<temporal_dt.size();p++){
+ ret += "\t\trhs_var = rts->tempeq_var"+to_string(p)+";\n";
}
- ret += gen_unpack_cids(schema, r_cids, "1", needs_xform);
- ret += gen_unpack_cids(schema, extra_cids, "1", needs_xform);
- ret += unpack_null;
- ret+="\t}\n";
+ ret += "\t}else{\n";
+ for(p=0;p<temporal_dt.size();p++){
+ ret += "\t\trhs_var = 0;\n";
+ }
+ ret += "\t}\n";
ret += gen_init_temp_status_tuple(this->get_node_name());
// Start packing.
- ret += "//\t\tPack the fields into the tuple.\n";
- ret += gen_pack_tuple(schema,select_list,this->get_node_name(), true );
+
+
+// This is checked in the query analyzer so I think its safe,
+// But a lot of older code has complex code to propagate multiple
+// timestamps
+ for(s=0;s<select_list.size();s++){
+ scalarexp_t *se = select_list[s]->se;
+ data_type *sdt = se->get_data_type();
+ if(sdt->is_temporal()){
+ string target = "\ttuple->tuple_var"+to_string(s)+" = ";
+ if(from[0]->get_property()==0 && from[1]->get_property()==0){ // INNER
+ ret += target+"(lhs_var>rhs_var ? lhs_var : rhs_var); // INNER\n";
+ }
+ if(from[0]->get_property()!=0 && from[1]->get_property()==0){ // LEFT
+ ret += target+"lhs_var; // LEFT\n";
+// ret += target+"rhs_var; // LEFT\n";
+ }
+ if(from[0]->get_property()==0 && from[1]->get_property()!=0){ // RIGHT
+ ret += target+"rhs_var; // RIGHT\n";
+// ret += target+"lhs_var; // RIGHT\n";
+ }
+ if(from[0]->get_property()!=0 && from[1]->get_property()!=0){ // OUTER
+ ret += target+"(lhs_var<rhs_var ? lhs_var : rhs_var); // OUTER\n";
+ }
+ }
+ }
ret += "\treturn 0;\n";
#include <host_tuple.h>
#include <fta.h>
+#include<map>
+
// for htonl,ntohl
#include <netinet/in.h>
}
+// -------------------------------------------------------
+// map_int_to_string and its support functions, structs
+
+struct int_to_string_map_struct{
+ std::map<gs_int64_t, vstring> i2s_map;
+ std::string fname;
+ vstring empty_string;
+};
+
+gs_param_handle_t register_handle_for_int_to_string_map_slot_1(vstring *filename){
+ int_to_string_map_struct *map_struct;
+
+ map_struct = new int_to_string_map_struct();
+ if(map_struct == NULL){
+ gslog(LOG_EMERG, "int_to_string_map:: Could not allocate handle memory\n");
+ return 0;
+ }
+
+ map_struct->empty_string.offset = (gs_p_t)malloc(1);
+ map_struct->empty_string.reserved = INTERNAL;
+ map_struct->empty_string.length = 0;
+
+ gs_sp_t filenamec;
+ filenamec = (gs_sp_t)alloca(filename->length+1);
+ if (filenamec==0) {
+ gslog(LOG_EMERG, "int_to_string_map:: Could not allocate filename memory\n");
+ return 0;
+ }
+ memcpy(filenamec,(gs_sp_t)filename->offset,filename->length);
+ filenamec[filename->length]=0;
+ map_struct->fname = filenamec;
+
+ FILE *fl = fopen(filenamec, "r");
+ if(fl==NULL){
+ gslog(LOG_EMERG, "int_to_string_map:: Could not open regex file %s \n",filename);
+ return 0;
+ }
+
+ char buf[10000], buf_str[10000];
+ gs_int32_t buflen;
+ gs_int64_t val;
+ while(fgets(buf, buflen, fl) > 0){
+ int nvals = sscanf(buf, "%lld,%s", &val, buf_str);
+ if(nvals >= 2){
+ vstring new_str;
+ new_str.reserved = SHALLOW_COPY;
+ new_str.length = strlen(buf_str);
+ new_str.offset = (gs_p_t)malloc(new_str.length);
+ memcpy((char *)new_str.offset, buf_str, new_str.length);
+ map_struct->i2s_map[val] = new_str;
+ }
+ }
+
+ fclose(fl);
+
+ return (gs_param_handle_t) map_struct;
+}
+
+gs_retval_t int_to_string_map(vstring *result, gs_int64_t val, gs_param_handle_t handle){
+ int_to_string_map_struct *map_struct = (int_to_string_map_struct *)handle;
+ if(map_struct->i2s_map.count(val)>0){
+ vstring ret = map_struct->i2s_map[val];
+ result->offset = ret.offset;
+ result->reserved = ret.reserved;
+ result->length = ret.length;
+ }else{
+ result->offset = map_struct->empty_string.offset;
+ result->reserved = map_struct->empty_string.reserved;
+ result->length = map_struct->empty_string.length;
+ }
+
+ return 0;
+}
+
+gs_param_handle_t deregister_handle_for_int_to_string_map_slot_1(gs_param_handle_t handle){
+ int_to_string_map_struct *map_struct = (int_to_string_map_struct *)handle;
+ for(std::map<gs_int64_t, vstring>::iterator i2si = map_struct->i2s_map.begin(); i2si!=map_struct->i2s_map.end(); ++i2si){
+ free((void *)((*i2si).second.offset));
+ }
+ free((void *)(map_struct->empty_string.offset));
+ delete map_struct;
+}
+