Change lfta code generation form C to C++
[com/gs-lite.git] / src / ftacmp / query_plan.cc
index 512299f..a746e44 100644 (file)
@@ -3874,7 +3874,7 @@ vector<qp_node *> rsgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_li
                                        new_opl.push_back(new_se);
                                }
                        }
-                       stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),false, false,aggr_tbl.has_bailout(a));
+                       stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),aggr_tbl.is_superaggr(a), aggr_tbl.is_running_aggr(a),aggr_tbl.has_bailout(a));
                        hse = new scalarexp_t(aggr_tbl.get_op(a).c_str(),new_opl);
                        hse->set_data_type(Ext_fcns->get_fcn_dt(aggr_tbl.get_fcn_id(a)));
                        hse->set_fcn_id(aggr_tbl.get_fcn_id(a));
@@ -4332,12 +4332,12 @@ vector<qp_node *> sgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_lis
                stream_node->set_node_name( node_name );
                stream_node->table_name->set_range_var(table_name->get_var_name());
 
-//                     allowed stream disorder.  Default is 2,
+//                     allowed stream disorder.  Default is 1,
 //                     can override with max_lfta_disorder setting.
 //                     Also limit the hfta disorder, set to lfta disorder + 1.
 //                     can override with max_hfta_disorder.
 
