Update running groupby operator 73/3673/1
authorvlad shkapenyuk <vshkap@research.att.com>
Tue, 12 May 2020 13:42:58 +0000 (09:42 -0400)
committervlad shkapenyuk <vshkap@research.att.com>
Tue, 12 May 2020 13:42:58 +0000 (09:42 -0400)
Signed-off-by: vlad shkapenyuk <vshkap@research.att.com>
Change-Id: I4e0edbf2914a2b92edb3ade592a3fc1f774fd2b3

include/hfta/join_eq_hash_operator.h
include/hfta/running_gb_operator.h
include/lfta/rts_external.h
src/ftacmp/query_plan.cc
src/ftacmp/translate_fta.cc
src/lib/gscplftaaux/rts_string.c

index 5038f7a..911074e 100644 (file)
@@ -277,7 +277,7 @@ n_calls=0; n_iters=0; n_eqk=0;
 
            int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
                if (tup_order < 0){
-printf("out of order ts.\n");
+printf("%s: out of order ts.\n", op_name);
                        tup.free_tuple();
 
                        // even for out of order temporal tuples we need to post new temporal tuple
index 5bd7388..c20c87f 100644 (file)
@@ -31,6 +31,7 @@ private :
        groupby_func func;
        hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
        typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+       gs_int32_t nflushes;
 
 
 
@@ -43,21 +44,17 @@ public:
 
 //                     Push out completed groups
 
-               // create buffer on the stack to store key object
+// create buffer on the stack to store key object
                char buffer[sizeof(group)];
+//     Number of flushes required
 
-               // extract the key information from the tuple and
-               // copy it into buffer
+// extract the key information from the tuple and
+// copy it into buffer
                group* grp = func.create_group(tup, buffer);
-/*//                   Ignore temp tuples until we can fix their timestamps.
-if (func.temp_status_received()) {
- tup.free_tuple();
- return 0;
-}
-*/
-
+               nflushes = func.flush_needed();
+               
                if (!grp) {
-                       if (func.flush_needed()){
+                       if (nflushes>0){
                                flush(result);
                        }
                        if (func.temp_status_received()) {
@@ -71,7 +68,7 @@ if (func.temp_status_received()) {
                        return 0;
                }
 
-               if (func.flush_needed()) {
+               if (nflushes>0) {
                        flush(result);
                }
                typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
@@ -95,24 +92,40 @@ if (func.temp_status_received()) {
        virtual int flush(list<host_tuple>& result) {
                host_tuple tup;
                typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-//             If the old table isn't empty, flush it now.
-               for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
-                       bool failed = false;
-                       tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-                       if (!failed) {
-                               tup.channel = output_channel;
-                               result.push_back(tup);
-                       }
-                       if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
-                           group* g = (*flush_pos).first;
-                       aggregate* a = (*flush_pos).second;
-                               ++flush_pos;
-                               group_table.erase(g);
-                               delete (g);
-                               delete (a);
+
+//     Limit the number of successive flushes - avoid explosive behavior
+               const gs_int32_t max_flushes = 10;
+               if(nflushes>max_flushes){
+                       fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
+                       nflushes = max_flushes;
+               }
+
+               for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
+//     advance the TB for the reinit
+                       if(flush_no < nflushes-1){
+                               func.advance_last_tb();
                        }else{
-                               func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
-                               ++flush_pos;
+                               func.reset_last_tb();   // Move to current tb in case flush limit reached
+                       }
+//             If the old table isn't empty, flush it now.
+                       for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
+                               bool failed = false;
+                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+                               if (!failed) {
+                                       tup.channel = output_channel;
+                                       result.push_back(tup);
+                               }
+                               if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
+                               group* g = (*flush_pos).first;
+                               aggregate* a = (*flush_pos).second;
+                                       ++flush_pos;
+                                       group_table.erase(g);
+                                       delete (g);
+                                       delete (a);
+                               }else{
+                                       func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
+                                       ++flush_pos;
+                               }
                        }
                }
 
index 358527e..d1fdf7b 100644 (file)
@@ -69,6 +69,9 @@ gs_retval_t str_exists_substr( struct gs_string * str1, struct gs_string * str2)
 
 gs_retval_t str_compare( struct gs_string * str1, struct gs_string * str2);
 
+/* String equality */
+
+gs_retval_t str_equal( struct gs_string * str1, struct gs_string * str2);
 
 /*     Construct a string constant */
 
index 512299f..2e2286e 100644 (file)
@@ -4914,6 +4914,7 @@ vector<qp_node *> join_eq_hash_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, t
                        child_qpn->table_name = new tablevar_t(
                           from[f]->get_interface().c_str(), from[f]->get_schema_name().c_str(), from[f]->get_ifq());
                        child_qpn->table_name->set_range_var(from[f]->get_var_name());
+                       child_qpn->table_name->set_machine(from[f]->get_machine());
 
                        child_vec.push_back(child_qpn);
                        select_vec.push_back(&(child_qpn->select_list));
@@ -7032,6 +7033,29 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        return(ret);
 }
 
+static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){
+       string ret;
+
+    if(dt->complex_comparison(dt) ){
+               ret.append(dt->get_hfta_comparison_fcn(dt));
+               ret.append("(");
+                       if(dt->is_buffer_type() )
+                               ret.append("&");
+               ret.append(lhs_op);
+               ret.append(", ");
+                       if(dt->is_buffer_type() )
+                               ret.append("&");
+               ret.append(rhs_op );
+               ret.append(") == 1");
+       }else{
+               ret.append(lhs_op );
+               ret.append(" < ");
+               ret.append(rhs_op );
+       }
+
+       return(ret);
+}
+
 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
        string ret;
 
@@ -12863,13 +12887,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
                        if(gdt->is_temporal()){
-                         sprintf(tmpstr,"last_gb%d",g);
+                         sprintf(tmpstr,"curr_gb%d",g);
                          ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
-                         sprintf(tmpstr,"last_flushed_gb%d",g);
+                         sprintf(tmpstr,"last_gb%d",g);
                          ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
                        }
                }
-               ret += "\tbool needs_temporal_flush;\n";
+               ret += "\tgs_int32_t needs_temporal_flush;\n";
        }
 
 //                     The publicly exposed functions
