Fix parsing of floating point numbers
[com/gs-lite.git] / src / ftacmp / query_plan.cc
index 512299f..8e05ae2 100644 (file)
@@ -4914,6 +4914,7 @@ vector<qp_node *> join_eq_hash_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, t
                        child_qpn->table_name = new tablevar_t(
                           from[f]->get_interface().c_str(), from[f]->get_schema_name().c_str(), from[f]->get_ifq());
                        child_qpn->table_name->set_range_var(from[f]->get_var_name());
+                       child_qpn->table_name->set_machine(from[f]->get_machine());
 
                        child_vec.push_back(child_qpn);
                        select_vec.push_back(&(child_qpn->select_list));
@@ -7032,6 +7033,29 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        return(ret);
 }
 
+static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){
+       string ret;
+
+    if(dt->complex_comparison(dt) ){
+               ret.append(dt->get_hfta_comparison_fcn(dt));
+               ret.append("(");
+                       if(dt->is_buffer_type() )
+                               ret.append("&");
+               ret.append(lhs_op);
+               ret.append(", ");
+                       if(dt->is_buffer_type() )
+                               ret.append("&");
+               ret.append(rhs_op );
+               ret.append(") == 1");
+       }else{
+               ret.append(lhs_op );
+               ret.append(" < ");
+               ret.append(rhs_op );
+       }
+
+       return(ret);
+}
+
 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
        string ret;
 
@@ -11110,84 +11134,67 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_
 
 
 //             create a temp status tuple
-       ret += "int create_temp_status_tuple(const host_tuple &tup0, const host_tuple &tup1, host_tuple& result) {\n\n";
+       ret += "int create_temp_status_tuple("+this->generate_functor_name()+"_tempeqdef *lts,"+this->generate_functor_name()+"_tempeqdef *rts, host_tuple& result) {\n\n";
 
        ret += "\tgs_retval_t retval = 0;\n";
        ret += "\tgs_int32_t problem = 0;\n";
 
-       ret += "\tif(tup0.data){\n";
-
-//             Unpack all the temporal attributes references in select list
-       col_id_set found_cids;
-
-       for(s=0;s<select_list.size();s++){
-               if (select_list[s]->se->get_data_type()->is_temporal()) {
-//                     Find the set of attributes accessed in this SE
-                       col_id_set new_cids;
-                       get_new_se_cids(select_list[s]->se,found_cids, new_cids, NULL);
-               }
+       for(p=0;p<temporal_dt.size();p++){
+               sprintf(tmpstr,"lhs_var");
+               ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";         
+               sprintf(tmpstr,"rhs_var");
+               ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";         
        }
 
-       //                      Deal with outer join stuff
-       l_cids.clear(), r_cids.clear();
-       for(ocsi=found_cids.begin();ocsi!=found_cids.end();++ocsi){
-               if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
-               else                                            r_cids.insert((*ocsi));
+       ret += "\tif(lts!=NULL){\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\tlhs_var = lts->tempeq_var"+to_string(p)+";\n";
        }
-       unpack_null = "";
-       extra_cids.clear();
-       for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){
-               string field = (*ocsi).field;
-               if(r_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n";
-                       col_id_set addnl_cids;
-                       get_new_se_cids(r_equiv[field],l_cids,addnl_cids,NULL);
-               }else{
-               int schref = (*ocsi).schema_ref;
-                       data_type dt(schema->get_type_name(schref,field));
-                       literal_t empty_lit(dt.type_indicator());
-                       if(empty_lit.is_cpx_lit()){
-                               sprintf(tmpstr,"&(unpack_var_%s_1)",field.c_str());
-                               unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
-                       }else{
-                               unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n";
-                       }
-               }
+       ret += "\t}else{\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\tlhs_var = 0;\n";
        }
-       ret += gen_unpack_cids(schema,  l_cids, "1", needs_xform);
-       ret += gen_unpack_cids(schema,  extra_cids, "1", needs_xform);
-       ret += unpack_null;
+       ret += "\t}\n";
 
-       ret+="\t}else if (tup1.data) {\n";
-       unpack_null = ""; extra_cids.clear();
-       for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){
-               string field = (*ocsi).field;
-               if(l_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n";
-                       col_id_set addnl_cids;
-                       get_new_se_cids(l_equiv[field],r_cids,addnl_cids,NULL);
-               }else{
-               int schref = (*ocsi).schema_ref;
-                       data_type dt(schema->get_type_name(schref,field));
-                       literal_t empty_lit(dt.type_indicator());
-                       if(empty_lit.is_cpx_lit()){
-                               sprintf(tmpstr,"&(unpack_var_%s_0)",field.c_str());
-                               unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
-                       }else{
-                               unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n";
-                       }
-               }
+       ret += "\tif(rts!=NULL){\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\trhs_var = rts->tempeq_var"+to_string(p)+";\n";
        }
-       ret += gen_unpack_cids(schema,  r_cids, "1", needs_xform);
-       ret += gen_unpack_cids(schema,  extra_cids, "1", needs_xform);
-       ret += unpack_null;
-       ret+="\t}\n";
+       ret += "\t}else{\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\trhs_var = 0;\n";
+       }
+       ret += "\t}\n";
 
        ret += gen_init_temp_status_tuple(this->get_node_name());
 
 //             Start packing.