-       fta_node->lfta_disorder = 2;
+       fta_node->lfta_disorder = 1;
        if(this->get_val_of_def("max_lfta_disorder") != ""){
                int d = atoi(this->get_val_of_def("max_lfta_disorder").c_str() );
                if(d<1){
@@ -4364,6 +4364,7 @@ printf("node %s setting lfta_disorder = %d\n",node_name.c_str(),fta_node->lfta_d
                }
        }
 
+
 //                     First, process the group-by variables.
 //                     The fta must supply the values of all the gbvars.
 //                     If a gb is computed, the computation must be
@@ -4914,6 +4915,7 @@ vector<qp_node *> join_eq_hash_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, t
                        child_qpn->table_name = new tablevar_t(
                           from[f]->get_interface().c_str(), from[f]->get_schema_name().c_str(), from[f]->get_ifq());
                        child_qpn->table_name->set_range_var(from[f]->get_var_name());
+                       child_qpn->table_name->set_machine(from[f]->get_machine());
 
                        child_vec.push_back(child_qpn);
                        select_vec.push_back(&(child_qpn->select_list));
@@ -6822,7 +6824,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){
                        ret.append("( ");
 
                if(ldt->complex_comparison(ldt) ){
-                               ret.append( ldt->get_hfta_comparison_fcn(ldt) );
+                               ret.append( ldt->get_hfta_equals_fcn(ldt) );
                                ret.append("( ");
                                if(ldt->is_buffer_type() )
                                        ret.append("&");
@@ -6854,6 +6856,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){
 
                ret.append("( ");
         if(ldt->complex_comparison(rdt) ){
+// TODO can use get_hfta_equals_fcn if op is "=" ?
                        ret.append(ldt->get_hfta_comparison_fcn(rdt));
                        ret.append("(");
                        if(ldt->is_buffer_type() )
@@ -6923,7 +6926,7 @@ static string generate_predicate_code_fm_aggr(predicate_t *pr, string gbvar, str
                        ret.append("( ");
 
                if(ldt->complex_comparison(ldt) ){
-                               ret.append( ldt->get_hfta_comparison_fcn(ldt) );
+                               ret.append( ldt->get_hfta_equals_fcn(ldt) );
                                ret.append("( ");
                                if(ldt->is_buffer_type() )
                                        ret.append("&");
@@ -6955,6 +6958,7 @@ static string generate_predicate_code_fm_aggr(predicate_t *pr, string gbvar, str
 
                ret.append("( ");
         if(ldt->complex_comparison(rdt) ){
+// TODO can use get_hfta_equals_fcn if op is "=" ?
                        ret.append(ldt->get_hfta_comparison_fcn(rdt));
                        ret.append("(");
                        if(ldt->is_buffer_type() )
@@ -7013,7 +7017,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        string ret;
 
     if(dt->complex_comparison(dt) ){
-               ret.append(dt->get_hfta_comparison_fcn(dt));
+               ret.append(dt->get_hfta_equals_fcn(dt));
                ret.append("(");
                        if(dt->is_buffer_type() )
                                ret.append("&");
@@ -7032,7 +7036,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        return(ret);
 }
 
-static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){
        string ret;
 
     if(dt->complex_comparison(dt) ){
@@ -7045,16 +7049,39 @@ static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt)
                        if(dt->is_buffer_type() )
                                ret.append("&");
                ret.append(rhs_op );
-               ret.append(") == 0");
+               ret.append(") == 1");
        }else{
                ret.append(lhs_op );
-               ret.append(" == ");
+               ret.append(" < ");
                ret.append(rhs_op );
        }
 
        return(ret);
 }
 
+//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+//     string ret;
+//
+//    if(dt->complex_comparison(dt) ){
+//             ret.append(dt->get_hfta_equals_fcn(dt));
+//             ret.append("(");
+//                     if(dt->is_buffer_type() )
+//                             ret.append("&");
+//             ret.append(lhs_op);
+//             ret.append(", ");
+//                     if(dt->is_buffer_type() )
+//                             ret.append("&");
+//             ret.append(rhs_op );
+//             ret.append(") == 0");
+//     }else{
+//             ret.append(lhs_op );
+//             ret.append(" == ");
+//             ret.append(rhs_op );
+//     }
+//
+//     return(ret);
+//}
+
 
 //             Here I assume that only MIN and MAX aggregates can be computed
 //             over BUFFER data types.
@@ -7829,9 +7856,9 @@ static string gen_unpack_cids(table_list *schema, col_id_set &new_cids, string o
                        unpack_fcn = dt.get_hfta_unpack_fcn_noxf();
                }
                if(dt.is_buffer_type()){
-                       sprintf(tmpstr,"\t unpack_var_%s_%d = %s(tup%d.data, tup%d.tuple_size, unpack_offset_%s_%d, &problem);\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, tblref, field.c_str(), tblref);
+                       sprintf(tmpstr,"\tunpack_var_%s_%d = %s(tup%d.data, tup%d.tuple_size, unpack_offset_%s_%d, &problem); // unpack_cid\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, tblref, field.c_str(), tblref);
                }else{
-                       sprintf(tmpstr,"\t unpack_var_%s_%d = %s_nocheck(tup%d.data,  unpack_offset_%s_%d);\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref,  field.c_str(), tblref);
+                       sprintf(tmpstr,"\tunpack_var_%s_%d = %s_nocheck(tup%d.data,  unpack_offset_%s_%d); // unpack_cid\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref,  field.c_str(), tblref);
                }
                ret += tmpstr;
                if(dt.is_buffer_type()){
@@ -8857,6 +8884,18 @@ string sgah_qpn::generate_functor_name(){
 string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector<bool> &needs_xform){
        int a,g,w,s;
 
+//                     Regular or slow flush?
+       hfta_slow_flush = 0;
+       if(this->get_val_of_def("hfta_slow_flush") != ""){
+               int d = atoi(this->get_val_of_def("hfta_slow_flush").c_str() );
+               if(d<0){
+                       fprintf(stderr,"Warning, hfta_slow_flush in node %s is %d, must be at least 0, setting to 0.\n",node_name.c_str(), d);
+                       hfta_slow_flush = 0;
+               }else{
+                       hfta_slow_flush = d;
+               }
+       }
+       
 
 //                     Initialize generate utility globals
        segen_gb_tbl = &(gb_tbl);
@@ -8878,36 +8917,57 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
                ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
        }
 //             empty strucutred literals
-       map<int, string>::iterator sii;
-       for(sii=structured_types.begin();sii!=structured_types.end();++sii){
-               data_type dt(sii->second);
-               literal_t empty_lit(sii->first);
-               ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
-       }
+//     map<int, string>::iterator sii;
+//     for(sii=structured_types.begin();sii!=structured_types.end();++sii){
+//             data_type dt(sii->second);
+//             literal_t empty_lit(sii->first);
+//             ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
+//     }
 //             Constructors
        if(structured_types.size()==0){
                ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
        }else{
-               ret += "\t"+generate_functor_name() + "_groupdef(){}\n";
+               ret += "\t"+generate_functor_name() + "_groupdef(){\n";
+               for(g=0;g<gb_tbl.size();g++){
+                       data_type *gdt = gb_tbl.get_data_type(g);
+                       if(gdt->is_buffer_type()){
+                               sprintf(tmpstr,"\t\t%s(&gb_var%d);\n",
+                                       gdt->get_hfta_buffer_init().c_str(), g );
+                               ret += tmpstr;
+                       }
+               }
+               ret += "\t};\n";
        }
 
 
+       ret += "\t// shallow copy constructors\n";
        ret += "\t"+generate_functor_name() + "_groupdef("+
-               this->generate_functor_name() + "_groupdef *gd){\n";
+               "const " + this->generate_functor_name() + "_groupdef &gd){\n";
        for(g=0;g<gb_tbl.size();g++){
                data_type *gdt = gb_tbl.get_data_type(g);
-               if(gdt->is_buffer_type()){
-                       sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
-                         gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
-                       ret += tmpstr;
-               }else{
-                       sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
-                       ret += tmpstr;
-               }
+               sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+               ret += tmpstr;
+// TODO : do strings perisist after the call?  its a shllow copy 
+//             if(gdt->is_buffer_type()){
+//                     sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
+//                       gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+//                     ret += tmpstr;
+//             }else{
+//                     sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+//                     ret += tmpstr;
+//             }
        }
        ret += "\t}\n";
        ret += "\t"+generate_functor_name() + "_groupdef("+
-               this->generate_functor_name() + "_groupdef *gd, bool *pattern){\n";
+               "const " + this->generate_functor_name() + "_groupdef &gd, bool *pattern){\n";
+//     -- For patterns, need empty strucutred literals
+       map<int, string>::iterator sii;
+       for(sii=structured_types.begin();sii!=structured_types.end();++sii){
+               data_type dt(sii->second);
+               literal_t empty_lit(sii->first);
+               ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
+       }
+
        for(sii=structured_types.begin();sii!=structured_types.end();++sii){
                literal_t empty_lit(sii->first);
                ret += "\t\t"+empty_lit.to_hfta_C_code("&"+empty_lit.hfta_empty_literal_name())+";\n";
@@ -8915,14 +8975,17 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
        for(g=0;g<gb_tbl.size();g++){
                data_type *gdt = gb_tbl.get_data_type(g);
                ret += "\t\tif(pattern["+int_to_string(g)+"]){\n";
-               if(gdt->is_buffer_type()){
-                       sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
-                         gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
-                       ret += tmpstr;
-               }else{
-                       sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g);
-                       ret += tmpstr;
-               }
+               sprintf(tmpstr,"\t\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+               ret += tmpstr;
+//     TODO Do strings persist long enough?  its a shllow copy constructor?
+//             if(gdt->is_buffer_type()){
+//                     sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
+//                       gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+//                     ret += tmpstr;
+//             }else{
+//                     sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+//                     ret += tmpstr;
+//             }
                ret += "\t\t}else{\n";
                literal_t empty_lit(gdt->type_indicator());
                if(empty_lit.is_cpx_lit()){
@@ -8933,6 +8996,23 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
                ret += "\t\t}\n";
        }
        ret += "\t};\n";
+
+       ret += "\t// deep assignment operator\n";
+       ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+
+                this->generate_functor_name() + "_groupdef &gd){\n";
+        for(g=0;g<gb_tbl.size();g++){
+                data_type *gdt = gb_tbl.get_data_type(g);
+                if(gdt->is_buffer_type()){
+                        sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n",
+                          gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+                        ret += tmpstr;
+                }else{
+                        sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+                        ret += tmpstr;
+                }
+        }
+        ret += "\t}\n";
+
 //             destructor
        ret += "\t~"+ generate_functor_name() + "_groupdef(){\n";
        for(g=0;g<gb_tbl.size();g++){
@@ -9134,6 +9214,7 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
                        }
                }
                ret += "\tbool needs_temporal_flush;\n";
+               ret += "\tbool disordered_arrival;\n";
        }
 
 
@@ -9242,6 +9323,14 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
 "}\n\n"
 ;
 
+//---------------------------------------
+//             Parameterized number of tuples output per slow flush
+       ret += 
+"int gb_flush_per_tuple(){\n"
+"      return "+int_to_string(hfta_slow_flush)+";\n"
+"}\n\n";
+
+
 
 
 
@@ -9313,16 +9402,18 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve
 ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
        if(hfta_disorder < 2){
                if(uses_temporal_flush){
-                       ret+= "\tif( !( (";
+                       ret+= "\tif( ( (";
                        bool first_one = true;
+                       string disorder_test;
                        for(g=0;g<gb_tbl.size();g++){
                                data_type *gdt = gb_tbl.get_data_type(g);
 
                                if(gdt->is_temporal()){
-                               sprintf(tmpstr,"last_gb%d",g);   string lhs_op = tmpstr;
-                               sprintf(tmpstr,"gbval->gb_var%d",g);   string rhs_op = tmpstr;
-                               if(first_one){first_one = false;} else {ret += ") && (";}
-                               ret += generate_equality_test(lhs_op, rhs_op, gdt);
+                                       sprintf(tmpstr,"last_gb%d",g);   string lhs_op = tmpstr;
+                                       sprintf(tmpstr,"gbval->gb_var%d",g);   string rhs_op = tmpstr;
+                                       if(first_one){first_one = false;} else {ret += ") && (";}
+                                       ret += generate_lt_test(lhs_op, rhs_op, gdt);
+                                       disorder_test += generate_lt_test(rhs_op, lhs_op, gdt);
                                }
                        }
                        ret += ") ) ){\n";
@@ -9340,9 +9431,17 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
                                }
                        }
                        ret += "\t\tneeds_temporal_flush=true;\n";
-                       ret += "\t\t}else{\n"
-                               "\t\t\tneeds_temporal_flush=false;\n"
-                               "\t\t}\n";
+                       ret += "\t}else{\n"
+                               "\t\tneeds_temporal_flush=false;\n"
+                               "\t}\n";
+
+                       ret += "\tdisordered_arrival = "+disorder_test+";\n";
+//                     ret += "\tif( ( ("+disorder_test+") ) ){\n";
+//                     ret += "\t\tdisordered_arrival=true;\n";
+//                     ret += "\t}else{\n";
+//                     ret += "\t\tdisordered_arrival=false;\n";
+//                     ret += "\t}\n";
+
                }
        }else{
                ret+= "\tif(temp_tuple_received && !( (";
@@ -9386,7 +9485,10 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 
 
 //             For temporal status tuple we don't need to do anything else
-       ret += "\tif (temp_tuple_received) return NULL;\n\n";
+       ret += "\tif (temp_tuple_received){\n";
+       ret += "\t\tdisordered_arrival = false;\n";
+       ret += "\t\treturn NULL;\n\n";
+       ret += "\t}\n";
 
        for(w=0;w<where.size();++w){
                sprintf(tmpstr,"//\t\tPredicate clause %d.\n",w);
@@ -9491,8 +9593,8 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 //                     update an aggregate object
 
        ret += "void update_aggregate(host_tuple &tup0, "
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval){\n";
        //              Variables for execution of the function.
        ret += "\tgs_int32_t problem = 0;\n";   // return unpack failure
 
@@ -9503,7 +9605,7 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 //             Unpack all remaining attributes
        ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform);
        for(a=0;a<aggr_tbl.size();a++){
-         sprintf(tmpstr,"aggval->aggr_var%d",a);
+         sprintf(tmpstr,"aggval.aggr_var%d",a);
          string varname = tmpstr;
          ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema));
        }
@@ -9522,6 +9624,8 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
        }
        ret += "\t};\n";
 
+       ret += "bool disordered(){return disordered_arrival;}\n";
+
 //---------------------------------------------------
 //                     create output tuple
 //                     Unpack the partial functions ref'd in the where clause,
@@ -9532,15 +9636,15 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 //                     so I'll leave it in longhand.
 
        ret += "host_tuple create_output_tuple("
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval, bool &failed){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval, bool &failed){\n";
 
        ret += "\thost_tuple tup;\n";
        ret += "\tfailed = false;\n";
        ret += "\tgs_retval_t retval = 0;\n";
 
-       string gbvar = "gbval->gb_var";
-       string aggvar = "aggval->";
+       string gbvar = "gbval.gb_var";
+       string aggvar = "aggval.";
 
 //                     Create cached temporaries for UDAF return values.
        for(a=0;a<aggr_tbl.size();a++){
@@ -9755,18 +9859,18 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 
        ret += "struct "+generate_functor_name()+"_hash_func{\n";
        ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+
-                               "_groupdef *grp) const{\n";
+                               "_groupdef &grp) const{\n";
        ret += "\t\treturn( (";
        for(g=0;g<gb_tbl.size();g++){
                if(g>0) ret += "^";
                data_type *gdt = gb_tbl.get_data_type(g);
                if(gdt->use_hashfunc()){
                        if(gdt->is_buffer_type())
-                               sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+                               sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
                        else
-                               sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+                               sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
                }else{
-                       sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
+                       sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
                }
                ret += tmpstr;
        }
@@ -9778,22 +9882,22 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 //                     The comparison function
 
        ret += "struct "+generate_functor_name()+"_equal_func{\n";
-       ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+
-                       generate_functor_name()+"_groupdef *grp2) const{\n";
+       ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+
+                       "const "+generate_functor_name()+"_groupdef &grp2) const{\n";
        ret += "\t\treturn( (";
 
        for(g=0;g<gb_tbl.size();g++){
                if(g>0) ret += ") && (";
                data_type *gdt = gb_tbl.get_data_type(g);
                if(gdt->complex_comparison(gdt)){
-               if(gdt->is_buffer_type())
-                       sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
-               else
-                       sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                       if(gdt->is_buffer_type())
+                               sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)",
+                                       gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
+                       else
+                               sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)",
+                                       gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                }else{
-                       sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
+                       sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g);
                }
                ret += tmpstr;
        }
@@ -9808,14 +9912,17 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
 string sgah_qpn::generate_operator(int i, string params){
 
        if(hfta_disorder < 2){
+               string op_name = "groupby_operator";
+               if(hfta_slow_flush>0)
+                       op_name = "groupby_slowflush_operator";
                return(
-                       "       groupby_operator<" +
+                       "       "+op_name+"<" +
                        generate_functor_name()+","+
                        generate_functor_name() + "_groupdef, " +
                        generate_functor_name() + "_aggrdef, " +
                        generate_functor_name()+"_hash_func, "+
                        generate_functor_name()+"_equal_func "
-                       "> *op"+int_to_string(i)+" = new groupby_operator<"+
+                       "> *op"+int_to_string(i)+" = new "+op_name+"<"+
                        generate_functor_name()+","+
                        generate_functor_name() + "_groupdef, " +
                        generate_functor_name() + "_aggrdef, " +
@@ -10973,22 +11080,31 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_
 
 //                     Deal with outer join stuff
        col_id_set l_cids, r_cids;
+       col_id_set l_base_cids, r_base_cids; // l_cids and r_cids get modified
+                                                                                 // to account for extra_f fields to
+                                                                                 // unpack for value imputation
        col_id_set::iterator ocsi;
        for(ocsi=local_cids.begin();ocsi!=local_cids.end();++ocsi){
-               if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
-               else                                            r_cids.insert((*ocsi));
+               if((*ocsi).tblvar_ref == 0){
+                       l_cids.insert((*ocsi)); l_base_cids.insert((*ocsi));
+               }else{
+                       r_cids.insert((*ocsi)); r_base_cids.insert((*ocsi));
+               }
        }
        for(ocsi=se_cids.begin();ocsi!=se_cids.end();++ocsi){
-               if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
-               else                                            r_cids.insert((*ocsi));
+               if((*ocsi).tblvar_ref == 0){
+                       l_cids.insert((*ocsi)); l_base_cids.insert((*ocsi));
+               }else{
+                       r_cids.insert((*ocsi)); r_base_cids.insert((*ocsi));
+               }
        }
 
        ret += "\t}else if(tup0.data){\n";
        string unpack_null = ""; col_id_set extra_cids;
-       for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){
+       for(ocsi=r_base_cids.begin();ocsi!=r_base_cids.end();++ocsi){
                string field = (*ocsi).field;
                if(r_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n";
+                       unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+"; // r_equiv\n";
                        get_new_se_cids(r_equiv[field],l_cids,new_cids,NULL);
                }else{
                int schref = (*ocsi).schema_ref;
@@ -11000,23 +11116,25 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_
 //                                     NB : works for string type only
 //                                     NNB: installed fix for ipv6, more of this should be pushed
 //                                             into the literal_t code.
-                               unpack_null+="\tunpack_var_"+field+"_1= "+empty_lit.hfta_empty_literal_name()+";\n";
+                               unpack_null+="\t\tunpack_var_"+field+"_1= "+empty_lit.hfta_empty_literal_name()+"; // empty\n";
                        }else{
-                               unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n";
+                               unpack_null+="\t\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+"; // empty\n";
                        }
                }
        }
+       ret += "// l_cids\n";
        ret += gen_unpack_cids(schema,  l_cids, "tup", needs_xform);
+       ret += "// extra_cids\n";
        ret += gen_unpack_cids(schema,  extra_cids, "tup", needs_xform);
        ret += unpack_null;
        ret += gen_unpack_partial_fcn(schema, partial_fcns, sl_pfcns, "tup");
 
        ret+="\t}else{\n";
-       unpack_null = ""; extra_cids.clear();
-       for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){
+       unpack_null = ""; extra_cids.clear(); new_cids.clear();
+       for(ocsi=l_base_cids.begin();ocsi!=l_base_cids.end();++ocsi){
                string field = (*ocsi).field;
                if(l_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n";
+                       unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+"; // l_equiv\n";
                        get_new_se_cids(l_equiv[field],r_cids,new_cids,NULL);
                }else{
                int schref = (*ocsi).schema_ref;
@@ -11028,13 +11146,15 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_
 //                                     NB : works for string type only
 //                                     NNB: installed fix for ipv6, more of this should be pushed
 //                                             into the literal_t code.
-                               unpack_null+="\tunpack_var_"+field+"_0= "+empty_lit.hfta_empty_literal_name()+";\n";
+                               unpack_null+="\t\tunpack_var_"+field+"_0= "+empty_lit.hfta_empty_literal_name()+"; // empty\n";
                        }else{
-                               unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n";
+                               unpack_null+="\t\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+"; // empty\n";
                        }
                }
        }
+       ret += "// r_cids\n";
        ret += gen_unpack_cids(schema,  r_cids, "tup", needs_xform);
+       ret += "// extra_cids\n";
        ret += gen_unpack_cids(schema,  extra_cids, "tup", needs_xform);
        ret += unpack_null;
        ret += gen_unpack_partial_fcn(schema, partial_fcns, sl_pfcns, "tup");
@@ -11110,84 +11230,67 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_
 
 
 //             create a temp status tuple
-       ret += "int create_temp_status_tuple(const host_tuple &tup0, const host_tuple &tup1, host_tuple& result) {\n\n";
+       ret += "int create_temp_status_tuple("+this->generate_functor_name()+"_tempeqdef *lts,"+this->generate_functor_name()+"_tempeqdef *rts, host_tuple& result) {\n\n";
 
        ret += "\tgs_retval_t retval = 0;\n";
        ret += "\tgs_int32_t problem = 0;\n";
 
-       ret += "\tif(tup0.data){\n";
-
-//             Unpack all the temporal attributes references in select list
-       col_id_set found_cids;
-
-       for(s=0;s<select_list.size();s++){
-               if (select_list[s]->se->get_data_type()->is_temporal()) {
-//                     Find the set of attributes accessed in this SE
-                       col_id_set new_cids;
-                       get_new_se_cids(select_list[s]->se,found_cids, new_cids, NULL);
-               }
+       for(p=0;p<temporal_dt.size();p++){
+               sprintf(tmpstr,"lhs_var");
+               ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";         
+               sprintf(tmpstr,"rhs_var");
+               ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";         
        }
 
-       //                      Deal with outer join stuff
-       l_cids.clear(), r_cids.clear();
-       for(ocsi=found_cids.begin();ocsi!=found_cids.end();++ocsi){
-               if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
-               else                                            r_cids.insert((*ocsi));
+       ret += "\tif(lts!=NULL){\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\tlhs_var = lts->tempeq_var"+to_string(p)+";\n";
        }
-       unpack_null = "";
-       extra_cids.clear();
-       for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){
-               string field = (*ocsi).field;
-               if(r_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n";
-                       col_id_set addnl_cids;
-                       get_new_se_cids(r_equiv[field],l_cids,addnl_cids,NULL);
-               }else{
-               int schref = (*ocsi).schema_ref;
-                       data_type dt(schema->get_type_name(schref,field));
-                       literal_t empty_lit(dt.type_indicator());
-                       if(empty_lit.is_cpx_lit()){
-                               sprintf(tmpstr,"&(unpack_var_%s_1)",field.c_str());
-                               unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
-                       }else{
-                               unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n";
-                       }
-               }
+       ret += "\t}else{\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\tlhs_var = 0;\n";
        }
-       ret += gen_unpack_cids(schema,  l_cids, "1", needs_xform);
-       ret += gen_unpack_cids(schema,  extra_cids, "1", needs_xform);
-       ret += unpack_null;
+       ret += "\t}\n";
 
-       ret+="\t}else if (tup1.data) {\n";
-       unpack_null = ""; extra_cids.clear();
-       for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){
-               string field = (*ocsi).field;
-               if(l_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n";
-                       col_id_set addnl_cids;
-                       get_new_se_cids(l_equiv[field],r_cids,addnl_cids,NULL);
-               }else{
-               int schref = (*ocsi).schema_ref;
-                       data_type dt(schema->get_type_name(schref,field));
-                       literal_t empty_lit(dt.type_indicator());
-                       if(empty_lit.is_cpx_lit()){
-                               sprintf(tmpstr,"&(unpack_var_%s_0)",field.c_str());
-                               unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
-                       }else{
-                               unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n";
-                       }
-               }
+       ret += "\tif(rts!=NULL){\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\trhs_var = rts->tempeq_var"+to_string(p)+";\n";
        }
-       ret += gen_unpack_cids(schema,  r_cids, "1", needs_xform);
-       ret += gen_unpack_cids(schema,  extra_cids, "1", needs_xform);
-       ret += unpack_null;
-       ret+="\t}\n";
+       ret += "\t}else{\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\trhs_var = 0;\n";
+       }
+       ret += "\t}\n";
 
        ret += gen_init_temp_status_tuple(this->get_node_name());
 
 //             Start packing.
-       ret += "//\t\tPack the fields into the tuple.\n";
-       ret += gen_pack_tuple(schema,select_list,this->get_node_name(), true );
+
+
+//             This is checked in the query analyzer so I think its safe,
+//             But a lot of older code has complex code to propagate multiple
+//             timestamps
+    for(s=0;s<select_list.size();s++){
+               scalarexp_t *se  = select_list[s]->se;
+        data_type *sdt = se->get_data_type();
+               if(sdt->is_temporal()){
+                       string target = "\ttuple->tuple_var"+to_string(s)+" = ";
+                       if(from[0]->get_property()==0 && from[1]->get_property()==0){ // INNER
+                               ret += target+"(lhs_var>rhs_var ? lhs_var : rhs_var); // INNER\n";
+                       }
+                       if(from[0]->get_property()!=0 && from[1]->get_property()==0){ // LEFT
+                               ret += target+"lhs_var; // LEFT\n";
+//                             ret += target+"rhs_var; // LEFT\n";
+                       }
+                       if(from[0]->get_property()==0 && from[1]->get_property()!=0){ // RIGHT
+                               ret += target+"rhs_var; // RIGHT\n";
+//                             ret += target+"lhs_var; // RIGHT\n";
+                       }
+                       if(from[0]->get_property()!=0 && from[1]->get_property()!=0){ // OUTER
+                               ret += target+"(lhs_var<rhs_var ? lhs_var : rhs_var); // OUTER\n";
+                       }
+               }
+       }
 
 
        ret += "\treturn 0;\n";
@@ -11237,10 +11340,10 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_
                if(hashkey_dt[p]->complex_comparison(hashkey_dt[p])){
                  if(hashkey_dt[p]->is_buffer_type())
                        sprintf(tmpstr,"(%s(&(key1->hashkey_var%d), &(key2->hashkey_var%d))==0)",
-                               hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p);
+                               hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p);
                  else
                        sprintf(tmpstr,"(%s((key1->hashkey_var%d), (key2->hashkey_var%d))==0)",
-                               hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p);
+                               hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p);
                }else{
                        sprintf(tmpstr,"key1->hashkey_var%d == key2->hashkey_var%d",p,p);
                }
@@ -12584,10 +12687,10 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                if(gdt->complex_comparison(gdt)){
                  if(gdt->is_buffer_type())
                        sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                  else
                        sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                }else{
                        sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
                }
@@ -12614,10 +12717,10 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                                if(gdt->complex_comparison(gdt)){
                                  if(gdt->is_buffer_type())
                                        sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
-                                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                                  else
                                        sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
-                                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                                }else{
                                        sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
                                }
@@ -12688,21 +12791,44 @@ string rsgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, v
                ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
        }
 //             Constructors
