Updates to examples queries. Improve real-timeness of lfta aggregation
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
index 55227b1..5ec4167 100644 (file)
@@ -464,6 +464,7 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl,
                ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
 //             ret+="\tint bitmap_size;\n";
                ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
+               ret += "\tgs_int32_t n_ticks; // for limiting slow flush\n";
                ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
                ret += "\tint max_windows; // max number of open windows.\n";
                ret += "\tunsigned int generation; // initially zero, increment on\n";
@@ -1053,7 +1054,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){
                        ret += "( ";
 
                if(ldt->complex_comparison(ldt) ){
-                               ret +=  ldt->get_comparison_fcn(ldt) ;
+                               ret +=  ldt->get_equals_fcn(ldt) ;
                                ret += "( ";
                                if(ldt->is_buffer_type() ) ret += "&";
                                ret += generate_se_code(pr->get_left_se(), schema);
@@ -1083,6 +1084,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){
 
                ret += "( ";
         if(ldt->complex_comparison(rdt) ){
+// TODO can use get_equals_fcn if op is "=" ?
                        ret += ldt->get_comparison_fcn(rdt);
                        ret += "(";
                        if(ldt->is_buffer_type() ) ret += "&";
@@ -1153,7 +1155,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        string ret;
 
     if(dt->complex_comparison(dt) ){
-               ret += dt->get_comparison_fcn(dt);
+               ret += dt->get_equals_fcn(dt);
                ret += "(";
                        if(dt->is_buffer_type() ) ret += "&";
                ret += lhs_op;
@@ -1170,26 +1172,26 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        return(ret);
 }
 
-static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
-       string ret;
-
-    if(dt->complex_comparison(dt) ){
-               ret += dt->get_comparison_fcn(dt);
-               ret += "(";
-                       if(dt->is_buffer_type() ) ret += "&";
-               ret += lhs_op;
-               ret += ", ";
-                       if(dt->is_buffer_type() ) ret += "&";
-               ret += rhs_op;
-               ret += ") == 0";
-       }else{
-               ret += lhs_op;
-               ret += " == ";
-               ret += rhs_op;
-       }
-
-       return(ret);
-}
+//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+//     string ret;
+//
//   if(dt->complex_comparison(dt) ){
+//             ret += dt->get_equals_fcn(dt);
+//             ret += "(";
+//                     if(dt->is_buffer_type() ) ret += "&";
+//             ret += lhs_op;
+//             ret += ", ";
+//                     if(dt->is_buffer_type() ) ret += "&";
+//             ret += rhs_op;
+//             ret += ") == 0";
+//     }else{
+//             ret += lhs_op;
+//             ret += " == ";
+//             ret += rhs_op;
+//     }
+//
+//     return(ret);
+//}
 
 //             Here I assume that only MIN and MAX aggregates can be computed
 //             over BUFFER data types.
@@ -1370,7 +1372,7 @@ string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
 //     ret +=  "#include \"fta.h\"\n\n");
 
        string ret = "#ifndef LFTA_IN_NIC\n";
-       ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
+       ret += "const char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
        ret += "#include<stdio.h>\n";
        ret += "#include <limits.h>\n";
        ret += "#include <float.h>\n";
@@ -1459,7 +1461,7 @@ string generate_tuple_from_aggr(string node_name, table_list *schema, string idx
          ret += ";\n";
 
 
-         ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
+         ret += "\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
          ret += "\t\t\tif( tuple != NULL){\n";
 
 
@@ -1902,7 +1904,7 @@ string generate_fta_control(string node_name, table_list *schema, bool is_aggr_q
                ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
 
                ret+="\t\tif (!t->n_aggrs) {\n";
-               ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
+               ret+="\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, 0);\n";
                ret+="\t\t\tif( tuple != NULL)\n";
                ret+="\t\t\t\tpost_tuple(tuple);\n";
 
@@ -1953,7 +1955,7 @@ string generate_fta_control(string node_name, table_list *schema, bool is_aggr_q
        }
 
        ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
-       ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
+       ret += "\t\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
        ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
 
        /* mark tuple as EOF_TUPLE */
@@ -2138,12 +2140,21 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co
                        sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
                      }
                 }
+               ret += "\t\tt->n_ticks = 0; // reset clock tick counter, limit slow flush\n";
+               ret += "\t}else{\n";
+               ret += "//\tLimit slow flush, do a full flush at two clock ticks past the change in generation.\n";
+               ret += "\t\tt->n_ticks++;\n";
+               ret += "\t\tif(t->n_ticks == 2){\n";
+               ret += "\t\t\tif(t->flush_pos<t->max_aggrs) \n";
+               ret += "\t\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
+               ret += "\t\t}\n";
                ret += "\t}\n\n";
+               
 
        }
 
        ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
-       ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+       ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+"*)allocate_tuple(f, tuple_size );\n";
        ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
 
 
@@ -2209,7 +2220,8 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co
        ret+="\tpost_tuple(tuple);\n";
 
        ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
-       ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
+       ret += "\n\t/* Disable heartbeats for now to avoid overloading clearinghouse */\n";     
+       ret += "\t/* fta_heartbeat(f->ftaid, t->trace_id++, 1, &stats); */\n";
 
        ret += "\n\t/* Reset runtime stats */\n";
        ret += "\tt->in_tuple_cnt = 0;\n";
@@ -2297,6 +2309,7 @@ string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *sc
                  temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
                  temporal_flush+="\t\tt->generation++;\n";
                  temporal_flush+="\t\tt->flush_pos = 0;\n";
+                 temporal_flush+="\t\tt->n_ticks = 0; // reset clock tick counter, to limit slow flush\n";
 
 
 //                             Now set the saved temporal value of the gb to the
@@ -2445,7 +2458,7 @@ string ret;
          ret += ";\n";
 
 
-         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+         ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
          ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
 
 //                     Test passed, make assignments to the tuple.
@@ -3036,7 +3049,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%
          ret += ";\n";
 
 
-         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+         ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
          ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
 
 //                     Test passed, make assignments to the tuple.
@@ -3493,7 +3506,7 @@ string ret;
          ret += ";\n";
 
 
-         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+         ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
          ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
 
 //                     Test passed, make assignments to the tuple.
@@ -4149,7 +4162,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo
        ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
        ret+="\tint i;\n";
        ret += "\n";
-       ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
+       ret+="\tif((f=(struct "+generate_fta_name(node_name)+" *)fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
 
 //                             assign a streamid to fta instance
        ret+="\t/* assign a streamid */\n";
@@ -4159,6 +4172,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo
 
        if(is_aggr_query){
                ret += "\tf->n_aggrs = 0;\n";
+               ret += "\tf->n_ticks = 0; // for limiting slow flush\n";
 
                ret += "\tf->max_aggrs = ";
 
@@ -4196,12 +4210,12 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo
                }
                ret += ";\n";
 
-               ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
+               ret+="\tif ((f->aggr_table = (struct "+generate_aggr_struct_name(node_name)+" *)sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
                ret+="\t\treturn(0);\n";
                ret+="\t}\n\n";
 //             ret+="/* compute how many integers we need to store the hashmap */\n";
 //             ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
-               ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
+               ret+="\tif ((f->aggr_table_hashmap = (gs_uint32_t *)sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
                ret+="\t\treturn(0);\n";
                ret+="\t}\n";
                ret+="/*\t\tfill bitmap with zero \t*/\n";
@@ -4238,7 +4252,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo
                        int bf_byte_size = bf_bit_size / (8*sizeof(char));
 
                        int bf_tot = n_bloom*bf_byte_size;
-                       ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
+                       ret+="\tif ((f->bf_table = (unsigned char *)sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
                        ret+="\t\treturn(0);\n";
                        ret+="\t}\n";
                        ret +=
@@ -4257,7 +4271,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo
                                }
                                ht_size = hs;
                        }
