Improvements to aggregation code and fucntion library
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
index 41df197..19ff634 100644 (file)
@@ -55,6 +55,8 @@ Copyright 2014 AT&T Intellectual Property
 #include"xml_t.h"
 #include"field_list.h"
 
+#include "gsconfig.h"
+
 extern int xmlParserparse(void);
 extern FILE *xmlParserin;
 extern int xmlParserdebug;
@@ -164,9 +166,10 @@ int main(int argc, char **argv){
 
   set<int>::iterator si;
 
-  vector<string> query_names;                  // for lfta.c registration
+  vector<string> registration_query_names;                     // for lfta.c registration
   map<string, vector<int> > mach_query_names;  // list queries of machine
   vector<int> snap_lengths;                            // for lfta.c registration
+  vector<int> snap_position;                           // for lfta.c registration
   vector<string> interface_names;                      // for lfta.c registration
   vector<string> machine_names;                        // machine of interface
   vector<bool> lfta_reuse_options;                     // for lfta.c registration
@@ -395,24 +398,26 @@ int main(int argc, char **argv){
                while(fgets(tmpstr,TMPSTRLEN,osp_in)){
                        o_lineno++;
                        int nflds = split_string(tmpstr,',',flds,MAXFLDS);
-                       if(nflds == 7){
+                       if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
+                               if(nflds == 7){
 //             make operator type lowercase
-                               char *tmpc;
-                               for(tmpc=flds[1];*tmpc!='\0';++tmpc)
-                                       *tmpc = tolower(*tmpc);
-
-                               ospec_str *tmp_ospec = new ospec_str();
-                               tmp_ospec->query = flds[0];
-                               tmp_ospec->operator_type = flds[1];
-                               tmp_ospec->operator_param = flds[2];
-                               tmp_ospec->output_directory = flds[3];
-                               tmp_ospec->bucketwidth = atoi(flds[4]);
-                               tmp_ospec->partitioning_flds = flds[5];
-                               tmp_ospec->n_partitions = atoi(flds[6]);
-                               qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
-                               output_specs.push_back(tmp_ospec);
-                       }else{
-                               fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+                                       char *tmpc;
+                                       for(tmpc=flds[1];*tmpc!='\0';++tmpc)
+                                               *tmpc = tolower(*tmpc);
+       
+                                       ospec_str *tmp_ospec = new ospec_str();
+                                       tmp_ospec->query = flds[0];
+                                       tmp_ospec->operator_type = flds[1];
+                                       tmp_ospec->operator_param = flds[2];
+                                       tmp_ospec->output_directory = flds[3];
+                                       tmp_ospec->bucketwidth = atoi(flds[4]);
+                                       tmp_ospec->partitioning_flds = flds[5];
+                                       tmp_ospec->n_partitions = atoi(flds[6]);
+                                       qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
+                                       output_specs.push_back(tmp_ospec);
+                               }else{
+                                       fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+                               }
                        }
                }
                fclose(osp_in);
@@ -633,24 +638,31 @@ int main(int argc, char **argv){
   map<string, string> lfta_prefilter_val;
 
   string lfta_header =
-"#include <limits.h>\n\n"
+"#include <limits.h>\n"
 "#include \"rts.h\"\n"
 "#include \"fta.h\"\n"
 "#include \"lapp.h\"\n"
-"#include \"rts_udaf.h\"\n\n"
+"#include \"rts_udaf.h\"\n"
+"#include<stdio.h>\n"
+"#include <float.h>\n"
+"#include \"rdtsc.h\"\n"
+"#include \"watchlist.h\"\n\n"
+
 ;
 // Get any locally defined parsing headers
     glob_t glob_result;
     memset(&glob_result, 0, sizeof(glob_result));
 
-    // do the glob operation
+    // do the glob operation TODO should be from GSROOT
     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
        if(return_value == 0){
+               lfta_header += "\n";
        for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
                        char *flds[1000];
                        int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
-                       lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
+                       lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
        }
+               lfta_header += "\n";
        }else{
                fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
        }
@@ -1344,6 +1356,7 @@ fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
        }
        if(dangling_ospecs!=""){
                fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
+               exit(1);
        }
 
        string dangling_par = "";
@@ -1895,8 +1908,20 @@ for(q=0;q<hfta_sets.size();++q){
           if(lfta_names.count(split_queries[l]->query_name) == 0){
 //                             Grab the lfta for global optimization.
                vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
-               string liface = tvec[0]->get_interface();       // iface queries have been resolved
-               string lmach = tvec[0]->get_machine();
+               string liface = "_local_";
+//             string lmach = "";
+               string lmach = hostname;
+               if(tvec.size()>0){
+                       liface = tvec[0]->get_interface();      // iface queries have been resolved
+                       if(tvec[0]->get_machine() != ""){
+                               lmach = tvec[0]->get_machine();
+                       }else{
+                               fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n",  split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
+                       }
+               } // else{
+                       interface_names.push_back(liface);
+                       machine_names.push_back(lmach);
+//             }
 
                vector<predicate_t *> schemaid_preds;
                for(int irv=0;irv<tvec.size();++irv){
@@ -1906,8 +1931,9 @@ for(q=0;q<hfta_sets.size();++q){
                        int schema_ref = tvec[irv]->get_schema_ref();
                        if (lmach == "")
                                lmach = hostname;
-                       interface_names.push_back(liface);
-                       machine_names.push_back(lmach);
+//                     interface_names.push_back(liface);
+//                     machine_names.push_back(lmach);
+
 //printf("Machine is %s\n",lmach.c_str());
 
 //                             Check if a schemaId constraint needs to be inserted.
@@ -1922,7 +1948,10 @@ for(q=0;q<hfta_sets.size();++q){
                                fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
                                exit(1);
                        }       
-                       if(iface->has_multiple_schemas()){
+
+
+                       if(tvec[irv]->get_interface() != "_local_"){
+                        if(iface->has_multiple_schemas()){
                                if(schema_id<0){        // invalid schema_id
                                        fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
                                        exit(1);
@@ -1945,7 +1974,7 @@ for(q=0;q<hfta_sets.size();++q){
                                                exit(1);
                                        }
                                }
-                       }else{  // single-schema interface
+                        }else{ // single-schema interface
                                schema_id = -1; // don't generate schema_id predicate
                                vector<string> iface_schemas = iface->get_property("Schemas");
                                if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
@@ -1956,7 +1985,10 @@ for(q=0;q<hfta_sets.size();++q){
                                        fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
                                        exit(1);
                                }
-                       }                       
+                        }                      
+                       }else{
+                               schema_id = -1;
+                       }
 
 // If we need to check the schema_id, insert a predicate into the lfta.
 //      TODO not just schema_id, the full all_schema_ids set.
@@ -1983,7 +2015,8 @@ for(q=0;q<hfta_sets.size();++q){
 // cnf built, now insert it.
                                split_queries[l]->query_plan[0]->append_to_where(clist[0]);
 
-// Specialized processing ... currently filter join
+// Specialized processing 
+//             filter join, get two schemaid preds
                                string node_type = split_queries[l]->query_plan[0]->node_type();
                                if(node_type == "filter_join"){
                                        filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
@@ -1994,10 +2027,18 @@ for(q=0;q<hfta_sets.size();++q){
                                        }
                                        schemaid_preds.push_back(schid_pr);
                                }
+//             watchlist join, get the first schemaid pred
+                               if(node_type == "watch_join"){
+                                       watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
+                                       if(irv==0){
+                                               fj->pred_t0.push_back(clist[0]);
+                                               schemaid_preds.push_back(schid_pr);
+                                       }
+                               }
                        }
                }
 // Specialized processing, currently filter join.
-               if(schemaid_preds.size()>0){
+               if(schemaid_preds.size()>1){
                        string node_type = split_queries[l]->query_plan[0]->node_type();
                        if(node_type == "filter_join"){
                                filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
@@ -2055,9 +2096,17 @@ for(q=0;q<hfta_sets.size();++q){
                lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
 */
 
-               snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
-               query_names.push_back(split_queries[l]->query_name);
-               mach_query_names[lmach].push_back(query_names.size()-1);
+               snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
+               snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
+
+// STOPPED HERE need to figure out how to generate the code that Vlad needs
+//             from snap_postion
+
+// TODO NOTE : I'd like it to be the case that registration_query_names
+//     are the queries to be registered for subsciption.
+//     but there is some complex bookkeeping here.
+               registration_query_names.push_back(split_queries[l]->query_name);
+               mach_query_names[lmach].push_back(registration_query_names.size()-1);
 //                     NOTE: I will assume a 1-1 correspondance between
 //                     mach_query_names[lmach] and lfta_mach_lists[lmach]
 //                     where mach_query_names[lmach][i] contains the index into
@@ -2191,7 +2240,7 @@ if (partitioned_mode) {
 }
 
 
-print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
+print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
 
 int num_hfta = hfta_list.size();
 for(i=0; i < hfta_list.size(); ++i){
@@ -2204,7 +2253,7 @@ for(i=num_hfta; i < hfta_list.size(); ++i){
                Schema->append_table(td);
 }
 
-print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
+print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
 
 
 
@@ -2276,7 +2325,6 @@ for(i=0;i<hfta_list.size();++i){          // query also has an HFTA component
 //                     if(hfta_namespace == "")
 //                             warning_str += "\tnamespace not found.\n";
 
-// STOPPED HERE
 //     There is a get_tbl_keys method implemented for qp_nodes,
 //     integrate it into steam_query, then call it to find keys,
 //     and annotate feidls with their key-ness.
@@ -2341,7 +2389,7 @@ for(i=0;i<hfta_list.size();++i){          // query also has an HFTA component
                                missing_keys.push_back(hfta_keys[ki]);
                  }
                  if(missing_keys.size()>0){
-                       fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
+                       fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
                        for(int hi=0; hi<missing_keys.size(); ++hi){
                                fprintf(stderr," %s", missing_keys[hi].c_str());
                        }
@@ -2463,7 +2511,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                int li;
                for(li=0;li<mach_lftas.size();++li){
                        vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
-                       string lfta_iface = tvec[0]->get_interface();
+                       string lfta_iface = "_local_";
+                       if(tvec.size()>0){
+                               string lfta_iface = tvec[0]->get_interface();
+                       }
                        lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
                }
 
@@ -2506,7 +2557,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
 
 
 //                     Find common filter predicates in the LFTAs.
-//                     in addition generate structs to store the temporal attributes unpacked by prefilter
+//                     in addition generate structs to store the
+//                     temporal attributes unpacked by prefilter
+//                     compute & provide interface for per-interface
+//                     record extraction properties
        
        map<string, vector<stream_query *> >::iterator ssqi;
        for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
@@ -2524,7 +2578,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                                                         // for fta_init
                for(li=0;li<mach_lftas.size();++li){
                        vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
-                       string lfta_iface = tvec[0]->get_interface();
+                       string lfta_iface = "_local_";
+                       if(tvec.size()>0){
+                               lfta_iface = tvec[0]->get_interface();
+                       }
                        lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
                        lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
                }
@@ -2535,11 +2592,14 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
 //     But this is defunct code for gs-lite
                for(li=0;li<mach_lftas.size();++li){
                  vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
-                 string liface = tvec[0]->get_interface();
+                 string liface = "_local_";
+                 if(tvec.size()>0){
+                        liface = tvec[0]->get_interface();
+                 }
                  vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
                  if(iface_codegen_type.size()){
                        if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
-                         packed_return = true;
+                       packed_return = true;
                        }
                  }
                }
@@ -2603,10 +2663,13 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                        lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
 #endif
                map<string, vector<long long int> > lfta_sigs; // used again later
+               map<string, int> lfta_snap_pos; // optimize csv parsing
+                                                                       // compute now, use in get_iface_properties
                for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
                        string liface = (*mvsi).first;
                        vector<long long int> empty_list;
                        lfta_sigs[liface] = empty_list;
+                       lfta_snap_pos[liface] = -1;
 
                        vector<col_id_set> lfta_cols;
                        vector<int> lfta_snap_length;
@@ -2620,7 +2683,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                                }
                                lfta_sigs[liface].push_back(mask);
                                lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
-                               lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
+                               lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
+                               int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
+                               if(this_snap_pos > lfta_snap_pos[liface])
+                                       lfta_snap_pos[liface]  = this_snap_pos;
                        }
 
 //for(li=0;li<mach_lftas.size();++li){
@@ -2670,7 +2736,10 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                        lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
 
                // iterate through interface properties
-               vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
+               vector<string> iface_properties;
+               if(*sir!="_local_"){    // dummy watchlist interface, don't process.
+                       iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
+               }
                if (erri) {
                        fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
                        exit(1);
@@ -2694,6 +2763,8 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                                }
                                lfta_val[lmach] += "\";\n";
                        }
+                       lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
+                       lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
                        lfta_val[lmach] += "\t\t} else\n";
                        lfta_val[lmach] += "\t\t\treturn NULL;\n";
                }
@@ -2707,11 +2778,25 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
          lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
          lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
 
-         for (i = 0; i < query_names.size(); ++i) {
-                  if (i)
-                         lfta_val[lmach] += ", ";
-                  lfta_val[lmach] += "\"" + query_names[i] + "\"";
+         bool first = true;
+         for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
+               string liface = (*mvsi).first;
+               if(liface != "_local_"){        // these don't register themselves
+                       vector<stream_query *> lfta_list = (*mvsi).second;
+                       for(i=0;i<lfta_list.size();i++){
+                               int mi = lfta_iface_qname_ix[liface][i];
+                               if(first) first = false;
+                               else      lfta_val[lmach] += ", ";
+                               lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
+                       }
+               }
          }
+//       for (i = 0; i < registration_query_names.size(); ++i) {
+//                if (i)
+//                       lfta_val[lmach] += ", ";
+//                lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
+//       }
+
          for (i = 0; i < hfta_list.size(); ++i) {
                   lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
          }
@@ -2723,6 +2808,7 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
 //     set the prefilter function accordingly.
 //     see the example in demo/err2
          lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
+         lfta_val[lmach] += "//        note: the last parameter in fta_register is the prefilter signature\n";
 
 //       for(i=0;i<mach_query_names[lmach].size();i++)
 //             int mi = mach_query_names[lmach][i];
@@ -2734,6 +2820,13 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                for(i=0;i<lfta_list.size();i++){
                        stream_query *lfta_sq = lfta_list[i];
                        int mi = lfta_iface_qname_ix[liface][i];
+
+                       if(liface == "_local_"){
+//  Don't register an init function, do the init code inline
+                               lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
+                               lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
+                               continue;
+                       }
                
                        fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
 
@@ -2741,7 +2834,7 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                        if(interface_names[mi]!="")
                                this_iface = '"'+interface_names[mi]+'"';
                        lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
-               lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
+               lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
 //             if(interface_names[mi]=="")
 //                             lfta_val[lmach]+="DEFAULTDEV";
 //             else
@@ -2749,8 +2842,8 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                        lfta_val[lmach] += this_iface;
 
 
-               lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
-                       +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
+               lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
+                       +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
                        +"\n#endif\n";
                                sprintf(tmpstr,",%d",snap_lengths[mi]);
                        lfta_val[lmach] += tmpstr;
@@ -2812,7 +2905,7 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
 
 //                     NOTE : I'm assuming that visible lftas do not start with _fta.
 //                             -- will fail for non-visible simple selection queries.
-               if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
+               if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
                        string warning_str;
                        if(lfta_comment == "")
                                warning_str += "\tcomment not found.\n";
@@ -2829,26 +2922,30 @@ for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_e
                        }
                        if(warning_str != "")
                                fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
-                                       query_names[mi].c_str(),warning_str.c_str());
+                                       registration_query_names[mi].c_str(),warning_str.c_str());
                }
 
 
 //                     Create qtree output
-               fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
-                  if(lfta_comment != "")
-                        fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
-                  if(lfta_title != "")
-                        fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
-                  if(lfta_namespace != "")
-                        fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
-                  if(lfta_ht_size != "")
-                        fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
+               fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
+        if(lfta_comment != "")
+              fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
+        if(lfta_title != "")
+              fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
+        if(lfta_namespace != "")
+              fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
+        if(lfta_ht_size != "")
+              fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
                if(lmach != "")
                  fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
                else
                  fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
                fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
-               fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
+               std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
+               for(int t=0;t<itbls.size();++t){
+                       fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
+               }
+//             fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
                fprintf(qtree_output,"\t\t<Rate value='100' />\n");
                fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
 //                             write info about fields to qtree.xml
@@ -2996,6 +3093,9 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
 //             for an interface type
 
        bool use_proto = false;
+       bool use_bsa = false;
+       bool use_kafka = false;
+       bool use_ssl = false;   
        int erri;
        string err_str;
        for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
@@ -3003,9 +3103,47 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
                vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
                for(int ift_i=0;ift_i<ift.size();ift_i++){
                        if(ift[ift_i]=="PROTO"){
+#ifdef PROTO_ENABLED                           
                                use_proto = true;
+#else
+                               fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
+                               exit(0);
+#endif
                        }
                }
+               ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
+               for(int ift_i=0;ift_i<ift.size();ift_i++){
+                       if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
+#ifdef BSA_ENABLED                             
+                               use_bsa = true;
+#else
+                               fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
+                               exit(0);
+#endif                                 
+                       }
+               }
+               ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
+               for(int ift_i=0;ift_i<ift.size();ift_i++){
+                       if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
+#ifdef KAFKA_ENABLED                           
+                               use_kafka = true;
+#else
+                               fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
+                               exit(0);
+#endif 
+                       }
+               }
+               ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
+               for(int ift_i=0;ift_i<ift.size();ift_i++){
+                       if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
+#ifdef SSL_ENABLED                             
+                               use_ssl = true;
+#else
+                               fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
+                               exit(0);
+#endif 
+                       }
+               }               
        }
 
        fprintf(outfl,
@@ -3026,15 +3164,26 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
        if(use_pads)
                fprintf(outfl,"-lgscppads -lpads ");
        fprintf(outfl,
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
        if(use_pads)
                fprintf(outfl, " -lpz -lz -lbz ");
        if(libz_exists && libast_exists)
                fprintf(outfl," -last ");
        if(use_pads)
                fprintf(outfl, " -ldll -ldl ");
-       if(use_proto)
-               fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
+
+#ifdef PROTO_ENABLED   
+       fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
+#endif
+#ifdef BSA_ENABLED     
+       fprintf(outfl, " -lbsa_stream ");
+#endif
+#ifdef KAFKA_ENABLED   
+       fprintf(outfl, " -lrdkafka ");
+#endif
+#ifdef SSL_ENABLED     
+       fprintf(outfl, " -lssl -lcrypto ");
+#endif
        fprintf(outfl," -lgscpaux");
 #ifdef GCOV
        fprintf(outfl," -fprofile-arcs");
@@ -3117,6 +3266,9 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
 //     string err_str;
        for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
                string ifnm = (*ssi);
+               // suppress internal _local_ interface
+               if (ifnm == "_local_")
+                       continue;
                fprintf(outfl, "%s ",ifnm.c_str());
                vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
                for(j=0;j<ifv.size();++j)