-       ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
-       ret += "\t"+generate_functor_name() + "_groupdef("+
-               this->generate_functor_name() + "_groupdef *gd){\n";
+
+       ret += "\t"+generate_functor_name() + "_groupdef(){\n";
        for(g=0;g<gb_tbl.size();g++){
                data_type *gdt = gb_tbl.get_data_type(g);
                if(gdt->is_buffer_type()){
-                       sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
-                         gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
-                       ret += tmpstr;
-               }else{
-                       sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+                       sprintf(tmpstr,"\t\t%s(&gb_var%d);\n",
+                               gdt->get_hfta_buffer_init().c_str(), g );
                        ret += tmpstr;
                }
        }
        ret += "\t};\n";
+
+       ret += "\t// shallow copy constructor\n";
+       ret += "\t"+generate_functor_name() + "_groupdef("+
+               this->generate_functor_name() + "_groupdef &gd){\n";
+       for(g=0;g<gb_tbl.size();g++){
+               data_type *gdt = gb_tbl.get_data_type(g);
+               sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+               ret += tmpstr;
+       }
+       ret += "\t};\n";
+
+       ret += "\t// deep assignment operator\n";
+       ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+
+            this->generate_functor_name() + "_groupdef &gd){\n";
+    for(g=0;g<gb_tbl.size();g++){
+            data_type *gdt = gb_tbl.get_data_type(g);
+            if(gdt->is_buffer_type()){
+                    sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n",
+                      gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+                    ret += tmpstr;
+            }else{
+                    sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+                    ret += tmpstr;
+            }
+    }
+    ret += "\t}\n";    
+
 //             destructor
        ret += "\t~"+ generate_functor_name() + "_groupdef(){\n";
        for(g=0;g<gb_tbl.size();g++){
@@ -12863,13 +12989,14 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
                        if(gdt->is_temporal()){
-                         sprintf(tmpstr,"last_gb%d",g);
+                         sprintf(tmpstr,"curr_gb%d",g);
                          ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
-                         sprintf(tmpstr,"last_flushed_gb%d",g);
+                         sprintf(tmpstr,"last_gb%d",g);
                          ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
                        }
                }
