Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / src / tools / gsprintconsole_ves.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15
16
17 /*
18  * Print ves formatted records to the console.
19  * Each line is a json record.
20  * Based on gsprintconsole.c, just differences in formatting.
21 */
22
23
24 #include <app.h>
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <signal.h>
29 #include <time.h>
30 #include <string.h>
31 #include <sys/time.h>
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36
37 #include <errno.h>
38 #include <string.h>
39
40 #ifdef RMR_ENABLED
41 #include <rmr/rmr.h>
42 #include <rmr/RIC_message_types.h>
43 #include <sys/epoll.h>
44 #endif
45
46 #include "gsconfig.h"
47 #include "gstypes.h"
48 #include "gshub.h"
49 #include "simple_http.h"
50
51 #include <schemaparser.h>
52
53 #define MAXLINE 100000
54 static unsigned tcpport=0;
55 static char linebuf[MAXLINE];
56 int listensockfd=0;
57 int fd=0;
58
59 // how frequently we will log stats (expressed in tuples posted)
60 #define STAT_FREQUENCY 5
61
62
63 // Not all systems have timersub defined so make sure its ther
64 #ifndef timersub
65
66 #define timersub(tvp, uvp, vvp)                                         \
67 do {                                                            \
68 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec;          \
69 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec;       \
70 if ((vvp)->tv_usec < 0) {                               \
71 (vvp)->tv_sec--;                                \
72 (vvp)->tv_usec += 1000000;                      \
73 }                                                       \
74 } while (0)
75
76 #endif
77
78 void hand(int iv) {
79     ftaapp_exit();
80     fprintf(stderr, "exiting via signal handler %d...\n", iv);
81     exit(1);
82 }
83
84 static void wait_for_client() {
85     struct sockaddr_in serv_addr,cli_addr;
86     socklen_t clilen;
87     if (listensockfd==0) {
88                 gs_int32_t on = 1;
89                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
90         if (listensockfd < 0) {
91                         gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
92                         exit(1);
93                 }
94                 bzero((char *) &serv_addr, sizeof(serv_addr));
95                 serv_addr.sin_family = AF_INET;
96                 serv_addr.sin_addr.s_addr = INADDR_ANY;
97                 serv_addr.sin_port = htons(tcpport);
98 #ifndef __linux__
99         /* make sure we can reuse the common port rapidly */
100         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
101                        (gs_sp_t )&on, sizeof(on)) != 0) {
102             gslog(LOG_EMERG,"Error::could not set socket option");
103             exit(1);
104         }
105 #endif
106         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
107                        (gs_sp_t )&on, sizeof(on)) != 0) {
108             gslog(LOG_EMERG,"Error::could not set socket option");
109             exit(1);
110                 }
111         
112                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
113                  sizeof(serv_addr)) < 0) {
114                         gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
115             exit(1);
116         }
117         }
118     
119         do {
120                 listen(listensockfd,5);
121                 clilen = sizeof(cli_addr);
122                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
123                 if (fd<0) {
124             gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
125                 }
126         } while (fd==0);
127 }
128
129
130 static void emit_socket() {
131         unsigned o,w,l;
132         o=0;
133         w=0;
134         l=strlen(linebuf);
135         do {
136                 if((w=write(fd,&linebuf[o],l))==0) {
137                         close(fd);
138                         wait_for_client();
139                 }
140                 o=o+w;
141         } while (o<l);
142 }
143
144 static void emit_line() {
145     
146     if (tcpport==0) {
147         printf("%s",linebuf);
148     } else {
149         emit_socket();
150     }
151     
152 }
153
154 int main(int argc, char* argv[]) {
155     gs_sp_t me = argv[0];
156     FTAID fta_id;
157     gs_int32_t schema, ch;
158     
159     FTAID rfta_id;
160     gs_uint32_t rsize;
161     gs_uint32_t bufsz=8*1024*1024;
162     gs_int8_t rbuf[2*MAXTUPLESZ];
163     
164     gs_int32_t numberoffields;
165     gs_int32_t verbose=0;
166     gs_int32_t y, lcv;
167     
168     void *pblk;
169     gs_int32_t pblklen;
170         gs_int32_t n_actual_param;
171         gs_int32_t n_expected_param;
172     gs_int32_t xit = 0;
173     gs_int32_t dump = 0;
174     struct timeval tvs, tve, tvd;
175     gs_retval_t code;
176     endpoint gshub;
177     endpoint dummyep;
178     gs_uint32_t tip1,tip2,tip3,tip4;
179     gs_sp_t instance_name;
180     gs_sp_t     rmr_port = NULL;    
181
182 #ifdef RMR_ENABLED
183     // RMR-related parameters
184     gs_int32_t  rmr_mtype = MC_REPORT;
185 #endif
186
187         gs_sp_t curl_address = NULL;
188         endpoint curl_endpoint;
189         gs_sp_t curl_url = NULL;
190         gs_sp_t curl_auth = NULL;
191         gs_uint32_t http_code;
192
193         gs_uint32_t ves_version=7;
194
195     gs_uint32_t tlimit = 0;     // time limit in seconds
196     time_t start_time, curr_time;
197
198     gs_uint64_t post_success_cnt = 0ULL;
199     gs_uint64_t post_failure_cnt = 0ULL;    
200
201 #ifdef RMR_ENABLED
202         void* mrc;                                                      //msg router context
203         struct epoll_event events[1];                   // list of events to give to epoll
204         struct epoll_event epe;                 // event definition for event to listen to
205         gs_int32_t     ep_fd = -1;                                              // epoll's file des (given to epoll_wait)
206         gs_int32_t rcv_fd;                                              // file des that NNG tickles -- give this to epoll to listen on
207         gs_int32_t nready;                                                              // number of events ready for receive
208         rmr_mbuf_t*             rmr_sbuf;                                       // send buffer
209         rmr_mbuf_t*             rmr_rbuf;                                       // received buffer
210
211     gs_uint64_t rmr_post_success_cnt = 0ULL;
212     gs_uint64_t rmr_post_failure_cnt = 0ULL;        
213 #endif
214     
215         gsopenlog(argv[0]);
216     
217     while ((ch = getopt(argc, argv, "l:p:r:vXDC:U:R:A:V:")) != -1) {
218         switch (ch) {
219             case 'r':
220                 bufsz=atoi(optarg);
221                 break;
222             case 'p':
223                 tcpport=atoi(optarg);
224                 break;
225             case 'v':
226                 verbose++;
227                 break;
228             case 'X':
229                 xit++;
230                 break;
231             case 'D':
232                 dump++;
233                 break;
234             case 'l':
235                 tlimit = atoi(optarg);
236                 break;
237             case 'V':
238                 ves_version = atoi(optarg);
239                 break;
240                         case 'C':
241                                 curl_address = strdup(optarg);
242                         if (sscanf(curl_address,"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(curl_endpoint.port))!= 5 ) {
243                                 gslog(LOG_EMERG,"Curl IP NOT DEFINED");
244                                 exit(1);
245                         }
246                         curl_endpoint.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
247                         curl_endpoint.port=htons(curl_endpoint.port);
248                                 break;
249             case 'R':
250                 rmr_port=strdup(optarg);
251                 break;
252             case 'U':
253                                 curl_url = strdup(optarg);
254                 break;
255             case 'A':
256                                 curl_auth = strdup(optarg);
257                 break;
258             default:
259             usage:
260                 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-C <curl_dest>:<curl_port>] [-U <curl_url>] [-A <authentication_string>] [-V <ves_version>] [-R <rmr_port>] <gshub-hostname>:<gshub-port> <gsinstance_name>  query param1 param2...\n",
261                         *argv);
262                 exit(1);
263         }
264     }
265     argc -= optind;
266     argv += optind;
267     if (argc<3) goto usage;
268     
269     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
270         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
271         exit(1);
272     }
273     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
274     gshub.port=htons(gshub.port);
275     instance_name=strdup(argv[1]);
276     if (set_hub(gshub)!=0) {
277         gslog(LOG_EMERG,"Could not set hub");
278         exit(1);
279     }
280     if (set_instance_name(instance_name)!=0) {
281         gslog(LOG_EMERG,"Could not set instance name");
282         exit(1);
283     }
284     
285     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
286         gslog(LOG_EMERG,"Did not receive signal that GS is initiated");
287     }
288
289
290 //      If this uses curl output, ensure consistency in the curl args
291         if(curl_address != NULL){
292                 if(curl_url == NULL){
293                         gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar");
294                         exit(1);
295                 }
296                 if(curl_auth==NULL){
297                         curl_auth = "";
298                 } 
299         }
300     
301     gettimeofday(&tvs, 0);
302     argc -=2;
303     argv +=2;
304     if (argc < 1)
305         goto usage;
306
307     if (rmr_port) {
308 #ifdef RMR_ENABLED        
309         /* initialize RMR library */
310         if( (mrc = rmr_init( rmr_port, 1400, RMRFL_NONE )) == NULL ) {
311             fprintf(stderr, "%s::error:unable to initialise RMR\n", me);
312             exit( 1 );
313         }
314
315         rcv_fd = rmr_get_rcvfd( mrc );                                  // set up epoll things, start by getting the FD from MRr
316         if( rcv_fd < 0 ) {
317             fprintf(stderr, "%s::error:unable to set up polling fd\n", me);
318             exit( 1 );
319         }
320         if( (ep_fd = epoll_create1( 0 )) < 0 ) {
321             fprintf(stderr, "%s::error:unable to create epoll fd: %d\n", me, errno);
322             exit( 1 );
323         }
324         epe.events = EPOLLIN;
325         epe.data.fd = rcv_fd;
326
327         if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
328             fprintf(stderr, "%s::error:epoll_ctl status not 0 : %s\n", me, strerror(errno));
329             exit( 1 );
330         }
331
332         rmr_sbuf = rmr_alloc_msg( mrc, MAXLINE );       // alloc first send buffer; subsequent buffers allcoated on send
333         rmr_rbuf = NULL;                                                // don't need to alloc receive buffer
334
335         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr say it has one
336             sleep( 10 );
337         }
338         fprintf( stderr, "%s: RMR is ready\n", argv[0]);       
339 #else
340                 fprintf(stderr,"Runtime libraries built without RMR support. Rebuild with RMR_ENABLED defined in gsoptions.h\n");
341                 exit(0);
342 #endif
343     }
344         
345     /* initialize host library and the sgroup  */
346     
347     if (verbose>=2) fprintf(stderr,"Initializing gscp\n");
348     
349     if (ftaapp_init(bufsz)!=0) {
350         fprintf(stderr,"%s::error:could not initialize gscp\n", me);
351         exit(1);
352     }
353     
354     signal(SIGTERM, hand);
355     signal(SIGINT, hand);
356     
357     schema = ftaapp_get_fta_schema_by_name(argv[0]);
358     if (schema < 0) {
359         fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
360                 me ,argv[0]);
361         exit(1);
362     }
363         n_expected_param = ftaschema_parameter_len(schema);
364     if (n_expected_param == 0) {
365         pblk = 0;
366         pblklen = 0;
367     } else {
368                 n_actual_param = argc-1;
369                 if(n_actual_param < n_expected_param){
370                         fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
371                         exit(1);
372                 }
373         /* parse the params */
374         for (lcv = 1 ; lcv < argc ; lcv++) {
375             char *k, *e;
376             int rv;
377             k = argv[lcv];
378             e = k;
379             while (*e && *e != '=') e++;
380             if (*e == 0) {
381                 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
382                         argv[lcv]);
383                 exit(1);
384             }
385             *e = 0;
386             rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
387             *e = '=';
388             if (rv < 0) {
389                 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
390                         argv[lcv]);
391                 exit(1);
392             }
393         }
394         if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
395             fprintf(stderr, "ftaschema_create_param_block failed!\n");
396             exit(1);
397         }
398     }
399 //    ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used
400     
401     
402     if (verbose>=2) fprintf(stderr,"Initalized FTA\n");
403     
404     fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
405     if (fta_id.streamid==0) {
406         fprintf(stderr,"%s::error:could not initialize fta %s\n",
407                 me, argv[0]);
408         exit(1);
409     }
410     /* XXXCDC: pblk is malloc'd, should we free it? */
411     
412     if (verbose>=2) fprintf(stderr,"Get schema handle\n");
413     
414     if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
415         fprintf(stderr,"%s::error:could not get schema\n", me);
416         exit(1);
417     }
418     
419     if ((numberoffields=ftaschema_tuple_len(schema))<0) {
420         fprintf(stderr,"%s::error:could not get number of fields in schema\n",
421                 me);
422         exit(1);
423     }
424     
425     if (verbose>=1) {
426         for(y=0; y<numberoffields;y++) {
427             printf("%s",ftaschema_field_name(schema,y));
428             if (y<numberoffields-1) printf("|");
429         }
430         printf("\n");
431     }
432     if (xit) {  // -X in command line
433         gettimeofday(&tve, 0);
434         timersub(&tve, &tvs, &tvd);
435         printf("TIME= %ld%06d sec\n", tvd.tv_sec, tvd.tv_usec);
436         hand(0);        // effectively an exit
437     }
438     if (tcpport!=0) {
439         wait_for_client();
440     }
441
442     start_time = time(NULL);
443
444         int measurement_interval_pos = -1; // extract measurementInterval if present
445         char *field_names[numberoffields];
446     for(y=0; y<numberoffields;y++) {
447                 field_names[y] = strdup(ftaschema_field_name(schema,y));
448                 if(strcmp(field_names[y], "measurementInterval")==0)
449                         measurement_interval_pos = y;
450         }
451
452
453         struct timeval tsample;
454         gettimeofday(&tsample, 0);
455         char start_ts[100], curr_ts[100];
456         sprintf(start_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
457
458         long unsigned int lineno=0;
459         long unsigned int seqno=0;
460         double measurement_interval;
461     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
462                 lineno++;
463         if (dump)       // -D in command line
464             continue;
465         if (ftaschema_is_eof_tuple(schema, rbuf)) {
466             /* initiate shutdown or something of that nature */
467             printf("#All data proccessed\n");
468             exit(0);
469         }
470         if (!rsize)
471             continue;
472         if (verbose >=2) {
473             snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
474             emit_line();
475         }
476         if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
477                         seqno++;
478                         gettimeofday(&tsample, 0);
479                         sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
480                         int pos;
481                         if(ves_version < 7){
482                                 pos = snprintf(linebuf, MAXLINE,
483   "{\"event\": { \"commonEventHeader\": { "
484         "\"domain\": \"measurementsForVfScaling\", "
485         "\"eventId\": \"%s%u\", "
486         "\"eventType\": \"%s\", "
487         "\"eventName\": \"Measurement_MC_%s\", "
488         "\"lastEpochMicrosec\": %s, "
489         "\"priority\": \"Normal\", "
490         "\"reportingEntityName\": \"GS-LITE MC\", "
491         "\"sequence\": %u, "
492         "\"sourceName\": \"meas_cmpgn_xapp\", "
493         "\"startEpochMicrosec\": %s, "
494         "\"version\": 5 "
495       "}, "
496       "\"measurementsForVfScalingFields\": { "
497                 "\"additionalFields\": ["
498                                 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
499                                 );
500                         }else{
501                                 pos = snprintf(linebuf, MAXLINE,
502   "{\"event\": { \"commonEventHeader\": { "
503         "\"domain\": \"measurement\", "
504         "\"eventId\": \"%s%u\", "
505         "\"eventType\": \"%s\", "
506         "\"eventName\": \"Measurement_MC_%s\", "
507         "\"lastEpochMicrosec\": %s, "
508         "\"priority\": \"Normal\", "
509         "\"reportingEntityName\": \"GS-LITE MC\", "
510         "\"sequence\": %u, "
511         "\"sourceName\": \"meas_cmpgn_xapp\", "
512         "\"startEpochMicrosec\": %s, "
513         "\"version\": \"4.0.1\", "
514         "\"vesEventListenerVersion\": \"7.0.1\" "
515       "}, "
516       "\"measurementFields\": { "
517                 "\"additionalFields\": {"
518                                 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
519                                 );
520                         }
521
522                         measurement_interval = 0;
523             for(y=0; y<numberoffields;y++) {
524                 struct access_result ar;
525 //                if (verbose>=2)
526 //                    printf("%s->",ftaschema_field_name(schema,y));
527                 if(y>0){
528                                         linebuf[pos]=',';
529                                         pos++;
530                                 }
531                 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
532                 switch (ar.field_data_type) {
533                     case INT_TYPE:
534                                                 if(ves_version < 7)
535                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i);
536                                                 else
537                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i);
538                                                 if(y==measurement_interval_pos)
539                                                         measurement_interval = (double)ar.r.i;
540                         break;
541                     case UINT_TYPE:
542                                                 if(ves_version < 7)
543                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
544                                                 else
545                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui);
546                                                 if(y==measurement_interval_pos)
547                                                         measurement_interval = (double)ar.r.ui;
548                         break;
549                     case IP_TYPE:
550                                                 if(ves_version < 7)
551                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff,
552                                  ar.r.ui>>16&0xff,
553                                  ar.r.ui>>8&0xff,
554                                  ar.r.ui&0xff);
555                                                 else
556                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff,
557                                  ar.r.ui>>16&0xff,
558                                  ar.r.ui>>8&0xff,
559                                  ar.r.ui&0xff);
560                         break;
561                     case IPV6_TYPE:
562                     {
563                                                 if(ves_version < 7)
564                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
565                                                 else
566                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]);
567                         unsigned x;
568                         unsigned zc=0;
569                         for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
570                         if (zc!=4) {
571                             snprintf(linebuf,MAXLINE,"");
572                             for(x=0;x<8;x++) {
573                                 unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
574                                 unsigned y;
575                                 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
576                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y);
577                                 if (x<7){
578                                                                         pos += snprintf(linebuf+pos,MAXLINE-pos,":");
579                                                                 }
580                             }
581                         } else {
582                             pos+=snprintf(linebuf+pos,MAXLINE-pos,"::");
583                         }
584                                                 if(ves_version < 7)
585                                                         pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}");
586                                                 else
587                                                         pos += snprintf(linebuf+pos, MAXLINE-pos,"\"");
588                     }
589                         break;
590                         
591                     case USHORT_TYPE:
592                                                 if(ves_version < 7)
593                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
594                                                 else
595                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\":  \"%u\"}",field_names[y], ar.r.ui);
596                                                 if(y==measurement_interval_pos)
597                                                         measurement_interval = (double)ar.r.ui;
598                         break;
599                     case BOOL_TYPE:
600                                                 if(ves_version < 7){
601                                 if (ar.r.ui==0) {
602                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]);
603                                      } else {
604                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]);
605                                 }
606                                                 }else{
607                                 if (ar.r.ui==0) {
608                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]);
609                                      } else {
610                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]);
611                                 }
612                                                 }
613                         break;
614                     case ULLONG_TYPE:
615                                                 if(ves_version < 7)
616                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul);
617                                                 else
618                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul);
619                                                 if(y==measurement_interval_pos)
620                                                         measurement_interval = (double)ar.r.ul;
621                         break;
622                     case LLONG_TYPE:
623                                                 if(ves_version < 7)
624                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l);
625                                                 else
626                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l);
627                                                 if(y==measurement_interval_pos)
628                                                         measurement_interval = (double)ar.r.l;
629                         break;
630                     case FLOAT_TYPE:
631                                                 if(ves_version < 7)
632                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f);
633                                                 else
634                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f);
635                                                 if(y==measurement_interval_pos)
636                                                         measurement_interval = (double)ar.r.f;                            
637                         break;
638                     case TIMEVAL_TYPE:
639                     {
640                         gs_float_t t;
641                         t= ar.r.t.tv_usec;
642                         t=t/1000000;
643                         t=t+ar.r.t.tv_sec;
644                                                 if(ves_version < 7)
645                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t);
646                                                 else
647                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t);
648                     }
649                         break;
650                     case VSTR_TYPE:
651                     {
652                                                 if(ves_version < 7)
653                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
654                                                 else
655                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]);
656                         int x;
657                         int c;
658                         char * src;
659                         src=(char*)ar.r.vs.offset;
660                                                 for(x=0;x<ar.r.vs.length;x++){
661                             c=src[x];
662                             if ((c<='~') && (c>=' ')) {
663                                 if (pos<MAXLINE-1) {
664                                     linebuf[pos]=c;
665                                     pos++;
666                                 }
667                             } else {
668                                 if (pos<MAXLINE-1) {
669                                     linebuf[pos]='.';
670                                     pos++;
671                                 }
672                             }
673                                                 }
674                                                 if(ves_version < 7)
675                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"}");
676                                                 else
677                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"");
678                     }
679                         break;
680                     default:
681                         linebuf[0]=0;
682                         break;
683                 }
684                         }
685                         if(ves_version < 7){
686                                 snprintf(linebuf+pos, MAXLINE-pos,
687                 "], \"measurementInterval\": %f, \"measurementsForVfScalingVersion\": 1"
688                 "}}}\n", measurement_interval
689                                 );
690                         }else{
691                                 snprintf(linebuf+pos, MAXLINE-pos,
692                 "}, \"measurementInterval\": %f, \"measurementFieldsVersion\": \"4.0\""
693                 "}}}\n", measurement_interval
694                                 );
695                         }
696
697 #ifdef RMR_ENABLED
698             if (rmr_port) {                
699                 rmr_sbuf->mtype = rmr_mtype;                                                    // fill in the message bits
700                         rmr_sbuf->len =  strlen(linebuf) + 1;           // our receiver likely wants a nice acsii-z string
701                         memcpy(rmr_sbuf->payload, linebuf, rmr_sbuf->len);
702                 rmr_sbuf->state = 0;
703                 rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf);                                // send it (send returns an empty payload on success, or the original payload on fail/retry)
704                 while( rmr_sbuf->state == RMR_ERR_RETRY ) {                     // soft failure (device busy?) retry
705                     rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf);                    // retry send until it's good (simple test; real programmes should do better)
706                 }
707                 if(rmr_sbuf->state != RMR_OK) {
708                     gslog(LOG_WARNING, "rmr_send_msg() failure, strerror(errno) is %s", strerror(errno));
709                     rmr_post_failure_cnt++;
710                 } else
711                     rmr_post_success_cnt++;
712                 if (((rmr_post_success_cnt+rmr_post_failure_cnt) % STAT_FREQUENCY) == 0)
713                     gslog(LOG_WARNING, "%s: successful RMR posts - %llu, failed RMR posts - %llu", argv[0], rmr_post_success_cnt, rmr_post_failure_cnt);
714             }
715 #endif            
716
717                         if(curl_address==NULL){
718                 if (!rmr_port)  // if neither VES collector nor RMR is specified print to standard output
719                     emit_line();
720                         }else{
721                                 http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth);
722                                 if(http_code != 200 && http_code != 202){
723                     post_failure_cnt++; 
724                                         gslog(LOG_WARNING, "http return code is %d",http_code);
725                                 } else {
726                     post_success_cnt++;   
727                 }  
728                 if (((post_success_cnt+post_failure_cnt) % STAT_FREQUENCY) == 0)
729                     gslog(LOG_WARNING, "%s: successful ves posts - %llu, failed ves posts - %llu", argv[0], post_success_cnt, post_failure_cnt);
730                         }
731             if (verbose!=0) fflush(stdout);
732         } else {
733             if (rfta_id.streamid != fta_id.streamid)
734                 fprintf(stderr,"Got unknown streamid %llu \n",rfta_id.streamid);
735         }
736
737         // whenever we receive a temp tuple check if we reached time limit
738         if ((code==2)  && tlimit && (time(NULL)-start_time)>=tlimit) {
739             fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);    
740             ftaapp_exit();
741             exit(0);
742         }        
743     }
744 }
745