Fixed newline characters throughout the code
[com/gs-lite.git] / src / tools / process_logs.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<stdio.h>
17 #include<stdlib.h>
18
19 #include<string.h>
20 #include<string>
21 #include<vector>
22 #include<map>
23 #include<set>
24 #include<list>
25 #include<time.h>
26 #include <unistd.h>
27
28 #include"xml_t.h"
29
30 #include"qnode.h"
31
32 using namespace std;
33
34 extern int xmlParserparse(void);
35 extern FILE *xmlParserin;
36 extern int xmlParserdebug;
37 xml_t *xml_result;
38
39 int init_discard = 12;
40
41 int rts_load_history_len = 3;
42 double hist_multiplier = 0.8;
43 bool uniform_rts_alloc = true;
44
45 #define LINEBUF 1000
46 #define SPLITBUF 20
47
48 double min_hfta_insz = 1000000.0;
49 double min_hfta_cpu = 0.2;
50
51 double cpu_util_threshold = 0.9;
52 double crate_hi=.01;    // upper bound on collision rate
53 double crate_lo=.002;   // lower bound, increase HT size
54 double erate_hi=.01;    // upper bound on eviction ratemsvii
55 int htmax = 1000;
56 double xfer_costs[4] = {.1, .1, .3, 1.0};
57
58
59 int split_string(char *instr,char sep, char **words,int max_words){
60    char *loc;
61    char *str;
62    int nwords = 0;
63
64    str = instr;
65    words[nwords++] = str;
66    while( (loc = strchr(str,sep)) != NULL){
67         *loc = '\0';
68         str = loc+1;
69         if(nwords >= max_words){
70                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
71                 nwords = max_words-1;
72         }
73         words[nwords++] = str;
74    }
75
76    return(nwords);
77 }
78
79 string int_to_string(int i){
80     string ret;
81     char tmpstr[100];
82     sprintf(tmpstr,"%d",i);
83     ret=tmpstr;
84     return(ret);
85 }
86
87
88
89
90
91 struct fta_addr{
92         int ip;
93         int port;
94         int streamid;
95
96         fta_addr(int i, int p, int s){
97                 ip=i;
98                 port=p;
99                 streamid=s;
100         }
101 };
102
103 struct cmpr_fta_addr{
104         bool operator()(fta_addr const &a, fta_addr const &b) const{
105                 if(a.ip < b.ip)
106                         return true;
107                 if(a.ip > b.ip)
108                         return false;
109                 if(a.port < b.port)
110                         return true;
111                 if(a.port > b.port)
112                         return false;
113                 if(a.streamid < b.streamid)
114                         return true;
115                 return false;
116         }
117 };
118
119 bool cmpr_parallel_idx(const qnode *a, const qnode *b){
120         return a->par_index < b->par_index;
121 }
122
123 struct cpu_info_str{
124         int processor_id;
125         int socket_id;
126         int core_id;
127
128         double assigned_load;
129
130         cpu_info_str(int p, int s, int c){
131                 processor_id=p;
132                 socket_id=s;
133                 core_id=c;
134                 assigned_load = 0.0;
135         }
136
137         string to_csv(){
138                 char buf[200];
139                 sprintf(buf,"%d,%d,%d",processor_id,socket_id,core_id);
140                 return string(buf);
141         }
142
143         int distance_from(cpu_info_str *other){
144                 if(socket_id != other->socket_id)
145                         return 3;
146                 if(core_id != other->core_id)
147                         return 2;
148                 if(processor_id != other->processor_id)
149                         return 1;
150                 return 0;
151         }
152 };
153
154 bool cmpr_cpu_info(cpu_info_str const *a, cpu_info_str const *b){
155         if(a->socket_id < b->socket_id)
156                 return true;
157         if(a->socket_id > b->socket_id)
158                 return false;
159         if(a->core_id < b->core_id)
160                 return true;
161         if(a->core_id > b->core_id)
162                 return false;
163         if(a->processor_id < b->processor_id)
164                 return true;
165         return false;
166 }
167
168
169
170
171 int main(int argc, char **argv){
172         int i,j,s;
173
174         time_t now = time(NULL);
175         tm *now_tm = localtime(&now);
176         int year=now_tm->tm_year;
177
178
179 //              Options
180         string src_dir="";
181         string trace_file="";
182         string resource_log_file = "resource_log.csv";
183
184         const char *optstr = "d:r:i:l:m:UNu:s:C:c:E:0:1:2:";
185         const char *usage_str =
186 "Usage: %s [options]  trace_file\n"
187 "\t-d source_directory\n"
188 "\t-r resource_log_file\n"
189 "\t-i initial discard from the resource log\n"
190 "\t-s default interface hash table length.\n"
191 "\t-l rts load history length\n"
192 "\t-m rts load history multiplier\n"
193 "\t-U All rts interface hashes are the same.\n"
194 "\t-N rts interface hashes are processed independently.\n"
195 "\t-u max cpu utilization threshold\n"
196 "\t-C upper bound on collision rate.\n"
197 "\t-c lower bound on collision rate.\n"
198 "\t-E upper bound on the eviction rate.\n"
199 "\t-0 communication cost multiplier for 0-distance processes.\n"
200 "\t-1 communication cost multiplier for 1-distance processes.\n"
201 "\t-2 communication cost multiplier for 2-distance processes.\n"
202 ;
203
204    char chopt;
205    while((chopt = getopt(argc,argv,optstr)) != -1){
206                 switch(chopt){
207                 case '0':
208                         xfer_costs[0] = atof(optarg);
209                         if(xfer_costs[0] < 0 || xfer_costs[0] > 1){
210                                 fprintf(stderr,"ERROR, 0-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[0]);
211                                 exit(1);
212                         }
213                 break;
214                 case '1':
215                         xfer_costs[1] = atof(optarg);
216                         if(xfer_costs[1] < 0 || xfer_costs[1] > 1){
217                                 fprintf(stderr,"ERROR, 1-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[1]);
218                                 exit(1);
219                         }
220                 break;
221                 case '2':
222                         xfer_costs[2] = atof(optarg);
223                         if(xfer_costs[2] < 0 || xfer_costs[2] > 1){
224                                 fprintf(stderr,"ERROR, 2-distance xfer costs (%f) must be in [0,1].\n",xfer_costs[2]);
225                                 exit(1);
226                         }
227                 break;
228                 case 'C':
229                         crate_hi = atof(optarg);
230                         if(crate_hi < 0 || crate_hi>1){
231                                 fprintf(stderr,"ERROR, request to set crate_hi to %f\n must be in [0,1].\n",hist_multiplier);
232                                 exit(1);
233                         }
234                 break;
235                 case 'c':
236                         crate_lo = atof(optarg);
237                         if(crate_lo < 0 || crate_lo>1){
238                                 fprintf(stderr,"ERROR, request to set crate_lo to %f\n must be in [0,1].\n",hist_multiplier);
239                                 exit(1);
240                         }
241                 break;
242                 case 'E':
243                         erate_hi = atof(optarg);
244                         if(erate_hi < 0 || erate_hi>1){
245                                 fprintf(stderr,"ERROR, request to set erate_hi to %f\n must be in [0,1].\n",hist_multiplier);
246                                 exit(1);
247                         }
248                 break;
249                 case 's':
250                         htmax = atoi(optarg);
251                         if(htmax <= 0){
252                                 fprintf(stderr,"ERROPR, htmax set to %d, must be positive nonzero.\n",htmax);
253                                 exit(1);
254                         }
255                 break;
256                 case 'm':
257                         hist_multiplier = atof(optarg);
258                         if(hist_multiplier <= 0){
259                                 fprintf(stderr,"ERROR, request to set hist_multiplier to %f\n must be positive nonzero.\n",hist_multiplier);
260                                 exit(1);
261                         }
262                 break;
263                 case 'u':
264                         cpu_util_threshold = atof(optarg);
265                         if(cpu_util_threshold<=0 || cpu_util_threshold>1){
266                                 fprintf(stderr,"ERROR, cpu_threshold set to %f, must be in (0,1].\n",cpu_util_threshold);
267                                 exit(1);
268                         }
269                         break;
270                 case 'U':
271                         uniform_rts_alloc=true;
272                 break;
273                 case 'N':
274                         uniform_rts_alloc=false;
275                 break;
276                 case 'd':
277                         src_dir = optarg;
278                 break;
279                 case 'r':
280                         resource_log_file = optarg;
281                 break;
282                 case 'i':
283                         init_discard = atoi(optarg);
284                         if(init_discard < 0){
285                                 init_discard=0;
286                                 fprintf(stderr,"ERROR, atttempting to set init_discard to a negative value (%d), setting to zero.\n",init_discard);
287                         }
288                 break;
289                 case 'l':
290                         rts_load_history_len = atoi(optarg);
291                         if(rts_load_history_len < 0){
292                                 rts_load_history_len=0;
293                                 fprintf(stderr,"ERROR, atttempting to set rts_load_history_len to a negative value (%d), setting to zero.\n",rts_load_history_len);
294                         }
295                 break;
296                 case '?':
297                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
298                         fprintf(stderr,"%s\n", usage_str);
299                         exit(1);
300                 default:
301                         fprintf(stderr,"Invalid arguments\n");
302                         fprintf(stderr,"%s\n", usage_str);
303                         exit(1);
304                 }
305         }
306         argc -= optind;
307         argv += optind;
308         if (argc > 0)
309                 trace_file = argv[0];
310
311         if(trace_file == ""){
312                 fprintf(stderr, usage_str, argv[0]);
313                 exit(1);
314         }
315
316 //                      for month string-to-int conversion
317         const char *months_str[12] = {"Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"};
318         map<string, int> month_str_to_int;
319         for(i=0;i<12;++i)
320                 month_str_to_int[months_str[i]] = i;
321
322
323         FILE *qtree_fl = NULL;
324         string qtree_flname = src_dir + "/" + "qtree.xml";
325         string actual_qtree_flname;
326         if(src_dir != ""){
327                 qtree_fl = fopen(qtree_flname.c_str(),"r");
328                 actual_qtree_flname = qtree_flname;
329         }
330         if(qtree_fl == NULL){
331                 qtree_fl = fopen("qtree.xml","r");
332                 actual_qtree_flname = "qtree.xml";
333         }
334         if(qtree_fl == NULL){
335                 fprintf(stderr,"ERROR, can't open ");
336                 if(src_dir != ""){
337                         fprintf(stderr,"%s or ",qtree_flname.c_str());
338                 }
339                 fprintf(stderr,"qtree.xml, exiting.\n");
340                 exit(1);
341         }
342
343
344 //              Parse the qtree.xml file
345         xmlParser_setfileinput(qtree_fl);
346         if(xmlParserparse()){
347                 fprintf(stderr,"ERROR, could not parse query tree file %s\n",actual_qtree_flname.c_str());
348         }
349
350 //              Get the lfta, hfta nodes
351         xml_t *xroot = xml_result;
352         vector<xml_t *> xqnodes;
353         xroot->get_roots("HFTA",xqnodes);
354         xroot->get_roots("LFTA",xqnodes);
355
356         map<string,int> qname_to_idx;
357         map<string,int> exe_to_idx;
358         vector<qnode *> qnode_list;
359
360 //              Build the qnodes
361         for(i=0;i<xqnodes.size();++i){
362                 string qname;
363                 if(xqnodes[i]->get_attrib_val("name",qname)){
364 //printf("node type = %s, name=%s\n",xqnodes[i]->name.c_str(),qname.c_str());
365                         qnode *qn = new qnode(xqnodes[i]->name,qname,init_discard);
366                         for(s=0;s<xqnodes[i]->subtrees.size();++s){
367                                 xml_t *xsub = xqnodes[i]->subtrees[s];
368                                 if(xsub->name == "Field"){
369                                         string fname;
370                                         bool nret = xsub->get_attrib_val("name",fname);
371                                         string fpos;
372                                         bool pret = xsub->get_attrib_val("pos",fpos);
373                                         string ftype;
374                                         bool tret = xsub->get_attrib_val("type",ftype);
375                                         string fmods;
376                                         bool mret = xsub->get_attrib_val("mods",fmods);
377                                         if(nret && pret && tret){
378                                                 field_str *fld = new field_str(fname,atoi(fpos.c_str()),ftype,fmods);
379                                                 qn->add_field(fld);
380                                         }else{
381                                                 fprintf(stderr,"---> subtree %d of FTA %s has an malformed field.\n",s,qname.c_str());
382                                         }
383                                 }
384                                 if(xsub->name == "HtSize"){
385                                         string src;
386                                         bool sret = xsub->get_attrib_val("value",src);
387                                         if(sret){
388                                                 int htsize = atoi(src.c_str());
389                                                 if(htsize > 0){
390                                                         unsigned int naggrs = 1;                // make it power of 2
391                                                         unsigned int nones = 0;
392                                                         while(htsize){
393                                                                 if(htsize&1)
394                                                                         nones++;
395                                                                 naggrs = naggrs << 1;
396                                                                 htsize = htsize >> 1;
397                                                         }
398                                                         if(nones==1)            // in case it was already a power of 2.
399                                                                 naggrs/=2;
400                                                         qn->aggr_tbl_size = naggrs;
401                                                 }else{
402                                                         fprintf(stderr,"---> subtree %d of FTA %s has an invalid HtSize (%s).\n",s,qname.c_str(),src.c_str());
403                                                 }
404                                         }else{
405                                                 fprintf(stderr,"---> subtree %d of FTA %s has an malformed HtSize.\n",s,qname.c_str());
406                                         }
407                                 }
408                                 if(xsub->name == "ReadsFrom"){
409                                         string src;
410                                         bool sret = xsub->get_attrib_val("value",src);
411                                         if(sret){
412                                                 qn->add_source(src);
413                                         }else{
414                                                 fprintf(stderr,"---> subtree %d of FTA %s has an malformed ReadsFrom.\n",s,qname.c_str());
415                                         }
416                                 }
417                                 if(xsub->name == "Interface"){
418                                         string iface;
419                                         bool sret = xsub->get_attrib_val("value",iface);
420                                         if(sret){
421                                                 qn->src_interface = iface;
422                                         }
423                                 }
424                                 if(xsub->name == "FileName"){
425                                         string full_fname;
426                                         bool sret = xsub->get_attrib_val("value",full_fname);
427                                         if(sret){
428                                                 size_t dotpos = full_fname.find_first_of('.');
429                                                 if(dotpos != string::npos){
430                                                         qn->executable_name = full_fname.substr(0,dotpos);
431                                                 }
432                                         }
433                                 }
434                         }
435                         qname_to_idx[qname] = qnode_list.size();
436                         if(qn->executable_name != "" && qn->executable_name != "rts")
437                                 exe_to_idx[qn->executable_name] = qnode_list.size();
438                         qnode_list.push_back(qn);
439                 }else{
440                         fprintf(stderr,"---> node type %s, no name.\n",xqnodes[i]->name.c_str());
441                 }
442         }
443
444
445         bool error = false;
446         for(i=0;i<qnode_list.size();++i){
447                 if(qnode_list[i]->qnode_type == "HFTA"){
448                         for(s=0;s<qnode_list[i]->reads_from.size();++s){
449                                 if(qname_to_idx.count(qnode_list[i]->reads_from[s])>0){
450                                         int src_id = qname_to_idx[qnode_list[i]->reads_from[s]];
451                                         qnode_list[i]->reads_from_idx.push_back(src_id);
452                                         qnode_list[src_id]->sources_to_idx.push_back(i);
453                                 }else{
454                                         fprintf(stderr,"ERROR, hfta %s reads_from %s, but its not in qtree.xml.\n",qnode_list[i]->name.c_str(),qnode_list[i]->reads_from[s].c_str());
455                                         error = true;
456                                 }
457                         }
458                 }
459         }
460
461         if(error)
462                 exit(1);
463
464 /*
465         for(i=0;i<qnode_list.size();++i){
466                 printf("node %s reads_from:",qnode_list[i]->name.c_str());
467                 for(s=0;s<qnode_list[i]->reads_from.size();++s){
468                         printf(" %s",qnode_list[i]->reads_from[s].c_str());
469                 }
470                 printf("\nand sources to:\n");
471                 for(s=0;s<qnode_list[i]->sources_to_idx.size();++s){
472                         printf(" %s",qnode_list[qnode_list[i]->sources_to_idx[s]]->name.c_str());
473                 }
474                 printf("\n\n");
475         }
476 */
477
478         string tracefilename = trace_file;
479         if(src_dir != ""){
480                 tracefilename = src_dir + "/" + trace_file;
481         }
482         FILE *trace_fl = NULL;
483         if((trace_fl = fopen(tracefilename.c_str(),"r"))==NULL){
484                 fprintf(stderr,"ERROR, can't open trace file %s\n",tracefilename.c_str());
485                 exit(1);
486         }
487
488         map<fta_addr,string, cmpr_fta_addr> qnode_map;
489         map<int, perf_struct *> rts_perf_map;
490         map<int, string> rts_iface_map;
491         map<string, int> pid_iface_map;
492
493         tm time_str;
494         char inp[LINEBUF],line[LINEBUF],*saveptr;
495         unsigned int hbeat_ip, hbeat_port, hbeat_index, hbeat_streamid, hbeat_trace_id,hbeat_ntraces;
496         while(fgets(inp,LINEBUF,trace_fl)){
497
498 //                      Try to grab the timestamp
499                 time_t tick;
500                 int pid;
501                 strncpy(line,inp,LINEBUF);
502                 char mon_str[4];
503                 int mon=0,day,hr,mn,sec;
504                 int nret = sscanf(line,"%c%c%c %d %d:%d:%d",mon_str,mon_str+1,mon_str+2,&day,&hr,&mn,&sec);
505                 if(nret >= 7){
506                         mon_str[3] = '\0';
507                         if(month_str_to_int.count(mon_str)>0){
508                                 mon = month_str_to_int[mon_str];
509                         }else{
510                                 fprintf(stderr,"Warning, %s not recognized as a month string.\n",mon_str);
511                         }
512                         time_str.tm_sec = sec;
513                         time_str.tm_min = mn;
514                         time_str.tm_hour = hr;
515                         time_str.tm_mday = day;
516                         time_str.tm_mon = mon;
517                         time_str.tm_year = year;
518                         tick = mktime(&time_str);
519 //printf("mon=%d, day=%d, hr=%d, mn=%d, sec=%d, tick=%d\n",mon,day,hr,mn,sec,tick);
520                 }
521
522 //                      Grab the process ID
523                 strncpy(line,inp,LINEBUF);
524                 int tmp_pid;
525                 pid = -1;
526                 char *segment = strtok_r(line,"[",&saveptr);
527                 if(segment!=NULL){
528                         segment = strtok_r(NULL,"[",&saveptr);
529                         nret = sscanf(segment,"%d]",&tmp_pid);
530                         if(nret>=1){
531                                 pid=tmp_pid;
532                         }
533                 }
534
535 //                      Grab address-to-hfta mappings
536                 strncpy(line,inp,LINEBUF);
537                 segment = strtok_r(line,"]",&saveptr);
538                 if(segment != NULL){
539                         segment = strtok_r(NULL," ",&saveptr);
540                         segment = strtok_r(NULL," ",&saveptr);
541                 }
542 //printf("segmetn=<%s>, comparison=%d\n",segment,strcmp(segment,"Lookup"));
543                 if(segment!=NULL && strcmp(segment,"Lookup")==0){
544                         int pos=0;
545                         char fta_name[LINEBUF];
546                         int ip;
547                         int port;
548                         int index;
549                         int streamid;
550                         int nret = 0;
551                         while(segment != NULL){
552 //printf("pos=%d, segment = %s\n",pos, segment);
553
554                                 if(pos==3){
555                                         strncpy(fta_name,segment,LINEBUF);
556                                 }
557
558                                 if(pos==6)
559                                         nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&ip,&port,&index,&streamid);
560
561                                 pos++;
562                                 segment = strtok_r(NULL," ",&saveptr);
563                         }
564                         if(nret>0){
565 //printf("nret=%d, fta_name=%s,ip=%d,port=%d,index=%d,streamid=%d\n",nret,fta_name,ip,port,index,streamid);
566                                 fta_addr addr(ip,port,streamid);
567                                 qnode_map[addr] = fta_name;
568 //printf("Adding fta %s, (%d %d %d) to qnode_map\n",fta_name,addr.ip,addr.port,addr.streamid);
569                                 continue;
570                         }
571                 }
572                 if(segment!=NULL && strcmp(segment,"Lfta")==0){
573                         int pos=0;
574                         char fta_name[LINEBUF];
575                         int ip;
576                         int port;
577                         int index;
578                         int streamid;
579                         int nret = 0;
580                         while(segment != NULL){
581 //printf("pos=%d, segment = %s\n",pos, segment);
582
583                                 if(pos==1){
584                                         strncpy(fta_name,segment,LINEBUF);
585                                 }
586
587                                 if(pos==4)
588                                         nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&ip,&port,&index,&streamid);
589
590                                 pos++;
591                                 segment = strtok_r(NULL," ",&saveptr);
592                         }
593                         if(nret>0){
594 //printf("nret=%d, fta_name=%s,ip=%d,port=%d,index=%d,streamid=%d\n",nret,fta_name,ip,port,index,streamid);
595                                 fta_addr addr(ip,port,streamid);
596                                 qnode_map[addr] = fta_name;
597 //printf("Adding lfta %s, (%d %d %d) to qnode_map\n",fta_name,addr.ip,addr.port,addr.streamid);
598                                 continue;
599                         }
600                 }
601                 if(segment!=NULL && strcmp(segment,"Init")==0){
602                         int pos=0;
603                         string iface = "";
604                         string keywd = "";
605                         while(segment != NULL){
606                                 if(pos==1)
607                                         keywd = segment;
608                                 if(pos==3){
609                                         char *cc;
610                                         for(cc=segment;*cc!='\0';++cc){
611                                                 if(*cc == '\n'){
612                                                         *cc = '\0';
613                                                         break;
614                                                 }
615                                         }
616                                         iface = segment;
617                                 }
618                                 pos++;
619                                 segment = strtok_r(NULL," ",&saveptr);
620                         }
621                         if(iface!="" && keywd == "LFTAs"){
622                                 rts_perf_map[pid] = new perf_struct(init_discard);
623                                 rts_iface_map[pid] = iface;
624                                 pid_iface_map[iface] = pid;
625                         }
626                 }
627                 if(segment!=NULL && strcmp(segment,"Heartbeat")==0){
628                         int pos=0;
629                         int nret=0,nret2=0,nret3=0;
630                         unsigned int tmp_hbeat_ip, tmp_hbeat_port, tmp_hbeat_index, tmp_hbeat_streamid, tmp_hbeat_trace_id,tmp_hbeat_ntraces;
631                         while(segment != NULL){
632 //printf("pos=%d, segment = %s\n",pos, segment);
633                                 if(pos==4)
634                                         nret = sscanf(segment,"{ip=%d,port=%d,index=%d,streamid=%d}",&tmp_hbeat_ip,&tmp_hbeat_port,&tmp_hbeat_index,&tmp_hbeat_streamid);
635                                 if(pos==5)
636                                         nret2 = sscanf(segment,"trace_id=%d",&tmp_hbeat_trace_id);
637                                 if(pos==6)
638                                         nret3 = sscanf(segment,"ntrace=%d",&tmp_hbeat_ntraces);
639                                 pos++;
640                                 segment = strtok_r(NULL," ",&saveptr);
641                         }
642                         if(nret>=4 && nret2 >= 1 && nret3 == 1){
643                                 hbeat_ip = tmp_hbeat_ip;
644                                 hbeat_port = tmp_hbeat_port;
645                                 hbeat_index = tmp_hbeat_index;
646                                 hbeat_streamid = tmp_hbeat_streamid;
647                                 hbeat_trace_id = tmp_hbeat_trace_id;
648                                 hbeat_ntraces = tmp_hbeat_ntraces;
649 //                              printf("ip=%d, port=%d, index=%d, streamid=%d hbeat_trace_id=%d\n",hbeat_ip,hbeat_port,hbeat_index,hbeat_streamid,hbeat_trace_id);
650                                 fta_addr hb_addr(hbeat_ip,hbeat_port,tmp_hbeat_streamid);
651                                 if(qnode_map.count(hb_addr) == 0){
652                                         hb_addr.streamid = 0;   // maybe an hfta?
653                                         if(qnode_map.count(hb_addr) == 0){
654                                                 hbeat_port = 0;
655 //printf("Hbeat streamid=%d no match (%d,%d), hbeat_trace_id=%d\n",hbeat_streamid,hbeat_ip,hbeat_port,hbeat_trace_id);
656                                         }
657                                 } else{
658 //printf("Hbeat streamid=%d matches %s, hbeat_trace_id=%d\n",hbeat_streamid,qnode_map[hb_addr].c_str(),hbeat_trace_id);
659                                 }
660                         }else{
661                                 printf("Couldn't parse as hearbeat %s\n",inp);
662                         }
663                 }
664                 if(segment!=NULL && strncmp(segment,"trace_id=",8)==0){
665                         int pos=0;
666                         int nret=0,nret2=0,nret3=0;
667                         unsigned long long int trace_id;
668                         unsigned int tr_pos,tr_ip,tr_port,tr_index,tr_streamid;
669                         unsigned int tr_intup,tr_outtup,tr_outsz,tr_acctup,tr_cycles;
670                         unsigned int tr_evictions,tr_collisions;
671                         double tr_sample;
672                         nret = sscanf(segment,"trace_id=%llu",&trace_id);
673                         while(segment != NULL){
674 //printf("pos=%d, segment = %s\n",pos, segment);
675                                 if(pos==0)
676                                         nret = sscanf(segment,"trace_id=%llu",&trace_id);
677                                 if(pos==1)
678                                         nret2 = sscanf(segment,"trace[%d].ftaid={ip=%u,port=%u,index=%u,streamid=%u}",&tr_pos,&tr_ip,&tr_port,&tr_index,&tr_streamid);
679                                 if(pos==2)
680                                         nret3 = sscanf(segment,"fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%u,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%lf}",
681                 &tr_intup, &tr_outtup, &tr_outsz, &tr_acctup,&tr_cycles,
682                 &tr_collisions,&tr_evictions,&tr_sample);
683                                 pos++;
684                                 segment = strtok_r(NULL," ",&saveptr);
685                         }
686                         if(nret>=1 && nret2>=5 && nret3>=7){
687                                 fta_addr tr_addr(tr_ip,tr_port,tr_streamid);
688                                 if(qnode_map.count(tr_addr)==0)
689                                         tr_addr.streamid = 0;           // maybe an hfta?
690                                 if(qnode_map.count(tr_addr)>0){
691 //printf("Trace idx=%d streamid=%d matches %s, trace_id=%d, hbeat_trace_id=%d\n",tr_pos,tr_streamid,qnode_map[tr_addr].c_str(),trace_id,hbeat_trace_id);
692                                         if(tr_pos+1 == hbeat_ntraces){
693                                                 string qname = qnode_map[tr_addr];
694                                                 int qidx = qname_to_idx[qname];
695                                                 if(qnode_list[qidx]->start_tick < 0)
696                                                         qnode_list[qidx]->start_tick = tick;
697                                                 if(qnode_list[qidx]->end_tick < tick)
698                                                         qnode_list[qidx]->end_tick = tick;
699                                                 qnode_list[qidx]->in_tup += tr_intup;
700                                                 qnode_list[qidx]->out_tup += tr_outtup;
701                                                 qnode_list[qidx]->out_sz += tr_outsz;
702                                                 qnode_list[qidx]->accepted_tup += tr_acctup;
703                                                 qnode_list[qidx]->cycles += tr_cycles;
704                                                 qnode_list[qidx]->collisions += tr_collisions;
705                                                 qnode_list[qidx]->evictions += tr_evictions;
706                                         }
707                                 }
708 //else{
709 //printf("Trace idx=%d streamid=%d no match (%d,%d), trace_id=%d, hbeat_trace_id=%d\n",tr_pos,tr_streamid,tr_ip,tr_port,trace_id,hbeat_trace_id);
710 //}
711                         }
712                 }
713         }
714
715 //printf("qnode_map has %d entries\n",qnode_map.size());
716
717 //                      Open and process performance log info, if any.
718         if(src_dir != ""){
719                 resource_log_file = src_dir + "/" + resource_log_file;
720         }
721         FILE *res_fl = NULL;
722         if((res_fl = fopen(resource_log_file.c_str(),"r"))==NULL){
723                 fprintf(stderr,"ERROR, can't open trace file %s\n",resource_log_file.c_str());
724                 exit(1);
725         }
726
727         char *flds[SPLITBUF];
728         int lineno = 0;
729         while(fgets(inp,LINEBUF,res_fl)){
730                 int nflds = split_string(inp,',',flds,SPLITBUF);
731                 lineno++;
732                 if(nflds >= 8){
733                         int ts = atoi(flds[0]);
734                         string procname = flds[1];
735                         int pid = atoi(flds[2]);
736                         unsigned long long int utime = atoll(flds[3]);
737                         unsigned long long int stime = atoll(flds[4]);
738                         unsigned long long int vm_size = atoll(flds[5]);
739                         unsigned long long int rss_size = atoll(flds[6]);
740                         int pagesize = atoi(flds[7]);
741
742                         if(procname == "rts"){
743                                 if(rts_perf_map.count(pid)>0){
744                                         if(rts_perf_map[pid]->update(ts,utime,stime,vm_size,rss_size)){
745                                                 fprintf(stderr,"Resource log file is corrupted, line %d\n",lineno);
746                                                 exit(1);
747                                         }
748                                 }
749                         }else{
750                                 if(exe_to_idx.count(procname)>0){
751                                         perf_struct *p = qnode_list[exe_to_idx[procname]]->perf;
752                                         if(p->update(ts,utime,stime,vm_size,rss_size)){
753                                                 fprintf(stderr,"Resource log file is corrupted, line %d\n",lineno);
754                                                 exit(1);
755                                         }
756                                 }
757                         }
758                 }
759         }
760
761
762
763         FILE *rpt_fl = fopen("performance_report.csv","w");
764         if(rpt_fl == NULL){
765                 fprintf(stderr,"Warning, can't open performance_report.csv, can't save the performance report.\n");
766         }
767
768         char tmpstr[10000];
769         printf("Performance report:\n");
770         if(rpt_fl) fprintf(rpt_fl,"query_name,process_name,in_tup,out_tup,out_sz,accepted_tup,cycles,collisions,evictions,in_tup_rate,out_tup_rate,out_bytes_rate,cycle_consumption_rate");
771         if(rpt_fl) fprintf(rpt_fl,",%s",perf_struct::to_csv_hdr().c_str());
772         if(rpt_fl) fprintf(rpt_fl,",packets_sent_to_query,fraction_intup_lost,inferred_read_rate");
773         if(rpt_fl) fprintf(rpt_fl,"\n");
774
775         map<fta_addr,string>::iterator mpiisi;
776         int n_output = 0;
777         set<string> found_names;
778         for(mpiisi=qnode_map.begin();mpiisi!=qnode_map.end();++mpiisi){
779                 string qname = (*mpiisi).second;
780                 if(found_names.count(qname)==0){
781                         found_names.insert(qname);
782                         int qidx = qname_to_idx[qname];
783                         string executable = qnode_list[qidx]->executable_name;
784                         if(executable == "")
785                                 executable="rts";
786 //                      printf("(%d,%d,%d) -> %s, %d reads-from\n",(*mpiisi).first.ip,(*mpiisi).first.port,(*mpiisi).first.streamid,qname.c_str(),qnode_list[qidx]->reads_from_idx.size());
787                         printf("query=%s, executable=%s\tintup=%llu, out_tup=%llu, out_sz=%llu, accepted_tup=%llu, cycles=%llu, collisions=%llu, evictions=%llu\n",
788                                                 qname.c_str(),qnode_list[qidx]->executable_name.c_str(),
789                                                 qnode_list[qidx]->in_tup,
790                                                 qnode_list[qidx]->out_tup,
791                                                 qnode_list[qidx]->out_sz,
792                                                 qnode_list[qidx]->accepted_tup,
793                                                 qnode_list[qidx]->cycles,
794                                                 qnode_list[qidx]->collisions,
795                                                 qnode_list[qidx]->evictions
796                         );
797                         if(rpt_fl) fprintf(rpt_fl,"%s,%s,%llu,%llu,%llu,%llu,%llu,%llu,%llu",
798                                                 qname.c_str(),qnode_list[qidx]->executable_name.c_str(),
799                                                 qnode_list[qidx]->in_tup,
800                                                 qnode_list[qidx]->out_tup,
801                                                 qnode_list[qidx]->out_sz,
802                                                 qnode_list[qidx]->accepted_tup,
803                                                 qnode_list[qidx]->cycles,
804                                                 qnode_list[qidx]->collisions,
805                                                 qnode_list[qidx]->evictions
806                         );
807                         double duration = 1.0*(qnode_list[qidx]->end_tick - qnode_list[qidx]->start_tick);
808
809                         printf("\tin_tup_rate=%f, out_tup_rate=%f, out_byte_rate=%f, cycle_rate=%f\n",qnode_list[qidx]->in_tup/duration,qnode_list[qidx]->out_tup/duration,qnode_list[qidx]->out_sz/duration,qnode_list[qidx]->cycles/duration);
810                         if(rpt_fl) fprintf(rpt_fl,",%f,%f,%f,%f",qnode_list[qidx]->in_tup/duration,qnode_list[qidx]->out_tup/duration,qnode_list[qidx]->out_sz/duration,qnode_list[qidx]->cycles/duration);
811                         printf("\t%s\n",qnode_list[qidx]->perf->to_string().c_str());
812                         if(rpt_fl) fprintf(rpt_fl,",%s",qnode_list[qidx]->perf->to_csv().c_str());
813 //if(qnode_list[qidx]->aggr_tbl_size>0){
814 //printf("\taggregate table size is %d\n",qnode_list[qidx]->aggr_tbl_size);
815 //}
816 //if(qnode_list[qidx]->src_interface != ""){
817 //printf("\tSource interface is %s\n",qnode_list[qidx]->src_interface.c_str());
818 //}
819                         if(qnode_list[qidx]->reads_from_idx.size()>0){
820                                 unsigned long long int total_sent = 0;
821                                 for(i=0;i<qnode_list[qidx]->reads_from_idx.size();++i){
822                                         total_sent += qnode_list[qnode_list[qidx]->reads_from_idx[i]]->out_tup;
823                                         qnode_list[qidx]->inferred_in_sz += qnode_list[qnode_list[qidx]->reads_from_idx[i]]->out_sz / (qnode_list[qnode_list[qidx]->reads_from_idx[i]]->end_tick-qnode_list[qnode_list[qidx]->reads_from_idx[i]]->start_tick);
824                                 }
825                                 printf("\t%llu packets sent, %f lost, inferred read rate is %f\n",total_sent,(1.0*total_sent-qnode_list[qidx]->in_tup)/total_sent, qnode_list[qidx]->inferred_in_sz);
826                                 if(rpt_fl) fprintf(rpt_fl,",%llu,%f,%f",total_sent,(1.0*total_sent-qnode_list[qidx]->in_tup)/total_sent, qnode_list[qidx]->inferred_in_sz);
827                         }
828                         else{
829                                 if(rpt_fl) fprintf(rpt_fl,",,,");
830                         }
831
832                         if(rpt_fl) fprintf(rpt_fl,"\n");
833                         n_output++;
834                 }
835         }
836
837
838
839 //              Collect performance info about RTSs and determine a better hash partitioning.
840
841 //              First, grab any existing balancing information
842         map<string, vector<int> > prev_rts_loads;
843         FILE *rload_fl = NULL;
844         rload_fl = fopen("rts_load.cfg","r");
845         lineno = 0;
846         if(rload_fl != NULL){
847                 while(fgets(line,LINEBUF,rload_fl)){
848                         lineno++;
849                         int nflds = split_string(line,',',flds,SPLITBUF);
850                         if(nflds>1){
851                                 vector<int> hbounds;
852                                 bool invalid_line=false;
853                                 int prev_val = 0;
854                                 for(i=1;i<nflds;++i){
855                                         int new_val = atoi(flds[i]);
856                                         if(new_val < prev_val)
857                                                 invalid_line = true;
858                                         hbounds.push_back(new_val);
859                                 }
860                                 if(! invalid_line){
861                                         prev_rts_loads[flds[0]] = hbounds;
862                                 }else{
863                                         printf("Warning, rts_load.cfg has an invalid entry on line %d, skipping\n",lineno);
864                                 }
865                         }else{
866                                 printf("Warning, rts_load.cfg has an invalid entry on line %d, skipping\n",lineno);
867                         }
868                 }
869         }else{
870                 fprintf(rload_fl,"Warning, can't open rts_load.cfg, skipping and using defualt allocation estimate.\n");
871         }
872         map<string, vector<int> > new_rts_loads = prev_rts_loads;
873         fclose(rload_fl);
874
875 //              Next, try to grab a history of allocations and resulting cpu loads
876         FILE *rtrace_fl = NULL;
877         rtrace_fl = fopen("rts_load.trace.txt","r");
878         lineno = 0;
879         map<string, vector< vector<int> > > iface_alloc_history;
880         map<string, vector< vector<double> > > iface_load_history;
881         if(rtrace_fl != NULL){
882                 vector<int> curr_allocation;
883                 vector<double> curr_load;
884                 while(fgets(line,LINEBUF,rtrace_fl)){
885                         int nflds = split_string(line,',',flds,SPLITBUF);
886                         if(nflds > 2){
887                                 string iface = flds[0];
888                                 string entry = flds[1];
889                                 if(entry == "Previous allocation"){
890                                         curr_allocation.clear();
891                                         for(i=2;i<nflds;++i){
892                                                 curr_allocation.push_back(atoi(flds[i]));
893                                         }
894                                 }
895                                 if(entry == "Previous cpu loads"){
896                                         curr_load.clear();
897                                         for(i=2;i<nflds;++i){
898                                                 curr_load.push_back(atof(flds[i]));
899                                         }
900                                         if(curr_allocation.size() == curr_load.size()){
901                                                 iface_alloc_history[iface].push_back(curr_allocation);
902                                                 iface_load_history[iface].push_back(curr_load);
903                                         }
904                                 }
905                         }
906                 }
907         }
908
909 /*
910 map<string, vector< vector<int> > >::iterator msvvi;
911 for(msvvi=iface_alloc_history.begin();msvvi!=iface_alloc_history.end();++msvvi){
912 string iface = (*msvvi).first;
913 printf("iface %s past allocations:\n",iface.c_str());
914 vector<vector<int> > &alloc = iface_alloc_history[iface];
915 printf("alloc size is %d\n",alloc.size());
916 for(i=0;i<alloc.size();++i){
917 for(j=0;j<alloc[i].size();++j){
918 printf("%d ",alloc[i][j]);
919 }
920 printf("\n");
921 }
922 printf("\niface %s past loads:\n",iface.c_str());
923 vector<vector<double> > &load = iface_load_history[iface];
924 printf("load size is %d\n",load.size());
925 for(i=0;i<load.size();++i){
926 for(j=0;j<load[i].size();++j){
927 printf("%f ",load[i][j]);
928 }
929 printf("\n");
930 }
931 printf("\n");
932 }
933 */
934
935
936         map<int, string>::iterator misi;
937         map<string, vector<int> > rts_iface_indices;
938         map<string, vector<double> > rts_iface_cpu_load;
939         for(misi=rts_iface_map.begin();misi!=rts_iface_map.end();++misi){
940                 int rpid = (*misi).first;
941                 string riface = (*misi).second;
942                 size_t Xpos = riface.find_last_of("X");
943                 if(Xpos!=string::npos){
944                         string iface = riface.substr(0,Xpos);
945 //                      ifaces_found.insert(iface);
946                         string ifcopy = riface.substr(Xpos+1);
947                         int ifidx = atoi(ifcopy.c_str());
948                         rts_iface_indices[iface].push_back(ifidx);
949                 }
950                 printf("pid=%d, rts %s, %s\n",rpid,riface.c_str(),rts_perf_map[rpid]->to_string().c_str());
951                 if(rpt_fl){
952                         fprintf(rpt_fl,",rts %s,,,,,,,,,,,,%s,,,,\n",riface.c_str(),rts_perf_map[rpid]->to_csv().c_str());
953
954                 }
955         }
956         map<string, vector<int> >::iterator msvi;
957         set<string> ifaces_found;
958         map<string, vector<double> > ht_cpu_allocs;
959         map<string, int> total_ht_sizes;
960         for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){
961                 string iface = (*msvi).first;
962                 vector<int> &ifindices = (*msvi).second;
963                 sort(ifindices.begin(),ifindices.end());
964
965                 double total_cpu = 0.0;
966                 vector<double> if_cpu;
967                 for(i=0;i<ifindices.size();++i){
968                         string full_iface = iface + "X" + int_to_string(ifindices[i]);
969                         int pid = pid_iface_map[full_iface];
970                         total_cpu += rts_perf_map[pid]->avg_cpu_time();
971                         if_cpu.push_back(rts_perf_map[pid]->avg_cpu_time());
972                 }
973                 rts_iface_cpu_load[iface] = if_cpu;
974
975                 vector<int> current_allocation;
976                 if(new_rts_loads.count(iface) == 0 || new_rts_loads[iface].size() != ifindices.size()){
977                         int cumm_cpu = 0;
978                         for(i=0;i<ifindices.size();++i){
979                                 int new_cumm = ((i+1)*htmax)/ifindices.size();
980                                 current_allocation.push_back(new_cumm-cumm_cpu);
981                                 cumm_cpu=new_cumm;
982                         }
983                 }else{
984                         current_allocation = new_rts_loads[iface];
985                 }
986                 int total_ht_allocation = 0;
987 //printf("Current allocation is:");
988                 for(i=0;i<current_allocation.size();++i){
989                         total_ht_allocation += current_allocation[i];
990 //printf(" %d",current_allocation[i]);
991                 }
992
993                 vector<double> local_alloc(total_ht_allocation,0.0); // estimated cpu per HT slot.
994                 int ht_ptr = 0;
995                 for(i=0;i<if_cpu.size();++i){
996                         double rate = if_cpu[i]/(current_allocation[i]*total_cpu);
997                         for(j=0;j<current_allocation[i];++j){
998                                 local_alloc[ht_ptr++] = rate;
999                         }
1000                 }
1001
1002                 if(iface_alloc_history.count(iface)>0){
1003                         vector<vector<int> > &alloc = iface_alloc_history[iface];
1004                         vector<vector<double> > &load = iface_load_history[iface];
1005                         int n_remaining = rts_load_history_len;
1006                         double multiplier = hist_multiplier;
1007                         double normalizer = 1.0;
1008                         for(i=alloc.size()-1;i>=0 && n_remaining>0;i--){
1009                                 int hist_ht_size = 0;
1010                                 for(j=0;j<alloc[i].size();++j)
1011                                         hist_ht_size+=alloc[i][j];
1012                                 double hist_cpu = 0.0;
1013                                 for(j=0;j<load[i].size();++j)
1014                                         hist_cpu += load[i][j];
1015                                 if(hist_ht_size == total_ht_allocation){
1016                                         int ht_ptr = 0;
1017                                         for(j=0;j<alloc[i].size();++j){
1018                                                 double rate = (multiplier*load[i][j])/(alloc[i][j]*hist_cpu);
1019                                                 int k;
1020                                                 for(k=0;k<alloc[i][j];k++){
1021                                                         local_alloc[ht_ptr++] += rate;
1022                                                 }
1023                                         }
1024                                         normalizer += multiplier;
1025                                         multiplier *= hist_multiplier;
1026                                         n_remaining--;
1027                                 }
1028                         }
1029                         for(i=0;i<total_ht_allocation;i++){
1030                                 local_alloc[i] /= normalizer;
1031                         }
1032                 }
1033
1034
1035                 total_ht_sizes[iface] = total_ht_allocation;
1036                 ht_cpu_allocs[iface] = local_alloc;
1037         }
1038
1039         if(uniform_rts_alloc){
1040 //                      I will require that if this option is true, all HTs
1041 //                      the same size.
1042                 bool same_sizes = true;
1043                 int total_alloc = -1;
1044                 map<string, int >::iterator msi;
1045                 for(msi=total_ht_sizes.begin();msi!=total_ht_sizes.end();++msi){
1046                         if(total_alloc<0){
1047                                 total_alloc=(*msi).second;
1048                         }else{
1049                                 if(total_alloc != (*msi).second){
1050                                         same_sizes = false;
1051                                 }
1052                         }
1053                 }
1054                 if(same_sizes){
1055                         vector<double> local_alloc(total_alloc,0.0);
1056                         double normalizer = 0.0;
1057                         map<string, vector<double> >::iterator msvdi;
1058                         for(msvdi=ht_cpu_allocs.begin();msvdi!=ht_cpu_allocs.end();++msvdi){
1059                                 string iface = (*msvdi).first;
1060                                 for(i=0;i<total_alloc;++i){
1061                                         local_alloc[i] += ht_cpu_allocs[iface][i];
1062                                 }
1063                                 normalizer += 1.0;
1064                         }
1065                         for(i=0;i<total_alloc;++i)
1066                                 local_alloc[i] /= normalizer;
1067                         for(msvdi=ht_cpu_allocs.begin();msvdi!=ht_cpu_allocs.end();++msvdi){
1068                                 (*msvdi).second = local_alloc;
1069                         }
1070                 }
1071         }
1072
1073         for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){
1074                 string iface = (*msvi).first;
1075                 vector<int> &ifindices = (*msvi).second;
1076                 sort(ifindices.begin(),ifindices.end());
1077
1078                 int cumm_ht_alloc = 0;          // ht allocated thus far
1079                 int current_pos = 0;    // idx to the current_allocation, if_cpu arrays
1080                 vector<int> new_allocation;
1081                 for(i=0;i<ifindices.size()-1;++i){
1082                         double slot_cpu = 1.0/ifindices.size(); // amount of cpu to allocate
1083                         int current_alloc = 0;  // ht allocated in the curr_pos slot
1084                         while(slot_cpu > 0.0){
1085                                 slot_cpu -= ht_cpu_allocs[iface][current_pos];
1086                                 current_pos++;
1087                                 current_alloc++;
1088                         }
1089         //                              try to make the allocations even
1090                         if(current_alloc>1 && (-slot_cpu) > (ht_cpu_allocs[iface][current_pos-1]/2.0)){
1091                                 current_pos--;
1092                                 current_alloc--;
1093                         }
1094                         new_allocation.push_back(current_alloc);
1095                 }
1096                 new_allocation.push_back(total_ht_sizes[iface]-current_pos);
1097
1098 /*
1099                 int cumm_ht_alloc = 0;          // ht allocated thus far
1100                 int current_pos = 0;    // idx to the current_allocation, if_cpu arrays
1101                 int current_alloc = 0;  // ht allocated in the curr_pos slot
1102                 vector<int> new_allocation;
1103                 for(i=0;i<ifindices.size()-1;++i){
1104                         double slot_cpu = total_cpu/ifindices.size(); // amount of cpu to allocate
1105                         int slot_ht = 0;
1106                         while(slot_cpu > 0.0){
1107                                 double cpu_rate = if_cpu[current_pos] / current_allocation[current_pos];
1108                                 double cpu_remaining = cpu_rate*(current_allocation[current_pos] - current_alloc);
1109                                 if(slot_cpu <= cpu_remaining){
1110                                         double this_cpu_alloc = slot_cpu;
1111                                         int this_ht_alloc = (int)(this_cpu_alloc / cpu_rate);
1112                                         slot_ht += this_ht_alloc;
1113                                         slot_cpu = 0.0;
1114                                         cumm_ht_alloc += this_ht_alloc;
1115                                         current_alloc += this_ht_alloc;
1116                                         if(current_alloc >= current_allocation[current_pos]){
1117                                                 current_pos++;
1118                                                 current_alloc = 0;
1119                                         }
1120                                 }else{
1121                                         slot_cpu -= cpu_remaining;
1122                                         slot_ht += current_allocation[current_pos] - current_alloc;
1123                                         cumm_ht_alloc += current_allocation[current_pos] - current_alloc;
1124                                         current_pos++;
1125                                         current_alloc = 0;
1126                                 }
1127                         }
1128                         new_allocation.push_back(slot_ht);
1129                 }
1130                 new_allocation.push_back(total_ht_allocation - cumm_ht_alloc);
1131 */
1132
1133                 new_rts_loads[iface] = new_allocation;
1134
1135
1136 /*
1137 printf("Interface %s:",iface.c_str());
1138 for(i=0;i<ifindices.size();++i){
1139 string full_iface = iface + "X" + int_to_string(ifindices[i]);
1140 int pid = pid_iface_map[full_iface];
1141 printf(" %f",rts_perf_map[pid]->avg_cpu_time()/total_cpu);
1142 }
1143 printf("\n\t");
1144 for(i=0;i<ifindices.size();++i){
1145 printf(" %d",prev_rts_loads[iface][i]);
1146 }
1147 printf("\n");
1148 */
1149         }
1150
1151
1152         FILE *rrec_fl = fopen("rts_load.cfg.recommended","w");
1153         if(rrec_fl == NULL){
1154                 fprintf(stderr,"Warning, can't open rts_load.cfg.recommended for write, skipping.\n");
1155         }
1156
1157         printf("Recommended virtual interface hash allocation:\n");
1158         map<string, vector<int> >::iterator msvii;
1159         for(msvii=new_rts_loads.begin();msvii!=new_rts_loads.end();++msvii){
1160                 string iface_name = (*msvii).first;
1161                 vector<int> iface_alloc = (*msvii).second;
1162                 printf("%s",iface_name.c_str());
1163                 if(rrec_fl!=NULL) fprintf(rrec_fl,"%s",iface_name.c_str());
1164                 for(i=0;i<iface_alloc.size();++i){
1165                         printf(",%d",iface_alloc[i]);
1166                         if(rrec_fl!=NULL) fprintf(rrec_fl,",%d",iface_alloc[i]);
1167                 }
1168                 printf("\n");
1169                 if(rrec_fl!=NULL) fprintf(rrec_fl,"\n");
1170         }
1171         fclose(rrec_fl);
1172
1173
1174         rrec_fl = fopen("rts_load.trace.txt","a");
1175         if(rrec_fl != NULL){
1176                 fprintf(rrec_fl,"\n");
1177                 for(msvii=new_rts_loads.begin();msvii!=new_rts_loads.end();++msvii){
1178                         string iface_name = (*msvii).first;
1179                         vector<int> iface_alloc = (*msvii).second;
1180                         vector<double> iface_cpu_loads = rts_iface_cpu_load[iface_name];
1181                         vector<int> prev_iface_alloc = prev_rts_loads[iface_name];
1182
1183                         fprintf(rrec_fl,"Interface %s:\n",iface_name.c_str());
1184                         fprintf(rrec_fl,"%s,Previous allocation,",iface_name.c_str());
1185                         for(i=0;i<prev_iface_alloc.size();++i){
1186                                 if(i>0) fprintf(rrec_fl,",");
1187                                 fprintf(rrec_fl,"%d",prev_iface_alloc[i]);
1188                         }
1189                         fprintf(rrec_fl,"\n%s,Previous cpu loads,",iface_name.c_str());
1190                         for(i=0;i<iface_cpu_loads.size();++i){
1191                                 if(i>0) fprintf(rrec_fl,",");
1192                                 fprintf(rrec_fl,"%f",iface_cpu_loads[i]);
1193                         }
1194                         fprintf(rrec_fl,"\n%s,New allocation,",iface_name.c_str());
1195                         for(i=0;i<iface_alloc.size();++i){
1196                                 if(i>0) fprintf(rrec_fl,",");
1197                                 fprintf(rrec_fl,"%d",iface_alloc[i]);
1198                         }
1199                         fprintf(rrec_fl,"\n\n");
1200                 }
1201         }
1202         fclose(rrec_fl);
1203
1204
1205 //      ----------------------------------------------------------------
1206 //              Make an hfta parallelism analysis.  Start by collecting hftas and grouping
1207 //              them by their copies.  Count on the __copy%d name mangling.
1208
1209         set<string>::iterator ssi;
1210         map<string, vector<int> > par_hfta_map;
1211         for(ssi=found_names.begin();ssi!=found_names.end();++ssi){
1212                 string base = (*ssi);
1213                 int qidx = qname_to_idx[base];
1214                 if(qnode_list[qidx]->qnode_type == "HFTA"){
1215                         size_t cpos = (*ssi).find("__copy");
1216                         if(cpos!=string::npos){
1217                                 base = (*ssi).substr(0,cpos);
1218                                 string idx_str = (*ssi).substr(cpos+6);
1219                                 int pidx = atoi(idx_str.c_str());
1220                                 qnode_list[qidx]->par_index = pidx;
1221                         }
1222                         par_hfta_map[base].push_back(qidx);
1223                 }
1224         }
1225
1226 //              Coalesce or split hftas.  Reduce parallelism until the max cpu utilization
1227 //              is in [cpu_util_threshold/2, cpu_util_threshold].  Double parallelism
1228 //              if max cpu utilization is > cpu_util_threshold.
1229 //              Only recommend parallelism if a resource utilization file was found.
1230   if(res_fl!=NULL){
1231         map<string, int> recommended_parallelism;
1232         map<string, vector<int> >::iterator msvii;
1233         for(msvii=par_hfta_map.begin();msvii!=par_hfta_map.end();++msvii){
1234                 vector<int> buddy_indices = (*msvii).second;
1235                 vector<qnode *> buddies;
1236                 int n_valid = 0;
1237                 for(i=0;i<buddy_indices.size();i++){
1238                         buddies.push_back(qnode_list[buddy_indices[i]]);
1239                         if(qnode_list[buddy_indices[i]]->perf->is_valid())
1240                                 n_valid++;
1241                 }
1242
1243                 if(n_valid>0){
1244                         sort(buddies.begin(),buddies.end(),cmpr_parallel_idx);
1245
1246                         int level=1;
1247                         double max_util = 0.0;
1248                         while(level<=buddies.size()){
1249                                 for(i=0;i<buddies.size();i+=level){
1250                                         double this_util = 0.0;
1251                                         for(j=0;j<level;j++){
1252                                                 this_util += buddies[i+j]->perf->avg_cpu_time();
1253                                         }
1254                                         if(this_util > max_util)
1255                                                 max_util = this_util;
1256                                 }
1257                                 if(max_util >= cpu_util_threshold/2)
1258                                         break;
1259                                 level *= 2;
1260                         }
1261                         int npar = buddies.size();
1262                         if(max_util > cpu_util_threshold)
1263                                 level/=2;
1264                         if(level>buddies.size())
1265                                 level/=2;
1266                         if(level==0)
1267                                 npar *= 2;
1268                         else
1269                                 npar /= level;
1270
1271                         recommended_parallelism[(*msvii).first] = npar;
1272                 }else{
1273                         printf("Warning, no resource usage information for %s, skipping.\n",(*msvii).first.c_str());
1274                 }
1275         }
1276
1277         FILE *hpar_fl = NULL;
1278         hpar_fl=fopen("hfta_parallelism.cfg","r");
1279         if(hpar_fl==NULL){
1280                 fprintf(stderr,"Warning, can't open hfta_parallelism.cfg, ignoring.\n");
1281         }else{
1282                 while(fgets(line,LINEBUF,hpar_fl)){
1283                         int nflds = split_string(line,',',flds,SPLITBUF);
1284                         if(nflds==2){
1285                                 int npar = atoi(flds[1]);
1286                                 if(npar>0 && recommended_parallelism.count(flds[0])==0){
1287                                         recommended_parallelism[flds[0]] = npar;
1288                                 }
1289                         }
1290                 }
1291                 fclose(hpar_fl);
1292         }
1293
1294         FILE *recpar_fl = NULL;
1295         recpar_fl=fopen("hfta_parallelism.cfg.recommended","w");
1296         if(recpar_fl==NULL){
1297                 fprintf(stderr,"Warning, can't open hfta_parallelism.cfg.recommended, can't write the file.\n");
1298         }
1299         printf("Recommended parallelism:\n");
1300         map<string, int>::iterator msii;
1301         for(msii=recommended_parallelism.begin();msii!=recommended_parallelism.end();++msii){
1302                 if(recpar_fl!=NULL)
1303                         fprintf(recpar_fl,"%s,%d\n",(*msii).first.c_str(),(*msii).second);
1304                 printf("%s,%d\n",(*msii).first.c_str(),(*msii).second);
1305         }
1306         fclose(recpar_fl);
1307   }else{
1308         printf("Can't recommend hfta parallelism, no resource utilization file found.\n");
1309   }
1310
1311         FILE *rec_ht_fl = NULL;
1312         rec_ht_fl=fopen("lfta_htsize.cfg.recommended","w");
1313         if(rec_ht_fl==NULL){
1314                 fprintf(stderr,"Warning, can't open lfta_htsize.cfg.recommended, can't write the file.\n");
1315         }
1316         printf("Recommended LFTA hash table sizes:\n");
1317         for(i=0;i<qnode_list.size();++i){
1318                 qnode *this_qn = qnode_list[i];
1319                 if(this_qn->qnode_type=="LFTA" && this_qn->aggr_tbl_size>0 && this_qn->accepted_tup>0){
1320                         int ht_size = this_qn->aggr_tbl_size;
1321                         double collision_rate = ((double)this_qn->collisions)/this_qn->accepted_tup;
1322                         double eviction_rate = ((double)this_qn->evictions)/this_qn->accepted_tup;
1323                         printf("%s htsize=%d collision=%f evictions=%f",this_qn->name.c_str(),ht_size,collision_rate,eviction_rate);
1324 //printf("%d,%f,%f\n",ht_size,collision_rate,eviction_rate);
1325                         if(eviction_rate >= erate_hi){
1326                                 ht_size /= 2;
1327                         }else if(collision_rate >= crate_hi){
1328                                 ht_size *= 2;
1329                         }else if(collision_rate < crate_lo){
1330                                 ht_size /= 2;
1331                         }
1332
1333                         printf(" rec ht_size=%d\n",ht_size);
1334                         fprintf(rec_ht_fl,"%s,%u\n",this_qn->name.c_str(),ht_size);
1335
1336                 }
1337         }
1338         fclose(rec_ht_fl);
1339
1340
1341 //              Try to load the cpu configuration info
1342
1343         vector <cpu_info_str *> cpu_info_list;
1344         FILE *cinfo_fl = NULL;
1345         cinfo_fl=fopen("cpu_info.csv","r");
1346         if(cinfo_fl==NULL){
1347                 fprintf(stderr,"Warning, can't open cpu_info.csv, skipping process pinning optimization.  Run gscpv3/bin/parse_cpuinfo.pl to generate a cpu_info.csv file.\n");
1348         }else{
1349                 int lineno = 0;
1350                 while(fgets(inp,LINEBUF,cinfo_fl)){
1351                         int nflds = split_string(inp,',',flds,SPLITBUF);
1352                         lineno++;
1353                         if(nflds >= 3){
1354                                 cpu_info_list.push_back(new cpu_info_str(atoi(flds[0]),atoi(flds[1]),atoi(flds[2])));
1355                         }
1356                 }
1357
1358                 sort(cpu_info_list.begin(),cpu_info_list.end(),cmpr_cpu_info);
1359
1360 //                      Spread the LFTAs among the cores.
1361                 vector<string> iface_names;
1362                 for(msvi=rts_iface_indices.begin();msvi!=rts_iface_indices.end();++msvi){
1363                         string iface = (*msvi).first;
1364                         vector<int> ifindices = (*msvi).second;
1365                         sort(ifindices.begin(),ifindices.end());
1366
1367                         for(i=0;i<ifindices.size();++i){
1368                                 string full_iface = iface + "X" + int_to_string(ifindices[i]);
1369                                 iface_names.push_back(full_iface);
1370                         }
1371                 }
1372
1373
1374                 map<string, int> rts_assignment;
1375                 double stride = ((double)(cpu_info_list.size()))/((double)(iface_names.size()));
1376                 double rtspos_f = 0.0;
1377                 for(i=0;i<iface_names.size();++i){
1378                         int rtspos = (int)rtspos_f;
1379                         rts_assignment[iface_names[i]] = rtspos;
1380                         cpu_info_list[rtspos]->assigned_load += rts_perf_map[pid_iface_map[iface_names[i]]]->avg_cpu_time();
1381                         rtspos_f += stride;
1382                 }
1383
1384 //for(i=0;i<iface_names.size();++i){
1385 //printf("Placing %s at %d\n",iface_names[i].c_str(), rts_assignment[iface_names[i]]);
1386 //}
1387
1388                 set<string> eligible_hftas;
1389                 map<string, int> hfta_assignment;
1390                 set<string>::iterator ssi;
1391                 for(ssi = found_names.begin();ssi!=found_names.end();++ssi){
1392                         int qidx = qname_to_idx[(*ssi)];
1393 //printf("Considering %s (%d), sz=%f, cpu=%f\n",(*ssi).c_str(),qidx,qnode_list[qidx]->inferred_in_sz,qnode_list[qidx]->perf->avg_cpu_time());
1394                         if(qnode_list[qidx]->inferred_in_sz >= min_hfta_insz || qnode_list[qidx]->perf->avg_cpu_time() > min_hfta_cpu){
1395 //printf("\tAdding to eligible list\n");
1396                                 eligible_hftas.insert((*ssi));
1397                         }
1398                 }
1399
1400                 while(eligible_hftas.size()>0){
1401                         int chosen_hfta = -1;
1402                         double max_assigned_rate = 0.0;
1403                         for(ssi=eligible_hftas.begin();ssi!=eligible_hftas.end();++ssi){
1404                                 double assigned_rate = 0.0;
1405                                 string qname = (*ssi);
1406                                 int qidx = qname_to_idx[qname];
1407                                 vector<int> reads_from = qnode_list[qidx]->reads_from_idx;
1408                                 for(i=0;i<reads_from.size();++i){
1409                                         if(qnode_list[reads_from[i]]->qnode_type == "LFTA" || (qnode_list[reads_from[i]]->qnode_type == "HFTA" && hfta_assignment.count(qnode_list[reads_from[i]]->name) > 0))
1410                                                 assigned_rate += qnode_list[reads_from[i]]->output_rate();
1411                                 }
1412 //printf("hfta %s, assigned rate=%f\n",qname.c_str(),assigned_rate);
1413                                 if(assigned_rate >= max_assigned_rate){
1414 //printf("\t picking %s\n",qname.c_str());
1415                                         max_assigned_rate = assigned_rate;
1416                                         chosen_hfta = qidx;
1417                                 }
1418                         }
1419                         if(chosen_hfta >= 0){
1420                                 vector<int> reads_from = qnode_list[chosen_hfta]->reads_from_idx;
1421                                 vector<int> src_location;
1422                                 vector<double> src_volume;
1423                                 for(i=0;i<reads_from.size();++i){
1424                                         int qidx = reads_from[i];
1425                                         if(qnode_list[qidx]->qnode_type == "HFTA"){
1426                                                 if(hfta_assignment.count(qnode_list[qidx]->name)>0){
1427                                                         src_location.push_back(hfta_assignment[qnode_list[qidx]->name]);
1428                                                         src_volume.push_back(qnode_list[qidx]->output_rate());
1429                                                 }
1430                                         }
1431                                         if(qnode_list[qidx]->qnode_type == "LFTA"){
1432                                                 if(rts_assignment.count(qnode_list[qidx]->src_interface)>0){
1433                                                         src_location.push_back(rts_assignment[qnode_list[qidx]->src_interface]);
1434                                                         src_volume.push_back(qnode_list[qidx]->output_rate());
1435                                                 }
1436                                         }
1437                                 }
1438 //printf("chosen hfta is %d (%s), sources are:\n",chosen_hfta,qnode_list[chosen_hfta]->name.c_str());
1439 //for(i=0;i<src_location.size();++i){
1440 //printf("\tloc=%d, (%s) volume=%f\n",src_location[i],cpu_info_list[src_location[i]]->to_csv().c_str(),src_volume[i]);
1441 //}
1442
1443                                 double hfta_cpu_usage = qnode_list[chosen_hfta]->perf->avg_cpu_time();
1444                                 if(hfta_cpu_usage > cpu_util_threshold) // hack for overloaded hftas.
1445                                         hfta_cpu_usage = cpu_util_threshold * .9999;
1446 printf("hfta %d (%s) has cpu usage %f\n",chosen_hfta,qnode_list[chosen_hfta]->name.c_str(),hfta_cpu_usage);
1447                                 int best_cpu = -1;
1448                                 double lowest_cost = 0.0;
1449                                 for(i=0;i<cpu_info_list.size();++i){
1450                                         double curr_cost = 0.0;
1451                                         for(j=0;j<src_location.size();++j){
1452                                                 int dist = cpu_info_list[i]->distance_from(cpu_info_list[src_location[j]]);
1453                                                 curr_cost += src_volume[j]*xfer_costs[dist];
1454                                         }
1455 //printf("Cpu %s, cost=%f\n",cpu_info_list[i]->to_csv().c_str(),curr_cost);
1456                                         if((cpu_info_list[i]->assigned_load+hfta_cpu_usage < cpu_util_threshold) && (best_cpu<0 || curr_cost <= lowest_cost)){
1457                                                 best_cpu = i;
1458                                                 lowest_cost = curr_cost;
1459 //printf("\tpicking %s\n",cpu_info_list[i]->to_csv().c_str());
1460                                         }
1461                                 }
1462
1463                                 if(best_cpu>=0)
1464                                         cpu_info_list[best_cpu]->assigned_load += hfta_cpu_usage;
1465                                 hfta_assignment[qnode_list[chosen_hfta]->name] = best_cpu;
1466                                 eligible_hftas.erase(qnode_list[chosen_hfta]->name);
1467
1468                         }else{
1469                                 fprintf(stderr,"ERROR, chosen_hfta=-1, bailing out.\n");
1470                                 exit(1);
1471                         }
1472                 }
1473
1474                 FILE *pin_fl = fopen("pinning_info.csv","w");
1475                 if(pin_fl==NULL){
1476                         fprintf(stderr,"Warning, can't open pinning_info.csv, can't write the file.\n");
1477                 }
1478                 printf("RTS assignments:\n");
1479                 for(i=0;i<iface_names.size();++i){
1480                         int assigned_cpu = rts_assignment[iface_names[i]];
1481                         if(assigned_cpu>=0){
1482                                 printf("Place %s at %d (%s)\n",iface_names[i].c_str(), assigned_cpu, cpu_info_list[assigned_cpu]->to_csv().c_str());
1483                                 if(pin_fl != NULL){
1484                                         fprintf(pin_fl,"rts %s,%d\n",iface_names[i].c_str(), assigned_cpu);
1485                                 }
1486                         }
1487                 }
1488
1489                 printf("HFTA assignments:\n");
1490                 map<string, int>::iterator msii;
1491                 for(msii=hfta_assignment.begin();msii!=hfta_assignment.end();++msii){
1492                         int assigned_cpu = (*msii).second;
1493                         string qname = (*msii).first;
1494                         if(assigned_cpu>=0){
1495                                 printf("Place %s (%s) at %d (%s)\n",qname.c_str(),qnode_list[qname_to_idx[qname]]->executable_name.c_str(),assigned_cpu,cpu_info_list[assigned_cpu]->to_csv().c_str());
1496                                 if(pin_fl != NULL){
1497                                         fprintf(pin_fl,"%s,%d\n",qnode_list[qname_to_idx[qname]]->executable_name.c_str(), assigned_cpu);
1498                                 }
1499                         }
1500                 }
1501
1502         }
1503
1504 //for(i=0;i<cpu_info_list.size();++i){
1505 //printf("Cpu %d assigned load %f\n",i,cpu_info_list[i]->assigned_load);
1506 //}
1507
1508 return 0;
1509 }
1510
1511
1512