-               ret += "\tbool needs_temporal_flush;\n";
+               ret += "\tgs_int32_t needs_temporal_flush;\n";
+               ret += "\tbool disordered_arrival;\n";
        }
 
 //                     The publicly exposed functions
@@ -12913,11 +13040,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                        data_type *gdt = gb_tbl.get_data_type(g);
                        if(gdt->is_temporal()){
                                literal_t gl(gdt->type_indicator());
+                               sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
+                               ret.append(tmpstr);
                                sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
                                ret.append(tmpstr);
                        }
                }
-               ret += "\tneeds_temporal_flush = false;\n";
+               ret += "\tneeds_temporal_flush = 0;\n";
        }
 
        //              Init temporal attributes referenced in select list
@@ -13031,36 +13160,54 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     set flush indicator and update stored GB vars if there is any change.
 
        if(uses_temporal_flush){
-               ret+= "\tif( !( (";
+               ret+= "\tif( ( (";
                bool first_one = true;
+               string disorder_test;
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
 
                        if(gdt->is_temporal()){
-                         sprintf(tmpstr,"last_gb%d",g);   string lhs_op = tmpstr;
+                         sprintf(tmpstr,"curr_gb%d",g);   string lhs_op = tmpstr;
                          sprintf(tmpstr,"gbval->gb_var%d",g);   string rhs_op = tmpstr;
                          if(first_one){first_one = false;} else {ret += ") && (";}
-                         ret += generate_equality_test(lhs_op, rhs_op, gdt);
+                         ret += generate_lt_test(lhs_op, rhs_op, gdt);
+                         disorder_test += generate_lt_test(rhs_op, lhs_op, gdt);
                        }
                }
                ret += ") ) ){\n";
