X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fftacmp%2Ftranslate_fta.cc;h=19ff6349ee040838f2820da8ad8b33dffd7e732e;hb=dec9c93423775db0f4783a93145f016d5d780ffd;hp=41df1972ce5188ccac55fd74bd6c9389f1244ac2;hpb=804ea15b01566ac0de58781ca61870b4824d0e02;p=com%2Fgs-lite.git diff --git a/src/ftacmp/translate_fta.cc b/src/ftacmp/translate_fta.cc index 41df197..19ff634 100644 --- a/src/ftacmp/translate_fta.cc +++ b/src/ftacmp/translate_fta.cc @@ -55,6 +55,8 @@ Copyright 2014 AT&T Intellectual Property #include"xml_t.h" #include"field_list.h" +#include "gsconfig.h" + extern int xmlParserparse(void); extern FILE *xmlParserin; extern int xmlParserdebug; @@ -164,9 +166,10 @@ int main(int argc, char **argv){ set::iterator si; - vector query_names; // for lfta.c registration + vector registration_query_names; // for lfta.c registration map > mach_query_names; // list queries of machine vector snap_lengths; // for lfta.c registration + vector snap_position; // for lfta.c registration vector interface_names; // for lfta.c registration vector machine_names; // machine of interface vector lfta_reuse_options; // for lfta.c registration @@ -395,24 +398,26 @@ int main(int argc, char **argv){ while(fgets(tmpstr,TMPSTRLEN,osp_in)){ o_lineno++; int nflds = split_string(tmpstr,',',flds,MAXFLDS); - if(nflds == 7){ + if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){ + if(nflds == 7){ // make operator type lowercase - char *tmpc; - for(tmpc=flds[1];*tmpc!='\0';++tmpc) - *tmpc = tolower(*tmpc); - - ospec_str *tmp_ospec = new ospec_str(); - tmp_ospec->query = flds[0]; - tmp_ospec->operator_type = flds[1]; - tmp_ospec->operator_param = flds[2]; - tmp_ospec->output_directory = flds[3]; - tmp_ospec->bucketwidth = atoi(flds[4]); - tmp_ospec->partitioning_flds = flds[5]; - tmp_ospec->n_partitions = atoi(flds[6]); - qname_to_ospec.insert(pair(tmp_ospec->query,output_specs.size())); - output_specs.push_back(tmp_ospec); - }else{ - fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds); + char *tmpc; + for(tmpc=flds[1];*tmpc!='\0';++tmpc) + *tmpc = tolower(*tmpc); + + ospec_str *tmp_ospec = new ospec_str(); + tmp_ospec->query = flds[0]; + tmp_ospec->operator_type = flds[1]; + tmp_ospec->operator_param = flds[2]; + tmp_ospec->output_directory = flds[3]; + tmp_ospec->bucketwidth = atoi(flds[4]); + tmp_ospec->partitioning_flds = flds[5]; + tmp_ospec->n_partitions = atoi(flds[6]); + qname_to_ospec.insert(pair(tmp_ospec->query,output_specs.size())); + output_specs.push_back(tmp_ospec); + }else{ + fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds); + } } } fclose(osp_in); @@ -633,24 +638,31 @@ int main(int argc, char **argv){ map lfta_prefilter_val; string lfta_header = -"#include \n\n" +"#include \n" "#include \"rts.h\"\n" "#include \"fta.h\"\n" "#include \"lapp.h\"\n" -"#include \"rts_udaf.h\"\n\n" +"#include \"rts_udaf.h\"\n" +"#include\n" +"#include \n" +"#include \"rdtsc.h\"\n" +"#include \"watchlist.h\"\n\n" + ; // Get any locally defined parsing headers glob_t glob_result; memset(&glob_result, 0, sizeof(glob_result)); - // do the glob operation + // do the glob operation TODO should be from GSROOT int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result); if(return_value == 0){ + lfta_header += "\n"; for(size_t i = 0; i < glob_result.gl_pathc; ++i) { char *flds[1000]; int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000); - lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n"; + lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n"; } + lfta_header += "\n"; }else{ fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n"); } @@ -1344,6 +1356,7 @@ fprintf(stderr,"Parsing file %s\n",qpathname.c_str()); } if(dangling_ospecs!=""){ fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str()); + exit(1); } string dangling_par = ""; @@ -1895,8 +1908,20 @@ for(q=0;qquery_name) == 0){ // Grab the lfta for global optimization. vector tvec = split_queries[l]->query_plan[0]->get_input_tbls(); - string liface = tvec[0]->get_interface(); // iface queries have been resolved - string lmach = tvec[0]->get_machine(); + string liface = "_local_"; +// string lmach = ""; + string lmach = hostname; + if(tvec.size()>0){ + liface = tvec[0]->get_interface(); // iface queries have been resolved + if(tvec[0]->get_machine() != ""){ + lmach = tvec[0]->get_machine(); + }else{ + fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str()); + } + } // else{ + interface_names.push_back(liface); + machine_names.push_back(lmach); +// } vector schemaid_preds; for(int irv=0;irvget_schema_ref(); if (lmach == "") lmach = hostname; - interface_names.push_back(liface); - machine_names.push_back(lmach); +// interface_names.push_back(liface); +// machine_names.push_back(lmach); + //printf("Machine is %s\n",lmach.c_str()); // Check if a schemaId constraint needs to be inserted. @@ -1922,7 +1948,10 @@ for(q=0;qhas_multiple_schemas()){ + + + if(tvec[irv]->get_interface() != "_local_"){ + if(iface->has_multiple_schemas()){ if(schema_id<0){ // invalid schema_id fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str()); exit(1); @@ -1945,7 +1974,7 @@ for(q=0;q iface_schemas = iface->get_property("Schemas"); if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){ @@ -1956,7 +1985,10 @@ for(q=0;qquery_plan[0]->append_to_where(clist[0]); -// Specialized processing ... currently filter join +// Specialized processing +// filter join, get two schemaid preds string node_type = split_queries[l]->query_plan[0]->node_type(); if(node_type == "filter_join"){ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0]; @@ -1994,10 +2027,18 @@ for(q=0;qquery_plan[0]; + if(irv==0){ + fj->pred_t0.push_back(clist[0]); + schemaid_preds.push_back(schid_pr); + } + } } } // Specialized processing, currently filter join. - if(schemaid_preds.size()>0){ + if(schemaid_preds.size()>1){ string node_type = split_queries[l]->query_plan[0]->node_type(); if(node_type == "filter_join"){ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0]; @@ -2055,9 +2096,17 @@ for(q=0;qquery_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop); */ - snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema)); - query_names.push_back(split_queries[l]->query_name); - mach_query_names[lmach].push_back(query_names.size()-1); + snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap")); + snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index")); + +// STOPPED HERE need to figure out how to generate the code that Vlad needs +// from snap_postion + +// TODO NOTE : I'd like it to be the case that registration_query_names +// are the queries to be registered for subsciption. +// but there is some complex bookkeeping here. + registration_query_names.push_back(split_queries[l]->query_name); + mach_query_names[lmach].push_back(registration_query_names.size()-1); // NOTE: I will assume a 1-1 correspondance between // mach_query_names[lmach] and lfta_mach_lists[lmach] // where mach_query_names[lmach][i] contains the index into @@ -2191,7 +2240,7 @@ if (partitioned_mode) { } -print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names); +print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names); int num_hfta = hfta_list.size(); for(i=0; i < hfta_list.size(); ++i){ @@ -2204,7 +2253,7 @@ for(i=num_hfta; i < hfta_list.size(); ++i){ Schema->append_table(td); } -print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names); +print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names); @@ -2276,7 +2325,6 @@ for(i=0;i0){ - fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str()); + fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str()); for(int hi=0; hi tvec = mach_lftas[li]->query_plan[0]->get_input_tbls(); - string lfta_iface = tvec[0]->get_interface(); + string lfta_iface = "_local_"; + if(tvec.size()>0){ + string lfta_iface = tvec[0]->get_interface(); + } lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]); } @@ -2506,7 +2557,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e // Find common filter predicates in the LFTAs. -// in addition generate structs to store the temporal attributes unpacked by prefilter +// in addition generate structs to store the +// temporal attributes unpacked by prefilter +// compute & provide interface for per-interface +// record extraction properties map >::iterator ssqi; for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){ @@ -2524,7 +2578,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e // for fta_init for(li=0;li tvec = mach_lftas[li]->query_plan[0]->get_input_tbls(); - string lfta_iface = tvec[0]->get_interface(); + string lfta_iface = "_local_"; + if(tvec.size()>0){ + lfta_iface = tvec[0]->get_interface(); + } lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]); lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]); } @@ -2535,11 +2592,14 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e // But this is defunct code for gs-lite for(li=0;li tvec = mach_lftas[li]->query_plan[0]->get_input_tbls(); - string liface = tvec[0]->get_interface(); + string liface = "_local_"; + if(tvec.size()>0){ + liface = tvec[0]->get_interface(); + } vector iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str); if(iface_codegen_type.size()){ if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){ - packed_return = true; + packed_return = true; } } } @@ -2603,10 +2663,13 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n"; #endif map > lfta_sigs; // used again later + map lfta_snap_pos; // optimize csv parsing + // compute now, use in get_iface_properties for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){ string liface = (*mvsi).first; vector empty_list; lfta_sigs[liface] = empty_list; + lfta_snap_pos[liface] = -1; vector lfta_cols; vector lfta_snap_length; @@ -2620,7 +2683,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e } lfta_sigs[liface].push_back(mask); lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema)); - lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema)); + lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap")); + int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index"); + if(this_snap_pos > lfta_snap_pos[liface]) + lfta_snap_pos[liface] = this_snap_pos; } //for(li=0;li iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str); + vector iface_properties; + if(*sir!="_local_"){ // dummy watchlist interface, don't process. + iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str); + } if (erri) { fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str()); exit(1); @@ -2694,6 +2763,8 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e } lfta_val[lmach] += "\";\n"; } + lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n"; + lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n"; lfta_val[lmach] += "\t\t} else\n"; lfta_val[lmach] += "\t\t\treturn NULL;\n"; } @@ -2707,11 +2778,25 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n"; lfta_val[lmach] += "const gs_sp_t fta_names[] = {"; - for (i = 0; i < query_names.size(); ++i) { - if (i) - lfta_val[lmach] += ", "; - lfta_val[lmach] += "\"" + query_names[i] + "\""; + bool first = true; + for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){ + string liface = (*mvsi).first; + if(liface != "_local_"){ // these don't register themselves + vector lfta_list = (*mvsi).second; + for(i=0;iquery_name + "\""; } @@ -2723,6 +2808,7 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e // set the prefilter function accordingly. // see the example in demo/err2 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n"; + lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n"; // for(i=0;i\n",query_names[mi].c_str()); - if(lfta_comment != "") - fprintf(qtree_output,"\t\t\n",lfta_comment.c_str()); - if(lfta_title != "") - fprintf(qtree_output,"\t\t\n",lfta_title.c_str()); - if(lfta_namespace != "") - fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str()); - if(lfta_ht_size != "") - fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str()); + fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str()); + if(lfta_comment != "") + fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str()); + if(lfta_title != "") + fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str()); + if(lfta_namespace != "") + fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str()); + if(lfta_ht_size != "") + fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str()); if(lmach != "") fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str()); else fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str()); fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str()); - fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str()); + std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables(); + for(int t=0;t<itbls.size();++t){ + fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str()); + } +// fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str()); fprintf(qtree_output,"\t\t<Rate value='100' />\n"); fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]); // write info about fields to qtree.xml @@ -2996,6 +3093,9 @@ void generate_makefile(vector<string> &input_file_names, int nfiles, // for an interface type bool use_proto = false; + bool use_bsa = false; + bool use_kafka = false; + bool use_ssl = false; int erri; string err_str; for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){ @@ -3003,9 +3103,47 @@ void generate_makefile(vector<string> &input_file_names, int nfiles, vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str); for(int ift_i=0;ift_i<ift.size();ift_i++){ if(ift[ift_i]=="PROTO"){ +#ifdef PROTO_ENABLED use_proto = true; +#else + fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n"); + exit(0); +#endif } } + ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str); + for(int ift_i=0;ift_i<ift.size();ift_i++){ + if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){ +#ifdef BSA_ENABLED + use_bsa = true; +#else + fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n"); + exit(0); +#endif + } + } + ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str); + for(int ift_i=0;ift_i<ift.size();ift_i++){ + if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){ +#ifdef KAFKA_ENABLED + use_kafka = true; +#else + fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n"); + exit(0); +#endif + } + } + ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str); + for(int ift_i=0;ift_i<ift.size();ift_i++){ + if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){ +#ifdef SSL_ENABLED + use_ssl = true; +#else + fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n"); + exit(0); +#endif + } + } } fprintf(outfl, @@ -3026,15 +3164,26 @@ void generate_makefile(vector<string> &input_file_names, int nfiles, if(use_pads) fprintf(outfl,"-lgscppads -lpads "); fprintf(outfl, -"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz"); +"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt"); if(use_pads) fprintf(outfl, " -lpz -lz -lbz "); if(libz_exists && libast_exists) fprintf(outfl," -last "); if(use_pads) fprintf(outfl, " -ldll -ldl "); - if(use_proto) - fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c "); + +#ifdef PROTO_ENABLED + fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c "); +#endif +#ifdef BSA_ENABLED + fprintf(outfl, " -lbsa_stream "); +#endif +#ifdef KAFKA_ENABLED + fprintf(outfl, " -lrdkafka "); +#endif +#ifdef SSL_ENABLED + fprintf(outfl, " -lssl -lcrypto "); +#endif fprintf(outfl," -lgscpaux"); #ifdef GCOV fprintf(outfl," -fprofile-arcs"); @@ -3117,6 +3266,9 @@ void generate_makefile(vector<string> &input_file_names, int nfiles, // string err_str; for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){ string ifnm = (*ssi); + // suppress internal _local_ interface + if (ifnm == "_local_") + continue; fprintf(outfl, "%s ",ifnm.c_str()); vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str); for(j=0;j<ifv.size();++j)