Adding MC-NIB support
[ric-app/mc.git] / mc-core / mc / mcnib / gsmcnib.cc
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <app.h>
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <unistd.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <string.h>
22 #include <sys/time.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27
28
29 #include "gsconfig.h"
30 #include "gstypes.h"
31 #include "gshub.h"
32
33 #include <schemaparser.h>
34
35 #include <sdl/syncstorage.hpp>
36
37 using namespace std;
38
39 //      data type definitions from sdl
40 using Namespace = std::string;
41 using Key = std::string;
42 using Data = std::vector<uint8_t>;
43 using DataMap = std::map<Key, Data>;
44 using Keys = std::set<Key>;
45
46
47
48 #define MAXLINE 100000
49 static unsigned tcpport=0;
50 static char linebuf[MAXLINE];
51 int listensockfd=0;
52 int fd=0;
53
54 FILE* outf;
55
56 // Not all systems have timersub defined so make sure its ther
57 #ifndef timersub
58
59 #define timersub(tvp, uvp, vvp)                                         \
60 do {                                                            \
61 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec;          \
62 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec;       \
63 if ((vvp)->tv_usec < 0) {                               \
64 (vvp)->tv_sec--;                                \
65 (vvp)->tv_usec += 1000000;                      \
66 }                                                       \
67 } while (0)
68
69 #endif
70
71 void hand(int iv) {
72     ftaapp_exit();
73     fprintf(stderr, "exiting via signal handler %d...\n", iv);
74     exit(1);
75 }
76
77 static void wait_for_client() {
78     struct sockaddr_in serv_addr,cli_addr;
79     socklen_t clilen;
80     if (listensockfd==0) {
81                 gs_int32_t on = 1;
82                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
83         if (listensockfd < 0) {
84                         gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
85                         exit(1);
86                 }
87                 bzero((char *) &serv_addr, sizeof(serv_addr));
88                 serv_addr.sin_family = AF_INET;
89                 serv_addr.sin_addr.s_addr = INADDR_ANY;
90                 serv_addr.sin_port = htons(tcpport);
91 #ifndef __linux__
92         /* make sure we can reuse the common port rapidly */
93         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
94                        (gs_sp_t )&on, sizeof(on)) != 0) {
95             gslog(LOG_EMERG,"Error::could not set socket option\n");
96             exit(1);
97         }
98 #endif
99         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
100                        (gs_sp_t )&on, sizeof(on)) != 0) {
101             gslog(LOG_EMERG,"Error::could not set socket option\n");
102             exit(1);
103                 }
104         
105                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
106                  sizeof(serv_addr)) < 0) {
107                         gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
108             exit(1);
109         }
110         }
111     
112         do {
113                 listen(listensockfd,5);
114                 clilen = sizeof(cli_addr);
115                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
116                 if (fd<0) {
117             gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
118                 }
119         } while (fd==0);
120 }
121
122
123 static void emit_socket() {
124         unsigned o,w,l;
125         o=0;
126         w=0;
127         l=strlen(linebuf);
128         do {
129                 if((w=write(fd,&linebuf[o],l))==0) {
130                         close(fd);
131                         wait_for_client();
132                 }
133                 o=o+w;
134         } while (o<l);
135 }
136
137 static void emit_line() {
138     
139     if (tcpport==0) {
140         fprintf(outf,"%s",linebuf);
141     } else {
142         emit_socket();
143     }
144     
145 }
146
147 int split_string(char *instr,char sep, char **words,int max_words){
148    char *loc;
149    char *str;
150    int nwords = 0;
151
152    str = instr;
153    words[nwords++] = str;
154    while( (loc = strchr(str,sep)) != NULL){
155         *loc = '\0';
156         str = loc+1;
157         if(nwords >= max_words){
158                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
159                 nwords = max_words-1;
160         }
161         words[nwords++] = str;
162    }
163
164    return(nwords);
165 }
166
167 std::vector<string> split_string(const string &str, char sep){
168         char *instr = strdup(str.c_str());
169         char *words[1000];
170         int nwords = split_string(instr, sep, words, 1000);
171         vector<string> ret;
172         for(int i=0;i<nwords;++i){
173                 ret.push_back(words[i]);
174         }
175         delete instr;
176         return ret;
177 }
178
179 vector<uint8_t> packData(const char *d, int len){
180         const uint8_t *d8 = (const uint8_t *)d;
181         return Data(d8, d8+len+1);
182 }
183
184
185
186 int main(int argc, char* argv[]) {
187     gs_sp_t me = argv[0];
188     FTAID fta_id;
189     gs_int32_t schema, ch;
190     
191     FTAID rfta_id;
192     gs_uint32_t rsize;
193     gs_uint32_t bufsz=8*1024*1024;
194     gs_int8_t rbuf[2*MAXTUPLESZ];
195     
196     gs_int32_t numberoffields;
197     gs_int32_t verbose=0;
198     gs_int32_t y, lcv;
199     
200     void *pblk;
201     gs_int32_t pblklen;
202         gs_int32_t n_actual_param;
203         gs_int32_t n_expected_param;
204     gs_int32_t xit = 0;
205     gs_int32_t dump = 0;
206     struct timeval tvs, tve, tvd;
207     gs_retval_t code;
208     endpoint gshub;
209     endpoint dummyep;
210     gs_uint32_t tip1,tip2,tip3,tip4;
211     gs_sp_t instance_name;
212
213         string keys_s;
214         vector<string> keys_v;
215         vector<int> keys_i;
216
217     gs_uint32_t tlimit = 0;     // time limit in seconds
218     time_t start_time, curr_time;
219     
220         gsopenlog(argv[0]);
221
222     // by default the output will go to stdout
223     outf = stdout;
224     
225     while ((ch = getopt(argc, argv, "l:p:r:veXDK:")) != -1) {
226         switch (ch) {
227             case 'r':
228                 bufsz=atoi(optarg);
229                 break;
230             case 'p':
231                 tcpport=atoi(optarg);
232                 break;
233             case 'v':
234                 verbose++;
235                 break;
236             case 'e':
237                 outf = stderr;
238                 break;
239             case 'X':
240                 xit++;
241                 break;
242             case 'D':
243                 dump++;
244                 break;
245             case 'l':
246                 tlimit = atoi(optarg);
247                 break;
248                         case 'K':
249                                 keys_s = optarg;
250                                 keys_v = split_string(keys_s, ',');
251                                 break;
252             default:
253             usage:
254                 fprintf(stderr, "usage: %s [-r <bufsz>] [-e] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-K comma_separated_key_fields] <gshub-hostname>:<gshub-port> <gsinstance_name>  query param1 param2...\n", *argv);
255                 exit(1);
256         }
257     }
258     argc -= optind;
259     argv += optind;
260     if (argc<3) goto usage;
261     
262     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
263         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
264         exit(1);
265     }
266     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
267     gshub.port=htons(gshub.port);
268     instance_name=strdup(argv[1]);
269     if (set_hub(gshub)!=0) {
270         gslog(LOG_EMERG,"Could not set hub");
271         exit(1);
272     }
273     if (set_instance_name(instance_name)!=0) {
274         gslog(LOG_EMERG,"Could not set instance name");
275         exit(1);
276     }
277     
278     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
279         gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
280     }
281     
282     gettimeofday(&tvs, 0);
283     argc -=2;
284     argv +=2;
285     if (argc < 1)
286         goto usage;
287     
288     /* initialize host library and the sgroup  */
289     
290     if (verbose>=2) fprintf(stderr,"Inializin gscp\n");
291     
292     if (ftaapp_init(bufsz)!=0) {
293         fprintf(stderr,"%s::error:could not initialize gscp\n", me);
294         exit(1);
295     }
296     
297     signal(SIGTERM, hand);
298     signal(SIGINT, hand);
299
300         Namespace ns("mcnib");  
301         string key_base = argv[0];
302     
303     schema = ftaapp_get_fta_schema_by_name(argv[0]);
304     if (schema < 0) {
305         fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
306                 me ,argv[0]);
307         exit(1);
308     }
309         n_expected_param = ftaschema_parameter_len(schema);
310     if (n_expected_param == 0) {
311         pblk = 0;
312         pblklen = 0;
313     } else {
314         /* parse the params */
315                 n_actual_param = argc-1;
316                 if(n_actual_param < n_expected_param){
317                         fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
318                         exit(1);
319                 }
320         for (lcv = 1 ; lcv < argc ; lcv++) {
321             char *k, *e;
322             int rv;
323             k = argv[lcv];
324             e = k;
325             while (*e && *e != '=') e++;
326             if (*e == 0) {
327                 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
328                         argv[lcv]);
329                 exit(1);
330             }
331             *e = 0;
332             rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
333             *e = '=';
334             if (rv < 0) {
335                 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
336                         argv[lcv]);
337                 exit(1);
338             }
339         }
340         if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
341             fprintf(stderr, "ftaschema_create_param_block failed!\n");
342             exit(1);
343         }
344     }
345     ftaschema_free(schema); /* XXXCDC */
346     
347     
348     if (verbose>=2) fprintf(stderr,"Initalize FTA\n");
349     
350     fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
351     if (fta_id.streamid==0) {
352         fprintf(stderr,"%s::error:could not initialize fta %s\n",
353                 me, argv[0]);
354         exit(1);
355     }
356     /* XXXCDC: pblk is malloc'd, should we free it? */
357     
358     if (verbose>=2) fprintf(stderr,"Get schema handle\n");
359     
360     if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
361         fprintf(stderr,"%s::error:could not get schema\n", me);
362         exit(1);
363     }
364     
365     if ((numberoffields=ftaschema_tuple_len(schema))<0) {
366         fprintf(stderr,"%s::error:could not get number of fields in schema\n",
367                 me);
368         exit(1);
369     }
370     
371     if (verbose>=1) {
372         for(y=0; y<numberoffields;y++) {
373             printf("%s",ftaschema_field_name(schema,y));
374             if (y<numberoffields-1) printf("|");
375         }
376         printf("\n");
377     }
378     if (xit) {
379         gettimeofday(&tve, 0);
380         timersub(&tve, &tvs, &tvd);
381         printf("TIME= %ld.%06ld sec\n", tvd.tv_sec, tvd.tv_usec);
382         hand(0);
383     }
384     if (tcpport!=0) {
385         wait_for_client();
386     }
387
388 // Get the vector of keys, bail out if there is a mismatch
389         for(int ki=0;ki<keys_v.size();++ki){
390                 int fi;
391                 for(fi=0; fi<numberoffields;fi++) {
392                         if(ftaschema_field_name(schema, fi) == keys_v[ki]){
393                                 keys_i.push_back(fi);
394                                 break;
395                         }
396                 }
397                 if(fi>=numberoffields){
398                         fprintf(stderr,"ERROR key field %s is not in the schema.\n",keys_v[ki].c_str());
399                         exit(1);
400                 }
401         }
402
403 //              Get SDL handle
404         std::unique_ptr<shareddatalayer::SyncStorage> sdl(shareddatalayer::SyncStorage::create());
405
406     start_time = time(NULL);
407
408     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
409         if (dump)
410             continue;
411         if (ftaschema_is_eof_tuple(schema, rbuf)) {
412             /* initiate shutdown or something of that nature */
413             printf("#All data proccessed\n");
414             exit(0);
415         }
416         if (!rsize)
417             continue;
418         if (verbose >=2) {
419             snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
420             emit_line();
421         }
422
423         if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
424                         string key = key_base;
425                         for(int fi=0;fi<keys_i.size();++fi){
426                                 y = keys_i[fi]; //n match gsprintconsole
427                 access_result ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
428                 switch (ar.field_data_type) {
429                     case INT_TYPE:
430                                                 key += ":"+to_string(ar.r.i);
431                         break;
432                     case UINT_TYPE:
433                                                 key += ":"+to_string(ar.r.ui);
434                         break;
435                     case IP_TYPE:
436                         snprintf(linebuf,MAXLINE,"%u.%u.%u.%u",ar.r.ui>>24&0xff,
437                                  ar.r.ui>>16&0xff,
438                                  ar.r.ui>>8&0xff,
439                                  ar.r.ui&0xff);
440                                                 key += ":"+string(linebuf);
441                         break;
442                     case IPV6_TYPE:
443                     {
444                         unsigned x;
445                         unsigned zc=0;
446                         for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
447                         if (zc!=4) {
448                             for(x=0;x<8;x++) {
449                                 unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
450                                 unsigned y;
451                                 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
452                                 snprintf(&linebuf[strlen(linebuf)],MAXLINE,"%04x",y);
453                                 if (x<7) snprintf(&linebuf[strlen(linebuf)],MAXLINE,":");
454                             }
455                         } else {
456                             snprintf(linebuf,MAXLINE,"::");
457                         }
458                                                 key += ":"+string(linebuf);
459                     }
460                     break;
461                     case USHORT_TYPE:
462                                                 key += ":"+to_string(ar.r.ui);
463                         break;
464                     case BOOL_TYPE:
465                         if (ar.r.ui==0) {
466                                                         key += ":FALSE";
467                         } else {
468                                                         key += ":TRUE";
469                         }
470                         break;
471                     case ULLONG_TYPE:
472                                                 key += ":"+to_string(ar.r.ul);
473                         break;
474                     case LLONG_TYPE:
475                                                 key += ":"+to_string(ar.r.l);
476                         break;
477                     case FLOAT_TYPE:
478                                                 key += ":"+to_string(ar.r.f);
479                         break;
480                     case TIMEVAL_TYPE:
481                     {
482                         gs_float_t t;
483                         t= ar.r.t.tv_usec;
484                         t=t/1000000;
485                         t=t+ar.r.t.tv_sec;
486                         snprintf(linebuf,MAXLINE,"%f sec",t);
487                                                 key += ":"+string(linebuf);
488                     }
489                         break;
490                     case VSTR_TYPE:
491                     {
492                         int x;
493                         int c;
494                         int d=0;
495                         char * src;
496                         src=(char*)ar.r.vs.offset;
497                         if(d<MAXLINE){
498                             linebuf[d] = '\0';
499                         }
500                         for(x=0;x<ar.r.vs.length;x++) {
501                             c=src[x];
502                             if ((c<='~') && (c>=' ')) {
503                                 if (d<MAXLINE-1) {
504                                     linebuf[d]=c;
505                                     linebuf[d+1]=0;
506                                     d++;
507                                 }
508                             } else {
509                                 if (d<MAXLINE-1) {
510                                     linebuf[d]='.';
511                                     linebuf[d+1]=0;
512                                     d++;
513                                 }
514                             }
515                         }
516                                                 key += ":"+string(linebuf);
517                     }
518                         break;
519                     default:
520                         linebuf[0]=0;
521                         break;
522                 }
523             }
524                         if(keys_i.size()==0){
525                                 key += ":";
526                         }
527                         DataMap D;
528                         D[key] = packData(rbuf, rsize);
529                         sdl->set(ns, D);
530
531         } else {
532             if (rfta_id.streamid != fta_id.streamid)
533                 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
534         }
535
536         // whenever we receive a temp tuple check if we reached time limit
537         if ((code==2)  && tlimit && (time(NULL)-start_time)>=tlimit) {
538             fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);    
539             ftaapp_exit();
540             exit(0);
541         }        
542     }
543 }
544