@@ -12907,6 +12931,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 
 //             temporal flush variables
 //             ASSUME that structured values won't be temporal.
+       gs_int32_t temporal_gb = 0;
        if(uses_temporal_flush){
                ret += "//\t\tInitialize temporal flush variables.\n";
                for(g=0;g<gb_tbl.size();g++){
@@ -12915,9 +12940,12 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                                literal_t gl(gdt->type_indicator());
                                sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
                                ret.append(tmpstr);
+                               sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
+                               ret.append(tmpstr);
+                               temporal_gb = g;
                        }
                }
-               ret += "\tneeds_temporal_flush = false;\n";
+               ret += "\tneeds_temporal_flush = 0;\n";
        }
 
        //              Init temporal attributes referenced in select list
@@ -13031,35 +13059,39 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     set flush indicator and update stored GB vars if there is any change.
 
        if(uses_temporal_flush){
-               ret+= "\tif( !( (";
+               ret+= "\tif( ( (";
                bool first_one = true;
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
 
                        if(gdt->is_temporal()){
-                         sprintf(tmpstr,"last_gb%d",g);   string lhs_op = tmpstr;
+                         sprintf(tmpstr,"curr_gb%d",g);   string lhs_op = tmpstr;
                          sprintf(tmpstr,"gbval->gb_var%d",g);   string rhs_op = tmpstr;
                          if(first_one){first_one = false;} else {ret += ") && (";}
-                         ret += generate_equality_test(lhs_op, rhs_op, gdt);
+                         ret += generate_lt_test(lhs_op, rhs_op, gdt);
                        }
                }
                ret += ") ) ){\n";
                for(g=0;g<gb_tbl.size();g++){
                  data_type *gdt = gb_tbl.get_data_type(g);
                  if(gdt->is_temporal()){
-                         if(gdt->is_buffer_type()){
+                               temporal_gb = g;
+                         if(gdt->is_buffer_type()){    // TODO first, last?  or delete?
                                sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
                          }else{
-                               sprintf(tmpstr,"\t\tlast_flushed_gb%d = last_gb%d;\n",g,g);
-                               ret += tmpstr;
-                               sprintf(tmpstr,"\t\tlast_gb%d = gbval->gb_var%d;\n",g,g);
+                               ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n";
+                               ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n";
+                               ret += "\t\t}else{\n";
+                               ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n";
+                               ret += "\t\t}\n";
+                               sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
                          }
                          ret += tmpstr;
                        }
                }
-               ret += "\t\tneeds_temporal_flush=true;\n";
+               ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n"; 
                ret += "\t\t}else{\n"
-                       "\t\t\tneeds_temporal_flush=false;\n"
+                       "\t\t\tneeds_temporal_flush=0;\n"
                        "\t\t}\n";
        }
 
