-/* ------------------------------------------------
-Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#include<stdio.h>
-#include<stdlib.h>
-
-#include<string.h>
-#include<string>
-#include<vector>
-#include<map>
-#include<set>
-#include<list>
-#include<time.h>
-#include <unistd.h>
-
-#include"xml_t.h"
-
-#include"qnode.h"
-
-using namespace std;
-
-extern int xmlParserparse(void);
-extern FILE *xmlParserin;
-extern int xmlParserdebug;
-xml_t *xml_result;
-
-int init_discard = 12;
-
-int rts_load_history_len = 3;
-double hist_multiplier = 0.8;
-bool uniform_rts_alloc = true;
-
-#define LINEBUF 1000
-#define SPLITBUF 20
-
-double min_hfta_insz = 1000000.0;
-double min_hfta_cpu = 0.2;
-
-double cpu_util_threshold = 0.9;
-double crate_hi=.01; // upper bound on collision rate
-double crate_lo=.002; // lower bound, increase HT size
-double erate_hi=.01; // upper bound on eviction ratemsvii
-int htmax = 1000;
-double xfer_costs[4] = {.1, .1, .3, 1.0};
-
-
-int split_string(char *instr,char sep, char **words,int max_words){
- char *loc;
- char *str;
- int nwords = 0;
-
- str = instr;
- words[nwords++] = str;
- while( (loc = strchr(str,sep)) != NULL){
- *loc = '\0';
- str = loc+1;
- if(nwords >= max_words){
- fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
- nwords = max_words-1;
- }
- words[nwords++] = str;
- }
-
- return(nwords);
-}
-
-string int_to_string(int i){
- string ret;
- char tmpstr[100];
- sprintf(tmpstr,"%d",i);
- ret=tmpstr;
- return(ret);
-}
-
-
-
-
-
-struct fta_addr{
- int ip;
- int port;
- int streamid;
-
- fta_addr(int i, int p, int s){
- ip=i;
- port=p;
- streamid=s;
- }
-};
-
-struct cmpr_fta_addr{
- bool operator()(fta_addr const &a, fta_addr const &b) const{
- if(a.ip < b.ip)
- return true;
- if(a.ip > b.ip)
- return false;
- if(a.port < b.port)
- return true;
- if(a.port > b.port)
- return false;
- if(a.streamid < b.streamid)
- return true;
- return false;
- }
-};
-
-bool cmpr_parallel_idx(const qnode *a, const qnode *b){
- return a->par_index < b->par_index;
-}
-
-struct cpu_info_str{
- int processor_id;
- int socket_id;
- int core_id;
-
- double assigned_load;
-
- cpu_info_str(int p, int s, int c){
- processor_id=p;
- socket_id=s;
- core_id=c;
- assigned_load = 0.0;
- }
-
- string to_csv(){
- char buf[200];
- sprintf(buf,"%d,%d,%d",processor_id,socket_id,core_id);
- return string(buf);
- }
-
- int distance_from(cpu_info_str *other){
- if(socket_id != other->socket_id)
- return 3;
- if(core_id != other->core_id)
- return 2;
- if(processor_id != other->processor_id)
- return 1;
- return 0;
- }
-};
-
-bool cmpr_cpu_info(cpu_info_str const *a, cpu_info_str const *b){
- if(a->socket_id < b->socket_id)
- return true;
- if(a->socket_id > b->socket_id)
- return false;
- if(a->core_id < b->core_id)
- return true;
- if(a->core_id > b->core_id)
- return false;
- if(a->processor_id < b->processor_id)
- return true;
- return false;
-}
-
-
-
-
-int main(int argc, char **argv){
- int i,j,s;
-
- time_t now = time(NULL);
- tm *now_tm = localtime(&now);
- int year=now_tm->tm_year;
-
-
-// Options
- string src_dir="";
- string trace_file="";
- string resource_log_file = "resource_log.csv";
-
- const char *optstr = "d:r:i:l:m:UNu:s:C:c:E:0:1:2:";
- const char *usage_str =
-"Usage: %s [options] trace_file\n"
-"\t-d source_directory\n"
-"\t-r resource_log_file\n"
-"\t-i initial discard from the resource log\n"
-"\t-s default interface hash table length.\n"
-"\t-l rts load history length\n"
-"\t-m rts load history multiplier\n"
-"\t-U All rts interface hashes are the same.\n"
-"\t-N rts interface hashes are processed independently.\n"
-"\t-u max cpu utilization threshold\n"
-"\t-C upper bound on collision rate.\n"
-"\t-c lower bound on collision rate.\n"
-"\t-E upper bound on the eviction rate.\n"
-"\t-0 communication cost multiplier for 0-distance processes.\n"
-"\t-1 communication cost multiplier for 1-distance processes.\n"
-"\t-2 communication cost multiplier for 2-distance processes.\n"
-;
-
- char chopt;
- while((chopt = getopt(argc,argv,optstr)) != -1){
- switch(chopt){
- case '0':
- xfer_costs[0] = atof(optarg);
- if(xfer_costs[0] < 0 || xfer_costs[0] > 1){
- fprintf(stderr,"ERROR, 0-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[0]);
- exit(1);
- }
- break;
- case '1':
- xfer_costs[1] = atof(optarg);
- if(xfer_costs[1] < 0 || xfer_costs[1] > 1){
- fprintf(stderr,"ERROR, 1-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[1]);
- exit(1);
- }
- break;
- case '2':
- xfer_costs[2] = atof(optarg);
- if(xfer_costs[2] < 0 || xfer_costs[2] > 1){
- fprintf(stderr,"ERROR, 2-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[2]);
- exit(1);
- }
- break;
- case 'C':
- crate_hi = atof(optarg);
- if(crate_hi < 0 || crate_hi>1){
- fprintf(stderr,"ERROR, request to set crate_hi to %f\n must be in [0,1].\n",hist_multiplier);
- exit(1);
- }
- break;
- case 'c':
- crate_lo = atof(optarg);
- if(crate_lo < 0 || crate_lo>1){
- fprintf(stderr,"ERROR, request to set crate_lo to %f\n must be in [0,1].\n",hist_multiplier);
- exit(1);
- }
- break;
- case 'E':
- erate_hi = atof(optarg);
- if(erate_hi < 0 || erate_hi>1){
- fprintf(stderr,"ERROR, request to set erate_hi to %f\n must be in [0,1].\n",hist_multiplier);
- exit(1);
- }
- break;
- case 's':
- htmax = atoi(optarg);
- if(htmax <= 0){
- fprintf(stderr,"ERROPR, htmax set to %d, must be positive nonzero.\n",htmax);
- exit(1);
- }
- break;
- case 'm':
- hist_multiplier = atof(optarg);
- if(hist_multiplier <= 0){
- fprintf(stderr,"ERROR, request to set hist_multiplier to %f\n must be positive nonzero.\n",hist_multiplier);
- exit(1);
- }
- break;
- case 'u':
- cpu_util_threshold = atof(optarg);
- if(cpu_util_threshold<=0 || cpu_util_threshold>1){
- fprintf(stderr,"ERROR, cpu_threshold set to %f, must be in (0,1].\n",cpu_util_threshold);
- exit(1);
- }
- break;
- case 'U':
- uniform_rts_alloc=true;
- break;
- case 'N':
- uniform_rts_alloc=false;
- break;
- case 'd':
- src_dir = optarg;
- break;
- case 'r':
- resource_log_file = optarg;
- break;
- case 'i':
- init_discard = atoi(optarg);
- if(init_discard < 0){
- init_discard=0;
- fprintf(stderr,"ERROR, atttempting to set init_discard to a negative value (%d), setting to zero.\n",init_discard);
- }
- break;
- case 'l':
- rts_load_history_len = atoi(optarg);
- if(rts_load_history_len < 0){
- rts_load_history_len=0;
- fprintf(stderr,"ERROR, atttempting to set rts_load_history_len to a negative value (%d), setting to zero.\n",rts_load_history_len);
- }
- break;
- case '?':
- fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
- fprintf(stderr,"%s\n", usage_str);
- exit(1);
- default:
- fprintf(stderr,"Invalid arguments\n");
- fprintf(stderr,"%s\n", usage_str);
- exit(1);
- }
- }
- argc -= optind;
- argv += optind;
- if (argc > 0)
- trace_file = argv[0];
-
- if(trace_file == ""){
- fprintf(stderr, usage_str, argv[0]);
- exit(1);
- }
-
-// for month string-to-int conversion
- const char *months_str[12] = {"Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"};
- map<string, int> month_str_to_int;
- for(i=0;i<12;++i)
- month_str_to_int[months_str[i]] = i;
-
-
- FILE *qtree_fl = NULL;
- string qtree_flname = src_dir + "/" + "qtree.xml";
- string actual_qtree_flname;
- if(src_dir != ""){
- qtree_fl = fopen(qtree_flname.c_str(),"r");
- actual_qtree_flname = qtree_flname;
- }
- if(qtree_fl == NULL){
- qtree_fl = fopen("qtree.xml","r");
- actual_qtree_flname = "qtree.xml";
- }
- if(qtree_fl == NULL){
- fprintf(stderr,"ERROR, can't open ");
- if(src_dir != ""){
- fprintf(stderr,"%s or ",qtree_flname.c_str());
- }
- fprintf(stderr,"qtree.xml, exiting.\n");
- exit(1);
- }
-
-
-// Parse the qtree.xml file
- xmlParser_setfileinput(qtree_fl);
- if(xmlParserparse()){
- fprintf(stderr,"ERROR, could not parse query tree file %s\n",actual_qtree_flname.c_str());
- }
-
-// Get the lfta, hfta nodes
- xml_t *xroot = xml_result;
- vector<xml_t *> xqnodes;
- xroot->get_roots("HFTA",xqnodes);
- xroot->get_roots("LFTA",xqnodes);
-
- map<string,int> qname_to_idx;
- map<string,int> exe_to_idx;
- vector<qnode *> qnode_list;
-
-// Build the qnodes
- for(i=0;i<xqnodes.size();++i){
- string qname;
- if(xqnodes[i]->get_attrib_val("name",qname)){
-//printf("node type = %s, name=%s\n",xqnodes[i]->name.c_str(),qname.c_str());
- qnode *qn = new qnode(xqnodes[i]->name,qname,init_discard);
- for(s=0;s<xqnodes[i]->subtrees.size();++s){
- xml_t *xsub = xqnodes[i]->subtrees[s];
- if(xsub->name == "Field"){
- string fname;
- bool nret = xsub->get_attrib_val("name",fname);
- string fpos;
- bool pret = xsub->get_attrib_val("pos",fpos);
- string ftype;
- bool tret = xsub->get_attrib_val("type",ftype);
- string fmods;
- bool mret = xsub->get_attrib_val("mods",fmods);
- if(nret && pret && tret){
- field_str *fld = new field_str(fname,atoi(fpos.c_str()),ftype,fmods);
- qn->add_field(fld);
- }else{
- fprintf(stderr,"---> subtree %d of FTA %s has an malformed field.\n",s,qname.c_str());
- }
- }
- if(xsub->name == "HtSize"){
- string src;
- bool sret = xsub->get_attrib_val("value",src);
- if(sret){
- int htsize = atoi(src.c_str());
- if(htsize > 0){
- unsigned int naggrs = 1; // make it power of 2
- unsigned int nones = 0;
- while(htsize){
- if(htsize&1)
- nones++;
- naggrs = naggrs << 1;
- htsize = htsize >> 1;
- }
- if(nones==1) // in case it was already a power of 2.
- naggrs/=2;
- qn->aggr_tbl_size = naggrs;
- }else{
- fprintf(stderr,"---> subtree %d of FTA %s has an invalid HtSize (%s).\n",s,qname.c_str(),src.c_str());
- }
- }else{
- fprintf(stderr,"---> subtree %d of FTA %s has an malformed HtSize.\n",s,qname.c_str());
- }
- }
- if(xsub->name == "ReadsFrom"){
- string src;
- bool sret = xsub->get_attrib_val("value",src);
- if(sret){
- qn->add_source(src);
- }else{
- fprintf(stderr,"---> subtree %d of FTA %s has an malformed ReadsFrom.\n",s,qname.c_str());
- }
- }
- if(xsub->name == "Interface"){
- string iface;
- bool sret = xsub->get_attrib_val("value",iface);
- if(sret){
- qn->src_interface = iface;
- }
- }
- if(xsub->name == "FileName"){
- string full_fname;
- bool sret = xsub->get_attrib_val("value",full_fname);
- if(sret){
- size_t dotpos = full_fname.find_first_of('.');
- if(dotpos != string::npos){
- qn->executable_name = full_fname.substr(0,dotpos);
- }
- }
- }
- }
- qname_to_idx[qname] = qnode_list.size();
- if(qn->executable_name != "" && qn->executable_name != "rts")
- exe_to_idx[qn->executable_name] = qnode_list.size();
- qnode_list.push_back(qn);
- }else{
- fprintf(stderr,"---> node type %s, no name.\n",xqnodes[i]->name.c_str());
- }
- }
-
-
- bool error = false;
- for(i=0;i<qnode_list.size();++i){
- if(qnode_list[i]->qnode_type == "HFTA"){
- for(s=0;s<qnode_list[i]->reads_from.size();++s){
- if(qname_to_idx.count(qnode_list[i]->reads_from[s])>0){
- int src_id = qname_to_idx[qnode_list[i]->reads_from[s]];
- qnode_list[i]->reads_from_idx.push_back(src_id);
- qnode_list[src_id]->sources_to_idx.push_back(i);
- }else{
- fprintf(stderr,"ERROR, hfta %s reads_from %s, but its not in qtree.xml.\n",qnode_list[i]->name.c_str(),qnode_list[i]->reads_from[s].c_str());
- error = true;
- }
- }
- }
- }
-
- if(error)
- exit(1);
-
-/*
- for(i=0;i<qnode_list.size();++i){
- printf("node %s reads_from:",qnode_list[i]->name.c_str());
- for(s=0;s<qnode_list[i]->reads_from.size();++s){
- printf(" %s",qnode_list[i]->reads_from[s].c_str());
- }
- printf("\nand sources to:\n");
- for(s=0;s<qnode_list[i]->sources_to_idx.size();++s){
- printf(" %s",qnode_list[qnode_list[i]->sources_to_idx[s]]->name.c_str());
- }
- printf("\n\n");
- }
-*/
-
- string tracefilename = trace_file;
- if(src_dir != ""){
- tracefilename = src_dir + "/" + trace_file;
- }
- FILE *trace_fl = NULL;
- if((trace_fl = fopen(tracefilename.c_str(),"r"))==NULL){
- fprintf(stderr,"ERROR, can't open trace file %s\n",tracefilename.c_str());
- exit(1);
- }
-
- map<fta_addr,string, cmpr_fta_addr> qnode_map;
- map<int, perf_struct *> rts_perf_map;
- map<int, string> rts_iface_map;
- map<string, int> pid_iface_map;
-
- tm time_str;
- char inp[LINEBUF],line[LINEBUF],*saveptr;
- unsigned int hbeat_ip, hbeat_port, hbeat_index, hbeat_streamid, hbeat_trace_id,hbeat_ntraces;
- while(fgets(inp,LINEBUF,trace_fl)){
-
-// Try to grab the timestamp
- time_t tick;
- int pid;
- strncpy(line,inp,LINEBUF);
- char mon_str[4];
- int mon=0,day,hr,mn,sec;
- int nret = sscanf(line,"%c%c%c %d %d:%d:%d",mon_str,mon_str+1,mon_str+2,&day,&hr,&mn,&sec);
- if(nret >= 7){
- mon_str[3] = '\0';
- if(month_str_to_int.count(mon_str)>0){
- mon = month_str_to_int[mon_str];
- }else{
- fprintf(stderr,"Warning, %s not recognized as a month string.\n",mon_str);
- }
- time_str.tm_sec = sec;
- time_str.tm_min = mn;
- time_str.tm_hour = hr;
- time_str.tm_mday = day;
- time_str.tm_mon = mon;
- time_str.tm_year = year;
- tick = mktime(&time_str);
-//printf("mon=%d, day=%d, hr=%d, mn=%d, sec=%d, tick=%d\n",mon,day,hr,mn,sec,tick);
- }
-
-// Grab the process ID
- strncpy(line,inp,LINEBUF);
- int tmp_pid;
- pid = -1;
- char *segment = strtok_r(line,"[",&saveptr);
- if(segment!=NULL){
- segment = strtok_r(NULL,"[",&saveptr);
- nret = sscanf(segment,"%d]",&tmp_pid);
- if(nret>=1){
- pid=tmp_pid;
- }
- }
-
-// Grab address-to-hfta mappings
- strncpy(line,inp,LINEBUF);
- segment = strtok_r(line,"]",&saveptr);
- if(segment != NULL){
- segment = strtok_r(NULL," ",&saveptr);
- segment = strtok_r(NULL," ",&saveptr);
- }
-//printf("segmetn=<%s>, comparison=%d\n",segment,strcmp(segment,"Lookup"));
- if(segment!=NULL && strcmp(segment,"Lookup")==0){
- int pos=0;
- char fta_name[LINEBUF];
- int ip;
- int port;
- int index;
- int streamid;
- int nret = 0;
- while(segment != NULL){
-//printf("pos=%d, segment = %s\n",pos, segment);
-
- if(pos==3){
- strncpy(fta_name,segment,LINEBUF);
- }
-
- if(pos==6)
- nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&ip,&port,&index,&streamid);
-
- pos++;
- segment = strtok_r(NULL," ",&saveptr);
- }
- if(nret>0){
-//printf("nret=%d, fta_name=%s,ip=%d,port=%d,index=%d,streamid=%d\n",nret,fta_name,ip,port,index,streamid);
- fta_addr addr(ip,port,streamid);
- qnode_map[addr] = fta_name;
-//printf("Adding fta %s, (%d %d %d) to qnode_map\n",fta_name,addr.ip,addr.port,addr.streamid);
- continue;
- }
- }
- if(segment!=NULL && strcmp(segment,"Lfta")==0){
- int pos=0;
- char fta_name[LINEBUF];
- int ip;
- int port;
- int index;
- int streamid;
- int nret = 0;
- while(segment != NULL){
-//printf("pos=%d, segment = %s\n",pos, segment);
-
- if(pos==1){
- strncpy(fta_name,segment,LINEBUF);
- }
-
- if(pos==4)
- nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&ip,&port,&index,&streamid);
-
- pos++;
- segment = strtok_r(NULL," ",&saveptr);
- }
- if(nret>0){
-//printf("nret=%d, fta_name=%s,ip=%d,port=%d,index=%d,streamid=%d\n",nret,fta_name,ip,port,index,streamid);
- fta_addr addr(ip,port,streamid);
- qnode_map[addr] = fta_name;
-//printf("Adding lfta %s, (%d %d %d) to qnode_map\n",fta_name,addr.ip,addr.port,addr.streamid);
- continue;
- }
- }
- if(segment!=NULL && strcmp(segment,"Init")==0){
- int pos=0;
- string iface = "";
- string keywd = "";
- while(segment != NULL){
- if(pos==1)
- keywd = segment;
- if(pos==3){
- char *cc;
- for(cc=segment;*cc!='\0';++cc){
- if(*cc == '\n'){
- *cc = '\0';
- break;
- }
- }
- iface = segment;
- }
- pos++;
- segment = strtok_r(NULL," ",&saveptr);
- }
- if(iface!="" && keywd == "LFTAs"){
- rts_perf_map[pid] = new perf_struct(init_discard);
- rts_iface_map[pid] = iface;
- pid_iface_map[iface] = pid;
- }
- }
- if(segment!=NULL && strcmp(segment,"Heartbeat")==0){
- int pos=0;
- int nret=0,nret2=0,nret3=0;
- unsigned int tmp_hbeat_ip, tmp_hbeat_port, tmp_hbeat_index, tmp_hbeat_streamid, tmp_hbeat_trace_id,tmp_hbeat_ntraces;
- while(segment != NULL){
-//printf("pos=%d, segment = %s\n",pos, segment);
- if(pos==4)
- nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&tmp_hbeat_ip,&tmp_hbeat_port,&tmp_hbeat_index,&tmp_hbeat_streamid);
- if(pos==5)
- nret2 = sscanf(segment,"trace_id=%d",&tmp_hbeat_trace_id);
- if(pos==6)
- nret3 = sscanf(segment,"ntrace=%d",&tmp_hbeat_ntraces);
- pos++;
- segment = strtok_r(NULL," ",&saveptr);
- }
- if(nret>=4 && nret2 >= 1 && nret3 == 1){
- hbeat_ip = tmp_hbeat_ip;
- hbeat_port = tmp_hbeat_port;
- hbeat_index = tmp_hbeat_index;
- hbeat_streamid = tmp_hbeat_streamid;
- hbeat_trace_id = tmp_hbeat_trace_id;
- hbeat_ntraces = tmp_hbeat_ntraces;
-// printf("ip=%d, port=%d, index=%d, streamid=%d hbeat_trace_id=%d\n",hbeat_ip,hbeat_port,hbeat_index,hbeat_streamid,hbeat_trace_id);
- fta_addr hb_addr(hbeat_ip,hbeat_port,tmp_hbeat_streamid);
- if(qnode_map.count(hb_addr) == 0){
- hb_addr.streamid = 0; // maybe an hfta?
- if(qnode_map.count(hb_addr) == 0){
- hbeat_port = 0;
-//printf("Hbeat streamid=%d no match (%d,%d), hbeat_trace_id=%d\n",hbeat_streamid,hbeat_ip,hbeat_port,hbeat_trace_id);
- }
- } else{
-//printf("Hbeat streamid=%d matches %s, hbeat_trace_id=%d\n",hbeat_streamid,qnode_map[hb_addr].c_str(),hbeat_trace_id);
- }
- }else{
- printf("Couldn't parse as hearbeat %s\n",inp);
- }
- }
- if(segment!=NULL && strncmp(segment,"trace_id=",8)==0){
- int pos=0;
- int nret=0,nret2=0,nret3=0;
- unsigned long long int trace_id;
- unsigned int tr_pos,tr_ip,tr_port,tr_index,tr_streamid;
- unsigned int tr_intup,tr_outtup,tr_outsz,tr_acctup,tr_cycles;
- unsigned int tr_evictions,tr_collisions;
- double tr_sample;
- nret = sscanf(segment,"trace_id=%llu",&trace_id);
- while(segment != NULL){
-//printf("pos=%d, segment = %s\n",pos, segment);
- if(pos==0)
- nret = sscanf(segment,"trace_id=%llu",&trace_id);
- if(pos==1)
- nret2 = sscanf(segment,"trace[%d].ftaid={ip=%u,port=%u,index=%u,streamid=%u}",&tr_pos,&tr_ip,&tr_port,&tr_index,&tr_streamid);
- if(pos==2)
- nret3 = sscanf(segment,"fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%u,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%lf}",
- &tr_intup, &tr_outtup, &tr_outsz, &tr_acctup,&tr_cycles,
- &tr_collisions,&tr_evictions,&tr_sample);
- pos++;
- segment = strtok_r(NULL," ",&saveptr);
- }
- if(nret>=1 && nret2>=5 && nret3>=7){
- fta_addr tr_addr(tr_ip,tr_port,tr_streamid);
- if(qnode_map.count(tr_addr)==0)
- tr_addr.streamid = 0; // maybe an hfta?
- if(qnode_map.count(tr_addr)>0){
-//printf("Trace idx=%d streamid=%d matches %s, trace_id=%d, hbeat_trace_id=%d\n",tr_pos,tr_streamid,qnode_map[tr_addr].c_str(),trace_id,hbeat_trace_id);
- if(tr_pos+1 == hbeat_ntraces){
- string qname = qnode_map[tr_addr];
- int qidx = qname_to_idx[qname];
- if(qnode_list[qidx]->start_tick < 0)
- qnode_list[qidx]->start_tick = tick;
- if(qnode_list[qidx]->end_tick < tick)
- qnode_list[qidx]->end_tick = tick;
- qnode_list[qidx]->in_tup += tr_intup;
- qnode_list[qidx]->out_tup += tr_outtup;
- qnode_list[qidx]->out_sz += tr_outsz;
- qnode_list[qidx]->accepted_tup += tr_acctup;
- qnode_list[qidx]->cycles += tr_cycles;
- qnode_list[qidx]->collisions += tr_collisions;
- qnode_list[qidx]->evictions += tr_evictions;
- }
- }
-//else{
-//printf("Trace idx=%d streamid=%d no match (%d,%d), trace_id=%d, hbeat_trace_id=%d\n",tr_pos,tr_streamid,tr_ip,tr_port,trace_id,hbeat_trace_id);
-//}
- }
- }
- }
-
-//printf("qnode_map has %d entries\n",qnode_map.size());
-
-// Open and process performance log info, if any.
- if(src_dir != ""){
- resource_log_file = src_dir + "/" + resource_log_file;
- }
- FILE *res_fl = NULL;
- if((res_fl = fopen(resource_log_file.c_str(),"r"))==NULL){
- fprintf(stderr,"ERROR, can't open trace file %s\n",resource_log_file.c_str());
- exit(1);
- }
-
- char *flds[SPLITBUF];
- int lineno = 0;
- while(fgets(inp,LINEBUF,res_fl)){
- int nflds = split_string(inp,',',flds,SPLITBUF);
- lineno++;
- if(nflds >= 8){
- int ts = atoi(flds[0]);
- string procname = flds[1];
- int pid = atoi(flds[2]);
- unsigned long long int utime = atoll(flds[3]);
- unsigned long long int stime = atoll(flds[4]);
- unsigned long long int vm_size = atoll(flds[5]);
- unsigned long long int rss_size = atoll(flds[6]);
- int pagesize = atoi(flds[7]);
-
- if(procname == "rts"){
- if(rts_perf_map.count(pid)>0){
- if(rts_perf_map[pid]->update(ts,utime,stime,vm_size,rss_size)){
- fprintf(stderr,"Resource log file is corrupted, line %d\n",lineno);
- exit(1);
- }
- }
- }else{
- if(exe_to_idx.count(procname)>0){
- perf_struct *p = qnode_list[exe_to_idx[procname]]->perf;
- if(p->update(ts,utime,stime,vm_size,rss_size)){
- fprintf(stderr,"Resource log file is corrupted, line %d\n",lineno);
- exit(1);
- }
- }
- }
- }
- }
-
-
-
- FILE *rpt_fl = fopen("performance_report.csv","w");
- if(rpt_fl == NULL){
- fprintf(stderr,"Warning, can't open performance_report.csv, can't save the performance report.\n");
- }
-
- char tmpstr[10000];
- printf("Performance report:\n");
- if(rpt_fl) fprintf(rpt_fl,"query_name,process_name,in_tup,out_tup,out_sz,accepted_tup,cycles,collisions,evictions,in_tup_rate,out_tup_rate,out_bytes_rate,cycle_consumption_rate");
- if(rpt_fl) fprintf(rpt_fl,",%s",perf_struct::to_csv_hdr().c_str());
- if(rpt_fl) fprintf(rpt_fl,",packets_sent_to_query,fraction_intup_lost,inferred_read_rate");
- if(rpt_fl) fprintf(rpt_fl,"\n");
-
- map<fta_addr,string>::iterator mpiisi;
- int n_output = 0;
- set<string> found_names;
- for(mpiisi=qnode_map.begin();mpiisi!=qnode_map.end();++mpiisi){
- string qname = (*mpiisi).second;
- if(found_names.count(qname)==0){
- found_names.insert(qname);
- int qidx = qname_to_idx[qname];
- string executable = qnode_list[qidx]->executable_name;
- if(executable == "")
- executable="rts";
-// printf("(%d,%d,%d) -> %s, %d reads-from\n",(*mpiisi).first.ip,(*mpiisi).first.port,(*mpiisi).first.streamid,qname.c_str(),qnode_list[qidx]->reads_from_idx.size());
- printf("query=%s, executable=%s\tintup=%llu, out_tup=%llu, out_sz=%llu, accepted_tup=%llu, cycles=%llu, collisions=%llu, evictions=%llu\n",
- qname.c_str(),qnode_list[qidx]->executable_name.c_str(),
- qnode_list[qidx]->in_tup,
- qnode_list[qidx]->out_tup,
- qnode_list[qidx]->out_sz,
- qnode_list[qidx]->accepted_tup,
- qnode_list[qidx]->cycles,
- qnode_list[qidx]->collisions,
- qnode_list[qidx]->evictions
- );
- if(rpt_fl) fprintf(rpt_fl,"%s,%s,%llu,%llu,%llu,%llu,%llu,%llu,%llu",
- qname.c_str(),qnode_list[qidx]->executable_name.c_str(),
- qnode_list[qidx]->in_tup,
- qnode_list[qidx]->out_tup,
- qnode_list[qidx]->out_sz,
- qnode_list[qidx]->accepted_tup,
- qnode_list[qidx]->cycles,
- qnode_list[qidx]->collisions,
- qnode_list[qidx]->evictions
- );
- double duration = 1.0*(qnode_list[qidx]->end_tick - qnode_list[qidx]->start_tick);
-
- printf("\tin_tup_rate=%f, out_tup_rate=%f, out_byte_rate=%f, cycle_rate=%f\n",qnode_list[qidx]->in_tup/duration,qnode_list[qidx]->out_tup/duration,qnode_list[qidx]->out_sz/duration,qnode_list[qidx]->cycles/duration);
- if(rpt_fl) fprintf(rpt_fl,",%f,%f,%f,%f",qnode_list[qidx]->in_tup/duration,qnode_list[qidx]->out_tup/duration,qnode_list[qidx]->out_sz/duration,qnode_list[qidx]->cycles/duration);
- printf("\t%s\n",qnode_list[qidx]->perf->to_string().c_str());
- if(rpt_fl) fprintf(rpt_fl,",%s",qnode_list[qidx]->perf->to_csv().c_str());
-//if(qnode_list[qidx]->aggr_tbl_size>0){
-//printf("\taggregate table size is %d\n",qnode_list[qidx]->aggr_tbl_size);
-//}
-//if(qnode_list[qidx]->src_interface != ""){
-//printf("\tSource interface is %s\n",qnode_list[qidx]->src_interface.c_str());
-//}
- if(qnode_list[qidx]->reads_from_idx.size()>0){
- unsigned long long int total_sent = 0;
- for(i=0;i<qnode_list[qidx]->reads_from_idx.size();++i){
- total_sent += qnode_list[qnode_list[qidx]->reads_from_idx[i]]->out_tup;
- qnode_list[qidx]->inferred_in_sz += qnode_list[qnode_list[qidx]->reads_from_idx[i]]->out_sz / (qnode_list[qnode_list[qidx]->reads_from_idx[i]]->end_tick-qnode_list[qnode_list[qidx]->reads_from_idx[i]]->start_tick);
- }
- printf("\t%llu packets sent, %f lost, inferred read rate is %f\n",total_sent,(1.0*total_sent-qnode_list[qidx]->in_tup)/total_sent, qnode_list[qidx]->inferred_in_sz);
- if(rpt_fl) fprintf(rpt_fl,",%llu,%f,%f",total_sent,(1.0*total_sent-qnode_list[qidx]->in_tup)/total_sent, qnode_list[qidx]->inferred_in_sz);
- }
- else{
- if(rpt_fl) fprintf(rpt_fl,",,,");
- }
-
- if(rpt_fl) fprintf(rpt_fl,"\n");
- n_output++;
- }
- }
-
-
-
-// Collect performance info about RTSs and determine a better hash partitioning.
-
-// First, grab any existing balancing information
- map<string, vector<int> > prev_rts_loads;
- FILE *rload_fl = NULL;
- rload_fl = fopen("rts_load.cfg","r");
- lineno = 0;
- if(rload_fl != NULL){
- while(fgets(line,LINEBUF,rload_fl)){
- lineno++;
- int nflds = split_string(line,',',flds,SPLITBUF);
- if(nflds>1){
- vector<int> hbounds;
- bool invalid_line=false;
- int prev_val = 0;
- for(i=1;i<nflds;++i){
- int new_val = atoi(flds[i]);
- if(new_val < prev_val)
- invalid_line = true;
- hbounds.push_back(new_val);
- }
- if(! invalid_line){
- prev_rts_loads[flds[0]] = hbounds;
- }else{
- printf("Warning, rts_load.cfg has an invalid entry on line %d, skipping\n",lineno);
- }
- }else{
- printf("Warning, rts_load.cfg has an invalid entry on line %d, skipping\n",lineno);
- }
- }
- }else{
- fprintf(rload_fl,"Warning, can't open rts_load.cfg, skipping and using defualt allocation estimate.\n");
- }
- map<string, vector<int> > new_rts_loads = prev_rts_loads;
- fclose(rload_fl);
-
-// Next, try to grab a history of allocations and resulting cpu loads
- FILE *rtrace_fl = NULL;
- rtrace_fl = fopen("rts_load.trace.txt","r");
- lineno = 0;
- map<string, vector< vector<int> > > iface_alloc_history;
- map<string, vector< vector<double> > > iface_load_history;
- if(rtrace_fl != NULL){
- vector<int> curr_allocation;
- vector<double> curr_load;
- while(fgets(line,LINEBUF,rtrace_fl)){
- int nflds = split_string(line,',',flds,SPLITBUF);
- if(nflds > 2){
- string iface = flds[0];
- string entry = flds[1];
- if(entry == "Previous allocation"){
- curr_allocation.clear();
- for(i=2;i<nflds;++i){
- curr_allocation.push_back(atoi(flds[i]));
- }
- }
- if(entry == "Previous cpu loads"){
- curr_load.clear();
- for(i=2;i<nflds;++i){
- curr_load.push_back(atof(flds[i]));
- }
- if(curr_allocation.size() == curr_load.size()){
- iface_alloc_history[iface].push_back(curr_allocation);
- iface_load_history[iface].push_back(curr_load);
- }
- }
- }
- }
- }
-
-/*
-map<string, vector< vector<int> > >::iterator msvvi;
-for(msvvi=iface_alloc_history.begin();msvvi!=iface_alloc_history.end();++msvvi){
-string iface = (*msvvi).first;
-printf("iface %s past allocations:\n",iface.c_str());
-vector<vector<int> > &alloc = iface_alloc_history[iface];
-printf("alloc size is %d\n",alloc.size());
-for(i=0;i<alloc.size();++i){
-for(j=0;j<alloc[i].size();++j){
-printf("%d ",alloc[i][j]);
-}
-printf("\n");
-}
-printf("\niface %s past loads:\n",iface.c_str());
-vector<vector<double> > &load = iface_load_history[iface];
-printf("load size is %d\n",load.size());
-for(i=0;i<load.size();++i){
-for(j=0;j<load[i].size();++j){
-printf("%f ",load[i][j]);
-}
-printf("\n");
-}
-printf("\n");
-}
-*/
-
-
- map<int, string>::iterator misi;
- map<string, vector<int> > rts_iface_indices;
- map<string, vector<double> > rts_iface_cpu_load;
- for(misi=rts_iface_map.begin();misi!=rts_iface_map.end();++misi){
- int rpid = (*misi).first;
- string riface = (*misi).second;
- size_t Xpos = riface.find_last_of("X");
- if(Xpos!=string::npos){
- string iface = riface.substr(0,Xpos);
-// ifaces_found.insert(iface);
- string ifcopy = riface.substr(Xpos+1);
- int ifidx = atoi(ifcopy.c_str());
- rts_iface_indices[iface].push_back(ifidx);
- }
- printf("pid=%d, rts %s, %s\n",rpid,riface.c_str(),rts_perf_map[rpid]->to_string().c_str());
- if(rpt_fl){
- fprintf(rpt_fl,",rts %s,,,,,,,,,,,,%s,,,,\n",riface.c_str(),rts_perf_map[rpid]->to_csv().c_str());
-
- }
- }
- map<string, vector<int> >::iterator msvi;
- set<string> ifaces_found;
- map<string, vector<double> > ht_cpu_allocs;
- map<string, int> total_ht_sizes;
- for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){
- string iface = (*msvi).first;
- vector<int> &ifindices = (*msvi).second;
- sort(ifindices.begin(),ifindices.end());
-
- double total_cpu = 0.0;
- vector<double> if_cpu;
- for(i=0;i<ifindices.size();++i){
- string full_iface = iface + "X" + int_to_string(ifindices[i]);
- int pid = pid_iface_map[full_iface];
- total_cpu += rts_perf_map[pid]->avg_cpu_time();
- if_cpu.push_back(rts_perf_map[pid]->avg_cpu_time());
- }
- rts_iface_cpu_load[iface] = if_cpu;
-
- vector<int> current_allocation;
- if(new_rts_loads.count(iface) == 0 || new_rts_loads[iface].size() != ifindices.size()){
- int cumm_cpu = 0;
- for(i=0;i<ifindices.size();++i){
- int new_cumm = ((i+1)*htmax)/ifindices.size();
- current_allocation.push_back(new_cumm-cumm_cpu);
- cumm_cpu=new_cumm;
- }
- }else{
- current_allocation = new_rts_loads[iface];
- }
- int total_ht_allocation = 0;
-//printf("Current allocation is:");
- for(i=0;i<current_allocation.size();++i){
- total_ht_allocation += current_allocation[i];
-//printf(" %d",current_allocation[i]);
- }
-
- vector<double> local_alloc(total_ht_allocation,0.0); // estimated cpu per HT slot.
- int ht_ptr = 0;
- for(i=0;i<if_cpu.size();++i){
- double rate = if_cpu[i]/(current_allocation[i]*total_cpu);
- for(j=0;j<current_allocation[i];++j){
- local_alloc[ht_ptr++] = rate;
- }
- }
-
- if(iface_alloc_history.count(iface)>0){
- vector<vector<int> > &alloc = iface_alloc_history[iface];
- vector<vector<double> > &load = iface_load_history[iface];
- int n_remaining = rts_load_history_len;
- double multiplier = hist_multiplier;
- double normalizer = 1.0;
- for(i=alloc.size()-1;i>=0 && n_remaining>0;i--){
- int hist_ht_size = 0;
- for(j=0;j<alloc[i].size();++j)
- hist_ht_size+=alloc[i][j];
- double hist_cpu = 0.0;
- for(j=0;j<load[i].size();++j)
- hist_cpu += load[i][j];
- if(hist_ht_size == total_ht_allocation){
- int ht_ptr = 0;
- for(j=0;j<alloc[i].size();++j){
- double rate = (multiplier*load[i][j])/(alloc[i][j]*hist_cpu);
- int k;
- for(k=0;k<alloc[i][j];k++){
- local_alloc[ht_ptr++] += rate;
- }
- }
- normalizer += multiplier;
- multiplier *= hist_multiplier;
- n_remaining--;
- }
- }
- for(i=0;i<total_ht_allocation;i++){
- local_alloc[i] /= normalizer;
- }
- }
-
-
- total_ht_sizes[iface] = total_ht_allocation;
- ht_cpu_allocs[iface] = local_alloc;
- }
-
- if(uniform_rts_alloc){
-// I will require that if this option is true, all HTs
-// the same size.
- bool same_sizes = true;
- int total_alloc = -1;
- map<string, int >::iterator msi;
- for(msi=total_ht_sizes.begin();msi!=total_ht_sizes.end();++msi){
- if(total_alloc<0){
- total_alloc=(*msi).second;
- }else{
- if(total_alloc != (*msi).second){
- same_sizes = false;
- }
- }
- }
- if(same_sizes){
- vector<double> local_alloc(total_alloc,0.0);
- double normalizer = 0.0;
- map<string, vector<double> >::iterator msvdi;
- for(msvdi=ht_cpu_allocs.begin();msvdi!=ht_cpu_allocs.end();++msvdi){
- string iface = (*msvdi).first;
- for(i=0;i<total_alloc;++i){
- local_alloc[i] += ht_cpu_allocs[iface][i];
- }
- normalizer += 1.0;
- }
- for(i=0;i<total_alloc;++i)
- local_alloc[i] /= normalizer;
- for(msvdi=ht_cpu_allocs.begin();msvdi!=ht_cpu_allocs.end();++msvdi){
- (*msvdi).second = local_alloc;
- }
- }
- }
-
- for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){
- string iface = (*msvi).first;
- vector<int> &ifindices = (*msvi).second;
- sort(ifindices.begin(),ifindices.end());
-
- int cumm_ht_alloc = 0; // ht allocated thus far
- int current_pos = 0; // idx to the current_allocation, if_cpu arrays
- vector<int> new_allocation;
- for(i=0;i<ifindices.size()-1;++i){
- double slot_cpu = 1.0/ifindices.size(); // amount of cpu to allocate
- int current_alloc = 0; // ht allocated in the curr_pos slot
- while(slot_cpu > 0.0){
- slot_cpu -= ht_cpu_allocs[iface][current_pos];
- current_pos++;
- current_alloc++;
- }
- // try to make the allocations even
- if(current_alloc>1 && (-slot_cpu) > (ht_cpu_allocs[iface][current_pos-1]/2.0)){
- current_pos--;
- current_alloc--;
- }
- new_allocation.push_back(current_alloc);
- }
- new_allocation.push_back(total_ht_sizes[iface]-current_pos);
-
-/*
- int cumm_ht_alloc = 0; // ht allocated thus far
- int current_pos = 0; // idx to the current_allocation, if_cpu arrays
- int current_alloc = 0; // ht allocated in the curr_pos slot
- vector<int> new_allocation;
- for(i=0;i<ifindices.size()-1;++i){
- double slot_cpu = total_cpu/ifindices.size(); // amount of cpu to allocate
- int slot_ht = 0;
- while(slot_cpu > 0.0){
- double cpu_rate = if_cpu[current_pos] / current_allocation[current_pos];
- double cpu_remaining = cpu_rate*(current_allocation[current_pos] - current_alloc);
- if(slot_cpu <= cpu_remaining){
- double this_cpu_alloc = slot_cpu;
- int this_ht_alloc = (int)(this_cpu_alloc / cpu_rate);
- slot_ht += this_ht_alloc;
- slot_cpu = 0.0;
- cumm_ht_alloc += this_ht_alloc;
- current_alloc += this_ht_alloc;
- if(current_alloc >= current_allocation[current_pos]){
- current_pos++;
- current_alloc = 0;
- }
- }else{
- slot_cpu -= cpu_remaining;
- slot_ht += current_allocation[current_pos] - current_alloc;
- cumm_ht_alloc += current_allocation[current_pos] - current_alloc;
- current_pos++;
- current_alloc = 0;
- }
- }
- new_allocation.push_back(slot_ht);
- }
- new_allocation.push_back(total_ht_allocation - cumm_ht_alloc);
-*/
-
- new_rts_loads[iface] = new_allocation;
-
-
-/*
-printf("Interface %s:",iface.c_str());
-for(i=0;i<ifindices.size();++i){
-string full_iface = iface + "X" + int_to_string(ifindices[i]);
-int pid = pid_iface_map[full_iface];
-printf(" %f",rts_perf_map[pid]->avg_cpu_time()/total_cpu);
-}
-printf("\n\t");
-for(i=0;i<ifindices.size();++i){
-printf(" %d",prev_rts_loads[iface][i]);
-}
-printf("\n");
-*/
- }
-
-
- FILE *rrec_fl = fopen("rts_load.cfg.recommended","w");
- if(rrec_fl == NULL){
- fprintf(stderr,"Warning, can't open rts_load.cfg.recommended for write, skipping.\n");
- }
-
- printf("Recommended virtual interface hash allocation:\n");
- map<string, vector<int> >::iterator msvii;
- for(msvii=new_rts_loads.begin();msvii!=new_rts_loads.end();++msvii){
- string iface_name = (*msvii).first;
- vector<int> iface_alloc = (*msvii).second;
- printf("%s",iface_name.c_str());
- if(rrec_fl!=NULL) fprintf(rrec_fl,"%s",iface_name.c_str());
- for(i=0;i<iface_alloc.size();++i){
- printf(",%d",iface_alloc[i]);
- if(rrec_fl!=NULL) fprintf(rrec_fl,",%d",iface_alloc[i]);
- }
- printf("\n");
- if(rrec_fl!=NULL) fprintf(rrec_fl,"\n");
- }
- fclose(rrec_fl);
-
-
- rrec_fl = fopen("rts_load.trace.txt","a");
- if(rrec_fl != NULL){
- fprintf(rrec_fl,"\n");
- for(msvii=new_rts_loads.begin();msvii!=new_rts_loads.end();++msvii){
- string iface_name = (*msvii).first;
- vector<int> iface_alloc = (*msvii).second;
- vector<double> iface_cpu_loads = rts_iface_cpu_load[iface_name];
- vector<int> prev_iface_alloc = prev_rts_loads[iface_name];
-
- fprintf(rrec_fl,"Interface %s:\n",iface_name.c_str());
- fprintf(rrec_fl,"%s,Previous allocation,",iface_name.c_str());
- for(i=0;i<prev_iface_alloc.size();++i){
- if(i>0) fprintf(rrec_fl,",");
- fprintf(rrec_fl,"%d",prev_iface_alloc[i]);
- }
- fprintf(rrec_fl,"\n%s,Previous cpu loads,",iface_name.c_str());
- for(i=0;i<iface_cpu_loads.size();++i){
- if(i>0) fprintf(rrec_fl,",");
- fprintf(rrec_fl,"%f",iface_cpu_loads[i]);
- }
- fprintf(rrec_fl,"\n%s,New allocation,",iface_name.c_str());
- for(i=0;i<iface_alloc.size();++i){
- if(i>0) fprintf(rrec_fl,",");
- fprintf(rrec_fl,"%d",iface_alloc[i]);
- }
- fprintf(rrec_fl,"\n\n");
- }
- }
- fclose(rrec_fl);
-
-
-// ----------------------------------------------------------------
-// Make an hfta parallelism analysis. Start by collecting hftas and grouping
-// them by their copies. Count on the __copy%d name mangling.
-
- set<string>::iterator ssi;
- map<string, vector<int> > par_hfta_map;
- for(ssi=found_names.begin();ssi!=found_names.end();++ssi){
- string base = (*ssi);
- int qidx = qname_to_idx[base];
- if(qnode_list[qidx]->qnode_type == "HFTA"){
- size_t cpos = (*ssi).find("__copy");
- if(cpos!=string::npos){
- base = (*ssi).substr(0,cpos);
- string idx_str = (*ssi).substr(cpos+6);
- int pidx = atoi(idx_str.c_str());
- qnode_list[qidx]->par_index = pidx;
- }
- par_hfta_map[base].push_back(qidx);
- }
- }
-
-// Coalesce or split hftas. Reduce parallelism until the max cpu utilization
-// is in [cpu_util_threshold/2, cpu_util_threshold]. Double parallelism
-// if max cpu utilization is > cpu_util_threshold.
-// Only recommend parallelism if a resource utilization file was found.
- if(res_fl!=NULL){
- map<string, int> recommended_parallelism;
- map<string, vector<int> >::iterator msvii;
- for(msvii=par_hfta_map.begin();msvii!=par_hfta_map.end();++msvii){
- vector<int> buddy_indices = (*msvii).second;
- vector<qnode *> buddies;
- int n_valid = 0;
- for(i=0;i<buddy_indices.size();i++){
- buddies.push_back(qnode_list[buddy_indices[i]]);
- if(qnode_list[buddy_indices[i]]->perf->is_valid())
- n_valid++;
- }
-
- if(n_valid>0){
- sort(buddies.begin(),buddies.end(),cmpr_parallel_idx);
-
- int level=1;
- double max_util = 0.0;
- while(level<=buddies.size()){
- for(i=0;i<buddies.size();i+=level){
- double this_util = 0.0;
- for(j=0;j<level;j++){
- this_util += buddies[i+j]->perf->avg_cpu_time();
- }
- if(this_util > max_util)
- max_util = this_util;
- }
- if(max_util >= cpu_util_threshold/2)
- break;
- level *= 2;
- }
- int npar = buddies.size();
- if(max_util > cpu_util_threshold)
- level/=2;
- if(level>buddies.size())
- level/=2;
- if(level==0)
- npar *= 2;
- else
- npar /= level;
-
- recommended_parallelism[(*msvii).first] = npar;
- }else{
- printf("Warning, no resource usage information for %s, skipping.\n",(*msvii).first.c_str());
- }
- }
-
- FILE *hpar_fl = NULL;
- hpar_fl=fopen("hfta_parallelism.cfg","r");
- if(hpar_fl==NULL){
- fprintf(stderr,"Warning, can't open hfta_parallelism.cfg, ignoring.\n");
- }else{
- while(fgets(line,LINEBUF,hpar_fl)){
- int nflds = split_string(line,',',flds,SPLITBUF);
- if(nflds==2){
- int npar = atoi(flds[1]);
- if(npar>0 && recommended_parallelism.count(flds[0])==0){
- recommended_parallelism[flds[0]] = npar;
- }
- }
- }
- fclose(hpar_fl);
- }
-
- FILE *recpar_fl = NULL;
- recpar_fl=fopen("hfta_parallelism.cfg.recommended","w");
- if(recpar_fl==NULL){
- fprintf(stderr,"Warning, can't open hfta_parallelism.cfg.recommended, can't write the file.\n");
- }
- printf("Recommended parallelism:\n");
- map<string, int>::iterator msii;
- for(msii=recommended_parallelism.begin();msii!=recommended_parallelism.end();++msii){
- if(recpar_fl!=NULL)
- fprintf(recpar_fl,"%s,%d\n",(*msii).first.c_str(),(*msii).second);
- printf("%s,%d\n",(*msii).first.c_str(),(*msii).second);
- }
- fclose(recpar_fl);
- }else{
- printf("Can't recommend hfta parallelism, no resource utilization file found.\n");
- }
-
- FILE *rec_ht_fl = NULL;
- rec_ht_fl=fopen("lfta_htsize.cfg.recommended","w");
- if(rec_ht_fl==NULL){
- fprintf(stderr,"Warning, can't open lfta_htsize.cfg.recommended, can't write the file.\n");
- }
- printf("Recommended LFTA hash table sizes:\n");
- for(i=0;i<qnode_list.size();++i){
- qnode *this_qn = qnode_list[i];
- if(this_qn->qnode_type=="LFTA" && this_qn->aggr_tbl_size>0 && this_qn->accepted_tup>0){
- int ht_size = this_qn->aggr_tbl_size;
- double collision_rate = ((double)this_qn->collisions)/this_qn->accepted_tup;
- double eviction_rate = ((double)this_qn->evictions)/this_qn->accepted_tup;
- printf("%s htsize=%d collision=%f evictions=%f",this_qn->name.c_str(),ht_size,collision_rate,eviction_rate);
-//printf("%d,%f,%f\n",ht_size,collision_rate,eviction_rate);
- if(eviction_rate >= erate_hi){
- ht_size /= 2;
- }else if(collision_rate >= crate_hi){
- ht_size *= 2;
- }else if(collision_rate < crate_lo){
- ht_size /= 2;
- }
-
- printf(" rec ht_size=%d\n",ht_size);
- fprintf(rec_ht_fl,"%s,%u\n",this_qn->name.c_str(),ht_size);
-
- }
- }
- fclose(rec_ht_fl);
-
-
-// Try to load the cpu configuration info
-
- vector <cpu_info_str *> cpu_info_list;
- FILE *cinfo_fl = NULL;
- cinfo_fl=fopen("cpu_info.csv","r");
- if(cinfo_fl==NULL){
- fprintf(stderr,"Warning, can't open cpu_info.csv, skipping process pinning optimization. Run gscpv3/bin/parse_cpuinfo.pl to generate a cpu_info.csv file.\n");
- }else{
- int lineno = 0;
- while(fgets(inp,LINEBUF,cinfo_fl)){
- int nflds = split_string(inp,',',flds,SPLITBUF);
- lineno++;
- if(nflds >= 3){
- cpu_info_list.push_back(new cpu_info_str(atoi(flds[0]),atoi(flds[1]),atoi(flds[2])));
- }
- }
-
- sort(cpu_info_list.begin(),cpu_info_list.end(),cmpr_cpu_info);
-
-// Spread the LFTAs among the cores.
- vector<string> iface_names;
- for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){
- string iface = (*msvi).first;
- vector<int> ifindices = (*msvi).second;
- sort(ifindices.begin(),ifindices.end());
-
- for(i=0;i<ifindices.size();++i){
- string full_iface = iface + "X" + int_to_string(ifindices[i]);
- iface_names.push_back(full_iface);
- }
- }
-
-
- map<string, int> rts_assignment;
- double stride = ((double)(cpu_info_list.size()))/((double)(iface_names.size()));
- double rtspos_f = 0.0;
- for(i=0;i<iface_names.size();++i){
- int rtspos = (int)rtspos_f;
- rts_assignment[iface_names[i]] = rtspos;
- cpu_info_list[rtspos]->assigned_load += rts_perf_map[pid_iface_map[iface_names[i]]]->avg_cpu_time();
- rtspos_f += stride;
- }
-
-//for(i=0;i<iface_names.size();++i){
-//printf("Placing %s at %d\n",iface_names[i].c_str(), rts_assignment[iface_names[i]]);
-//}
-
- set<string> eligible_hftas;
- map<string, int> hfta_assignment;
- set<string>::iterator ssi;
- for(ssi = found_names.begin();ssi!=found_names.end();++ssi){
- int qidx = qname_to_idx[(*ssi)];
-//printf("Considering %s (%d), sz=%f, cpu=%f\n",(*ssi).c_str(),qidx,qnode_list[qidx]->inferred_in_sz,qnode_list[qidx]->perf->avg_cpu_time());
- if(qnode_list[qidx]->inferred_in_sz >= min_hfta_insz || qnode_list[qidx]->perf->avg_cpu_time() > min_hfta_cpu){
-//printf("\tAdding to eligible list\n");
- eligible_hftas.insert((*ssi));
- }
- }
-
- while(eligible_hftas.size()>0){
- int chosen_hfta = -1;
- double max_assigned_rate = 0.0;
- for(ssi=eligible_hftas.begin();ssi!=eligible_hftas.end();++ssi){
- double assigned_rate = 0.0;
- string qname = (*ssi);
- int qidx = qname_to_idx[qname];
- vector<int> reads_from = qnode_list[qidx]->reads_from_idx;
- for(i=0;i<reads_from.size();++i){
- if(qnode_list[reads_from[i]]->qnode_type == "LFTA" || (qnode_list[reads_from[i]]->qnode_type == "HFTA" && hfta_assignment.count(qnode_list[reads_from[i]]->name) > 0))
- assigned_rate += qnode_list[reads_from[i]]->output_rate();
- }
-//printf("hfta %s, assigned rate=%f\n",qname.c_str(),assigned_rate);
- if(assigned_rate >= max_assigned_rate){
-//printf("\t picking %s\n",qname.c_str());
- max_assigned_rate = assigned_rate;
- chosen_hfta = qidx;
- }
- }
- if(chosen_hfta >= 0){
- vector<int> reads_from = qnode_list[chosen_hfta]->reads_from_idx;
- vector<int> src_location;
- vector<double> src_volume;
- for(i=0;i<reads_from.size();++i){
- int qidx = reads_from[i];
- if(qnode_list[qidx]->qnode_type == "HFTA"){
- if(hfta_assignment.count(qnode_list[qidx]->name)>0){
- src_location.push_back(hfta_assignment[qnode_list[qidx]->name]);
- src_volume.push_back(qnode_list[qidx]->output_rate());
- }
- }
- if(qnode_list[qidx]->qnode_type == "LFTA"){
- if(rts_assignment.count(qnode_list[qidx]->src_interface)>0){
- src_location.push_back(rts_assignment[qnode_list[qidx]->src_interface]);
- src_volume.push_back(qnode_list[qidx]->output_rate());
- }
- }
- }
-//printf("chosen hfta is %d (%s), sources are:\n",chosen_hfta,qnode_list[chosen_hfta]->name.c_str());
-//for(i=0;i<src_location.size();++i){
-//printf("\tloc=%d, (%s) volume=%f\n",src_location[i],cpu_info_list[src_location[i]]->to_csv().c_str(),src_volume[i]);
-//}
-
- double hfta_cpu_usage = qnode_list[chosen_hfta]->perf->avg_cpu_time();
- if(hfta_cpu_usage > cpu_util_threshold) // hack for overloaded hftas.
- hfta_cpu_usage = cpu_util_threshold * .9999;
-printf("hfta %d (%s) has cpu usage %f\n",chosen_hfta,qnode_list[chosen_hfta]->name.c_str(),hfta_cpu_usage);
- int best_cpu = -1;
- double lowest_cost = 0.0;
- for(i=0;i<cpu_info_list.size();++i){
- double curr_cost = 0.0;
- for(j=0;j<src_location.size();++j){
- int dist = cpu_info_list[i]->distance_from(cpu_info_list[src_location[j]]);
- curr_cost += src_volume[j]*xfer_costs[dist];
- }
-//printf("Cpu %s, cost=%f\n",cpu_info_list[i]->to_csv().c_str(),curr_cost);
- if((cpu_info_list[i]->assigned_load+hfta_cpu_usage < cpu_util_threshold) && (best_cpu<0 || curr_cost <= lowest_cost)){
- best_cpu = i;
- lowest_cost = curr_cost;
-//printf("\tpicking %s\n",cpu_info_list[i]->to_csv().c_str());
- }
- }
-
- if(best_cpu>=0)
- cpu_info_list[best_cpu]->assigned_load += hfta_cpu_usage;
- hfta_assignment[qnode_list[chosen_hfta]->name] = best_cpu;
- eligible_hftas.erase(qnode_list[chosen_hfta]->name);
-
- }else{
- fprintf(stderr,"ERROR, chosen_hfta=-1, bailing out.\n");
- exit(1);
- }
- }
-
- FILE *pin_fl = fopen("pinning_info.csv","w");
- if(pin_fl==NULL){
- fprintf(stderr,"Warning, can't open pinning_info.csv, can't write the file.\n");
- }
- printf("RTS assignments:\n");
- for(i=0;i<iface_names.size();++i){
- int assigned_cpu = rts_assignment[iface_names[i]];
- if(assigned_cpu>=0){
- printf("Place %s at %d (%s)\n",iface_names[i].c_str(), assigned_cpu, cpu_info_list[assigned_cpu]->to_csv().c_str());
- if(pin_fl != NULL){
- fprintf(pin_fl,"rts %s,%d\n",iface_names[i].c_str(), assigned_cpu);
- }
- }
- }
-
- printf("HFTA assignments:\n");
- map<string, int>::iterator msii;
- for(msii=hfta_assignment.begin();msii!=hfta_assignment.end();++msii){
- int assigned_cpu = (*msii).second;
- string qname = (*msii).first;
- if(assigned_cpu>=0){
- printf("Place %s (%s) at %d (%s)\n",qname.c_str(),qnode_list[qname_to_idx[qname]]->executable_name.c_str(),assigned_cpu,cpu_info_list[assigned_cpu]->to_csv().c_str());
- if(pin_fl != NULL){
- fprintf(pin_fl,"%s,%d\n",qnode_list[qname_to_idx[qname]]->executable_name.c_str(), assigned_cpu);
- }
- }
- }
-
- }
-
-//for(i=0;i<cpu_info_list.size();++i){
-//printf("Cpu %d assigned load %f\n",i,cpu_info_list[i]->assigned_load);
-//}
-
-return 0;
-}
-
-
-
+/* ------------------------------------------------\r
+Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#include<stdio.h>\r
+#include<stdlib.h>\r
+\r
+#include<string.h>\r
+#include<string>\r
+#include<vector>\r
+#include<map>\r
+#include<set>\r
+#include<list>\r
+#include<time.h>\r
+#include <unistd.h>\r
+\r
+#include"xml_t.h"\r
+\r
+#include"qnode.h"\r
+\r
+using namespace std;\r
+\r
+extern int xmlParserparse(void);\r
+extern FILE *xmlParserin;\r
+extern int xmlParserdebug;\r
+xml_t *xml_result;\r
+\r
+int init_discard = 12;\r
+\r
+int rts_load_history_len = 3;\r
+double hist_multiplier = 0.8;\r
+bool uniform_rts_alloc = true;\r
+\r
+#define LINEBUF 1000\r
+#define SPLITBUF 20\r
+\r
+double min_hfta_insz = 1000000.0;\r
+double min_hfta_cpu = 0.2;\r
+\r
+double cpu_util_threshold = 0.9;\r
+double crate_hi=.01; // upper bound on collision rate\r
+double crate_lo=.002; // lower bound, increase HT size\r
+double erate_hi=.01; // upper bound on eviction ratemsvii\r
+int htmax = 1000;\r
+double xfer_costs[4] = {.1, .1, .3, 1.0};\r
+\r
+\r
+int split_string(char *instr,char sep, char **words,int max_words){\r
+ char *loc;\r
+ char *str;\r
+ int nwords = 0;\r
+\r
+ str = instr;\r
+ words[nwords++] = str;\r
+ while( (loc = strchr(str,sep)) != NULL){\r
+ *loc = '\0';\r
+ str = loc+1;\r
+ if(nwords >= max_words){\r
+ fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);\r
+ nwords = max_words-1;\r
+ }\r
+ words[nwords++] = str;\r
+ }\r
+\r
+ return(nwords);\r
+}\r
+\r
+string int_to_string(int i){\r
+ string ret;\r
+ char tmpstr[100];\r
+ sprintf(tmpstr,"%d",i);\r
+ ret=tmpstr;\r
+ return(ret);\r
+}\r
+\r
+\r
+\r
+\r
+\r
+struct fta_addr{\r
+ int ip;\r
+ int port;\r
+ int streamid;\r
+\r
+ fta_addr(int i, int p, int s){\r
+ ip=i;\r
+ port=p;\r
+ streamid=s;\r
+ }\r
+};\r
+\r
+struct cmpr_fta_addr{\r
+ bool operator()(fta_addr const &a, fta_addr const &b) const{\r
+ if(a.ip < b.ip)\r
+ return true;\r
+ if(a.ip > b.ip)\r
+ return false;\r
+ if(a.port < b.port)\r
+ return true;\r
+ if(a.port > b.port)\r
+ return false;\r
+ if(a.streamid < b.streamid)\r
+ return true;\r
+ return false;\r
+ }\r
+};\r
+\r
+bool cmpr_parallel_idx(const qnode *a, const qnode *b){\r
+ return a->par_index < b->par_index;\r
+}\r
+\r
+struct cpu_info_str{\r
+ int processor_id;\r
+ int socket_id;\r
+ int core_id;\r
+\r
+ double assigned_load;\r
+\r
+ cpu_info_str(int p, int s, int c){\r
+ processor_id=p;\r
+ socket_id=s;\r
+ core_id=c;\r
+ assigned_load = 0.0;\r
+ }\r
+\r
+ string to_csv(){\r
+ char buf[200];\r
+ sprintf(buf,"%d,%d,%d",processor_id,socket_id,core_id);\r
+ return string(buf);\r
+ }\r
+\r
+ int distance_from(cpu_info_str *other){\r
+ if(socket_id != other->socket_id)\r
+ return 3;\r
+ if(core_id != other->core_id)\r
+ return 2;\r
+ if(processor_id != other->processor_id)\r
+ return 1;\r
+ return 0;\r
+ }\r
+};\r
+\r
+bool cmpr_cpu_info(cpu_info_str const *a, cpu_info_str const *b){\r
+ if(a->socket_id < b->socket_id)\r
+ return true;\r
+ if(a->socket_id > b->socket_id)\r
+ return false;\r
+ if(a->core_id < b->core_id)\r
+ return true;\r
+ if(a->core_id > b->core_id)\r
+ return false;\r
+ if(a->processor_id < b->processor_id)\r
+ return true;\r
+ return false;\r
+}\r
+\r
+\r
+\r
+\r
+int main(int argc, char **argv){\r
+ int i,j,s;\r
+\r
+ time_t now = time(NULL);\r
+ tm *now_tm = localtime(&now);\r
+ int year=now_tm->tm_year;\r
+\r
+\r
+// Options\r
+ string src_dir="";\r
+ string trace_file="";\r
+ string resource_log_file = "resource_log.csv";\r
+\r
+ const char *optstr = "d:r:i:l:m:UNu:s:C:c:E:0:1:2:";\r
+ const char *usage_str =\r
+"Usage: %s [options] trace_file\n"\r
+"\t-d source_directory\n"\r
+"\t-r resource_log_file\n"\r
+"\t-i initial discard from the resource log\n"\r
+"\t-s default interface hash table length.\n"\r
+"\t-l rts load history length\n"\r
+"\t-m rts load history multiplier\n"\r
+"\t-U All rts interface hashes are the same.\n"\r
+"\t-N rts interface hashes are processed independently.\n"\r
+"\t-u max cpu utilization threshold\n"\r
+"\t-C upper bound on collision rate.\n"\r
+"\t-c lower bound on collision rate.\n"\r
+"\t-E upper bound on the eviction rate.\n"\r
+"\t-0 communication cost multiplier for 0-distance processes.\n"\r
+"\t-1 communication cost multiplier for 1-distance processes.\n"\r
+"\t-2 communication cost multiplier for 2-distance processes.\n"\r
+;\r
+\r
+ char chopt;\r
+ while((chopt = getopt(argc,argv,optstr)) != -1){\r
+ switch(chopt){\r
+ case '0':\r
+ xfer_costs[0] = atof(optarg);\r
+ if(xfer_costs[0] < 0 || xfer_costs[0] > 1){\r
+ fprintf(stderr,"ERROR, 0-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[0]);\r
+ exit(1);\r
+ }\r
+ break;\r
+ case '1':\r
+ xfer_costs[1] = atof(optarg);\r
+ if(xfer_costs[1] < 0 || xfer_costs[1] > 1){\r
+ fprintf(stderr,"ERROR, 1-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[1]);\r
+ exit(1);\r
+ }\r
+ break;\r
+ case '2':\r
+ xfer_costs[2] = atof(optarg);\r
+ if(xfer_costs[2] < 0 || xfer_costs[2] > 1){\r
+ fprintf(stderr,"ERROR, 2-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[2]);\r
+ exit(1);\r
+ }\r
+ break;\r
+ case 'C':\r
+ crate_hi = atof(optarg);\r
+ if(crate_hi < 0 || crate_hi>1){\r
+ fprintf(stderr,"ERROR, request to set crate_hi to %f\n must be in [0,1].\n",hist_multiplier);\r
+ exit(1);\r
+ }\r
+ break;\r
+ case 'c':\r
+ crate_lo = atof(optarg);\r
+ if(crate_lo < 0 || crate_lo>1){\r
+ fprintf(stderr,"ERROR, request to set crate_lo to %f\n must be in [0,1].\n",hist_multiplier);\r
+ exit(1);\r
+ }\r
+ break;\r
+ case 'E':\r
+ erate_hi = atof(optarg);\r
+ if(erate_hi < 0 || erate_hi>1){\r
+ fprintf(stderr,"ERROR, request to set erate_hi to %f\n must be in [0,1].\n",hist_multiplier);\r
+ exit(1);\r
+ }\r
+ break;\r
+ case 's':\r
+ htmax = atoi(optarg);\r
+ if(htmax <= 0){\r
+ fprintf(stderr,"ERROPR, htmax set to %d, must be positive nonzero.\n",htmax);\r
+ exit(1);\r
+ }\r
+ break;\r
+ case 'm':\r
+ hist_multiplier = atof(optarg);\r
+ if(hist_multiplier <= 0){\r
+ fprintf(stderr,"ERROR, request to set hist_multiplier to %f\n must be positive nonzero.\n",hist_multiplier);\r
+ exit(1);\r
+ }\r
+ break;\r
+ case 'u':\r
+ cpu_util_threshold = atof(optarg);\r
+ if(cpu_util_threshold<=0 || cpu_util_threshold>1){\r
+ fprintf(stderr,"ERROR, cpu_threshold set to %f, must be in (0,1].\n",cpu_util_threshold);\r
+ exit(1);\r
+ }\r
+ break;\r
+ case 'U':\r
+ uniform_rts_alloc=true;\r
+ break;\r
+ case 'N':\r
+ uniform_rts_alloc=false;\r
+ break;\r
+ case 'd':\r
+ src_dir = optarg;\r
+ break;\r
+ case 'r':\r
+ resource_log_file = optarg;\r
+ break;\r
+ case 'i':\r
+ init_discard = atoi(optarg);\r
+ if(init_discard < 0){\r
+ init_discard=0;\r
+ fprintf(stderr,"ERROR, atttempting to set init_discard to a negative value (%d), setting to zero.\n",init_discard);\r
+ }\r
+ break;\r
+ case 'l':\r
+ rts_load_history_len = atoi(optarg);\r
+ if(rts_load_history_len < 0){\r
+ rts_load_history_len=0;\r
+ fprintf(stderr,"ERROR, atttempting to set rts_load_history_len to a negative value (%d), setting to zero.\n",rts_load_history_len);\r
+ }\r
+ break;\r
+ case '?':\r
+ fprintf(stderr,"Error, argument %c not recognized.\n",optopt);\r
+ fprintf(stderr,"%s\n", usage_str);\r
+ exit(1);\r
+ default:\r
+ fprintf(stderr,"Invalid arguments\n");\r
+ fprintf(stderr,"%s\n", usage_str);\r
+ exit(1);\r
+ }\r
+ }\r
+ argc -= optind;\r
+ argv += optind;\r
+ if (argc > 0)\r
+ trace_file = argv[0];\r
+\r
+ if(trace_file == ""){\r
+ fprintf(stderr, usage_str, argv[0]);\r
+ exit(1);\r
+ }\r
+\r
+// for month string-to-int conversion\r
+ const char *months_str[12] = {"Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"};\r
+ map<string, int> month_str_to_int;\r
+ for(i=0;i<12;++i)\r
+ month_str_to_int[months_str[i]] = i;\r
+\r
+\r
+ FILE *qtree_fl = NULL;\r
+ string qtree_flname = src_dir + "/" + "qtree.xml";\r
+ string actual_qtree_flname;\r
+ if(src_dir != ""){\r
+ qtree_fl = fopen(qtree_flname.c_str(),"r");\r
+ actual_qtree_flname = qtree_flname;\r
+ }\r
+ if(qtree_fl == NULL){\r
+ qtree_fl = fopen("qtree.xml","r");\r
+ actual_qtree_flname = "qtree.xml";\r
+ }\r
+ if(qtree_fl == NULL){\r
+ fprintf(stderr,"ERROR, can't open ");\r
+ if(src_dir != ""){\r
+ fprintf(stderr,"%s or ",qtree_flname.c_str());\r
+ }\r
+ fprintf(stderr,"qtree.xml, exiting.\n");\r
+ exit(1);\r
+ }\r
+\r
+\r
+// Parse the qtree.xml file\r
+ xmlParser_setfileinput(qtree_fl);\r
+ if(xmlParserparse()){\r
+ fprintf(stderr,"ERROR, could not parse query tree file %s\n",actual_qtree_flname.c_str());\r
+ }\r
+\r
+// Get the lfta, hfta nodes\r
+ xml_t *xroot = xml_result;\r
+ vector<xml_t *> xqnodes;\r
+ xroot->get_roots("HFTA",xqnodes);\r
+ xroot->get_roots("LFTA",xqnodes);\r
+\r
+ map<string,int> qname_to_idx;\r
+ map<string,int> exe_to_idx;\r
+ vector<qnode *> qnode_list;\r
+\r
+// Build the qnodes\r
+ for(i=0;i<xqnodes.size();++i){\r
+ string qname;\r
+ if(xqnodes[i]->get_attrib_val("name",qname)){\r
+//printf("node type = %s, name=%s\n",xqnodes[i]->name.c_str(),qname.c_str());\r
+ qnode *qn = new qnode(xqnodes[i]->name,qname,init_discard);\r
+ for(s=0;s<xqnodes[i]->subtrees.size();++s){\r
+ xml_t *xsub = xqnodes[i]->subtrees[s];\r
+ if(xsub->name == "Field"){\r
+ string fname;\r
+ bool nret = xsub->get_attrib_val("name",fname);\r
+ string fpos;\r
+ bool pret = xsub->get_attrib_val("pos",fpos);\r
+ string ftype;\r
+ bool tret = xsub->get_attrib_val("type",ftype);\r
+ string fmods;\r
+ bool mret = xsub->get_attrib_val("mods",fmods);\r
+ if(nret && pret && tret){\r
+ field_str *fld = new field_str(fname,atoi(fpos.c_str()),ftype,fmods);\r
+ qn->add_field(fld);\r
+ }else{\r
+ fprintf(stderr,"---> subtree %d of FTA %s has an malformed field.\n",s,qname.c_str());\r
+ }\r
+ }\r
+ if(xsub->name == "HtSize"){\r
+ string src;\r
+ bool sret = xsub->get_attrib_val("value",src);\r
+ if(sret){\r
+ int htsize = atoi(src.c_str());\r
+ if(htsize > 0){\r
+ unsigned int naggrs = 1; // make it power of 2\r
+ unsigned int nones = 0;\r
+ while(htsize){\r
+ if(htsize&1)\r
+ nones++;\r
+ naggrs = naggrs << 1;\r
+ htsize = htsize >> 1;\r
+ }\r
+ if(nones==1) // in case it was already a power of 2.\r
+ naggrs/=2;\r
+ qn->aggr_tbl_size = naggrs;\r
+ }else{\r
+ fprintf(stderr,"---> subtree %d of FTA %s has an invalid HtSize (%s).\n",s,qname.c_str(),src.c_str());\r
+ }\r
+ }else{\r
+ fprintf(stderr,"---> subtree %d of FTA %s has an malformed HtSize.\n",s,qname.c_str());\r
+ }\r
+ }\r
+ if(xsub->name == "ReadsFrom"){\r
+ string src;\r
+ bool sret = xsub->get_attrib_val("value",src);\r
+ if(sret){\r
+ qn->add_source(src);\r
+ }else{\r
+ fprintf(stderr,"---> subtree %d of FTA %s has an malformed ReadsFrom.\n",s,qname.c_str());\r
+ }\r
+ }\r
+ if(xsub->name == "Interface"){\r
+ string iface;\r
+ bool sret = xsub->get_attrib_val("value",iface);\r
+ if(sret){\r
+ qn->src_interface = iface;\r
+ }\r
+ }\r
+ if(xsub->name == "FileName"){\r
+ string full_fname;\r
+ bool sret = xsub->get_attrib_val("value",full_fname);\r
+ if(sret){\r
+ size_t dotpos = full_fname.find_first_of('.');\r
+ if(dotpos != string::npos){\r
+ qn->executable_name = full_fname.substr(0,dotpos);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ qname_to_idx[qname] = qnode_list.size();\r
+ if(qn->executable_name != "" && qn->executable_name != "rts")\r
+ exe_to_idx[qn->executable_name] = qnode_list.size();\r
+ qnode_list.push_back(qn);\r
+ }else{\r
+ fprintf(stderr,"---> node type %s, no name.\n",xqnodes[i]->name.c_str());\r
+ }\r
+ }\r
+\r
+\r
+ bool error = false;\r
+ for(i=0;i<qnode_list.size();++i){\r
+ if(qnode_list[i]->qnode_type == "HFTA"){\r
+ for(s=0;s<qnode_list[i]->reads_from.size();++s){\r
+ if(qname_to_idx.count(qnode_list[i]->reads_from[s])>0){\r
+ int src_id = qname_to_idx[qnode_list[i]->reads_from[s]];\r
+ qnode_list[i]->reads_from_idx.push_back(src_id);\r
+ qnode_list[src_id]->sources_to_idx.push_back(i);\r
+ }else{\r
+ fprintf(stderr,"ERROR, hfta %s reads_from %s, but its not in qtree.xml.\n",qnode_list[i]->name.c_str(),qnode_list[i]->reads_from[s].c_str());\r
+ error = true;\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ if(error)\r
+ exit(1);\r
+\r
+/*\r
+ for(i=0;i<qnode_list.size();++i){\r
+ printf("node %s reads_from:",qnode_list[i]->name.c_str());\r
+ for(s=0;s<qnode_list[i]->reads_from.size();++s){\r
+ printf(" %s",qnode_list[i]->reads_from[s].c_str());\r
+ }\r
+ printf("\nand sources to:\n");\r
+ for(s=0;s<qnode_list[i]->sources_to_idx.size();++s){\r
+ printf(" %s",qnode_list[qnode_list[i]->sources_to_idx[s]]->name.c_str());\r
+ }\r
+ printf("\n\n");\r
+ }\r
+*/\r
+\r
+ string tracefilename = trace_file;\r
+ if(src_dir != ""){\r
+ tracefilename = src_dir + "/" + trace_file;\r
+ }\r
+ FILE *trace_fl = NULL;\r
+ if((trace_fl = fopen(tracefilename.c_str(),"r"))==NULL){\r
+ fprintf(stderr,"ERROR, can't open trace file %s\n",tracefilename.c_str());\r
+ exit(1);\r
+ }\r
+\r
+ map<fta_addr,string, cmpr_fta_addr> qnode_map;\r
+ map<int, perf_struct *> rts_perf_map;\r
+ map<int, string> rts_iface_map;\r
+ map<string, int> pid_iface_map;\r
+\r
+ tm time_str;\r
+ char inp[LINEBUF],line[LINEBUF],*saveptr;\r
+ unsigned int hbeat_ip, hbeat_port, hbeat_index, hbeat_streamid, hbeat_trace_id,hbeat_ntraces;\r
+ while(fgets(inp,LINEBUF,trace_fl)){\r
+\r
+// Try to grab the timestamp\r
+ time_t tick;\r
+ int pid;\r
+ strncpy(line,inp,LINEBUF);\r
+ char mon_str[4];\r
+ int mon=0,day,hr,mn,sec;\r
+ int nret = sscanf(line,"%c%c%c %d %d:%d:%d",mon_str,mon_str+1,mon_str+2,&day,&hr,&mn,&sec);\r
+ if(nret >= 7){\r
+ mon_str[3] = '\0';\r
+ if(month_str_to_int.count(mon_str)>0){\r
+ mon = month_str_to_int[mon_str];\r
+ }else{\r
+ fprintf(stderr,"Warning, %s not recognized as a month string.\n",mon_str);\r
+ }\r
+ time_str.tm_sec = sec;\r
+ time_str.tm_min = mn;\r
+ time_str.tm_hour = hr;\r
+ time_str.tm_mday = day;\r
+ time_str.tm_mon = mon;\r
+ time_str.tm_year = year;\r
+ tick = mktime(&time_str);\r
+//printf("mon=%d, day=%d, hr=%d, mn=%d, sec=%d, tick=%d\n",mon,day,hr,mn,sec,tick);\r
+ }\r
+\r
+// Grab the process ID\r
+ strncpy(line,inp,LINEBUF);\r
+ int tmp_pid;\r
+ pid = -1;\r
+ char *segment = strtok_r(line,"[",&saveptr);\r
+ if(segment!=NULL){\r
+ segment = strtok_r(NULL,"[",&saveptr);\r
+ nret = sscanf(segment,"%d]",&tmp_pid);\r
+ if(nret>=1){\r
+ pid=tmp_pid;\r
+ }\r
+ }\r
+\r
+// Grab address-to-hfta mappings\r
+ strncpy(line,inp,LINEBUF);\r
+ segment = strtok_r(line,"]",&saveptr);\r
+ if(segment != NULL){\r
+ segment = strtok_r(NULL," ",&saveptr);\r
+ segment = strtok_r(NULL," ",&saveptr);\r
+ }\r
+//printf("segmetn=<%s>, comparison=%d\n",segment,strcmp(segment,"Lookup"));\r
+ if(segment!=NULL && strcmp(segment,"Lookup")==0){\r
+ int pos=0;\r
+ char fta_name[LINEBUF];\r
+ int ip;\r
+ int port;\r
+ int index;\r
+ int streamid;\r
+ int nret = 0;\r
+ while(segment != NULL){\r
+//printf("pos=%d, segment = %s\n",pos, segment);\r
+\r
+ if(pos==3){\r
+ strncpy(fta_name,segment,LINEBUF);\r
+ }\r
+\r
+ if(pos==6)\r
+ nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&ip,&port,&index,&streamid);\r
+\r
+ pos++;\r
+ segment = strtok_r(NULL," ",&saveptr);\r
+ }\r
+ if(nret>0){\r
+//printf("nret=%d, fta_name=%s,ip=%d,port=%d,index=%d,streamid=%d\n",nret,fta_name,ip,port,index,streamid);\r
+ fta_addr addr(ip,port,streamid);\r
+ qnode_map[addr] = fta_name;\r
+//printf("Adding fta %s, (%d %d %d) to qnode_map\n",fta_name,addr.ip,addr.port,addr.streamid);\r
+ continue;\r
+ }\r
+ }\r
+ if(segment!=NULL && strcmp(segment,"Lfta")==0){\r
+ int pos=0;\r
+ char fta_name[LINEBUF];\r
+ int ip;\r
+ int port;\r
+ int index;\r
+ int streamid;\r
+ int nret = 0;\r
+ while(segment != NULL){\r
+//printf("pos=%d, segment = %s\n",pos, segment);\r
+\r
+ if(pos==1){\r
+ strncpy(fta_name,segment,LINEBUF);\r
+ }\r
+\r
+ if(pos==4)\r
+ nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&ip,&port,&index,&streamid);\r
+\r
+ pos++;\r
+ segment = strtok_r(NULL," ",&saveptr);\r
+ }\r
+ if(nret>0){\r
+//printf("nret=%d, fta_name=%s,ip=%d,port=%d,index=%d,streamid=%d\n",nret,fta_name,ip,port,index,streamid);\r
+ fta_addr addr(ip,port,streamid);\r
+ qnode_map[addr] = fta_name;\r
+//printf("Adding lfta %s, (%d %d %d) to qnode_map\n",fta_name,addr.ip,addr.port,addr.streamid);\r
+ continue;\r
+ }\r
+ }\r
+ if(segment!=NULL && strcmp(segment,"Init")==0){\r
+ int pos=0;\r
+ string iface = "";\r
+ string keywd = "";\r
+ while(segment != NULL){\r
+ if(pos==1)\r
+ keywd = segment;\r
+ if(pos==3){\r
+ char *cc;\r
+ for(cc=segment;*cc!='\0';++cc){\r
+ if(*cc == '\n'){\r
+ *cc = '\0';\r
+ break;\r
+ }\r
+ }\r
+ iface = segment;\r
+ }\r
+ pos++;\r
+ segment = strtok_r(NULL," ",&saveptr);\r
+ }\r
+ if(iface!="" && keywd == "LFTAs"){\r
+ rts_perf_map[pid] = new perf_struct(init_discard);\r
+ rts_iface_map[pid] = iface;\r
+ pid_iface_map[iface] = pid;\r
+ }\r
+ }\r
+ if(segment!=NULL && strcmp(segment,"Heartbeat")==0){\r
+ int pos=0;\r
+ int nret=0,nret2=0,nret3=0;\r
+ unsigned int tmp_hbeat_ip, tmp_hbeat_port, tmp_hbeat_index, tmp_hbeat_streamid, tmp_hbeat_trace_id,tmp_hbeat_ntraces;\r
+ while(segment != NULL){\r
+//printf("pos=%d, segment = %s\n",pos, segment);\r
+ if(pos==4)\r
+ nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&tmp_hbeat_ip,&tmp_hbeat_port,&tmp_hbeat_index,&tmp_hbeat_streamid);\r
+ if(pos==5)\r
+ nret2 = sscanf(segment,"trace_id=%d",&tmp_hbeat_trace_id);\r
+ if(pos==6)\r
+ nret3 = sscanf(segment,"ntrace=%d",&tmp_hbeat_ntraces);\r
+ pos++;\r
+ segment = strtok_r(NULL," ",&saveptr);\r
+ }\r
+ if(nret>=4 && nret2 >= 1 && nret3 == 1){\r
+ hbeat_ip = tmp_hbeat_ip;\r
+ hbeat_port = tmp_hbeat_port;\r
+ hbeat_index = tmp_hbeat_index;\r
+ hbeat_streamid = tmp_hbeat_streamid;\r
+ hbeat_trace_id = tmp_hbeat_trace_id;\r
+ hbeat_ntraces = tmp_hbeat_ntraces;\r
+// printf("ip=%d, port=%d, index=%d, streamid=%d hbeat_trace_id=%d\n",hbeat_ip,hbeat_port,hbeat_index,hbeat_streamid,hbeat_trace_id);\r
+ fta_addr hb_addr(hbeat_ip,hbeat_port,tmp_hbeat_streamid);\r
+ if(qnode_map.count(hb_addr) == 0){\r
+ hb_addr.streamid = 0; // maybe an hfta?\r
+ if(qnode_map.count(hb_addr) == 0){\r
+ hbeat_port = 0;\r
+//printf("Hbeat streamid=%d no match (%d,%d), hbeat_trace_id=%d\n",hbeat_streamid,hbeat_ip,hbeat_port,hbeat_trace_id);\r
+ }\r
+ } else{\r
+//printf("Hbeat streamid=%d matches %s, hbeat_trace_id=%d\n",hbeat_streamid,qnode_map[hb_addr].c_str(),hbeat_trace_id);\r
+ }\r
+ }else{\r
+ printf("Couldn't parse as hearbeat %s\n",inp);\r
+ }\r
+ }\r
+ if(segment!=NULL && strncmp(segment,"trace_id=",8)==0){\r
+ int pos=0;\r
+ int nret=0,nret2=0,nret3=0;\r
+ unsigned long long int trace_id;\r
+ unsigned int tr_pos,tr_ip,tr_port,tr_index,tr_streamid;\r
+ unsigned int tr_intup,tr_outtup,tr_outsz,tr_acctup,tr_cycles;\r
+ unsigned int tr_evictions,tr_collisions;\r
+ double tr_sample;\r
+ nret = sscanf(segment,"trace_id=%llu",&trace_id);\r
+ while(segment != NULL){\r
+//printf("pos=%d, segment = %s\n",pos, segment);\r
+ if(pos==0)\r
+ nret = sscanf(segment,"trace_id=%llu",&trace_id);\r
+ if(pos==1)\r
+ nret2 = sscanf(segment,"trace[%d].ftaid={ip=%u,port=%u,index=%u,streamid=%u}",&tr_pos,&tr_ip,&tr_port,&tr_index,&tr_streamid);\r
+ if(pos==2)\r
+ nret3 = sscanf(segment,"fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%u,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%lf}",\r
+ &tr_intup, &tr_outtup, &tr_outsz, &tr_acctup,&tr_cycles,\r
+ &tr_collisions,&tr_evictions,&tr_sample);\r
+ pos++;\r
+ segment = strtok_r(NULL," ",&saveptr);\r
+ }\r
+ if(nret>=1 && nret2>=5 && nret3>=7){\r
+ fta_addr tr_addr(tr_ip,tr_port,tr_streamid);\r
+ if(qnode_map.count(tr_addr)==0)\r
+ tr_addr.streamid = 0; // maybe an hfta?\r
+ if(qnode_map.count(tr_addr)>0){\r
+//printf("Trace idx=%d streamid=%d matches %s, trace_id=%d, hbeat_trace_id=%d\n",tr_pos,tr_streamid,qnode_map[tr_addr].c_str(),trace_id,hbeat_trace_id);\r
+ if(tr_pos+1 == hbeat_ntraces){\r
+ string qname = qnode_map[tr_addr];\r
+ int qidx = qname_to_idx[qname];\r
+ if(qnode_list[qidx]->start_tick < 0)\r
+ qnode_list[qidx]->start_tick = tick;\r
+ if(qnode_list[qidx]->end_tick < tick)\r
+ qnode_list[qidx]->end_tick = tick;\r
+ qnode_list[qidx]->in_tup += tr_intup;\r
+ qnode_list[qidx]->out_tup += tr_outtup;\r
+ qnode_list[qidx]->out_sz += tr_outsz;\r
+ qnode_list[qidx]->accepted_tup += tr_acctup;\r
+ qnode_list[qidx]->cycles += tr_cycles;\r
+ qnode_list[qidx]->collisions += tr_collisions;\r
+ qnode_list[qidx]->evictions += tr_evictions;\r
+ }\r
+ }\r
+//else{\r
+//printf("Trace idx=%d streamid=%d no match (%d,%d), trace_id=%d, hbeat_trace_id=%d\n",tr_pos,tr_streamid,tr_ip,tr_port,trace_id,hbeat_trace_id);\r
+//}\r
+ }\r
+ }\r
+ }\r
+\r
+//printf("qnode_map has %d entries\n",qnode_map.size());\r
+\r
+// Open and process performance log info, if any.\r
+ if(src_dir != ""){\r
+ resource_log_file = src_dir + "/" + resource_log_file;\r
+ }\r
+ FILE *res_fl = NULL;\r
+ if((res_fl = fopen(resource_log_file.c_str(),"r"))==NULL){\r
+ fprintf(stderr,"ERROR, can't open trace file %s\n",resource_log_file.c_str());\r
+ exit(1);\r
+ }\r
+\r
+ char *flds[SPLITBUF];\r
+ int lineno = 0;\r
+ while(fgets(inp,LINEBUF,res_fl)){\r
+ int nflds = split_string(inp,',',flds,SPLITBUF);\r
+ lineno++;\r
+ if(nflds >= 8){\r
+ int ts = atoi(flds[0]);\r
+ string procname = flds[1];\r
+ int pid = atoi(flds[2]);\r
+ unsigned long long int utime = atoll(flds[3]);\r
+ unsigned long long int stime = atoll(flds[4]);\r
+ unsigned long long int vm_size = atoll(flds[5]);\r
+ unsigned long long int rss_size = atoll(flds[6]);\r
+ int pagesize = atoi(flds[7]);\r
+\r
+ if(procname == "rts"){\r
+ if(rts_perf_map.count(pid)>0){\r
+ if(rts_perf_map[pid]->update(ts,utime,stime,vm_size,rss_size)){\r
+ fprintf(stderr,"Resource log file is corrupted, line %d\n",lineno);\r
+ exit(1);\r
+ }\r
+ }\r
+ }else{\r
+ if(exe_to_idx.count(procname)>0){\r
+ perf_struct *p = qnode_list[exe_to_idx[procname]]->perf;\r
+ if(p->update(ts,utime,stime,vm_size,rss_size)){\r
+ fprintf(stderr,"Resource log file is corrupted, line %d\n",lineno);\r
+ exit(1);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+\r
+ FILE *rpt_fl = fopen("performance_report.csv","w");\r
+ if(rpt_fl == NULL){\r
+ fprintf(stderr,"Warning, can't open performance_report.csv, can't save the performance report.\n");\r
+ }\r
+\r
+ char tmpstr[10000];\r
+ printf("Performance report:\n");\r
+ if(rpt_fl) fprintf(rpt_fl,"query_name,process_name,in_tup,out_tup,out_sz,accepted_tup,cycles,collisions,evictions,in_tup_rate,out_tup_rate,out_bytes_rate,cycle_consumption_rate");\r
+ if(rpt_fl) fprintf(rpt_fl,",%s",perf_struct::to_csv_hdr().c_str());\r
+ if(rpt_fl) fprintf(rpt_fl,",packets_sent_to_query,fraction_intup_lost,inferred_read_rate");\r
+ if(rpt_fl) fprintf(rpt_fl,"\n");\r
+\r
+ map<fta_addr,string>::iterator mpiisi;\r
+ int n_output = 0;\r
+ set<string> found_names;\r
+ for(mpiisi=qnode_map.begin();mpiisi!=qnode_map.end();++mpiisi){\r
+ string qname = (*mpiisi).second;\r
+ if(found_names.count(qname)==0){\r
+ found_names.insert(qname);\r
+ int qidx = qname_to_idx[qname];\r
+ string executable = qnode_list[qidx]->executable_name;\r
+ if(executable == "")\r
+ executable="rts";\r
+// printf("(%d,%d,%d) -> %s, %d reads-from\n",(*mpiisi).first.ip,(*mpiisi).first.port,(*mpiisi).first.streamid,qname.c_str(),qnode_list[qidx]->reads_from_idx.size());\r
+ printf("query=%s, executable=%s\tintup=%llu, out_tup=%llu, out_sz=%llu, accepted_tup=%llu, cycles=%llu, collisions=%llu, evictions=%llu\n",\r
+ qname.c_str(),qnode_list[qidx]->executable_name.c_str(),\r
+ qnode_list[qidx]->in_tup,\r
+ qnode_list[qidx]->out_tup,\r
+ qnode_list[qidx]->out_sz,\r
+ qnode_list[qidx]->accepted_tup,\r
+ qnode_list[qidx]->cycles,\r
+ qnode_list[qidx]->collisions,\r
+ qnode_list[qidx]->evictions\r
+ );\r
+ if(rpt_fl) fprintf(rpt_fl,"%s,%s,%llu,%llu,%llu,%llu,%llu,%llu,%llu",\r
+ qname.c_str(),qnode_list[qidx]->executable_name.c_str(),\r
+ qnode_list[qidx]->in_tup,\r
+ qnode_list[qidx]->out_tup,\r
+ qnode_list[qidx]->out_sz,\r
+ qnode_list[qidx]->accepted_tup,\r
+ qnode_list[qidx]->cycles,\r
+ qnode_list[qidx]->collisions,\r
+ qnode_list[qidx]->evictions\r
+ );\r
+ double duration = 1.0*(qnode_list[qidx]->end_tick - qnode_list[qidx]->start_tick);\r
+\r
+ printf("\tin_tup_rate=%f, out_tup_rate=%f, out_byte_rate=%f, cycle_rate=%f\n",qnode_list[qidx]->in_tup/duration,qnode_list[qidx]->out_tup/duration,qnode_list[qidx]->out_sz/duration,qnode_list[qidx]->cycles/duration);\r
+ if(rpt_fl) fprintf(rpt_fl,",%f,%f,%f,%f",qnode_list[qidx]->in_tup/duration,qnode_list[qidx]->out_tup/duration,qnode_list[qidx]->out_sz/duration,qnode_list[qidx]->cycles/duration);\r
+ printf("\t%s\n",qnode_list[qidx]->perf->to_string().c_str());\r
+ if(rpt_fl) fprintf(rpt_fl,",%s",qnode_list[qidx]->perf->to_csv().c_str());\r
+//if(qnode_list[qidx]->aggr_tbl_size>0){\r
+//printf("\taggregate table size is %d\n",qnode_list[qidx]->aggr_tbl_size);\r
+//}\r
+//if(qnode_list[qidx]->src_interface != ""){\r
+//printf("\tSource interface is %s\n",qnode_list[qidx]->src_interface.c_str());\r
+//}\r
+ if(qnode_list[qidx]->reads_from_idx.size()>0){\r
+ unsigned long long int total_sent = 0;\r
+ for(i=0;i<qnode_list[qidx]->reads_from_idx.size();++i){\r
+ total_sent += qnode_list[qnode_list[qidx]->reads_from_idx[i]]->out_tup;\r
+ qnode_list[qidx]->inferred_in_sz += qnode_list[qnode_list[qidx]->reads_from_idx[i]]->out_sz / (qnode_list[qnode_list[qidx]->reads_from_idx[i]]->end_tick-qnode_list[qnode_list[qidx]->reads_from_idx[i]]->start_tick);\r
+ }\r
+ printf("\t%llu packets sent, %f lost, inferred read rate is %f\n",total_sent,(1.0*total_sent-qnode_list[qidx]->in_tup)/total_sent, qnode_list[qidx]->inferred_in_sz);\r
+ if(rpt_fl) fprintf(rpt_fl,",%llu,%f,%f",total_sent,(1.0*total_sent-qnode_list[qidx]->in_tup)/total_sent, qnode_list[qidx]->inferred_in_sz);\r
+ }\r
+ else{\r
+ if(rpt_fl) fprintf(rpt_fl,",,,");\r
+ }\r
+\r
+ if(rpt_fl) fprintf(rpt_fl,"\n");\r
+ n_output++;\r
+ }\r
+ }\r
+\r
+\r
+\r
+// Collect performance info about RTSs and determine a better hash partitioning.\r
+\r
+// First, grab any existing balancing information\r
+ map<string, vector<int> > prev_rts_loads;\r
+ FILE *rload_fl = NULL;\r
+ rload_fl = fopen("rts_load.cfg","r");\r
+ lineno = 0;\r
+ if(rload_fl != NULL){\r
+ while(fgets(line,LINEBUF,rload_fl)){\r
+ lineno++;\r
+ int nflds = split_string(line,',',flds,SPLITBUF);\r
+ if(nflds>1){\r
+ vector<int> hbounds;\r
+ bool invalid_line=false;\r
+ int prev_val = 0;\r
+ for(i=1;i<nflds;++i){\r
+ int new_val = atoi(flds[i]);\r
+ if(new_val < prev_val)\r
+ invalid_line = true;\r
+ hbounds.push_back(new_val);\r
+ }\r
+ if(! invalid_line){\r
+ prev_rts_loads[flds[0]] = hbounds;\r
+ }else{\r
+ printf("Warning, rts_load.cfg has an invalid entry on line %d, skipping\n",lineno);\r
+ }\r
+ }else{\r
+ printf("Warning, rts_load.cfg has an invalid entry on line %d, skipping\n",lineno);\r
+ }\r
+ }\r
+ }else{\r
+ fprintf(rload_fl,"Warning, can't open rts_load.cfg, skipping and using defualt allocation estimate.\n");\r
+ }\r
+ map<string, vector<int> > new_rts_loads = prev_rts_loads;\r
+ fclose(rload_fl);\r
+\r
+// Next, try to grab a history of allocations and resulting cpu loads\r
+ FILE *rtrace_fl = NULL;\r
+ rtrace_fl = fopen("rts_load.trace.txt","r");\r
+ lineno = 0;\r
+ map<string, vector< vector<int> > > iface_alloc_history;\r
+ map<string, vector< vector<double> > > iface_load_history;\r
+ if(rtrace_fl != NULL){\r
+ vector<int> curr_allocation;\r
+ vector<double> curr_load;\r
+ while(fgets(line,LINEBUF,rtrace_fl)){\r
+ int nflds = split_string(line,',',flds,SPLITBUF);\r
+ if(nflds > 2){\r
+ string iface = flds[0];\r
+ string entry = flds[1];\r
+ if(entry == "Previous allocation"){\r
+ curr_allocation.clear();\r
+ for(i=2;i<nflds;++i){\r
+ curr_allocation.push_back(atoi(flds[i]));\r
+ }\r
+ }\r
+ if(entry == "Previous cpu loads"){\r
+ curr_load.clear();\r
+ for(i=2;i<nflds;++i){\r
+ curr_load.push_back(atof(flds[i]));\r
+ }\r
+ if(curr_allocation.size() == curr_load.size()){\r
+ iface_alloc_history[iface].push_back(curr_allocation);\r
+ iface_load_history[iface].push_back(curr_load);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+/*\r
+map<string, vector< vector<int> > >::iterator msvvi;\r
+for(msvvi=iface_alloc_history.begin();msvvi!=iface_alloc_history.end();++msvvi){\r
+string iface = (*msvvi).first;\r
+printf("iface %s past allocations:\n",iface.c_str());\r
+vector<vector<int> > &alloc = iface_alloc_history[iface];\r
+printf("alloc size is %d\n",alloc.size());\r
+for(i=0;i<alloc.size();++i){\r
+for(j=0;j<alloc[i].size();++j){\r
+printf("%d ",alloc[i][j]);\r
+}\r
+printf("\n");\r
+}\r
+printf("\niface %s past loads:\n",iface.c_str());\r
+vector<vector<double> > &load = iface_load_history[iface];\r
+printf("load size is %d\n",load.size());\r
+for(i=0;i<load.size();++i){\r
+for(j=0;j<load[i].size();++j){\r
+printf("%f ",load[i][j]);\r
+}\r
+printf("\n");\r
+}\r
+printf("\n");\r
+}\r
+*/\r
+\r
+\r
+ map<int, string>::iterator misi;\r
+ map<string, vector<int> > rts_iface_indices;\r
+ map<string, vector<double> > rts_iface_cpu_load;\r
+ for(misi=rts_iface_map.begin();misi!=rts_iface_map.end();++misi){\r
+ int rpid = (*misi).first;\r
+ string riface = (*misi).second;\r
+ size_t Xpos = riface.find_last_of("X");\r
+ if(Xpos!=string::npos){\r
+ string iface = riface.substr(0,Xpos);\r
+// ifaces_found.insert(iface);\r
+ string ifcopy = riface.substr(Xpos+1);\r
+ int ifidx = atoi(ifcopy.c_str());\r
+ rts_iface_indices[iface].push_back(ifidx);\r
+ }\r
+ printf("pid=%d, rts %s, %s\n",rpid,riface.c_str(),rts_perf_map[rpid]->to_string().c_str());\r
+ if(rpt_fl){\r
+ fprintf(rpt_fl,",rts %s,,,,,,,,,,,,%s,,,,\n",riface.c_str(),rts_perf_map[rpid]->to_csv().c_str());\r
+\r
+ }\r
+ }\r
+ map<string, vector<int> >::iterator msvi;\r
+ set<string> ifaces_found;\r
+ map<string, vector<double> > ht_cpu_allocs;\r
+ map<string, int> total_ht_sizes;\r
+ for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){\r
+ string iface = (*msvi).first;\r
+ vector<int> &ifindices = (*msvi).second;\r
+ sort(ifindices.begin(),ifindices.end());\r
+\r
+ double total_cpu = 0.0;\r
+ vector<double> if_cpu;\r
+ for(i=0;i<ifindices.size();++i){\r
+ string full_iface = iface + "X" + int_to_string(ifindices[i]);\r
+ int pid = pid_iface_map[full_iface];\r
+ total_cpu += rts_perf_map[pid]->avg_cpu_time();\r
+ if_cpu.push_back(rts_perf_map[pid]->avg_cpu_time());\r
+ }\r
+ rts_iface_cpu_load[iface] = if_cpu;\r
+\r
+ vector<int> current_allocation;\r
+ if(new_rts_loads.count(iface) == 0 || new_rts_loads[iface].size() != ifindices.size()){\r
+ int cumm_cpu = 0;\r
+ for(i=0;i<ifindices.size();++i){\r
+ int new_cumm = ((i+1)*htmax)/ifindices.size();\r
+ current_allocation.push_back(new_cumm-cumm_cpu);\r
+ cumm_cpu=new_cumm;\r
+ }\r
+ }else{\r
+ current_allocation = new_rts_loads[iface];\r
+ }\r
+ int total_ht_allocation = 0;\r
+//printf("Current allocation is:");\r
+ for(i=0;i<current_allocation.size();++i){\r
+ total_ht_allocation += current_allocation[i];\r
+//printf(" %d",current_allocation[i]);\r
+ }\r
+\r
+ vector<double> local_alloc(total_ht_allocation,0.0); // estimated cpu per HT slot.\r
+ int ht_ptr = 0;\r
+ for(i=0;i<if_cpu.size();++i){\r
+ double rate = if_cpu[i]/(current_allocation[i]*total_cpu);\r
+ for(j=0;j<current_allocation[i];++j){\r
+ local_alloc[ht_ptr++] = rate;\r
+ }\r
+ }\r
+\r
+ if(iface_alloc_history.count(iface)>0){\r
+ vector<vector<int> > &alloc = iface_alloc_history[iface];\r
+ vector<vector<double> > &load = iface_load_history[iface];\r
+ int n_remaining = rts_load_history_len;\r
+ double multiplier = hist_multiplier;\r
+ double normalizer = 1.0;\r
+ for(i=alloc.size()-1;i>=0 && n_remaining>0;i--){\r
+ int hist_ht_size = 0;\r
+ for(j=0;j<alloc[i].size();++j)\r
+ hist_ht_size+=alloc[i][j];\r
+ double hist_cpu = 0.0;\r
+ for(j=0;j<load[i].size();++j)\r
+ hist_cpu += load[i][j];\r
+ if(hist_ht_size == total_ht_allocation){\r
+ int ht_ptr = 0;\r
+ for(j=0;j<alloc[i].size();++j){\r
+ double rate = (multiplier*load[i][j])/(alloc[i][j]*hist_cpu);\r
+ int k;\r
+ for(k=0;k<alloc[i][j];k++){\r
+ local_alloc[ht_ptr++] += rate;\r
+ }\r
+ }\r
+ normalizer += multiplier;\r
+ multiplier *= hist_multiplier;\r
+ n_remaining--;\r
+ }\r
+ }\r
+ for(i=0;i<total_ht_allocation;i++){\r
+ local_alloc[i] /= normalizer;\r
+ }\r
+ }\r
+\r
+\r
+ total_ht_sizes[iface] = total_ht_allocation;\r
+ ht_cpu_allocs[iface] = local_alloc;\r
+ }\r
+\r
+ if(uniform_rts_alloc){\r
+// I will require that if this option is true, all HTs\r
+// the same size.\r
+ bool same_sizes = true;\r
+ int total_alloc = -1;\r
+ map<string, int >::iterator msi;\r
+ for(msi=total_ht_sizes.begin();msi!=total_ht_sizes.end();++msi){\r
+ if(total_alloc<0){\r
+ total_alloc=(*msi).second;\r
+ }else{\r
+ if(total_alloc != (*msi).second){\r
+ same_sizes = false;\r
+ }\r
+ }\r
+ }\r
+ if(same_sizes){\r
+ vector<double> local_alloc(total_alloc,0.0);\r
+ double normalizer = 0.0;\r
+ map<string, vector<double> >::iterator msvdi;\r
+ for(msvdi=ht_cpu_allocs.begin();msvdi!=ht_cpu_allocs.end();++msvdi){\r
+ string iface = (*msvdi).first;\r
+ for(i=0;i<total_alloc;++i){\r
+ local_alloc[i] += ht_cpu_allocs[iface][i];\r
+ }\r
+ normalizer += 1.0;\r
+ }\r
+ for(i=0;i<total_alloc;++i)\r
+ local_alloc[i] /= normalizer;\r
+ for(msvdi=ht_cpu_allocs.begin();msvdi!=ht_cpu_allocs.end();++msvdi){\r
+ (*msvdi).second = local_alloc;\r
+ }\r
+ }\r
+ }\r
+\r
+ for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){\r
+ string iface = (*msvi).first;\r
+ vector<int> &ifindices = (*msvi).second;\r
+ sort(ifindices.begin(),ifindices.end());\r
+\r
+ int cumm_ht_alloc = 0; // ht allocated thus far\r
+ int current_pos = 0; // idx to the current_allocation, if_cpu arrays\r
+ vector<int> new_allocation;\r
+ for(i=0;i<ifindices.size()-1;++i){\r
+ double slot_cpu = 1.0/ifindices.size(); // amount of cpu to allocate\r
+ int current_alloc = 0; // ht allocated in the curr_pos slot\r
+ while(slot_cpu > 0.0){\r
+ slot_cpu -= ht_cpu_allocs[iface][current_pos];\r
+ current_pos++;\r
+ current_alloc++;\r
+ }\r
+ // try to make the allocations even\r
+ if(current_alloc>1 && (-slot_cpu) > (ht_cpu_allocs[iface][current_pos-1]/2.0)){\r
+ current_pos--;\r
+ current_alloc--;\r
+ }\r
+ new_allocation.push_back(current_alloc);\r
+ }\r
+ new_allocation.push_back(total_ht_sizes[iface]-current_pos);\r
+\r
+/*\r
+ int cumm_ht_alloc = 0; // ht allocated thus far\r
+ int current_pos = 0; // idx to the current_allocation, if_cpu arrays\r
+ int current_alloc = 0; // ht allocated in the curr_pos slot\r
+ vector<int> new_allocation;\r
+ for(i=0;i<ifindices.size()-1;++i){\r
+ double slot_cpu = total_cpu/ifindices.size(); // amount of cpu to allocate\r
+ int slot_ht = 0;\r
+ while(slot_cpu > 0.0){\r
+ double cpu_rate = if_cpu[current_pos] / current_allocation[current_pos];\r
+ double cpu_remaining = cpu_rate*(current_allocation[current_pos] - current_alloc);\r
+ if(slot_cpu <= cpu_remaining){\r
+ double this_cpu_alloc = slot_cpu;\r
+ int this_ht_alloc = (int)(this_cpu_alloc / cpu_rate);\r
+ slot_ht += this_ht_alloc;\r
+ slot_cpu = 0.0;\r
+ cumm_ht_alloc += this_ht_alloc;\r
+ current_alloc += this_ht_alloc;\r
+ if(current_alloc >= current_allocation[current_pos]){\r
+ current_pos++;\r
+ current_alloc = 0;\r
+ }\r
+ }else{\r
+ slot_cpu -= cpu_remaining;\r
+ slot_ht += current_allocation[current_pos] - current_alloc;\r
+ cumm_ht_alloc += current_allocation[current_pos] - current_alloc;\r
+ current_pos++;\r
+ current_alloc = 0;\r
+ }\r
+ }\r
+ new_allocation.push_back(slot_ht);\r
+ }\r
+ new_allocation.push_back(total_ht_allocation - cumm_ht_alloc);\r
+*/\r
+\r
+ new_rts_loads[iface] = new_allocation;\r
+\r
+\r
+/*\r
+printf("Interface %s:",iface.c_str());\r
+for(i=0;i<ifindices.size();++i){\r
+string full_iface = iface + "X" + int_to_string(ifindices[i]);\r
+int pid = pid_iface_map[full_iface];\r
+printf(" %f",rts_perf_map[pid]->avg_cpu_time()/total_cpu);\r
+}\r
+printf("\n\t");\r
+for(i=0;i<ifindices.size();++i){\r
+printf(" %d",prev_rts_loads[iface][i]);\r
+}\r
+printf("\n");\r
+*/\r
+ }\r
+\r
+\r
+ FILE *rrec_fl = fopen("rts_load.cfg.recommended","w");\r
+ if(rrec_fl == NULL){\r
+ fprintf(stderr,"Warning, can't open rts_load.cfg.recommended for write, skipping.\n");\r
+ }\r
+\r
+ printf("Recommended virtual interface hash allocation:\n");\r
+ map<string, vector<int> >::iterator msvii;\r
+ for(msvii=new_rts_loads.begin();msvii!=new_rts_loads.end();++msvii){\r
+ string iface_name = (*msvii).first;\r
+ vector<int> iface_alloc = (*msvii).second;\r
+ printf("%s",iface_name.c_str());\r
+ if(rrec_fl!=NULL) fprintf(rrec_fl,"%s",iface_name.c_str());\r
+ for(i=0;i<iface_alloc.size();++i){\r
+ printf(",%d",iface_alloc[i]);\r
+ if(rrec_fl!=NULL) fprintf(rrec_fl,",%d",iface_alloc[i]);\r
+ }\r
+ printf("\n");\r
+ if(rrec_fl!=NULL) fprintf(rrec_fl,"\n");\r
+ }\r
+ fclose(rrec_fl);\r
+\r
+\r
+ rrec_fl = fopen("rts_load.trace.txt","a");\r
+ if(rrec_fl != NULL){\r
+ fprintf(rrec_fl,"\n");\r
+ for(msvii=new_rts_loads.begin();msvii!=new_rts_loads.end();++msvii){\r
+ string iface_name = (*msvii).first;\r
+ vector<int> iface_alloc = (*msvii).second;\r
+ vector<double> iface_cpu_loads = rts_iface_cpu_load[iface_name];\r
+ vector<int> prev_iface_alloc = prev_rts_loads[iface_name];\r
+\r
+ fprintf(rrec_fl,"Interface %s:\n",iface_name.c_str());\r
+ fprintf(rrec_fl,"%s,Previous allocation,",iface_name.c_str());\r
+ for(i=0;i<prev_iface_alloc.size();++i){\r
+ if(i>0) fprintf(rrec_fl,",");\r
+ fprintf(rrec_fl,"%d",prev_iface_alloc[i]);\r
+ }\r
+ fprintf(rrec_fl,"\n%s,Previous cpu loads,",iface_name.c_str());\r
+ for(i=0;i<iface_cpu_loads.size();++i){\r
+ if(i>0) fprintf(rrec_fl,",");\r
+ fprintf(rrec_fl,"%f",iface_cpu_loads[i]);\r
+ }\r
+ fprintf(rrec_fl,"\n%s,New allocation,",iface_name.c_str());\r
+ for(i=0;i<iface_alloc.size();++i){\r
+ if(i>0) fprintf(rrec_fl,",");\r
+ fprintf(rrec_fl,"%d",iface_alloc[i]);\r
+ }\r
+ fprintf(rrec_fl,"\n\n");\r
+ }\r
+ }\r
+ fclose(rrec_fl);\r
+\r
+\r
+// ----------------------------------------------------------------\r
+// Make an hfta parallelism analysis. Start by collecting hftas and grouping\r
+// them by their copies. Count on the __copy%d name mangling.\r
+\r
+ set<string>::iterator ssi;\r
+ map<string, vector<int> > par_hfta_map;\r
+ for(ssi=found_names.begin();ssi!=found_names.end();++ssi){\r
+ string base = (*ssi);\r
+ int qidx = qname_to_idx[base];\r
+ if(qnode_list[qidx]->qnode_type == "HFTA"){\r
+ size_t cpos = (*ssi).find("__copy");\r
+ if(cpos!=string::npos){\r
+ base = (*ssi).substr(0,cpos);\r
+ string idx_str = (*ssi).substr(cpos+6);\r
+ int pidx = atoi(idx_str.c_str());\r
+ qnode_list[qidx]->par_index = pidx;\r
+ }\r
+ par_hfta_map[base].push_back(qidx);\r
+ }\r
+ }\r
+\r
+// Coalesce or split hftas. Reduce parallelism until the max cpu utilization\r
+// is in [cpu_util_threshold/2, cpu_util_threshold]. Double parallelism\r
+// if max cpu utilization is > cpu_util_threshold.\r
+// Only recommend parallelism if a resource utilization file was found.\r
+ if(res_fl!=NULL){\r
+ map<string, int> recommended_parallelism;\r
+ map<string, vector<int> >::iterator msvii;\r
+ for(msvii=par_hfta_map.begin();msvii!=par_hfta_map.end();++msvii){\r
+ vector<int> buddy_indices = (*msvii).second;\r
+ vector<qnode *> buddies;\r
+ int n_valid = 0;\r
+ for(i=0;i<buddy_indices.size();i++){\r
+ buddies.push_back(qnode_list[buddy_indices[i]]);\r
+ if(qnode_list[buddy_indices[i]]->perf->is_valid())\r
+ n_valid++;\r
+ }\r
+\r
+ if(n_valid>0){\r
+ sort(buddies.begin(),buddies.end(),cmpr_parallel_idx);\r
+\r
+ int level=1;\r
+ double max_util = 0.0;\r
+ while(level<=buddies.size()){\r
+ for(i=0;i<buddies.size();i+=level){\r
+ double this_util = 0.0;\r
+ for(j=0;j<level;j++){\r
+ this_util += buddies[i+j]->perf->avg_cpu_time();\r
+ }\r
+ if(this_util > max_util)\r
+ max_util = this_util;\r
+ }\r
+ if(max_util >= cpu_util_threshold/2)\r
+ break;\r
+ level *= 2;\r
+ }\r
+ int npar = buddies.size();\r
+ if(max_util > cpu_util_threshold)\r
+ level/=2;\r
+ if(level>buddies.size())\r
+ level/=2;\r
+ if(level==0)\r
+ npar *= 2;\r
+ else\r
+ npar /= level;\r
+\r
+ recommended_parallelism[(*msvii).first] = npar;\r
+ }else{\r
+ printf("Warning, no resource usage information for %s, skipping.\n",(*msvii).first.c_str());\r
+ }\r
+ }\r
+\r
+ FILE *hpar_fl = NULL;\r
+ hpar_fl=fopen("hfta_parallelism.cfg","r");\r
+ if(hpar_fl==NULL){\r
+ fprintf(stderr,"Warning, can't open hfta_parallelism.cfg, ignoring.\n");\r
+ }else{\r
+ while(fgets(line,LINEBUF,hpar_fl)){\r
+ int nflds = split_string(line,',',flds,SPLITBUF);\r
+ if(nflds==2){\r
+ int npar = atoi(flds[1]);\r
+ if(npar>0 && recommended_parallelism.count(flds[0])==0){\r
+ recommended_parallelism[flds[0]] = npar;\r
+ }\r
+ }\r
+ }\r
+ fclose(hpar_fl);\r
+ }\r
+\r
+ FILE *recpar_fl = NULL;\r
+ recpar_fl=fopen("hfta_parallelism.cfg.recommended","w");\r
+ if(recpar_fl==NULL){\r
+ fprintf(stderr,"Warning, can't open hfta_parallelism.cfg.recommended, can't write the file.\n");\r
+ }\r
+ printf("Recommended parallelism:\n");\r
+ map<string, int>::iterator msii;\r
+ for(msii=recommended_parallelism.begin();msii!=recommended_parallelism.end();++msii){\r
+ if(recpar_fl!=NULL)\r
+ fprintf(recpar_fl,"%s,%d\n",(*msii).first.c_str(),(*msii).second);\r
+ printf("%s,%d\n",(*msii).first.c_str(),(*msii).second);\r
+ }\r
+ fclose(recpar_fl);\r
+ }else{\r
+ printf("Can't recommend hfta parallelism, no resource utilization file found.\n");\r
+ }\r
+\r
+ FILE *rec_ht_fl = NULL;\r
+ rec_ht_fl=fopen("lfta_htsize.cfg.recommended","w");\r
+ if(rec_ht_fl==NULL){\r
+ fprintf(stderr,"Warning, can't open lfta_htsize.cfg.recommended, can't write the file.\n");\r
+ }\r
+ printf("Recommended LFTA hash table sizes:\n");\r
+ for(i=0;i<qnode_list.size();++i){\r
+ qnode *this_qn = qnode_list[i];\r
+ if(this_qn->qnode_type=="LFTA" && this_qn->aggr_tbl_size>0 && this_qn->accepted_tup>0){\r
+ int ht_size = this_qn->aggr_tbl_size;\r
+ double collision_rate = ((double)this_qn->collisions)/this_qn->accepted_tup;\r
+ double eviction_rate = ((double)this_qn->evictions)/this_qn->accepted_tup;\r
+ printf("%s htsize=%d collision=%f evictions=%f",this_qn->name.c_str(),ht_size,collision_rate,eviction_rate);\r
+//printf("%d,%f,%f\n",ht_size,collision_rate,eviction_rate);\r
+ if(eviction_rate >= erate_hi){\r
+ ht_size /= 2;\r
+ }else if(collision_rate >= crate_hi){\r
+ ht_size *= 2;\r
+ }else if(collision_rate < crate_lo){\r
+ ht_size /= 2;\r
+ }\r
+\r
+ printf(" rec ht_size=%d\n",ht_size);\r
+ fprintf(rec_ht_fl,"%s,%u\n",this_qn->name.c_str(),ht_size);\r
+\r
+ }\r
+ }\r
+ fclose(rec_ht_fl);\r
+\r
+\r
+// Try to load the cpu configuration info\r
+\r
+ vector <cpu_info_str *> cpu_info_list;\r
+ FILE *cinfo_fl = NULL;\r
+ cinfo_fl=fopen("cpu_info.csv","r");\r
+ if(cinfo_fl==NULL){\r
+ fprintf(stderr,"Warning, can't open cpu_info.csv, skipping process pinning optimization. Run gscpv3/bin/parse_cpuinfo.pl to generate a cpu_info.csv file.\n");\r
+ }else{\r
+ int lineno = 0;\r
+ while(fgets(inp,LINEBUF,cinfo_fl)){\r
+ int nflds = split_string(inp,',',flds,SPLITBUF);\r
+ lineno++;\r
+ if(nflds >= 3){\r
+ cpu_info_list.push_back(new cpu_info_str(atoi(flds[0]),atoi(flds[1]),atoi(flds[2])));\r
+ }\r
+ }\r
+\r
+ sort(cpu_info_list.begin(),cpu_info_list.end(),cmpr_cpu_info);\r
+\r
+// Spread the LFTAs among the cores.\r
+ vector<string> iface_names;\r
+ for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){\r
+ string iface = (*msvi).first;\r
+ vector<int> ifindices = (*msvi).second;\r
+ sort(ifindices.begin(),ifindices.end());\r
+\r
+ for(i=0;i<ifindices.size();++i){\r
+ string full_iface = iface + "X" + int_to_string(ifindices[i]);\r
+ iface_names.push_back(full_iface);\r
+ }\r
+ }\r
+\r
+\r
+ map<string, int> rts_assignment;\r
+ double stride = ((double)(cpu_info_list.size()))/((double)(iface_names.size()));\r
+ double rtspos_f = 0.0;\r
+ for(i=0;i<iface_names.size();++i){\r
+ int rtspos = (int)rtspos_f;\r
+ rts_assignment[iface_names[i]] = rtspos;\r
+ cpu_info_list[rtspos]->assigned_load += rts_perf_map[pid_iface_map[iface_names[i]]]->avg_cpu_time();\r
+ rtspos_f += stride;\r
+ }\r
+\r
+//for(i=0;i<iface_names.size();++i){\r
+//printf("Placing %s at %d\n",iface_names[i].c_str(), rts_assignment[iface_names[i]]);\r
+//}\r
+\r
+ set<string> eligible_hftas;\r
+ map<string, int> hfta_assignment;\r
+ set<string>::iterator ssi;\r
+ for(ssi = found_names.begin();ssi!=found_names.end();++ssi){\r
+ int qidx = qname_to_idx[(*ssi)];\r
+//printf("Considering %s (%d), sz=%f, cpu=%f\n",(*ssi).c_str(),qidx,qnode_list[qidx]->inferred_in_sz,qnode_list[qidx]->perf->avg_cpu_time());\r
+ if(qnode_list[qidx]->inferred_in_sz >= min_hfta_insz || qnode_list[qidx]->perf->avg_cpu_time() > min_hfta_cpu){\r
+//printf("\tAdding to eligible list\n");\r
+ eligible_hftas.insert((*ssi));\r
+ }\r
+ }\r
+\r
+ while(eligible_hftas.size()>0){\r
+ int chosen_hfta = -1;\r
+ double max_assigned_rate = 0.0;\r
+ for(ssi=eligible_hftas.begin();ssi!=eligible_hftas.end();++ssi){\r
+ double assigned_rate = 0.0;\r
+ string qname = (*ssi);\r
+ int qidx = qname_to_idx[qname];\r
+ vector<int> reads_from = qnode_list[qidx]->reads_from_idx;\r
+ for(i=0;i<reads_from.size();++i){\r
+ if(qnode_list[reads_from[i]]->qnode_type == "LFTA" || (qnode_list[reads_from[i]]->qnode_type == "HFTA" && hfta_assignment.count(qnode_list[reads_from[i]]->name) > 0))\r
+ assigned_rate += qnode_list[reads_from[i]]->output_rate();\r
+ }\r
+//printf("hfta %s, assigned rate=%f\n",qname.c_str(),assigned_rate);\r
+ if(assigned_rate >= max_assigned_rate){\r
+//printf("\t picking %s\n",qname.c_str());\r
+ max_assigned_rate = assigned_rate;\r
+ chosen_hfta = qidx;\r
+ }\r
+ }\r
+ if(chosen_hfta >= 0){\r
+ vector<int> reads_from = qnode_list[chosen_hfta]->reads_from_idx;\r
+ vector<int> src_location;\r
+ vector<double> src_volume;\r
+ for(i=0;i<reads_from.size();++i){\r
+ int qidx = reads_from[i];\r
+ if(qnode_list[qidx]->qnode_type == "HFTA"){\r
+ if(hfta_assignment.count(qnode_list[qidx]->name)>0){\r
+ src_location.push_back(hfta_assignment[qnode_list[qidx]->name]);\r
+ src_volume.push_back(qnode_list[qidx]->output_rate());\r
+ }\r
+ }\r
+ if(qnode_list[qidx]->qnode_type == "LFTA"){\r
+ if(rts_assignment.count(qnode_list[qidx]->src_interface)>0){\r
+ src_location.push_back(rts_assignment[qnode_list[qidx]->src_interface]);\r
+ src_volume.push_back(qnode_list[qidx]->output_rate());\r
+ }\r
+ }\r
+ }\r
+//printf("chosen hfta is %d (%s), sources are:\n",chosen_hfta,qnode_list[chosen_hfta]->name.c_str());\r
+//for(i=0;i<src_location.size();++i){\r
+//printf("\tloc=%d, (%s) volume=%f\n",src_location[i],cpu_info_list[src_location[i]]->to_csv().c_str(),src_volume[i]);\r
+//}\r
+\r
+ double hfta_cpu_usage = qnode_list[chosen_hfta]->perf->avg_cpu_time();\r
+ if(hfta_cpu_usage > cpu_util_threshold) // hack for overloaded hftas.\r
+ hfta_cpu_usage = cpu_util_threshold * .9999;\r
+printf("hfta %d (%s) has cpu usage %f\n",chosen_hfta,qnode_list[chosen_hfta]->name.c_str(),hfta_cpu_usage);\r
+ int best_cpu = -1;\r
+ double lowest_cost = 0.0;\r
+ for(i=0;i<cpu_info_list.size();++i){\r
+ double curr_cost = 0.0;\r
+ for(j=0;j<src_location.size();++j){\r
+ int dist = cpu_info_list[i]->distance_from(cpu_info_list[src_location[j]]);\r
+ curr_cost += src_volume[j]*xfer_costs[dist];\r
+ }\r
+//printf("Cpu %s, cost=%f\n",cpu_info_list[i]->to_csv().c_str(),curr_cost);\r
+ if((cpu_info_list[i]->assigned_load+hfta_cpu_usage < cpu_util_threshold) && (best_cpu<0 || curr_cost <= lowest_cost)){\r
+ best_cpu = i;\r
+ lowest_cost = curr_cost;\r
+//printf("\tpicking %s\n",cpu_info_list[i]->to_csv().c_str());\r
+ }\r
+ }\r
+\r
+ if(best_cpu>=0)\r
+ cpu_info_list[best_cpu]->assigned_load += hfta_cpu_usage;\r
+ hfta_assignment[qnode_list[chosen_hfta]->name] = best_cpu;\r
+ eligible_hftas.erase(qnode_list[chosen_hfta]->name);\r
+\r
+ }else{\r
+ fprintf(stderr,"ERROR, chosen_hfta=-1, bailing out.\n");\r
+ exit(1);\r
+ }\r
+ }\r
+\r
+ FILE *pin_fl = fopen("pinning_info.csv","w");\r
+ if(pin_fl==NULL){\r
+ fprintf(stderr,"Warning, can't open pinning_info.csv, can't write the file.\n");\r
+ }\r
+ printf("RTS assignments:\n");\r
+ for(i=0;i<iface_names.size();++i){\r
+ int assigned_cpu = rts_assignment[iface_names[i]];\r
+ if(assigned_cpu>=0){\r
+ printf("Place %s at %d (%s)\n",iface_names[i].c_str(), assigned_cpu, cpu_info_list[assigned_cpu]->to_csv().c_str());\r
+ if(pin_fl != NULL){\r
+ fprintf(pin_fl,"rts %s,%d\n",iface_names[i].c_str(), assigned_cpu);\r
+ }\r
+ }\r
+ }\r
+\r
+ printf("HFTA assignments:\n");\r
+ map<string, int>::iterator msii;\r
+ for(msii=hfta_assignment.begin();msii!=hfta_assignment.end();++msii){\r
+ int assigned_cpu = (*msii).second;\r
+ string qname = (*msii).first;\r
+ if(assigned_cpu>=0){\r
+ printf("Place %s (%s) at %d (%s)\n",qname.c_str(),qnode_list[qname_to_idx[qname]]->executable_name.c_str(),assigned_cpu,cpu_info_list[assigned_cpu]->to_csv().c_str());\r
+ if(pin_fl != NULL){\r
+ fprintf(pin_fl,"%s,%d\n",qnode_list[qname_to_idx[qname]]->executable_name.c_str(), assigned_cpu);\r
+ }\r
+ }\r
+ }\r
+\r
+ }\r
+\r
+//for(i=0;i<cpu_info_list.size();++i){\r
+//printf("Cpu %d assigned load %f\n",i,cpu_info_list[i]->assigned_load);\r
+//}\r
+\r
+return 0;\r
+}\r
+\r
+\r
+\r