Fix out-of-order errors in join operator 46/4146/1
authorvlad shkapenyuk <vshkap@research.att.com>
Tue, 16 Jun 2020 19:54:21 +0000 (15:54 -0400)
committervlad shkapenyuk <vshkap@research.att.com>
Tue, 16 Jun 2020 19:54:21 +0000 (15:54 -0400)
Signed-off-by: vlad shkapenyuk <vshkap@research.att.com>
Change-Id: Ifca20766b9092cc68c3bbc7ee90dd73647e32da6

cfg/external_fcns.def
include/hfta/hfta_runtime_library.h
include/hfta/join_eq_hash_operator.h
src/ftacmp/parse_fta.h
src/ftacmp/query_plan.cc
src/lib/gscphftaaux/hfta_runtime_library.cc

index 0043b84..1c9eb3e 100644 (file)
@@ -704,3 +704,11 @@ uint FUN [LFTA_LEGAL, COST EXPENSIVE]
        string UDAF [RUNNING] running_array_aggr_hfta string (string);
        string UDAF [RUNNING] running_array_aggr_lfta fstring17 (uint);
        
+//////////////////////////////////////////////////////////
+//     Mapping functions
+//     Should eventually be replaced by watchlist join ?
+//////////////////////////////////
+       string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(llong, string HANDLE);
+       string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(ullong, string HANDLE);
+       string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(int, string HANDLE);
+       string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(uint, string HANDLE);
index afcf682..2839bef 100644 (file)
@@ -126,6 +126,12 @@ inline gs_uint32_t str_match_offset( gs_uint32_t offset, struct vstring * s1, st
        return 1;
 }
 
+// -------------------------------------------------------
+//             map_int_to_string and its support functions, structs
+
+gs_param_handle_t register_handle_for_int_to_string_map_slot_1(vstring *filename);
+gs_retval_t int_to_string_map(vstring *result, gs_int64_t val, gs_param_handle_t handle);
+gs_param_handle_t deregister_handle_for_int_to_string_map_slot_1(gs_param_handle_t handle);
 
 
 #endif
index 8324a31..0b9a9f1 100644 (file)
@@ -30,7 +30,7 @@ using namespace std;
 #define JOIN_OP_OUTER_JOIN 3
 
 
-#define MAX_TUPLE_SIZE 1024
+#define MAX_TUPLE_SIZE 10240
 
 template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
 class join_eq_hash_operator : public base_operator {
@@ -44,11 +44,18 @@ int n_calls, n_iters, n_eqk;
 
        // list of tuples from one of the channel waiting to be compared
        // against tuple from the other channel
+       // Normally at least one should be empty after processing accept_tuple
        list<host_tuple> input_queue[2];
 
 //             Admission control timestamp objects
-       timestamp *max_input_ts[2], *curr_ts;
-       bool hash_empty, curr_ts_valid;
+       timestamp *max_input_ts[2];     // largest timestamp received on this channel
+                                                               // perhaps from a temporal tuple
+       timestamp *curr_ts;                     // current ts being processed.
+       bool curr_ts_valid;                     // both channels have a ts so curr_ts has been
+                                                               // assigned a value
+
+       bool hash_empty;                        // always true, seems an artifact
+
 
 //     max tuples received from input channels
        char max_input_tuple_data[2][MAX_TUPLE_SIZE];
@@ -91,6 +98,7 @@ int n_calls, n_iters, n_eqk;
        int compare_qts_to_hashts(int i){
                timestamp tmp_ts;
                if(max_input_ts[i] == NULL) return(-1);
+//printf("compare_qts_to_hashts channel %d: ",i);
                if(input_queue[i].empty())
                        return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
                func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
@@ -117,18 +125,21 @@ int n_calls, n_iters, n_eqk;
                        rts = &tmp_rts;
                }
 
+//printf("compare_qts : ");
                return(func.compare_ts_with_ts(lts,rts));
        }
 
        int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
                timestamp tmp_ts;
                func.load_ts_from_tup(&tmp_ts, tup);
