X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fftacmp%2Fgenerate_lfta_code.cc;h=5ec41676fa5e8dd2f07c0715dc09f17201b4ebde;hb=9fd1eb03e66522e79c94dec7ed26f68c17018fc1;hp=d6026bbfe6cc6d79e6347ff91f19123dfef8c553;hpb=dec9c93423775db0f4783a93145f016d5d780ffd;p=com%2Fgs-lite.git diff --git a/src/ftacmp/generate_lfta_code.cc b/src/ftacmp/generate_lfta_code.cc index d6026bb..5ec4167 100644 --- a/src/ftacmp/generate_lfta_code.cc +++ b/src/ftacmp/generate_lfta_code.cc @@ -464,6 +464,7 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl, ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n"; // ret+="\tint bitmap_size;\n"; ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n"; + ret += "\tgs_int32_t n_ticks; // for limiting slow flush\n"; ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n"; ret += "\tint max_windows; // max number of open windows.\n"; ret += "\tunsigned int generation; // initially zero, increment on\n"; @@ -1371,7 +1372,7 @@ string generate_preamble(table_list *schema, //map &int_fcn_defs, // ret += "#include \"fta.h\"\n\n"); string ret = "#ifndef LFTA_IN_NIC\n"; - ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n"; + ret += "const char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n"; ret += "#include\n"; ret += "#include \n"; ret += "#include \n"; @@ -1460,7 +1461,7 @@ string generate_tuple_from_aggr(string node_name, table_list *schema, string idx ret += ";\n"; - ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; ret += "\t\t\tif( tuple != NULL){\n"; @@ -1903,7 +1904,7 @@ string generate_fta_control(string node_name, table_list *schema, bool is_aggr_q ret+="\tif(command == FTA_COMMAND_FLUSH){\n"; ret+="\t\tif (!t->n_aggrs) {\n"; - ret+="\t\t\ttuple = allocate_tuple(f, 0);\n"; + ret+="\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, 0);\n"; ret+="\t\t\tif( tuple != NULL)\n"; ret+="\t\t\t\tpost_tuple(tuple);\n"; @@ -1954,7 +1955,7 @@ string generate_fta_control(string node_name, table_list *schema, bool is_aggr_q } ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n"; - ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\t\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n"; /* mark tuple as EOF_TUPLE */ @@ -2139,12 +2140,21 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr; } } + ret += "\t\tt->n_ticks = 0; // reset clock tick counter, limit slow flush\n"; + ret += "\t}else{\n"; + ret += "//\tLimit slow flush, do a full flush at two clock ticks past the change in generation.\n"; + ret += "\t\tt->n_ticks++;\n"; + ret += "\t\tif(t->n_ticks == 2){\n"; + ret += "\t\t\tif(t->flush_posmax_aggrs) \n"; + ret += "\t\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n"; + ret += "\t\t}\n"; ret += "\t}\n\n"; + } ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n"; - ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+"*)allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\treturn 1;\n"; @@ -2210,7 +2220,8 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co ret+="\tpost_tuple(tuple);\n"; ret += "\n\t/* Send a heartbeat message to clearinghouse */\n"; - ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n"; + ret += "\n\t/* Disable heartbeats for now to avoid overloading clearinghouse */\n"; + ret += "\t/* fta_heartbeat(f->ftaid, t->trace_id++, 1, &stats); */\n"; ret += "\n\t/* Reset runtime stats */\n"; ret += "\tt->in_tuple_cnt = 0;\n"; @@ -2298,6 +2309,7 @@ string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *sc temporal_flush+="\t\t/* \t\tmark all groups as old */\n"; temporal_flush+="\t\tt->generation++;\n"; temporal_flush+="\t\tt->flush_pos = 0;\n"; + temporal_flush+="\t\tt->n_ticks = 0; // reset clock tick counter, to limit slow flush\n"; // Now set the saved temporal value of the gb to the @@ -2446,7 +2458,7 @@ string ret; ret += ";\n"; - ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; // Test passed, make assignments to the tuple. @@ -3037,7 +3049,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% ret += ";\n"; - ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; // Test passed, make assignments to the tuple. @@ -3494,7 +3506,7 @@ string ret; ret += ";\n"; - ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; // Test passed, make assignments to the tuple. @@ -4150,7 +4162,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n"; ret+="\tint i;\n"; ret += "\n"; - ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n"; + ret+="\tif((f=(struct "+generate_fta_name(node_name)+" *)fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n"; // assign a streamid to fta instance ret+="\t/* assign a streamid */\n"; @@ -4160,6 +4172,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo if(is_aggr_query){ ret += "\tf->n_aggrs = 0;\n"; + ret += "\tf->n_ticks = 0; // for limiting slow flush\n"; ret += "\tf->max_aggrs = "; @@ -4197,12 +4210,12 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo } ret += ";\n"; - ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n"; + ret+="\tif ((f->aggr_table = (struct "+generate_aggr_struct_name(node_name)+" *)sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n\n"; // ret+="/* compute how many integers we need to store the hashmap */\n"; // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n"; - ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n"; + ret+="\tif ((f->aggr_table_hashmap = (gs_uint32_t *)sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n"; ret+="/*\t\tfill bitmap with zero \t*/\n"; @@ -4239,7 +4252,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo int bf_byte_size = bf_bit_size / (8*sizeof(char)); int bf_tot = n_bloom*bf_byte_size; - ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n"; + ret+="\tif ((f->bf_table = (unsigned char *)sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n"; ret += @@ -4258,7 +4271,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo } ht_size = hs; } - ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n"; + ret+="\tif ((f->join_table = (struct "+generate_fj_struct_name(node_name)+" *) sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n\n"; ret += @@ -4501,7 +4514,7 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, } // Build list of "partial functions", by clause. -// NOTE : partial fcns are not handles well. +// NOTE : partial fcns are not handled well. // The act of searching for them associates the fcn call // in the SE with an index to an array. Refs to the // fcn value are replaced with refs to the variable they are @@ -4532,7 +4545,7 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, ag_fcns_start = gb_fcns_end = partial_fcns.size(); if(aggr_tbl != NULL){ for(i=0;isize();i++){ - find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns); + find_partial_fcns(aggr_tbl->get_aggr_se(i), &partial_fcns, NULL, &is_partial_fcn, Ext_fcns); } } ag_fcns_end = partial_fcns.size();