-                       ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
+                       ret+="\tif ((f->join_table = (struct "+generate_fj_struct_name(node_name)+" *) sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
                        ret+="\t\treturn(0);\n";
                        ret+="\t}\n\n";
                        ret +=
@@ -4500,7 +4514,7 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
        }
 
 //                     Build list of "partial functions", by clause.
-//                     NOTE : partial fcns are not handles well.
+//                     NOTE : partial fcns are not handled well.
 //                     The act of searching for them associates the fcn call
 //                     in the SE with an index to an array.  Refs to the
 //                     fcn value are replaced with refs to the variable they are
@@ -4531,7 +4545,7 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
        ag_fcns_start = gb_fcns_end = partial_fcns.size();
        if(aggr_tbl != NULL){
                for(i=0;i<aggr_tbl->size();i++){
-                       find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
+                       find_partial_fcns(aggr_tbl->get_aggr_se(i), &partial_fcns, NULL, &is_partial_fcn, Ext_fcns);
                }
        }
        ag_fcns_end = partial_fcns.size();
@@ -4630,7 +4644,7 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
 
 
 
-int compute_snap_len(qp_node *fs, table_list *schema){
+int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
 
 //             Initialize global vars
        gb_tbl = NULL;
@@ -4691,15 +4705,21 @@ int compute_snap_len(qp_node *fs, table_list *schema){
        int tblref = (*csi).tblvar_ref;
     string field = (*csi).field;
 
-       param_list *field_params = schema->get_modifier_list(schref, field);
-       if(field_params->contains_key("snap_len")){
-               string fld_snap_str = field_params->val_of("snap_len");
-               int fld_snap;
-               if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
-                       if(fld_snap > snap_len) snap_len = fld_snap;
-                       n_snap++;
-               }else{
-                       fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+       if(snap_type == "index"){
+               int pos = schema->get_field_idx(schref, field);
+               if(pos>snap_len) snap_len = pos;
+               n_snap++;
+       }else{
+               param_list *field_params = schema->get_modifier_list(schref, field);
+               if(field_params->contains_key("snap_len")){
+                       string fld_snap_str = field_params->val_of("snap_len");
+                       int fld_snap;
+                       if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
+                               if(fld_snap > snap_len) snap_len = fld_snap;
+                               n_snap++;
+                       }else{
+                               fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+                       }
                }
        }
   }