+               int temporal_gb=-1;
                for(g=0;g<gb_tbl.size();g++){
                  data_type *gdt = gb_tbl.get_data_type(g);
                  if(gdt->is_temporal()){
                          if(gdt->is_buffer_type()){
-                               sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
+                               sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&curr_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
                          }else{
-                               sprintf(tmpstr,"\t\tlast_flushed_gb%d = last_gb%d;\n",g,g);
-                               ret += tmpstr;
-                               sprintf(tmpstr,"\t\tlast_gb%d = gbval->gb_var%d;\n",g,g);
+//                             sprintf(tmpstr,"\t\tlast_gb%d = curr_gb%d;\n",g,g);
+//                             ret += tmpstr;
+//                             sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
+
+                               ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n";
+                               ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n";
+                               ret += "\t\t}else{\n";
+                               ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n";
+                               ret += "\t\t}\n";
+                               sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
+                               temporal_gb=g;
                          }
                          ret += tmpstr;
                        }
                }
-               ret += "\t\tneeds_temporal_flush=true;\n";
-               ret += "\t\t}else{\n"
-                       "\t\t\tneeds_temporal_flush=false;\n"
-                       "\t\t}\n";
+               ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n";
+               ret += "\t}else{\n"
+                       "\t\tneeds_temporal_flush=0;\n"
+                       "\t}\n";
+
+               ret += "\tdisordered_arrival = "+disorder_test+";\n";
+//             ret += "\tif( ( ("+disorder_test+") ) ){\n";
+//             ret += "\t\tdisordered_arrival=true;\n";
+//             ret += "\t}else{\n";
+//             ret += "\t\tdisordered_arrival=false;\n";
+//             ret += "\t}\n";
        }
 
 