@@ -13230,13 +13262,22 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //---------------------------------------------------
 //                     Flush test
 
-       ret += "\tbool flush_needed(){\n";
+       ret += "gs_int32_t flush_needed(){\n";
        if(uses_temporal_flush){
-               ret += "\t\treturn needs_temporal_flush;\n";
+               ret += "\treturn needs_temporal_flush;\n";
        }else{
-               ret += "\t\treturn false;\n";
+               ret += "\treturn 0;\n";
        }
-       ret += "\t};\n";
+       ret += "};\n";
+
+//------------------------------------------------
+//     time bucket management
+       ret += "void advance_last_tb(){\n";
+       ret += "\tlast_gb"+to_string(temporal_gb)+"++;\n";
+       ret += "}\n\n";
+       ret += "void reset_last_tb(){\n";
+       ret += "\tlast_gb"+to_string(temporal_gb)+" = curr_gb"+to_string(temporal_gb)+";\n";
+       ret += "}\n\n";
 
 //---------------------------------------------------
 //                     create output tuple
@@ -13469,7 +13510,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                if(sdt->is_temporal()){
                        sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
                        ret += tmpstr;
-                       sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_flushed_gb", "", schema).c_str());
+                       sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_gb", "", schema).c_str());
                        ret += tmpstr;
                        ret += ";\n";
                }
index b6855ff..31c552f 100644 (file)
@@ -1907,7 +1907,11 @@ for(q=0;q<hfta_sets.size();++q){
                string lmach = hostname;
                if(tvec.size()>0){
                        liface = tvec[0]->get_interface();      // iface queries have been resolved
-                       lmach = tvec[0]->get_machine();
+                       if(tvec[0]->get_machine() != ""){
+                               lmach = tvec[0]->get_machine();
+                       }else{
+                               fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n",  split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
+                       }
                } // else{
                        interface_names.push_back(liface);
                        machine_names.push_back(lmach);
index 8c31657..56c652a 100644 (file)
@@ -110,15 +110,11 @@ gs_uint32_t byte_match_offset( gs_uint32_t offset, gs_uint32_t val, struct gs_st
 gs_retval_t str_compare( struct gs_string * str1, struct gs_string * str2)
 {
     gs_int32_t len;
-    gs_int32_t x;
+    gs_int32_t x, ret;
     len = (str1->length>str2->length)?str2->length:str1->length;
     for(x=0;x<len;x++) {
-       if (str1->data[x]>str2->data[x]) {
-           return 1;
-       }
-       if (str1->data[x]<str2->data[x]) {
-           return -1;
-       }
+        if (ret = (str1->data[x]-str2->data[x]))
+            return ret;
     }
 
     if (str1->length>str2->length) {
@@ -130,6 +126,22 @@ gs_retval_t str_compare( struct gs_string * str1, struct gs_string * str2)
     return 0;
 }
 
+gs_retval_t str_equal( struct gs_string * str1, struct gs_string * str2)
+{
+    gs_int32_t x;
+
+    if (str1->length != str2->length)
+        return -1;
+
+    for(x=0;x<str1->length;x++) {
+        if (str1->data[x]!=str2->data[x]) {
+            return -1;
+        }
+    }
+    
+    return 0;
+}
+
 gs_retval_t str_constructor(struct gs_string *s, gs_sp_t l){
     s->data =  l;
     s->length = 0;