-       ret += "//\t\tPack the fields into the tuple.\n";
-       ret += gen_pack_tuple(schema,select_list,this->get_node_name(), true );
+
+
+//             This is checked in the query analyzer so I think its safe,
+//             But a lot of older code has complex code to propagate multiple
+//             timestamps
+    for(s=0;s<select_list.size();s++){
+               scalarexp_t *se  = select_list[s]->se;
+        data_type *sdt = se->get_data_type();
+               if(sdt->is_temporal()){
+                       string target = "\ttuple->tuple_var"+to_string(s)+" = ";
+                       if(from[0]->get_property()==0 && from[1]->get_property()==0){ // INNER
+                               ret += target+"(lhs_var>rhs_var ? lhs_var : rhs_var); // INNER\n";
+                       }
+                       if(from[0]->get_property()!=0 && from[1]->get_property()==0){ // LEFT
+                               ret += target+"lhs_var; // LEFT\n";
+//                             ret += target+"rhs_var; // LEFT\n";
+                       }
+                       if(from[0]->get_property()==0 && from[1]->get_property()!=0){ // RIGHT
+                               ret += target+"rhs_var; // RIGHT\n";
+//                             ret += target+"lhs_var; // RIGHT\n";
+                       }
+                       if(from[0]->get_property()!=0 && from[1]->get_property()!=0){ // OUTER
+                               ret += target+"(lhs_var<rhs_var ? lhs_var : rhs_var); // OUTER\n";
+                       }
+               }
+       }
 
 
        ret += "\treturn 0;\n";
@@ -12863,13 +12870,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
                        if(gdt->is_temporal()){
-                         sprintf(tmpstr,"last_gb%d",g);
+                         sprintf(tmpstr,"curr_gb%d",g);
                          ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
-                         sprintf(tmpstr,"last_flushed_gb%d",g);
+                         sprintf(tmpstr,"last_gb%d",g);
                          ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
                        }
                }
-               ret += "\tbool needs_temporal_flush;\n";
+               ret += "\tgs_int32_t needs_temporal_flush;\n";
        }
 
 //                     The publicly exposed functions
@@ -12907,6 +12914,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 
 //             temporal flush variables
 //             ASSUME that structured values won't be temporal.
+       gs_int32_t temporal_gb = 0;
        if(uses_temporal_flush){
                ret += "//\t\tInitialize temporal flush variables.\n";
                for(g=0;g<gb_tbl.size();g++){
@@ -12915,9 +12923,12 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                                literal_t gl(gdt->type_indicator());
                                sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
                                ret.append(tmpstr);
+                               sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
+                               ret.append(tmpstr);
+                               temporal_gb = g;
                        }
                }
-               ret += "\tneeds_temporal_flush = false;\n";
+               ret += "\tneeds_temporal_flush = 0;\n";
        }
 
        //              Init temporal attributes referenced in select list
@@ -13031,35 +13042,39 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     set flush indicator and update stored GB vars if there is any change.
 
        if(uses_temporal_flush){
-               ret+= "\tif( !( (";
+               ret+= "\tif( ( (";
                bool first_one = true;
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
 
                        if(gdt->is_temporal()){
-                         sprintf(tmpstr,"last_gb%d",g);   string lhs_op = tmpstr;
+                         sprintf(tmpstr,"curr_gb%d",g);   string lhs_op = tmpstr;
                          sprintf(tmpstr,"gbval->gb_var%d",g);   string rhs_op = tmpstr;
                          if(first_one){first_one = false;} else {ret += ") && (";}
-                         ret += generate_equality_test(lhs_op, rhs_op, gdt);
+                         ret += generate_lt_test(lhs_op, rhs_op, gdt);
                        }
                }
                ret += ") ) ){\n";
                for(g=0;g<gb_tbl.size();g++){
                  data_type *gdt = gb_tbl.get_data_type(g);
                  if(gdt->is_temporal()){
-                         if(gdt->is_buffer_type()){
+                               temporal_gb = g;
+                         if(gdt->is_buffer_type()){    // TODO first, last?  or delete?
                                sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
                          }else{
-                               sprintf(tmpstr,"\t\tlast_flushed_gb%d = last_gb%d;\n",g,g);
-                               ret += tmpstr;
-                               sprintf(tmpstr,"\t\tlast_gb%d = gbval->gb_var%d;\n",g,g);
+                               ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n";
+                               ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n";
+                               ret += "\t\t}else{\n";
+                               ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n";
+                               ret += "\t\t}\n";
+                               sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
                          }
                          ret += tmpstr;
                        }
                }
-               ret += "\t\tneeds_temporal_flush=true;\n";
+               ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n"; 
                ret += "\t\t}else{\n"
-                       "\t\t\tneeds_temporal_flush=false;\n"
+                       "\t\t\tneeds_temporal_flush=0;\n"
                        "\t\t}\n";
        }
 
@@ -13230,13 +13245,22 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //---------------------------------------------------
 //                     Flush test
 
-       ret += "\tbool flush_needed(){\n";
+       ret += "gs_int32_t flush_needed(){\n";
        if(uses_temporal_flush){
-               ret += "\t\treturn needs_temporal_flush;\n";
+               ret += "\treturn needs_temporal_flush;\n";
        }else{
-               ret += "\t\treturn false;\n";
+               ret += "\treturn 0;\n";
        }
-       ret += "\t};\n";
+       ret += "};\n";
+
+//------------------------------------------------
+//     time bucket management
+       ret += "void advance_last_tb(){\n";
+       ret += "\tlast_gb"+to_string(temporal_gb)+"++;\n";
+       ret += "}\n\n";
+       ret += "void reset_last_tb(){\n";
+       ret += "\tlast_gb"+to_string(temporal_gb)+" = curr_gb"+to_string(temporal_gb)+";\n";
+       ret += "}\n\n";
 
 //---------------------------------------------------
 //                     create output tuple
@@ -13469,7 +13493,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                if(sdt->is_temporal()){
                        sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
                        ret += tmpstr;
-                       sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_flushed_gb", "", schema).c_str());
+                       sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_gb", "", schema).c_str());
                        ret += tmpstr;
                        ret += ";\n";
                }