@@ -13169,8 +13316,8 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     update an aggregate object
 
        ret += "void update_aggregate(host_tuple &tup0, "
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval){\n";
        //              Variables for execution of the function.
        ret += "\tgs_int32_t problem = 0;\n";   // return unpack failure
 
@@ -13181,7 +13328,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //             Unpack all remaining attributes
        ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform);
        for(a=0;a<aggr_tbl.size();a++){
-         sprintf(tmpstr,"aggval->aggr_var%d",a);
+         sprintf(tmpstr,"aggval.aggr_var%d",a);
          string varname = tmpstr;
          ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema));
        }
@@ -13193,29 +13340,31 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     reinitialize an aggregate object
 
        ret += "void reinit_aggregates( "+
-               generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval){\n";
+               generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval){\n";
        //              Variables for execution of the function.
        ret += "\tgs_int32_t problem = 0;\n";   // return unpack failure
 
 //                     use of temporaries depends on the aggregate,
 //                     generate them in generate_aggr_update
 
+       int temporal_gb;        // track the # of the temporal gb
        for(g=0;g<gb_tbl.size();g++){
          data_type *gdt = gb_tbl.get_data_type(g);
          if(gdt->is_temporal()){
                  if(gdt->is_buffer_type()){
-                       sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
+                       sprintf(tmpstr,"\t\t%s(&(gbval.gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
                  }else{
-                       sprintf(tmpstr,"\t\t gbval->gb_var%d =last_gb%d;\n",g,g);
+                       sprintf(tmpstr,"\t\t gbval.gb_var%d =last_gb%d;\n",g,g);
                  }
                  ret += tmpstr;
+                 temporal_gb = g;
                }
        }
 
 //             Unpack all remaining attributes
        for(a=0;a<aggr_tbl.size();a++){
-         sprintf(tmpstr,"aggval->aggr_var%d",a);
+         sprintf(tmpstr,"aggval.aggr_var%d",a);
          string varname = tmpstr;
          ret.append(generate_aggr_reinitialize(varname,&aggr_tbl,a, schema));
        }
@@ -13230,13 +13379,24 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //---------------------------------------------------
 //                     Flush test
 
-       ret += "\tbool flush_needed(){\n";
+       ret += "gs_int32_t flush_needed(){\n";
        if(uses_temporal_flush){
-               ret += "\t\treturn needs_temporal_flush;\n";
+               ret += "\treturn needs_temporal_flush;\n";
        }else{
-               ret += "\t\treturn false;\n";
+               ret += "\treturn false;\n";
        }
-       ret += "\t};\n";
+       ret += "};\n";
+
+       ret += "bool disordered(){return disordered_arrival;}\n";
+
+//------------------------------------------------
+//     time bucket management
+       ret += "void advance_last_tb(){\n";
+       ret += "\tlast_gb"+to_string(temporal_gb)+"++;\n";
+       ret += "}\n\n";
+       ret += "void reset_last_tb(){\n";
+       ret += "\tlast_gb"+to_string(temporal_gb)+" = curr_gb"+to_string(temporal_gb)+";\n";
+       ret += "}\n\n";
 
 //---------------------------------------------------
 //                     create output tuple
@@ -13248,15 +13408,15 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     so I'll leave it in longhand.
 
        ret += "host_tuple create_output_tuple("
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval, bool &failed){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval, bool &failed){\n";
 
        ret += "\thost_tuple tup;\n";
        ret += "\tfailed = false;\n";
        ret += "\tgs_retval_t retval = 0;\n";
 
-       string gbvar = "gbval->gb_var";
-       string aggvar = "aggval->";
+       string gbvar = "gbval.gb_var";
+       string aggvar = "aggval.";
 
 
 //                     First, get the return values from the UDAFS
@@ -13400,14 +13560,14 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //             been unpacked.  delete the string udaf return values at the end.
 
        ret += "bool cleaning_when("
-               +generate_functor_name()+"_groupdef *gbval, "+
-               generate_functor_name()+"_aggrdef *aggval){\n";
+               +generate_functor_name()+"_groupdef &gbval, "+
+               generate_functor_name()+"_aggrdef &aggval){\n";
 
        ret += "\tbool retval = true;\n";
 
 
-       gbvar = "gbval->gb_var";
-       aggvar = "aggval->";
+       gbvar = "gbval.gb_var";
+       aggvar = "aggval.";
 
 
        set<int> clw_pfcns;
@@ -13469,7 +13629,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                if(sdt->is_temporal()){
                        sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
                        ret += tmpstr;
-                       sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_flushed_gb", "", schema).c_str());
+                       sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_gb", "", schema).c_str());
                        ret += tmpstr;
                        ret += ";\n";
                }
@@ -13484,7 +13644,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 
        ret += "struct "+generate_functor_name()+"_hash_func{\n";
        ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+
-                               "_groupdef *grp) const{\n";
+                               "_groupdef &grp) const{\n";
        ret += "\t\treturn(0";
        for(g=0;g<gb_tbl.size();g++){
                data_type *gdt = gb_tbl.get_data_type(g);
@@ -13492,11 +13652,11 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                        ret += "^";
                        if(gdt->use_hashfunc()){
                                if(gdt->is_buffer_type())
-                                       sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+                                       sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
                                        else
-                               sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+                               sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
                        }else{
-                               sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
+                               sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
                        }
                        ret += tmpstr;
                }
@@ -13509,8 +13669,8 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     The comparison function
 
        ret += "struct "+generate_functor_name()+"_equal_func{\n";
-       ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+
-                       generate_functor_name()+"_groupdef *grp2) const{\n";
+       ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+
+                       "const "+generate_functor_name()+"_groupdef &grp2) const{\n";
        ret += "\t\treturn( (";
 
        string hcmpr = "";
@@ -13521,13 +13681,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                        if(first_exec){first_exec=false;}else{ hcmpr += ") && (";}
                        if(gdt->complex_comparison(gdt)){
                          if(gdt->is_buffer_type())
-                               sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                               sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)",
+                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                          else
-                               sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
-                               gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+                               sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)",
+                               gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
                        }else{
-                               sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
+                               sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g);
                        }
                        hcmpr += tmpstr;
                }