From f6db93ae9480bc87fb36b61cec0bf17e6082b7c1 Mon Sep 17 00:00:00 2001 From: vlad shkapenyuk Date: Tue, 16 Jun 2020 15:54:21 -0400 Subject: [PATCH] Fix out-of-order errors in join operator Signed-off-by: vlad shkapenyuk Change-Id: Ifca20766b9092cc68c3bbc7ee90dd73647e32da6 --- cfg/external_fcns.def | 8 ++ include/hfta/hfta_runtime_library.h | 6 ++ include/hfta/join_eq_hash_operator.h | 72 ++++++++++++------ src/ftacmp/parse_fta.h | 4 +- src/ftacmp/query_plan.cc | 111 ++++++++++++---------------- src/lib/gscphftaaux/hfta_runtime_library.cc | 85 +++++++++++++++++++++ 6 files changed, 198 insertions(+), 88 deletions(-) diff --git a/cfg/external_fcns.def b/cfg/external_fcns.def index 0043b84..1c9eb3e 100644 --- a/cfg/external_fcns.def +++ b/cfg/external_fcns.def @@ -704,3 +704,11 @@ uint FUN [LFTA_LEGAL, COST EXPENSIVE] string UDAF [RUNNING] running_array_aggr_hfta string (string); string UDAF [RUNNING] running_array_aggr_lfta fstring17 (uint); +////////////////////////////////////////////////////////// +// Mapping functions +// Should eventually be replaced by watchlist join ? +////////////////////////////////// + string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(llong, string HANDLE); + string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(ullong, string HANDLE); + string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(int, string HANDLE); + string FUN [HFTA_ONLY, COST EXPENSIVE] int_to_string_map(uint, string HANDLE); diff --git a/include/hfta/hfta_runtime_library.h b/include/hfta/hfta_runtime_library.h index afcf682..2839bef 100644 --- a/include/hfta/hfta_runtime_library.h +++ b/include/hfta/hfta_runtime_library.h @@ -126,6 +126,12 @@ inline gs_uint32_t str_match_offset( gs_uint32_t offset, struct vstring * s1, st return 1; } +// ------------------------------------------------------- +// map_int_to_string and its support functions, structs + +gs_param_handle_t register_handle_for_int_to_string_map_slot_1(vstring *filename); +gs_retval_t int_to_string_map(vstring *result, gs_int64_t val, gs_param_handle_t handle); +gs_param_handle_t deregister_handle_for_int_to_string_map_slot_1(gs_param_handle_t handle); #endif diff --git a/include/hfta/join_eq_hash_operator.h b/include/hfta/join_eq_hash_operator.h index 8324a31..0b9a9f1 100644 --- a/include/hfta/join_eq_hash_operator.h +++ b/include/hfta/join_eq_hash_operator.h @@ -30,7 +30,7 @@ using namespace std; #define JOIN_OP_OUTER_JOIN 3 -#define MAX_TUPLE_SIZE 1024 +#define MAX_TUPLE_SIZE 10240 template class join_eq_hash_operator : public base_operator { @@ -44,11 +44,18 @@ int n_calls, n_iters, n_eqk; // list of tuples from one of the channel waiting to be compared // against tuple from the other channel + // Normally at least one should be empty after processing accept_tuple list input_queue[2]; // Admission control timestamp objects - timestamp *max_input_ts[2], *curr_ts; - bool hash_empty, curr_ts_valid; + timestamp *max_input_ts[2]; // largest timestamp received on this channel + // perhaps from a temporal tuple + timestamp *curr_ts; // current ts being processed. + bool curr_ts_valid; // both channels have a ts so curr_ts has been + // assigned a value + + bool hash_empty; // always true, seems an artifact + // max tuples received from input channels char max_input_tuple_data[2][MAX_TUPLE_SIZE]; @@ -91,6 +98,7 @@ int n_calls, n_iters, n_eqk; int compare_qts_to_hashts(int i){ timestamp tmp_ts; if(max_input_ts[i] == NULL) return(-1); +//printf("compare_qts_to_hashts channel %d: ",i); if(input_queue[i].empty()) return(func.compare_ts_with_ts(max_input_ts[i], curr_ts)); func.load_ts_from_tup(&tmp_ts,input_queue[i].front()); @@ -117,18 +125,21 @@ int n_calls, n_iters, n_eqk; rts = &tmp_rts; } +//printf("compare_qts : "); return(func.compare_ts_with_ts(lts,rts)); } int compare_tup_with_ts(host_tuple &tup, timestamp *ts){ timestamp tmp_ts; func.load_ts_from_tup(&tmp_ts, tup); +//printf("compare_tup_with_ts channel %d: ",tup.channel); return(func.compare_ts_with_ts(&tmp_ts, ts)); } void process_join(list& result){ int i; for(i=0;i<2;++i){ +//printf("\tprocess join channel %d input q len is %lu\n",i, input_queue[i].size()); while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){ // apply tuples to join int other = 1-i; // the other channel @@ -176,6 +187,8 @@ int n_calls, n_iters, n_eqk; host_tuple empty_tuple; empty_tuple.tuple_size = 0; empty_tuple.data = NULL; +//printf("Processing outer join\n"); + hash_empty = true; typename hash_table::iterator jti; for(i=0;i<2;++i){ @@ -249,6 +262,7 @@ n_calls=0; n_iters=0; n_eqk=0; // Ensure that the queue ts is initialized. if(max_input_ts[tup.channel] == NULL){ +//printf("Loading channel %d\n",tup.channel); max_input_ts[tup.channel] = new timestamp(); if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){ tup.free_tuple(); @@ -275,9 +289,10 @@ n_calls=0; n_iters=0; n_eqk=0; return(0); // can't load ts -- bail out. } +//printf("accept_tuple channel=%d: ",tup.channel); int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]); if (tup_order < 0){ -printf("%s: out of order ts.\n", op_name); + printf("%s: out of order ts, channel=%d.\n", op_name, tup.channel); tup.free_tuple(); // even for out of order temporal tuples we need to post new temporal tuple @@ -292,6 +307,7 @@ printf("%s: out of order ts.\n", op_name); // Update max if larger if(tup_order > 0){ +//printf("Loading channel %d\n",tup.channel); func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts); // save the content of the max tuple @@ -345,6 +361,7 @@ printf("%s: out of order ts.\n", op_name); } int flush(list& result) { +//printf("Calling flush\n"); process_outer_join(result); @@ -376,28 +393,39 @@ printf("%s: out of order ts.\n", op_name); int qcmp = compare_qts(); int minq = 0; if(qcmp>0) minq = 1; - host_tuple left_tuple, right_tuple; - left_tuple.tuple_size=0; left_tuple.data=NULL; - right_tuple.tuple_size=0; right_tuple.data=NULL; + timestamp left_ts, right_ts; + timestamp *left_ts_ptr = &left_ts; + timestamp *right_ts_ptr = &right_ts; - if (minq == 0) { - if(max_input_ts[minq]) { - if (input_queue[minq].empty()) - left_tuple = max_input_tuple[minq]; - else - left_tuple = input_queue[minq].front(); - } - } else { - if(max_input_ts[minq]) { - if (input_queue[minq].empty()) - right_tuple = max_input_tuple[minq]; - else - right_tuple = input_queue[minq].front(); - } + +/* + if (input_queue[0].empty()){ + printf("L=max_ts, "); + left_ts_ptr = max_input_ts[0]; + }else{ + printf("L=q, "); + func.load_ts_from_tup(left_ts_ptr, input_queue[0].front()); + } + + if (input_queue[1].empty()){ + printf("R=max_ts, "); + right_ts_ptr = max_input_ts[1]; + }else{ + printf("L=q, "); + func.load_ts_from_tup(right_ts_ptr, input_queue[1].front()); + } +*/ + if(curr_ts_valid){ + left_ts_ptr = curr_ts; + right_ts_ptr = curr_ts; + }else{ +//printf("curr_ts invalid\n"); + left_ts_ptr = NULL; + right_ts_ptr = NULL; } result.channel = output_channel; - return func.create_temp_status_tuple(left_tuple, right_tuple, result); + return func.create_temp_status_tuple(left_ts_ptr, right_ts_ptr, result); } diff --git a/src/ftacmp/parse_fta.h b/src/ftacmp/parse_fta.h index 94c2469..cae9898 100644 --- a/src/ftacmp/parse_fta.h +++ b/src/ftacmp/parse_fta.h @@ -693,8 +693,8 @@ public: #define LEFT_OUTER_JOIN_PROPERTY 1 #define RIGHT_OUTER_JOIN_PROPERTY 2 #define OUTER_JOIN_PROPERTY 3 -#define WATCHLIST_JOIN_PROPERTY 3 -#define FILTER_JOIN_PROPERTY 4 +#define WATCHLIST_JOIN_PROPERTY 4 +#define FILTER_JOIN_PROPERTY 5 // tablevar_list_t is the list of tablevars in a FROM clause diff --git a/src/ftacmp/query_plan.cc b/src/ftacmp/query_plan.cc index 2e2286e..8e05ae2 100644 --- a/src/ftacmp/query_plan.cc +++ b/src/ftacmp/query_plan.cc @@ -11134,84 +11134,67 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_ // create a temp status tuple - ret += "int create_temp_status_tuple(const host_tuple &tup0, const host_tuple &tup1, host_tuple& result) {\n\n"; + ret += "int create_temp_status_tuple("+this->generate_functor_name()+"_tempeqdef *lts,"+this->generate_functor_name()+"_tempeqdef *rts, host_tuple& result) {\n\n"; ret += "\tgs_retval_t retval = 0;\n"; ret += "\tgs_int32_t problem = 0;\n"; - ret += "\tif(tup0.data){\n"; - -// Unpack all the temporal attributes references in select list - col_id_set found_cids; - - for(s=0;sse->get_data_type()->is_temporal()) { -// Find the set of attributes accessed in this SE - col_id_set new_cids; - get_new_se_cids(select_list[s]->se,found_cids, new_cids, NULL); - } + for(p=0;pmake_host_cvar(tmpstr)+";\n"; + sprintf(tmpstr,"rhs_var"); + ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n"; } - // Deal with outer join stuff - l_cids.clear(), r_cids.clear(); - for(ocsi=found_cids.begin();ocsi!=found_cids.end();++ocsi){ - if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi)); - else r_cids.insert((*ocsi)); + ret += "\tif(lts!=NULL){\n"; + for(p=0;pget_type_name(schref,field)); - literal_t empty_lit(dt.type_indicator()); - if(empty_lit.is_cpx_lit()){ - sprintf(tmpstr,"&(unpack_var_%s_1)",field.c_str()); - unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n"; - }else{ - unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n"; - } - } + ret += "\t}else{\n"; + for(p=0;pget_type_name(schref,field)); - literal_t empty_lit(dt.type_indicator()); - if(empty_lit.is_cpx_lit()){ - sprintf(tmpstr,"&(unpack_var_%s_0)",field.c_str()); - unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n"; - }else{ - unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n"; - } - } + ret += "\tif(rts!=NULL){\n"; + for(p=0;pget_node_name()); // Start packing. - ret += "//\t\tPack the fields into the tuple.\n"; - ret += gen_pack_tuple(schema,select_list,this->get_node_name(), true ); + + +// This is checked in the query analyzer so I think its safe, +// But a lot of older code has complex code to propagate multiple +// timestamps + for(s=0;sse; + data_type *sdt = se->get_data_type(); + if(sdt->is_temporal()){ + string target = "\ttuple->tuple_var"+to_string(s)+" = "; + if(from[0]->get_property()==0 && from[1]->get_property()==0){ // INNER + ret += target+"(lhs_var>rhs_var ? lhs_var : rhs_var); // INNER\n"; + } + if(from[0]->get_property()!=0 && from[1]->get_property()==0){ // LEFT + ret += target+"lhs_var; // LEFT\n"; +// ret += target+"rhs_var; // LEFT\n"; + } + if(from[0]->get_property()==0 && from[1]->get_property()!=0){ // RIGHT + ret += target+"rhs_var; // RIGHT\n"; +// ret += target+"lhs_var; // RIGHT\n"; + } + if(from[0]->get_property()!=0 && from[1]->get_property()!=0){ // OUTER + ret += target+"(lhs_var #include +#include + // for htonl,ntohl #include @@ -551,3 +553,86 @@ gs_uint32_t byte_match_offset( gs_uint32_t offset, gs_uint32_t val,vstring *s2) } +// ------------------------------------------------------- +// map_int_to_string and its support functions, structs + +struct int_to_string_map_struct{ + std::map i2s_map; + std::string fname; + vstring empty_string; +}; + +gs_param_handle_t register_handle_for_int_to_string_map_slot_1(vstring *filename){ + int_to_string_map_struct *map_struct; + + map_struct = new int_to_string_map_struct(); + if(map_struct == NULL){ + gslog(LOG_EMERG, "int_to_string_map:: Could not allocate handle memory\n"); + return 0; + } + + map_struct->empty_string.offset = (gs_p_t)malloc(1); + map_struct->empty_string.reserved = INTERNAL; + map_struct->empty_string.length = 0; + + gs_sp_t filenamec; + filenamec = (gs_sp_t)alloca(filename->length+1); + if (filenamec==0) { + gslog(LOG_EMERG, "int_to_string_map:: Could not allocate filename memory\n"); + return 0; + } + memcpy(filenamec,(gs_sp_t)filename->offset,filename->length); + filenamec[filename->length]=0; + map_struct->fname = filenamec; + + FILE *fl = fopen(filenamec, "r"); + if(fl==NULL){ + gslog(LOG_EMERG, "int_to_string_map:: Could not open regex file %s \n",filename); + return 0; + } + + char buf[10000], buf_str[10000]; + gs_int32_t buflen; + gs_int64_t val; + while(fgets(buf, buflen, fl) > 0){ + int nvals = sscanf(buf, "%lld,%s", &val, buf_str); + if(nvals >= 2){ + vstring new_str; + new_str.reserved = SHALLOW_COPY; + new_str.length = strlen(buf_str); + new_str.offset = (gs_p_t)malloc(new_str.length); + memcpy((char *)new_str.offset, buf_str, new_str.length); + map_struct->i2s_map[val] = new_str; + } + } + + fclose(fl); + + return (gs_param_handle_t) map_struct; +} + +gs_retval_t int_to_string_map(vstring *result, gs_int64_t val, gs_param_handle_t handle){ + int_to_string_map_struct *map_struct = (int_to_string_map_struct *)handle; + if(map_struct->i2s_map.count(val)>0){ + vstring ret = map_struct->i2s_map[val]; + result->offset = ret.offset; + result->reserved = ret.reserved; + result->length = ret.length; + }else{ + result->offset = map_struct->empty_string.offset; + result->reserved = map_struct->empty_string.reserved; + result->length = map_struct->empty_string.length; + } + + return 0; +} + +gs_param_handle_t deregister_handle_for_int_to_string_map_slot_1(gs_param_handle_t handle){ + int_to_string_map_struct *map_struct = (int_to_string_map_struct *)handle; + for(std::map::iterator i2si = map_struct->i2s_map.begin(); i2si!=map_struct->i2s_map.end(); ++i2si){ + free((void *)((*i2si).second.offset)); + } + free((void *)(map_struct->empty_string.offset)); + delete map_struct; +} + -- 2.16.6