+//printf("compare_tup_with_ts channel %d: ",tup.channel);
                return(func.compare_ts_with_ts(&tmp_ts, ts));
        }
 
        void process_join(list<host_tuple>& result){
          int i;
          for(i=0;i<2;++i){
+//printf("\tprocess join channel %d input q len is %lu\n",i, input_queue[i].size());
                while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
 //                             apply tuples to join
                        int other = 1-i;        // the other channel
@@ -176,6 +187,8 @@ int n_calls, n_iters, n_eqk;
     host_tuple empty_tuple;
        empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
 
+//printf("Processing outer join\n");
+
        hash_empty = true;
        typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
        for(i=0;i<2;++i){
@@ -249,6 +262,7 @@ n_calls=0; n_iters=0; n_eqk=0;
 
 //                     Ensure that the queue ts is initialized.
                if(max_input_ts[tup.channel] == NULL){
+//printf("Loading channel %d\n",tup.channel);
                        max_input_ts[tup.channel] = new timestamp();
                        if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
                                tup.free_tuple();
@@ -275,9 +289,10 @@ n_calls=0; n_iters=0; n_eqk=0;
                        return(0);      // can't load ts -- bail out.
                }
 
+//printf("accept_tuple channel=%d: ",tup.channel);
            int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
                if (tup_order < 0){
-printf("%s: out of order ts.\n", op_name);
+                       printf("%s: out of order ts, channel=%d.\n", op_name, tup.channel);
                        tup.free_tuple();
 
                        // even for out of order temporal tuples we need to post new temporal tuple
@@ -292,6 +307,7 @@ printf("%s: out of order ts.\n", op_name);
 
 //     Update max if larger
                if(tup_order > 0){
+//printf("Loading channel %d\n",tup.channel);
                        func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
 
                        // save the content of the max tuple
@@ -345,6 +361,7 @@ printf("%s: out of order ts.\n", op_name);
        }
 
        int flush(list<host_tuple>& result) {
+//printf("Calling flush\n");
 
                process_outer_join(result);
 
@@ -376,28 +393,39 @@ printf("%s: out of order ts.\n", op_name);
                int qcmp = compare_qts();
                int minq = 0; if(qcmp>0) minq = 1;
 
-               host_tuple left_tuple, right_tuple;
-               left_tuple.tuple_size=0; left_tuple.data=NULL;
-               right_tuple.tuple_size=0; right_tuple.data=NULL;
+               timestamp left_ts, right_ts;
+               timestamp *left_ts_ptr = &left_ts;
+               timestamp *right_ts_ptr = &right_ts;
 
-               if (minq == 0) {
-                       if(max_input_ts[minq]) {
-                               if (input_queue[minq].empty())
-                                       left_tuple = max_input_tuple[minq];
-                               else
-                                       left_tuple = input_queue[minq].front();
-                       }
-               } else {
-                       if(max_input_ts[minq]) {
-                               if (input_queue[minq].empty())
-                                       right_tuple = max_input_tuple[minq];
-                               else
-                                       right_tuple = input_queue[minq].front();
-                       }
+               
+/*
+               if (input_queue[0].empty()){
+                       printf("L=max_ts, ");
+                       left_ts_ptr = max_input_ts[0];
+               }else{
+                       printf("L=q, ");
+                       func.load_ts_from_tup(left_ts_ptr, input_queue[0].front());
+               }
+               
+               if (input_queue[1].empty()){
+                       printf("R=max_ts, ");
+                       right_ts_ptr = max_input_ts[1];
+               }else{
+                       printf("L=q, ");
+                       func.load_ts_from_tup(right_ts_ptr, input_queue[1].front());
+               }
+*/
+               if(curr_ts_valid){
+                       left_ts_ptr = curr_ts;
+                       right_ts_ptr = curr_ts;
+               }else{
+//printf("curr_ts invalid\n");
+                       left_ts_ptr = NULL;
+                       right_ts_ptr = NULL;
                }
 
                result.channel = output_channel;
-               return func.create_temp_status_tuple(left_tuple, right_tuple, result);
+               return func.create_temp_status_tuple(left_ts_ptr, right_ts_ptr, result);
        }
 
 
index 94c2469..cae9898 100644 (file)
@@ -693,8 +693,8 @@ public:
 #define LEFT_OUTER_JOIN_PROPERTY 1
 #define RIGHT_OUTER_JOIN_PROPERTY 2
 #define OUTER_JOIN_PROPERTY 3
-#define WATCHLIST_JOIN_PROPERTY 3
-#define FILTER_JOIN_PROPERTY 4
+#define WATCHLIST_JOIN_PROPERTY 4
+#define FILTER_JOIN_PROPERTY 5
 
 //             tablevar_list_t is the list of tablevars in a FROM clause
 
index 2e2286e..8e05ae2 100644 (file)
@@ -11134,84 +11134,67 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_
 
 
 //             create a temp status tuple
-       ret += "int create_temp_status_tuple(const host_tuple &tup0, const host_tuple &tup1, host_tuple& result) {\n\n";
+       ret += "int create_temp_status_tuple("+this->generate_functor_name()+"_tempeqdef *lts,"+this->generate_functor_name()+"_tempeqdef *rts, host_tuple& result) {\n\n";
 
        ret += "\tgs_retval_t retval = 0;\n";
        ret += "\tgs_int32_t problem = 0;\n";
 
-       ret += "\tif(tup0.data){\n";
-
-//             Unpack all the temporal attributes references in select list
-       col_id_set found_cids;
-
-       for(s=0;s<select_list.size();s++){
-               if (select_list[s]->se->get_data_type()->is_temporal()) {
-//                     Find the set of attributes accessed in this SE
-                       col_id_set new_cids;
-                       get_new_se_cids(select_list[s]->se,found_cids, new_cids, NULL);
-               }
+       for(p=0;p<temporal_dt.size();p++){
+               sprintf(tmpstr,"lhs_var");
+               ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";         
+               sprintf(tmpstr,"rhs_var");
+               ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";         
        }
 
-       //                      Deal with outer join stuff
-       l_cids.clear(), r_cids.clear();
-       for(ocsi=found_cids.begin();ocsi!=found_cids.end();++ocsi){
-               if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
-               else                                            r_cids.insert((*ocsi));
+       ret += "\tif(lts!=NULL){\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\tlhs_var = lts->tempeq_var"+to_string(p)+";\n";
        }
-       unpack_null = "";
-       extra_cids.clear();
-       for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){
-               string field = (*ocsi).field;
-               if(r_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n";
-                       col_id_set addnl_cids;
-                       get_new_se_cids(r_equiv[field],l_cids,addnl_cids,NULL);
-               }else{
-               int schref = (*ocsi).schema_ref;
-                       data_type dt(schema->get_type_name(schref,field));
-                       literal_t empty_lit(dt.type_indicator());
-                       if(empty_lit.is_cpx_lit()){
-                               sprintf(tmpstr,"&(unpack_var_%s_1)",field.c_str());
-                               unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
-                       }else{
-                               unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n";
-                       }
-               }
+       ret += "\t}else{\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\tlhs_var = 0;\n";
        }
-       ret += gen_unpack_cids(schema,  l_cids, "1", needs_xform);
-       ret += gen_unpack_cids(schema,  extra_cids, "1", needs_xform);
-       ret += unpack_null;
+       ret += "\t}\n";
 
-       ret+="\t}else if (tup1.data) {\n";
-       unpack_null = ""; extra_cids.clear();
-       for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){
-               string field = (*ocsi).field;
-               if(l_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n";
-                       col_id_set addnl_cids;
-                       get_new_se_cids(l_equiv[field],r_cids,addnl_cids,NULL);
-               }else{
-               int schref = (*ocsi).schema_ref;
-                       data_type dt(schema->get_type_name(schref,field));
-                       literal_t empty_lit(dt.type_indicator());
-                       if(empty_lit.is_cpx_lit()){
-                               sprintf(tmpstr,"&(unpack_var_%s_0)",field.c_str());
-                               unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
-                       }else{
-                               unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n";
-                       }
-               }
+       ret += "\tif(rts!=NULL){\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\trhs_var = rts->tempeq_var"+to_string(p)+";\n";
        }
-       ret += gen_unpack_cids(schema,  r_cids, "1", needs_xform);
-       ret += gen_unpack_cids(schema,  extra_cids, "1", needs_xform);
-       ret += unpack_null;
-       ret+="\t}\n";
+       ret += "\t}else{\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\trhs_var = 0;\n";
+       }
+       ret += "\t}\n";
 
        ret += gen_init_temp_status_tuple(this->get_node_name());
 
 //             Start packing.
-       ret += "//\t\tPack the fields into the tuple.\n";
-       ret += gen_pack_tuple(schema,select_list,this->get_node_name(), true );
+
+
+//             This is checked in the query analyzer so I think its safe,
+//             But a lot of older code has complex code to propagate multiple
+//             timestamps
+    for(s=0;s<select_list.size();s++){
+               scalarexp_t *se  = select_list[s]->se;
+        data_type *sdt = se->get_data_type();
+               if(sdt->is_temporal()){
+                       string target = "\ttuple->tuple_var"+to_string(s)+" = ";
+                       if(from[0]->get_property()==0 && from[1]->get_property()==0){ // INNER
+                               ret += target+"(lhs_var>rhs_var ? lhs_var : rhs_var); // INNER\n";
+                       }
+                       if(from[0]->get_property()!=0 && from[1]->get_property()==0){ // LEFT
+                               ret += target+"lhs_var; // LEFT\n";
+//                             ret += target+"rhs_var; // LEFT\n";
+                       }
+                       if(from[0]->get_property()==0 && from[1]->get_property()!=0){ // RIGHT
+                               ret += target+"rhs_var; // RIGHT\n";
+//                             ret += target+"lhs_var; // RIGHT\n";
+                       }
+                       if(from[0]->get_property()!=0 && from[1]->get_property()!=0){ // OUTER
+                               ret += target+"(lhs_var<rhs_var ? lhs_var : rhs_var); // OUTER\n";
+                       }
+               }
+       }
 
 
        ret += "\treturn 0;\n";
index ccae6e3..5cabb20 100644 (file)
@@ -31,6 +31,8 @@ extern "C" {
 #include <host_tuple.h>
 #include <fta.h>
 
+#include<map>
+
 // for htonl,ntohl
 #include <netinet/in.h>
 
@@ -551,3 +553,86 @@ gs_uint32_t byte_match_offset( gs_uint32_t offset, gs_uint32_t val,vstring *s2)
 }
 
 
+// -------------------------------------------------------
+//             map_int_to_string and its support functions, structs
+
+struct int_to_string_map_struct{
+       std::map<gs_int64_t, vstring> i2s_map;
+       std::string fname;
+       vstring empty_string;
+};
+
+gs_param_handle_t register_handle_for_int_to_string_map_slot_1(vstring *filename){
+       int_to_string_map_struct *map_struct;
+
+       map_struct = new int_to_string_map_struct();
+       if(map_struct == NULL){
+               gslog(LOG_EMERG, "int_to_string_map:: Could not allocate handle memory\n");
+               return 0;
+       }
+
+       map_struct->empty_string.offset = (gs_p_t)malloc(1);
+       map_struct->empty_string.reserved = INTERNAL;
+       map_struct->empty_string.length = 0;
+
+       gs_sp_t filenamec;
+       filenamec = (gs_sp_t)alloca(filename->length+1);
+       if (filenamec==0) {
+               gslog(LOG_EMERG, "int_to_string_map:: Could not allocate filename memory\n");
+               return 0;
+       }
+       memcpy(filenamec,(gs_sp_t)filename->offset,filename->length);
+       filenamec[filename->length]=0;  
+       map_struct->fname = filenamec;
+
+       FILE *fl = fopen(filenamec, "r");
+       if(fl==NULL){
+               gslog(LOG_EMERG, "int_to_string_map:: Could not open regex file %s \n",filename);
+               return 0;
+       }
+       
+       char buf[10000], buf_str[10000];
+       gs_int32_t buflen;
+       gs_int64_t val;
+       while(fgets(buf, buflen, fl) > 0){
+               int nvals = sscanf(buf, "%lld,%s", &val, buf_str);
+               if(nvals >= 2){
+                       vstring new_str;
+                       new_str.reserved = SHALLOW_COPY;
+                       new_str.length = strlen(buf_str);
+                       new_str.offset = (gs_p_t)malloc(new_str.length);
+                       memcpy((char *)new_str.offset, buf_str, new_str.length);
+                       map_struct->i2s_map[val] = new_str;
+               }
+       }
+
+       fclose(fl);
+
+       return (gs_param_handle_t) map_struct;
+}
+
+gs_retval_t int_to_string_map(vstring *result, gs_int64_t val, gs_param_handle_t handle){
+       int_to_string_map_struct *map_struct = (int_to_string_map_struct *)handle;
+       if(map_struct->i2s_map.count(val)>0){
+               vstring ret = map_struct->i2s_map[val];
+               result->offset = ret.offset;
+               result->reserved = ret.reserved;
+               result->length = ret.length;
+       }else{
+               result->offset = map_struct->empty_string.offset;
+               result->reserved = map_struct->empty_string.reserved;
+               result->length = map_struct->empty_string.length;
+       }
+
+       return 0;
+}
+
+gs_param_handle_t deregister_handle_for_int_to_string_map_slot_1(gs_param_handle_t handle){
+       int_to_string_map_struct *map_struct = (int_to_string_map_struct *)handle;
+       for(std::map<gs_int64_t, vstring>::iterator i2si = map_struct->i2s_map.begin(); i2si!=map_struct->i2s_map.end(); ++i2si){
+               free((void *)((*i2si).second.offset));
+       }
+       free((void *)(map_struct->empty_string.offset));
+       delete map_struct;
+}
+