1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
34 extern int xmlParserparse(void);
35 extern FILE *xmlParserin;
36 extern int xmlParserdebug;
39 int init_discard = 12;
41 int rts_load_history_len = 3;
42 double hist_multiplier = 0.8;
43 bool uniform_rts_alloc = true;
48 double min_hfta_insz = 1000000.0;
49 double min_hfta_cpu = 0.2;
51 double cpu_util_threshold = 0.9;
52 double crate_hi=.01; // upper bound on collision rate
53 double crate_lo=.002; // lower bound, increase HT size
54 double erate_hi=.01; // upper bound on eviction ratemsvii
56 double xfer_costs[4] = {.1, .1, .3, 1.0};
59 int split_string(char *instr,char sep, char **words,int max_words){
65 words[nwords++] = str;
66 while( (loc = strchr(str,sep)) != NULL){
69 if(nwords >= max_words){
70 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
73 words[nwords++] = str;
79 string int_to_string(int i){
82 sprintf(tmpstr,"%d",i);
96 fta_addr(int i, int p, int s){
103 struct cmpr_fta_addr{
104 bool operator()(fta_addr const &a, fta_addr const &b) const{
113 if(a.streamid < b.streamid)
119 bool cmpr_parallel_idx(const qnode *a, const qnode *b){
120 return a->par_index < b->par_index;
128 double assigned_load;
130 cpu_info_str(int p, int s, int c){
139 sprintf(buf,"%d,%d,%d",processor_id,socket_id,core_id);
143 int distance_from(cpu_info_str *other){
144 if(socket_id != other->socket_id)
146 if(core_id != other->core_id)
148 if(processor_id != other->processor_id)
154 bool cmpr_cpu_info(cpu_info_str const *a, cpu_info_str const *b){
155 if(a->socket_id < b->socket_id)
157 if(a->socket_id > b->socket_id)
159 if(a->core_id < b->core_id)
161 if(a->core_id > b->core_id)
163 if(a->processor_id < b->processor_id)
171 int main(int argc, char **argv){
174 time_t now = time(NULL);
175 tm *now_tm = localtime(&now);
176 int year=now_tm->tm_year;
181 string trace_file="";
182 string resource_log_file = "resource_log.csv";
184 const char *optstr = "d:r:i:l:m:UNu:s:C:c:E:0:1:2:";
185 const char *usage_str =
186 "Usage: %s [options] trace_file\n"
187 "\t-d source_directory\n"
188 "\t-r resource_log_file\n"
189 "\t-i initial discard from the resource log\n"
190 "\t-s default interface hash table length.\n"
191 "\t-l rts load history length\n"
192 "\t-m rts load history multiplier\n"
193 "\t-U All rts interface hashes are the same.\n"
194 "\t-N rts interface hashes are processed independently.\n"
195 "\t-u max cpu utilization threshold\n"
196 "\t-C upper bound on collision rate.\n"
197 "\t-c lower bound on collision rate.\n"
198 "\t-E upper bound on the eviction rate.\n"
199 "\t-0 communication cost multiplier for 0-distance processes.\n"
200 "\t-1 communication cost multiplier for 1-distance processes.\n"
201 "\t-2 communication cost multiplier for 2-distance processes.\n"
205 while((chopt = getopt(argc,argv,optstr)) != -1){
208 xfer_costs[0] = atof(optarg);
209 if(xfer_costs[0] < 0 || xfer_costs[0] > 1){
210 fprintf(stderr,"ERROR, 0-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[0]);
215 xfer_costs[1] = atof(optarg);
216 if(xfer_costs[1] < 0 || xfer_costs[1] > 1){
217 fprintf(stderr,"ERROR, 1-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[1]);
222 xfer_costs[2] = atof(optarg);
223 if(xfer_costs[2] < 0 || xfer_costs[2] > 1){
224 fprintf(stderr,"ERROR, 2-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[2]);
229 crate_hi = atof(optarg);
230 if(crate_hi < 0 || crate_hi>1){
231 fprintf(stderr,"ERROR, request to set crate_hi to %f\n must be in [0,1].\n",hist_multiplier);
236 crate_lo = atof(optarg);
237 if(crate_lo < 0 || crate_lo>1){
238 fprintf(stderr,"ERROR, request to set crate_lo to %f\n must be in [0,1].\n",hist_multiplier);
243 erate_hi = atof(optarg);
244 if(erate_hi < 0 || erate_hi>1){
245 fprintf(stderr,"ERROR, request to set erate_hi to %f\n must be in [0,1].\n",hist_multiplier);
250 htmax = atoi(optarg);
252 fprintf(stderr,"ERROPR, htmax set to %d, must be positive nonzero.\n",htmax);
257 hist_multiplier = atof(optarg);
258 if(hist_multiplier <= 0){
259 fprintf(stderr,"ERROR, request to set hist_multiplier to %f\n must be positive nonzero.\n",hist_multiplier);
264 cpu_util_threshold = atof(optarg);
265 if(cpu_util_threshold<=0 || cpu_util_threshold>1){
266 fprintf(stderr,"ERROR, cpu_threshold set to %f, must be in (0,1].\n",cpu_util_threshold);
271 uniform_rts_alloc=true;
274 uniform_rts_alloc=false;
280 resource_log_file = optarg;
283 init_discard = atoi(optarg);
284 if(init_discard < 0){
286 fprintf(stderr,"ERROR, atttempting to set init_discard to a negative value (%d), setting to zero.\n",init_discard);
290 rts_load_history_len = atoi(optarg);
291 if(rts_load_history_len < 0){
292 rts_load_history_len=0;
293 fprintf(stderr,"ERROR, atttempting to set rts_load_history_len to a negative value (%d), setting to zero.\n",rts_load_history_len);
297 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
298 fprintf(stderr,"%s\n", usage_str);
301 fprintf(stderr,"Invalid arguments\n");
302 fprintf(stderr,"%s\n", usage_str);
309 trace_file = argv[0];
311 if(trace_file == ""){
312 fprintf(stderr, usage_str, argv[0]);
316 // for month string-to-int conversion
317 const char *months_str[12] = {"Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"};
318 map<string, int> month_str_to_int;
320 month_str_to_int[months_str[i]] = i;
323 FILE *qtree_fl = NULL;
324 string qtree_flname = src_dir + "/" + "qtree.xml";
325 string actual_qtree_flname;
327 qtree_fl = fopen(qtree_flname.c_str(),"r");
328 actual_qtree_flname = qtree_flname;
330 if(qtree_fl == NULL){
331 qtree_fl = fopen("qtree.xml","r");
332 actual_qtree_flname = "qtree.xml";
334 if(qtree_fl == NULL){
335 fprintf(stderr,"ERROR, can't open ");
337 fprintf(stderr,"%s or ",qtree_flname.c_str());
339 fprintf(stderr,"qtree.xml, exiting.\n");
344 // Parse the qtree.xml file
345 xmlParser_setfileinput(qtree_fl);
346 if(xmlParserparse()){
347 fprintf(stderr,"ERROR, could not parse query tree file %s\n",actual_qtree_flname.c_str());
350 // Get the lfta, hfta nodes
351 xml_t *xroot = xml_result;
352 vector<xml_t *> xqnodes;
353 xroot->get_roots("HFTA",xqnodes);
354 xroot->get_roots("LFTA",xqnodes);
356 map<string,int> qname_to_idx;
357 map<string,int> exe_to_idx;
358 vector<qnode *> qnode_list;
361 for(i=0;i<xqnodes.size();++i){
363 if(xqnodes[i]->get_attrib_val("name",qname)){
364 //printf("node type = %s, name=%s\n",xqnodes[i]->name.c_str(),qname.c_str());
365 qnode *qn = new qnode(xqnodes[i]->name,qname,init_discard);
366 for(s=0;s<xqnodes[i]->subtrees.size();++s){
367 xml_t *xsub = xqnodes[i]->subtrees[s];
368 if(xsub->name == "Field"){
370 bool nret = xsub->get_attrib_val("name",fname);
372 bool pret = xsub->get_attrib_val("pos",fpos);
374 bool tret = xsub->get_attrib_val("type",ftype);
376 bool mret = xsub->get_attrib_val("mods",fmods);
377 if(nret && pret && tret){
378 field_str *fld = new field_str(fname,atoi(fpos.c_str()),ftype,fmods);
381 fprintf(stderr,"---> subtree %d of FTA %s has an malformed field.\n",s,qname.c_str());
384 if(xsub->name == "HtSize"){
386 bool sret = xsub->get_attrib_val("value",src);
388 int htsize = atoi(src.c_str());
390 unsigned int naggrs = 1; // make it power of 2
391 unsigned int nones = 0;
395 naggrs = naggrs << 1;
396 htsize = htsize >> 1;
398 if(nones==1) // in case it was already a power of 2.
400 qn->aggr_tbl_size = naggrs;
402 fprintf(stderr,"---> subtree %d of FTA %s has an invalid HtSize (%s).\n",s,qname.c_str(),src.c_str());
405 fprintf(stderr,"---> subtree %d of FTA %s has an malformed HtSize.\n",s,qname.c_str());
408 if(xsub->name == "ReadsFrom"){
410 bool sret = xsub->get_attrib_val("value",src);
414 fprintf(stderr,"---> subtree %d of FTA %s has an malformed ReadsFrom.\n",s,qname.c_str());
417 if(xsub->name == "Interface"){
419 bool sret = xsub->get_attrib_val("value",iface);
421 qn->src_interface = iface;
424 if(xsub->name == "FileName"){
426 bool sret = xsub->get_attrib_val("value",full_fname);
428 size_t dotpos = full_fname.find_first_of('.');
429 if(dotpos != string::npos){
430 qn->executable_name = full_fname.substr(0,dotpos);
435 qname_to_idx[qname] = qnode_list.size();
436 if(qn->executable_name != "" && qn->executable_name != "rts")
437 exe_to_idx[qn->executable_name] = qnode_list.size();
438 qnode_list.push_back(qn);
440 fprintf(stderr,"---> node type %s, no name.\n",xqnodes[i]->name.c_str());
446 for(i=0;i<qnode_list.size();++i){
447 if(qnode_list[i]->qnode_type == "HFTA"){
448 for(s=0;s<qnode_list[i]->reads_from.size();++s){
449 if(qname_to_idx.count(qnode_list[i]->reads_from[s])>0){
450 int src_id = qname_to_idx[qnode_list[i]->reads_from[s]];
451 qnode_list[i]->reads_from_idx.push_back(src_id);
452 qnode_list[src_id]->sources_to_idx.push_back(i);
454 fprintf(stderr,"ERROR, hfta %s reads_from %s, but its not in qtree.xml.\n",qnode_list[i]->name.c_str(),qnode_list[i]->reads_from[s].c_str());
465 for(i=0;i<qnode_list.size();++i){
466 printf("node %s reads_from:",qnode_list[i]->name.c_str());
467 for(s=0;s<qnode_list[i]->reads_from.size();++s){
468 printf(" %s",qnode_list[i]->reads_from[s].c_str());
470 printf("\nand sources to:\n");
471 for(s=0;s<qnode_list[i]->sources_to_idx.size();++s){
472 printf(" %s",qnode_list[qnode_list[i]->sources_to_idx[s]]->name.c_str());
478 string tracefilename = trace_file;
480 tracefilename = src_dir + "/" + trace_file;
482 FILE *trace_fl = NULL;
483 if((trace_fl = fopen(tracefilename.c_str(),"r"))==NULL){
484 fprintf(stderr,"ERROR, can't open trace file %s\n",tracefilename.c_str());
488 map<fta_addr,string, cmpr_fta_addr> qnode_map;
489 map<int, perf_struct *> rts_perf_map;
490 map<int, string> rts_iface_map;
491 map<string, int> pid_iface_map;
494 char inp[LINEBUF],line[LINEBUF],*saveptr;
495 unsigned int hbeat_ip, hbeat_port, hbeat_index, hbeat_streamid, hbeat_trace_id,hbeat_ntraces;
496 while(fgets(inp,LINEBUF,trace_fl)){
498 // Try to grab the timestamp
501 strncpy(line,inp,LINEBUF);
503 int mon=0,day,hr,mn,sec;
504 int nret = sscanf(line,"%c%c%c %d %d:%d:%d",mon_str,mon_str+1,mon_str+2,&day,&hr,&mn,&sec);
507 if(month_str_to_int.count(mon_str)>0){
508 mon = month_str_to_int[mon_str];
510 fprintf(stderr,"Warning, %s not recognized as a month string.\n",mon_str);
512 time_str.tm_sec = sec;
513 time_str.tm_min = mn;
514 time_str.tm_hour = hr;
515 time_str.tm_mday = day;
516 time_str.tm_mon = mon;
517 time_str.tm_year = year;
518 tick = mktime(&time_str);
519 //printf("mon=%d, day=%d, hr=%d, mn=%d, sec=%d, tick=%d\n",mon,day,hr,mn,sec,tick);
522 // Grab the process ID
523 strncpy(line,inp,LINEBUF);
526 char *segment = strtok_r(line,"[",&saveptr);
528 segment = strtok_r(NULL,"[",&saveptr);
529 nret = sscanf(segment,"%d]",&tmp_pid);
535 // Grab address-to-hfta mappings
536 strncpy(line,inp,LINEBUF);
537 segment = strtok_r(line,"]",&saveptr);
539 segment = strtok_r(NULL," ",&saveptr);
540 segment = strtok_r(NULL," ",&saveptr);
542 //printf("segmetn=<%s>, comparison=%d\n",segment,strcmp(segment,"Lookup"));
543 if(segment!=NULL && strcmp(segment,"Lookup")==0){
545 char fta_name[LINEBUF];
551 while(segment != NULL){
552 //printf("pos=%d, segment = %s\n",pos, segment);
555 strncpy(fta_name,segment,LINEBUF);
559 nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&ip,&port,&index,&streamid);
562 segment = strtok_r(NULL," ",&saveptr);
565 //printf("nret=%d, fta_name=%s,ip=%d,port=%d,index=%d,streamid=%d\n",nret,fta_name,ip,port,index,streamid);
566 fta_addr addr(ip,port,streamid);
567 qnode_map[addr] = fta_name;
568 //printf("Adding fta %s, (%d %d %d) to qnode_map\n",fta_name,addr.ip,addr.port,addr.streamid);
572 if(segment!=NULL && strcmp(segment,"Lfta")==0){
574 char fta_name[LINEBUF];
580 while(segment != NULL){
581 //printf("pos=%d, segment = %s\n",pos, segment);
584 strncpy(fta_name,segment,LINEBUF);
588 nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&ip,&port,&index,&streamid);
591 segment = strtok_r(NULL," ",&saveptr);
594 //printf("nret=%d, fta_name=%s,ip=%d,port=%d,index=%d,streamid=%d\n",nret,fta_name,ip,port,index,streamid);
595 fta_addr addr(ip,port,streamid);
596 qnode_map[addr] = fta_name;
597 //printf("Adding lfta %s, (%d %d %d) to qnode_map\n",fta_name,addr.ip,addr.port,addr.streamid);
601 if(segment!=NULL && strcmp(segment,"Init")==0){
605 while(segment != NULL){
610 for(cc=segment;*cc!='\0';++cc){
619 segment = strtok_r(NULL," ",&saveptr);
621 if(iface!="" && keywd == "LFTAs"){
622 rts_perf_map[pid] = new perf_struct(init_discard);
623 rts_iface_map[pid] = iface;
624 pid_iface_map[iface] = pid;
627 if(segment!=NULL && strcmp(segment,"Heartbeat")==0){
629 int nret=0,nret2=0,nret3=0;
630 unsigned int tmp_hbeat_ip, tmp_hbeat_port, tmp_hbeat_index, tmp_hbeat_streamid, tmp_hbeat_trace_id,tmp_hbeat_ntraces;
631 while(segment != NULL){
632 //printf("pos=%d, segment = %s\n",pos, segment);
634 nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&tmp_hbeat_ip,&tmp_hbeat_port,&tmp_hbeat_index,&tmp_hbeat_streamid);
636 nret2 = sscanf(segment,"trace_id=%d",&tmp_hbeat_trace_id);
638 nret3 = sscanf(segment,"ntrace=%d",&tmp_hbeat_ntraces);
640 segment = strtok_r(NULL," ",&saveptr);
642 if(nret>=4 && nret2 >= 1 && nret3 == 1){
643 hbeat_ip = tmp_hbeat_ip;
644 hbeat_port = tmp_hbeat_port;
645 hbeat_index = tmp_hbeat_index;
646 hbeat_streamid = tmp_hbeat_streamid;
647 hbeat_trace_id = tmp_hbeat_trace_id;
648 hbeat_ntraces = tmp_hbeat_ntraces;
649 // printf("ip=%d, port=%d, index=%d, streamid=%d hbeat_trace_id=%d\n",hbeat_ip,hbeat_port,hbeat_index,hbeat_streamid,hbeat_trace_id);
650 fta_addr hb_addr(hbeat_ip,hbeat_port,tmp_hbeat_streamid);
651 if(qnode_map.count(hb_addr) == 0){
652 hb_addr.streamid = 0; // maybe an hfta?
653 if(qnode_map.count(hb_addr) == 0){
655 //printf("Hbeat streamid=%d no match (%d,%d), hbeat_trace_id=%d\n",hbeat_streamid,hbeat_ip,hbeat_port,hbeat_trace_id);
658 //printf("Hbeat streamid=%d matches %s, hbeat_trace_id=%d\n",hbeat_streamid,qnode_map[hb_addr].c_str(),hbeat_trace_id);
661 printf("Couldn't parse as hearbeat %s\n",inp);
664 if(segment!=NULL && strncmp(segment,"trace_id=",8)==0){
666 int nret=0,nret2=0,nret3=0;
667 unsigned long long int trace_id;
668 unsigned int tr_pos,tr_ip,tr_port,tr_index,tr_streamid;
669 unsigned int tr_intup,tr_outtup,tr_outsz,tr_acctup,tr_cycles;
670 unsigned int tr_evictions,tr_collisions;
672 nret = sscanf(segment,"trace_id=%llu",&trace_id);
673 while(segment != NULL){
674 //printf("pos=%d, segment = %s\n",pos, segment);
676 nret = sscanf(segment,"trace_id=%llu",&trace_id);
678 nret2 = sscanf(segment,"trace[%d].ftaid={ip=%u,port=%u,index=%u,streamid=%u}",&tr_pos,&tr_ip,&tr_port,&tr_index,&tr_streamid);
680 nret3 = sscanf(segment,"fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%u,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%lf}",
681 &tr_intup, &tr_outtup, &tr_outsz, &tr_acctup,&tr_cycles,
682 &tr_collisions,&tr_evictions,&tr_sample);
684 segment = strtok_r(NULL," ",&saveptr);
686 if(nret>=1 && nret2>=5 && nret3>=7){
687 fta_addr tr_addr(tr_ip,tr_port,tr_streamid);
688 if(qnode_map.count(tr_addr)==0)
689 tr_addr.streamid = 0; // maybe an hfta?
690 if(qnode_map.count(tr_addr)>0){
691 //printf("Trace idx=%d streamid=%d matches %s, trace_id=%d, hbeat_trace_id=%d\n",tr_pos,tr_streamid,qnode_map[tr_addr].c_str(),trace_id,hbeat_trace_id);
692 if(tr_pos+1 == hbeat_ntraces){
693 string qname = qnode_map[tr_addr];
694 int qidx = qname_to_idx[qname];
695 if(qnode_list[qidx]->start_tick < 0)
696 qnode_list[qidx]->start_tick = tick;
697 if(qnode_list[qidx]->end_tick < tick)
698 qnode_list[qidx]->end_tick = tick;
699 qnode_list[qidx]->in_tup += tr_intup;
700 qnode_list[qidx]->out_tup += tr_outtup;
701 qnode_list[qidx]->out_sz += tr_outsz;
702 qnode_list[qidx]->accepted_tup += tr_acctup;
703 qnode_list[qidx]->cycles += tr_cycles;
704 qnode_list[qidx]->collisions += tr_collisions;
705 qnode_list[qidx]->evictions += tr_evictions;
709 //printf("Trace idx=%d streamid=%d no match (%d,%d), trace_id=%d, hbeat_trace_id=%d\n",tr_pos,tr_streamid,tr_ip,tr_port,trace_id,hbeat_trace_id);
715 //printf("qnode_map has %d entries\n",qnode_map.size());
717 // Open and process performance log info, if any.
719 resource_log_file = src_dir + "/" + resource_log_file;
722 if((res_fl = fopen(resource_log_file.c_str(),"r"))==NULL){
723 fprintf(stderr,"ERROR, can't open trace file %s\n",resource_log_file.c_str());
727 char *flds[SPLITBUF];
729 while(fgets(inp,LINEBUF,res_fl)){
730 int nflds = split_string(inp,',',flds,SPLITBUF);
733 int ts = atoi(flds[0]);
734 string procname = flds[1];
735 int pid = atoi(flds[2]);
736 unsigned long long int utime = atoll(flds[3]);
737 unsigned long long int stime = atoll(flds[4]);
738 unsigned long long int vm_size = atoll(flds[5]);
739 unsigned long long int rss_size = atoll(flds[6]);
740 int pagesize = atoi(flds[7]);
742 if(procname == "rts"){
743 if(rts_perf_map.count(pid)>0){
744 if(rts_perf_map[pid]->update(ts,utime,stime,vm_size,rss_size)){
745 fprintf(stderr,"Resource log file is corrupted, line %d\n",lineno);
750 if(exe_to_idx.count(procname)>0){
751 perf_struct *p = qnode_list[exe_to_idx[procname]]->perf;
752 if(p->update(ts,utime,stime,vm_size,rss_size)){
753 fprintf(stderr,"Resource log file is corrupted, line %d\n",lineno);
763 FILE *rpt_fl = fopen("performance_report.csv","w");
765 fprintf(stderr,"Warning, can't open performance_report.csv, can't save the performance report.\n");
769 printf("Performance report:\n");
770 if(rpt_fl) fprintf(rpt_fl,"query_name,process_name,in_tup,out_tup,out_sz,accepted_tup,cycles,collisions,evictions,in_tup_rate,out_tup_rate,out_bytes_rate,cycle_consumption_rate");
771 if(rpt_fl) fprintf(rpt_fl,",%s",perf_struct::to_csv_hdr().c_str());
772 if(rpt_fl) fprintf(rpt_fl,",packets_sent_to_query,fraction_intup_lost,inferred_read_rate");
773 if(rpt_fl) fprintf(rpt_fl,"\n");
775 map<fta_addr,string>::iterator mpiisi;
777 set<string> found_names;
778 for(mpiisi=qnode_map.begin();mpiisi!=qnode_map.end();++mpiisi){
779 string qname = (*mpiisi).second;
780 if(found_names.count(qname)==0){
781 found_names.insert(qname);
782 int qidx = qname_to_idx[qname];
783 string executable = qnode_list[qidx]->executable_name;
786 // printf("(%d,%d,%d) -> %s, %d reads-from\n",(*mpiisi).first.ip,(*mpiisi).first.port,(*mpiisi).first.streamid,qname.c_str(),qnode_list[qidx]->reads_from_idx.size());
787 printf("query=%s, executable=%s\tintup=%llu, out_tup=%llu, out_sz=%llu, accepted_tup=%llu, cycles=%llu, collisions=%llu, evictions=%llu\n",
788 qname.c_str(),qnode_list[qidx]->executable_name.c_str(),
789 qnode_list[qidx]->in_tup,
790 qnode_list[qidx]->out_tup,
791 qnode_list[qidx]->out_sz,
792 qnode_list[qidx]->accepted_tup,
793 qnode_list[qidx]->cycles,
794 qnode_list[qidx]->collisions,
795 qnode_list[qidx]->evictions
797 if(rpt_fl) fprintf(rpt_fl,"%s,%s,%llu,%llu,%llu,%llu,%llu,%llu,%llu",
798 qname.c_str(),qnode_list[qidx]->executable_name.c_str(),
799 qnode_list[qidx]->in_tup,
800 qnode_list[qidx]->out_tup,
801 qnode_list[qidx]->out_sz,
802 qnode_list[qidx]->accepted_tup,
803 qnode_list[qidx]->cycles,
804 qnode_list[qidx]->collisions,
805 qnode_list[qidx]->evictions
807 double duration = 1.0*(qnode_list[qidx]->end_tick - qnode_list[qidx]->start_tick);
809 printf("\tin_tup_rate=%f, out_tup_rate=%f, out_byte_rate=%f, cycle_rate=%f\n",qnode_list[qidx]->in_tup/duration,qnode_list[qidx]->out_tup/duration,qnode_list[qidx]->out_sz/duration,qnode_list[qidx]->cycles/duration);
810 if(rpt_fl) fprintf(rpt_fl,",%f,%f,%f,%f",qnode_list[qidx]->in_tup/duration,qnode_list[qidx]->out_tup/duration,qnode_list[qidx]->out_sz/duration,qnode_list[qidx]->cycles/duration);
811 printf("\t%s\n",qnode_list[qidx]->perf->to_string().c_str());
812 if(rpt_fl) fprintf(rpt_fl,",%s",qnode_list[qidx]->perf->to_csv().c_str());
813 //if(qnode_list[qidx]->aggr_tbl_size>0){
814 //printf("\taggregate table size is %d\n",qnode_list[qidx]->aggr_tbl_size);
816 //if(qnode_list[qidx]->src_interface != ""){
817 //printf("\tSource interface is %s\n",qnode_list[qidx]->src_interface.c_str());
819 if(qnode_list[qidx]->reads_from_idx.size()>0){
820 unsigned long long int total_sent = 0;
821 for(i=0;i<qnode_list[qidx]->reads_from_idx.size();++i){
822 total_sent += qnode_list[qnode_list[qidx]->reads_from_idx[i]]->out_tup;
823 qnode_list[qidx]->inferred_in_sz += qnode_list[qnode_list[qidx]->reads_from_idx[i]]->out_sz / (qnode_list[qnode_list[qidx]->reads_from_idx[i]]->end_tick-qnode_list[qnode_list[qidx]->reads_from_idx[i]]->start_tick);
825 printf("\t%llu packets sent, %f lost, inferred read rate is %f\n",total_sent,(1.0*total_sent-qnode_list[qidx]->in_tup)/total_sent, qnode_list[qidx]->inferred_in_sz);
826 if(rpt_fl) fprintf(rpt_fl,",%llu,%f,%f",total_sent,(1.0*total_sent-qnode_list[qidx]->in_tup)/total_sent, qnode_list[qidx]->inferred_in_sz);
829 if(rpt_fl) fprintf(rpt_fl,",,,");
832 if(rpt_fl) fprintf(rpt_fl,"\n");
839 // Collect performance info about RTSs and determine a better hash partitioning.
841 // First, grab any existing balancing information
842 map<string, vector<int> > prev_rts_loads;
843 FILE *rload_fl = NULL;
844 rload_fl = fopen("rts_load.cfg","r");
846 if(rload_fl != NULL){
847 while(fgets(line,LINEBUF,rload_fl)){
849 int nflds = split_string(line,',',flds,SPLITBUF);
852 bool invalid_line=false;
854 for(i=1;i<nflds;++i){
855 int new_val = atoi(flds[i]);
856 if(new_val < prev_val)
858 hbounds.push_back(new_val);
861 prev_rts_loads[flds[0]] = hbounds;
863 printf("Warning, rts_load.cfg has an invalid entry on line %d, skipping\n",lineno);
866 printf("Warning, rts_load.cfg has an invalid entry on line %d, skipping\n",lineno);
870 fprintf(rload_fl,"Warning, can't open rts_load.cfg, skipping and using defualt allocation estimate.\n");
872 map<string, vector<int> > new_rts_loads = prev_rts_loads;
875 // Next, try to grab a history of allocations and resulting cpu loads
876 FILE *rtrace_fl = NULL;
877 rtrace_fl = fopen("rts_load.trace.txt","r");
879 map<string, vector< vector<int> > > iface_alloc_history;
880 map<string, vector< vector<double> > > iface_load_history;
881 if(rtrace_fl != NULL){
882 vector<int> curr_allocation;
883 vector<double> curr_load;
884 while(fgets(line,LINEBUF,rtrace_fl)){
885 int nflds = split_string(line,',',flds,SPLITBUF);
887 string iface = flds[0];
888 string entry = flds[1];
889 if(entry == "Previous allocation"){
890 curr_allocation.clear();
891 for(i=2;i<nflds;++i){
892 curr_allocation.push_back(atoi(flds[i]));
895 if(entry == "Previous cpu loads"){
897 for(i=2;i<nflds;++i){
898 curr_load.push_back(atof(flds[i]));
900 if(curr_allocation.size() == curr_load.size()){
901 iface_alloc_history[iface].push_back(curr_allocation);
902 iface_load_history[iface].push_back(curr_load);
910 map<string, vector< vector<int> > >::iterator msvvi;
911 for(msvvi=iface_alloc_history.begin();msvvi!=iface_alloc_history.end();++msvvi){
912 string iface = (*msvvi).first;
913 printf("iface %s past allocations:\n",iface.c_str());
914 vector<vector<int> > &alloc = iface_alloc_history[iface];
915 printf("alloc size is %d\n",alloc.size());
916 for(i=0;i<alloc.size();++i){
917 for(j=0;j<alloc[i].size();++j){
918 printf("%d ",alloc[i][j]);
922 printf("\niface %s past loads:\n",iface.c_str());
923 vector<vector<double> > &load = iface_load_history[iface];
924 printf("load size is %d\n",load.size());
925 for(i=0;i<load.size();++i){
926 for(j=0;j<load[i].size();++j){
927 printf("%f ",load[i][j]);
936 map<int, string>::iterator misi;
937 map<string, vector<int> > rts_iface_indices;
938 map<string, vector<double> > rts_iface_cpu_load;
939 for(misi=rts_iface_map.begin();misi!=rts_iface_map.end();++misi){
940 int rpid = (*misi).first;
941 string riface = (*misi).second;
942 size_t Xpos = riface.find_last_of("X");
943 if(Xpos!=string::npos){
944 string iface = riface.substr(0,Xpos);
945 // ifaces_found.insert(iface);
946 string ifcopy = riface.substr(Xpos+1);
947 int ifidx = atoi(ifcopy.c_str());
948 rts_iface_indices[iface].push_back(ifidx);
950 printf("pid=%d, rts %s, %s\n",rpid,riface.c_str(),rts_perf_map[rpid]->to_string().c_str());
952 fprintf(rpt_fl,",rts %s,,,,,,,,,,,,%s,,,,\n",riface.c_str(),rts_perf_map[rpid]->to_csv().c_str());
956 map<string, vector<int> >::iterator msvi;
957 set<string> ifaces_found;
958 map<string, vector<double> > ht_cpu_allocs;
959 map<string, int> total_ht_sizes;
960 for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){
961 string iface = (*msvi).first;
962 vector<int> &ifindices = (*msvi).second;
963 sort(ifindices.begin(),ifindices.end());
965 double total_cpu = 0.0;
966 vector<double> if_cpu;
967 for(i=0;i<ifindices.size();++i){
968 string full_iface = iface + "X" + int_to_string(ifindices[i]);
969 int pid = pid_iface_map[full_iface];
970 total_cpu += rts_perf_map[pid]->avg_cpu_time();
971 if_cpu.push_back(rts_perf_map[pid]->avg_cpu_time());
973 rts_iface_cpu_load[iface] = if_cpu;
975 vector<int> current_allocation;
976 if(new_rts_loads.count(iface) == 0 || new_rts_loads[iface].size() != ifindices.size()){
978 for(i=0;i<ifindices.size();++i){
979 int new_cumm = ((i+1)*htmax)/ifindices.size();
980 current_allocation.push_back(new_cumm-cumm_cpu);
984 current_allocation = new_rts_loads[iface];
986 int total_ht_allocation = 0;
987 //printf("Current allocation is:");
988 for(i=0;i<current_allocation.size();++i){
989 total_ht_allocation += current_allocation[i];
990 //printf(" %d",current_allocation[i]);
993 vector<double> local_alloc(total_ht_allocation,0.0); // estimated cpu per HT slot.
995 for(i=0;i<if_cpu.size();++i){
996 double rate = if_cpu[i]/(current_allocation[i]*total_cpu);
997 for(j=0;j<current_allocation[i];++j){
998 local_alloc[ht_ptr++] = rate;
1002 if(iface_alloc_history.count(iface)>0){
1003 vector<vector<int> > &alloc = iface_alloc_history[iface];
1004 vector<vector<double> > &load = iface_load_history[iface];
1005 int n_remaining = rts_load_history_len;
1006 double multiplier = hist_multiplier;
1007 double normalizer = 1.0;
1008 for(i=alloc.size()-1;i>=0 && n_remaining>0;i--){
1009 int hist_ht_size = 0;
1010 for(j=0;j<alloc[i].size();++j)
1011 hist_ht_size+=alloc[i][j];
1012 double hist_cpu = 0.0;
1013 for(j=0;j<load[i].size();++j)
1014 hist_cpu += load[i][j];
1015 if(hist_ht_size == total_ht_allocation){
1017 for(j=0;j<alloc[i].size();++j){
1018 double rate = (multiplier*load[i][j])/(alloc[i][j]*hist_cpu);
1020 for(k=0;k<alloc[i][j];k++){
1021 local_alloc[ht_ptr++] += rate;
1024 normalizer += multiplier;
1025 multiplier *= hist_multiplier;
1029 for(i=0;i<total_ht_allocation;i++){
1030 local_alloc[i] /= normalizer;
1035 total_ht_sizes[iface] = total_ht_allocation;
1036 ht_cpu_allocs[iface] = local_alloc;
1039 if(uniform_rts_alloc){
1040 // I will require that if this option is true, all HTs
1042 bool same_sizes = true;
1043 int total_alloc = -1;
1044 map<string, int >::iterator msi;
1045 for(msi=total_ht_sizes.begin();msi!=total_ht_sizes.end();++msi){
1047 total_alloc=(*msi).second;
1049 if(total_alloc != (*msi).second){
1055 vector<double> local_alloc(total_alloc,0.0);
1056 double normalizer = 0.0;
1057 map<string, vector<double> >::iterator msvdi;
1058 for(msvdi=ht_cpu_allocs.begin();msvdi!=ht_cpu_allocs.end();++msvdi){
1059 string iface = (*msvdi).first;
1060 for(i=0;i<total_alloc;++i){
1061 local_alloc[i] += ht_cpu_allocs[iface][i];
1065 for(i=0;i<total_alloc;++i)
1066 local_alloc[i] /= normalizer;
1067 for(msvdi=ht_cpu_allocs.begin();msvdi!=ht_cpu_allocs.end();++msvdi){
1068 (*msvdi).second = local_alloc;
1073 for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){
1074 string iface = (*msvi).first;
1075 vector<int> &ifindices = (*msvi).second;
1076 sort(ifindices.begin(),ifindices.end());
1078 int cumm_ht_alloc = 0; // ht allocated thus far
1079 int current_pos = 0; // idx to the current_allocation, if_cpu arrays
1080 vector<int> new_allocation;
1081 for(i=0;i<ifindices.size()-1;++i){
1082 double slot_cpu = 1.0/ifindices.size(); // amount of cpu to allocate
1083 int current_alloc = 0; // ht allocated in the curr_pos slot
1084 while(slot_cpu > 0.0){
1085 slot_cpu -= ht_cpu_allocs[iface][current_pos];
1089 // try to make the allocations even
1090 if(current_alloc>1 && (-slot_cpu) > (ht_cpu_allocs[iface][current_pos-1]/2.0)){
1094 new_allocation.push_back(current_alloc);
1096 new_allocation.push_back(total_ht_sizes[iface]-current_pos);
1099 int cumm_ht_alloc = 0; // ht allocated thus far
1100 int current_pos = 0; // idx to the current_allocation, if_cpu arrays
1101 int current_alloc = 0; // ht allocated in the curr_pos slot
1102 vector<int> new_allocation;
1103 for(i=0;i<ifindices.size()-1;++i){
1104 double slot_cpu = total_cpu/ifindices.size(); // amount of cpu to allocate
1106 while(slot_cpu > 0.0){
1107 double cpu_rate = if_cpu[current_pos] / current_allocation[current_pos];
1108 double cpu_remaining = cpu_rate*(current_allocation[current_pos] - current_alloc);
1109 if(slot_cpu <= cpu_remaining){
1110 double this_cpu_alloc = slot_cpu;
1111 int this_ht_alloc = (int)(this_cpu_alloc / cpu_rate);
1112 slot_ht += this_ht_alloc;
1114 cumm_ht_alloc += this_ht_alloc;
1115 current_alloc += this_ht_alloc;
1116 if(current_alloc >= current_allocation[current_pos]){
1121 slot_cpu -= cpu_remaining;
1122 slot_ht += current_allocation[current_pos] - current_alloc;
1123 cumm_ht_alloc += current_allocation[current_pos] - current_alloc;
1128 new_allocation.push_back(slot_ht);
1130 new_allocation.push_back(total_ht_allocation - cumm_ht_alloc);
1133 new_rts_loads[iface] = new_allocation;
1137 printf("Interface %s:",iface.c_str());
1138 for(i=0;i<ifindices.size();++i){
1139 string full_iface = iface + "X" + int_to_string(ifindices[i]);
1140 int pid = pid_iface_map[full_iface];
1141 printf(" %f",rts_perf_map[pid]->avg_cpu_time()/total_cpu);
1144 for(i=0;i<ifindices.size();++i){
1145 printf(" %d",prev_rts_loads[iface][i]);
1152 FILE *rrec_fl = fopen("rts_load.cfg.recommended","w");
1153 if(rrec_fl == NULL){
1154 fprintf(stderr,"Warning, can't open rts_load.cfg.recommended for write, skipping.\n");
1157 printf("Recommended virtual interface hash allocation:\n");
1158 map<string, vector<int> >::iterator msvii;
1159 for(msvii=new_rts_loads.begin();msvii!=new_rts_loads.end();++msvii){
1160 string iface_name = (*msvii).first;
1161 vector<int> iface_alloc = (*msvii).second;
1162 printf("%s",iface_name.c_str());
1163 if(rrec_fl!=NULL) fprintf(rrec_fl,"%s",iface_name.c_str());
1164 for(i=0;i<iface_alloc.size();++i){
1165 printf(",%d",iface_alloc[i]);
1166 if(rrec_fl!=NULL) fprintf(rrec_fl,",%d",iface_alloc[i]);
1169 if(rrec_fl!=NULL) fprintf(rrec_fl,"\n");
1174 rrec_fl = fopen("rts_load.trace.txt","a");
1175 if(rrec_fl != NULL){
1176 fprintf(rrec_fl,"\n");
1177 for(msvii=new_rts_loads.begin();msvii!=new_rts_loads.end();++msvii){
1178 string iface_name = (*msvii).first;
1179 vector<int> iface_alloc = (*msvii).second;
1180 vector<double> iface_cpu_loads = rts_iface_cpu_load[iface_name];
1181 vector<int> prev_iface_alloc = prev_rts_loads[iface_name];
1183 fprintf(rrec_fl,"Interface %s:\n",iface_name.c_str());
1184 fprintf(rrec_fl,"%s,Previous allocation,",iface_name.c_str());
1185 for(i=0;i<prev_iface_alloc.size();++i){
1186 if(i>0) fprintf(rrec_fl,",");
1187 fprintf(rrec_fl,"%d",prev_iface_alloc[i]);
1189 fprintf(rrec_fl,"\n%s,Previous cpu loads,",iface_name.c_str());
1190 for(i=0;i<iface_cpu_loads.size();++i){
1191 if(i>0) fprintf(rrec_fl,",");
1192 fprintf(rrec_fl,"%f",iface_cpu_loads[i]);
1194 fprintf(rrec_fl,"\n%s,New allocation,",iface_name.c_str());
1195 for(i=0;i<iface_alloc.size();++i){
1196 if(i>0) fprintf(rrec_fl,",");
1197 fprintf(rrec_fl,"%d",iface_alloc[i]);
1199 fprintf(rrec_fl,"\n\n");
1205 // ----------------------------------------------------------------
1206 // Make an hfta parallelism analysis. Start by collecting hftas and grouping
1207 // them by their copies. Count on the __copy%d name mangling.
1209 set<string>::iterator ssi;
1210 map<string, vector<int> > par_hfta_map;
1211 for(ssi=found_names.begin();ssi!=found_names.end();++ssi){
1212 string base = (*ssi);
1213 int qidx = qname_to_idx[base];
1214 if(qnode_list[qidx]->qnode_type == "HFTA"){
1215 size_t cpos = (*ssi).find("__copy");
1216 if(cpos!=string::npos){
1217 base = (*ssi).substr(0,cpos);
1218 string idx_str = (*ssi).substr(cpos+6);
1219 int pidx = atoi(idx_str.c_str());
1220 qnode_list[qidx]->par_index = pidx;
1222 par_hfta_map[base].push_back(qidx);
1226 // Coalesce or split hftas. Reduce parallelism until the max cpu utilization
1227 // is in [cpu_util_threshold/2, cpu_util_threshold]. Double parallelism
1228 // if max cpu utilization is > cpu_util_threshold.
1229 // Only recommend parallelism if a resource utilization file was found.
1231 map<string, int> recommended_parallelism;
1232 map<string, vector<int> >::iterator msvii;
1233 for(msvii=par_hfta_map.begin();msvii!=par_hfta_map.end();++msvii){
1234 vector<int> buddy_indices = (*msvii).second;
1235 vector<qnode *> buddies;
1237 for(i=0;i<buddy_indices.size();i++){
1238 buddies.push_back(qnode_list[buddy_indices[i]]);
1239 if(qnode_list[buddy_indices[i]]->perf->is_valid())
1244 sort(buddies.begin(),buddies.end(),cmpr_parallel_idx);
1247 double max_util = 0.0;
1248 while(level<=buddies.size()){
1249 for(i=0;i<buddies.size();i+=level){
1250 double this_util = 0.0;
1251 for(j=0;j<level;j++){
1252 this_util += buddies[i+j]->perf->avg_cpu_time();
1254 if(this_util > max_util)
1255 max_util = this_util;
1257 if(max_util >= cpu_util_threshold/2)
1261 int npar = buddies.size();
1262 if(max_util > cpu_util_threshold)
1264 if(level>buddies.size())
1271 recommended_parallelism[(*msvii).first] = npar;
1273 printf("Warning, no resource usage information for %s, skipping.\n",(*msvii).first.c_str());
1277 FILE *hpar_fl = NULL;
1278 hpar_fl=fopen("hfta_parallelism.cfg","r");
1280 fprintf(stderr,"Warning, can't open hfta_parallelism.cfg, ignoring.\n");
1282 while(fgets(line,LINEBUF,hpar_fl)){
1283 int nflds = split_string(line,',',flds,SPLITBUF);
1285 int npar = atoi(flds[1]);
1286 if(npar>0 && recommended_parallelism.count(flds[0])==0){
1287 recommended_parallelism[flds[0]] = npar;
1294 FILE *recpar_fl = NULL;
1295 recpar_fl=fopen("hfta_parallelism.cfg.recommended","w");
1296 if(recpar_fl==NULL){
1297 fprintf(stderr,"Warning, can't open hfta_parallelism.cfg.recommended, can't write the file.\n");
1299 printf("Recommended parallelism:\n");
1300 map<string, int>::iterator msii;
1301 for(msii=recommended_parallelism.begin();msii!=recommended_parallelism.end();++msii){
1303 fprintf(recpar_fl,"%s,%d\n",(*msii).first.c_str(),(*msii).second);
1304 printf("%s,%d\n",(*msii).first.c_str(),(*msii).second);
1308 printf("Can't recommend hfta parallelism, no resource utilization file found.\n");
1311 FILE *rec_ht_fl = NULL;
1312 rec_ht_fl=fopen("lfta_htsize.cfg.recommended","w");
1313 if(rec_ht_fl==NULL){
1314 fprintf(stderr,"Warning, can't open lfta_htsize.cfg.recommended, can't write the file.\n");
1316 printf("Recommended LFTA hash table sizes:\n");
1317 for(i=0;i<qnode_list.size();++i){
1318 qnode *this_qn = qnode_list[i];
1319 if(this_qn->qnode_type=="LFTA" && this_qn->aggr_tbl_size>0 && this_qn->accepted_tup>0){
1320 int ht_size = this_qn->aggr_tbl_size;
1321 double collision_rate = ((double)this_qn->collisions)/this_qn->accepted_tup;
1322 double eviction_rate = ((double)this_qn->evictions)/this_qn->accepted_tup;
1323 printf("%s htsize=%d collision=%f evictions=%f",this_qn->name.c_str(),ht_size,collision_rate,eviction_rate);
1324 //printf("%d,%f,%f\n",ht_size,collision_rate,eviction_rate);
1325 if(eviction_rate >= erate_hi){
1327 }else if(collision_rate >= crate_hi){
1329 }else if(collision_rate < crate_lo){
1333 printf(" rec ht_size=%d\n",ht_size);
1334 fprintf(rec_ht_fl,"%s,%u\n",this_qn->name.c_str(),ht_size);
1341 // Try to load the cpu configuration info
1343 vector <cpu_info_str *> cpu_info_list;
1344 FILE *cinfo_fl = NULL;
1345 cinfo_fl=fopen("cpu_info.csv","r");
1347 fprintf(stderr,"Warning, can't open cpu_info.csv, skipping process pinning optimization. Run gscpv3/bin/parse_cpuinfo.pl to generate a cpu_info.csv file.\n");
1350 while(fgets(inp,LINEBUF,cinfo_fl)){
1351 int nflds = split_string(inp,',',flds,SPLITBUF);
1354 cpu_info_list.push_back(new cpu_info_str(atoi(flds[0]),atoi(flds[1]),atoi(flds[2])));
1358 sort(cpu_info_list.begin(),cpu_info_list.end(),cmpr_cpu_info);
1360 // Spread the LFTAs among the cores.
1361 vector<string> iface_names;
1362 for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){
1363 string iface = (*msvi).first;
1364 vector<int> ifindices = (*msvi).second;
1365 sort(ifindices.begin(),ifindices.end());
1367 for(i=0;i<ifindices.size();++i){
1368 string full_iface = iface + "X" + int_to_string(ifindices[i]);
1369 iface_names.push_back(full_iface);
1374 map<string, int> rts_assignment;
1375 double stride = ((double)(cpu_info_list.size()))/((double)(iface_names.size()));
1376 double rtspos_f = 0.0;
1377 for(i=0;i<iface_names.size();++i){
1378 int rtspos = (int)rtspos_f;
1379 rts_assignment[iface_names[i]] = rtspos;
1380 cpu_info_list[rtspos]->assigned_load += rts_perf_map[pid_iface_map[iface_names[i]]]->avg_cpu_time();
1384 //for(i=0;i<iface_names.size();++i){
1385 //printf("Placing %s at %d\n",iface_names[i].c_str(), rts_assignment[iface_names[i]]);
1388 set<string> eligible_hftas;
1389 map<string, int> hfta_assignment;
1390 set<string>::iterator ssi;
1391 for(ssi = found_names.begin();ssi!=found_names.end();++ssi){
1392 int qidx = qname_to_idx[(*ssi)];
1393 //printf("Considering %s (%d), sz=%f, cpu=%f\n",(*ssi).c_str(),qidx,qnode_list[qidx]->inferred_in_sz,qnode_list[qidx]->perf->avg_cpu_time());
1394 if(qnode_list[qidx]->inferred_in_sz >= min_hfta_insz || qnode_list[qidx]->perf->avg_cpu_time() > min_hfta_cpu){
1395 //printf("\tAdding to eligible list\n");
1396 eligible_hftas.insert((*ssi));
1400 while(eligible_hftas.size()>0){
1401 int chosen_hfta = -1;
1402 double max_assigned_rate = 0.0;
1403 for(ssi=eligible_hftas.begin();ssi!=eligible_hftas.end();++ssi){
1404 double assigned_rate = 0.0;
1405 string qname = (*ssi);
1406 int qidx = qname_to_idx[qname];
1407 vector<int> reads_from = qnode_list[qidx]->reads_from_idx;
1408 for(i=0;i<reads_from.size();++i){
1409 if(qnode_list[reads_from[i]]->qnode_type == "LFTA" || (qnode_list[reads_from[i]]->qnode_type == "HFTA" && hfta_assignment.count(qnode_list[reads_from[i]]->name) > 0))
1410 assigned_rate += qnode_list[reads_from[i]]->output_rate();
1412 //printf("hfta %s, assigned rate=%f\n",qname.c_str(),assigned_rate);
1413 if(assigned_rate >= max_assigned_rate){
1414 //printf("\t picking %s\n",qname.c_str());
1415 max_assigned_rate = assigned_rate;
1419 if(chosen_hfta >= 0){
1420 vector<int> reads_from = qnode_list[chosen_hfta]->reads_from_idx;
1421 vector<int> src_location;
1422 vector<double> src_volume;
1423 for(i=0;i<reads_from.size();++i){
1424 int qidx = reads_from[i];
1425 if(qnode_list[qidx]->qnode_type == "HFTA"){
1426 if(hfta_assignment.count(qnode_list[qidx]->name)>0){
1427 src_location.push_back(hfta_assignment[qnode_list[qidx]->name]);
1428 src_volume.push_back(qnode_list[qidx]->output_rate());
1431 if(qnode_list[qidx]->qnode_type == "LFTA"){
1432 if(rts_assignment.count(qnode_list[qidx]->src_interface)>0){
1433 src_location.push_back(rts_assignment[qnode_list[qidx]->src_interface]);
1434 src_volume.push_back(qnode_list[qidx]->output_rate());
1438 //printf("chosen hfta is %d (%s), sources are:\n",chosen_hfta,qnode_list[chosen_hfta]->name.c_str());
1439 //for(i=0;i<src_location.size();++i){
1440 //printf("\tloc=%d, (%s) volume=%f\n",src_location[i],cpu_info_list[src_location[i]]->to_csv().c_str(),src_volume[i]);
1443 double hfta_cpu_usage = qnode_list[chosen_hfta]->perf->avg_cpu_time();
1444 if(hfta_cpu_usage > cpu_util_threshold) // hack for overloaded hftas.
1445 hfta_cpu_usage = cpu_util_threshold * .9999;
1446 printf("hfta %d (%s) has cpu usage %f\n",chosen_hfta,qnode_list[chosen_hfta]->name.c_str(),hfta_cpu_usage);
1448 double lowest_cost = 0.0;
1449 for(i=0;i<cpu_info_list.size();++i){
1450 double curr_cost = 0.0;
1451 for(j=0;j<src_location.size();++j){
1452 int dist = cpu_info_list[i]->distance_from(cpu_info_list[src_location[j]]);
1453 curr_cost += src_volume[j]*xfer_costs[dist];
1455 //printf("Cpu %s, cost=%f\n",cpu_info_list[i]->to_csv().c_str(),curr_cost);
1456 if((cpu_info_list[i]->assigned_load+hfta_cpu_usage < cpu_util_threshold) && (best_cpu<0 || curr_cost <= lowest_cost)){
1458 lowest_cost = curr_cost;
1459 //printf("\tpicking %s\n",cpu_info_list[i]->to_csv().c_str());
1464 cpu_info_list[best_cpu]->assigned_load += hfta_cpu_usage;
1465 hfta_assignment[qnode_list[chosen_hfta]->name] = best_cpu;
1466 eligible_hftas.erase(qnode_list[chosen_hfta]->name);
1469 fprintf(stderr,"ERROR, chosen_hfta=-1, bailing out.\n");
1474 FILE *pin_fl = fopen("pinning_info.csv","w");
1476 fprintf(stderr,"Warning, can't open pinning_info.csv, can't write the file.\n");
1478 printf("RTS assignments:\n");
1479 for(i=0;i<iface_names.size();++i){
1480 int assigned_cpu = rts_assignment[iface_names[i]];
1481 if(assigned_cpu>=0){
1482 printf("Place %s at %d (%s)\n",iface_names[i].c_str(), assigned_cpu, cpu_info_list[assigned_cpu]->to_csv().c_str());
1484 fprintf(pin_fl,"rts %s,%d\n",iface_names[i].c_str(), assigned_cpu);
1489 printf("HFTA assignments:\n");
1490 map<string, int>::iterator msii;
1491 for(msii=hfta_assignment.begin();msii!=hfta_assignment.end();++msii){
1492 int assigned_cpu = (*msii).second;
1493 string qname = (*msii).first;
1494 if(assigned_cpu>=0){
1495 printf("Place %s (%s) at %d (%s)\n",qname.c_str(),qnode_list[qname_to_idx[qname]]->executable_name.c_str(),assigned_cpu,cpu_info_list[assigned_cpu]->to_csv().c_str());
1497 fprintf(pin_fl,"%s,%d\n",qnode_list[qname_to_idx[qname]]->executable_name.c_str(), assigned_cpu);
1504 //for(i=0;i<cpu_info_list.size();++i){
1505 //printf("Cpu %d assigned load %f\n",i,cpu_info_list[i]->assigned_load);