Added support for publishing metrics using RMR and new descriptor file format
[ric-app/mc.git] / mc-core / mc / mcnib / gsprintconsole_ves.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15
16
17 /*
18  * Print ves formatted records to the console.
19  * Each line is a json record.
20  * Based on gsprintconsole.c, just differences in formatting.
21 */
22
23
24 #include <app.h>
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <signal.h>
29 #include <time.h>
30 #include <string.h>
31 #include <sys/time.h>
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36
37 #include <errno.h>
38 #include <string.h>
39 #include <sys/epoll.h>
40
41 #include <rmr/rmr.h>
42 #include <rmr/RIC_message_types.h>
43
44
45 #include "gsconfig.h"
46 #include "gstypes.h"
47 #include "gshub.h"
48 #include "simple_http.h"
49
50 #include <schemaparser.h>
51
52 #define MAXLINE 100000
53 static unsigned tcpport=0;
54 static char linebuf[MAXLINE];
55 int listensockfd=0;
56 int fd=0;
57
58 // how frequently we will log stats (expressed in tuples posted)
59 #define STAT_FREQUENCY 5
60
61
62 // Not all systems have timersub defined so make sure its ther
63 #ifndef timersub
64
65 #define timersub(tvp, uvp, vvp)                                         \
66 do {                                                            \
67 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec;          \
68 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec;       \
69 if ((vvp)->tv_usec < 0) {                               \
70 (vvp)->tv_sec--;                                \
71 (vvp)->tv_usec += 1000000;                      \
72 }                                                       \
73 } while (0)
74
75 #endif
76
77 void hand(int iv) {
78     ftaapp_exit();
79     fprintf(stderr, "exiting via signal handler %d...\n", iv);
80     exit(1);
81 }
82
83 static void wait_for_client() {
84     struct sockaddr_in serv_addr,cli_addr;
85     socklen_t clilen;
86     if (listensockfd==0) {
87                 gs_int32_t on = 1;
88                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
89         if (listensockfd < 0) {
90                         gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
91                         exit(1);
92                 }
93                 bzero((char *) &serv_addr, sizeof(serv_addr));
94                 serv_addr.sin_family = AF_INET;
95                 serv_addr.sin_addr.s_addr = INADDR_ANY;
96                 serv_addr.sin_port = htons(tcpport);
97 #ifndef __linux__
98         /* make sure we can reuse the common port rapidly */
99         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
100                        (gs_sp_t )&on, sizeof(on)) != 0) {
101             gslog(LOG_EMERG,"Error::could not set socket option");
102             exit(1);
103         }
104 #endif
105         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
106                        (gs_sp_t )&on, sizeof(on)) != 0) {
107             gslog(LOG_EMERG,"Error::could not set socket option");
108             exit(1);
109                 }
110         
111                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
112                  sizeof(serv_addr)) < 0) {
113                         gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
114             exit(1);
115         }
116         }
117     
118         do {
119                 listen(listensockfd,5);
120                 clilen = sizeof(cli_addr);
121                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
122                 if (fd<0) {
123             gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
124                 }
125         } while (fd==0);
126 }
127
128
129 static void emit_socket() {
130         unsigned o,w,l;
131         o=0;
132         w=0;
133         l=strlen(linebuf);
134         do {
135                 if((w=write(fd,&linebuf[o],l))==0) {
136                         close(fd);
137                         wait_for_client();
138                 }
139                 o=o+w;
140         } while (o<l);
141 }
142
143 static void emit_line() {
144     
145     if (tcpport==0) {
146         printf("%s",linebuf);
147     } else {
148         emit_socket();
149     }
150     
151 }
152
153 int main(int argc, char* argv[]) {
154     gs_sp_t me = argv[0];
155     FTAID fta_id;
156     gs_int32_t schema, ch;
157     
158     FTAID rfta_id;
159     gs_uint32_t rsize;
160     gs_uint32_t bufsz=8*1024*1024;
161     gs_int8_t rbuf[2*MAXTUPLESZ];
162     
163     gs_int32_t numberoffields;
164     gs_int32_t verbose=0;
165     gs_int32_t y, lcv;
166     
167     void *pblk;
168     gs_int32_t pblklen;
169         gs_int32_t n_actual_param;
170         gs_int32_t n_expected_param;
171     gs_int32_t xit = 0;
172     gs_int32_t dump = 0;
173     struct timeval tvs, tve, tvd;
174     gs_retval_t code;
175     endpoint gshub;
176     endpoint dummyep;
177     gs_uint32_t tip1,tip2,tip3,tip4;
178     gs_sp_t instance_name;
179
180     // RMR-related parameters
181     gs_sp_t     rmr_port = NULL;
182     gs_int32_t  rmr_mtype = MC_REPORT;    
183
184         gs_sp_t curl_address = NULL;
185         endpoint curl_endpoint;
186         gs_sp_t curl_url = NULL;
187         gs_sp_t curl_auth = NULL;
188         gs_uint32_t http_code;
189
190         gs_uint32_t ves_version=7;
191
192         void* mrc;                                                      //msg router context
193         struct epoll_event events[1];                   // list of events to give to epoll
194         struct epoll_event epe;                 // event definition for event to listen to
195         gs_int32_t     ep_fd = -1;                                              // epoll's file des (given to epoll_wait)
196         gs_int32_t rcv_fd;                                              // file des that NNG tickles -- give this to epoll to listen on
197         gs_int32_t nready;                                                              // number of events ready for receive
198         rmr_mbuf_t*             rmr_sbuf;                                       // send buffer
199         rmr_mbuf_t*             rmr_rbuf;                                       // received buffer
200
201     gs_uint32_t tlimit = 0;     // time limit in seconds
202     time_t start_time, curr_time;
203
204     gs_uint64_t post_success_cnt = 0ULL;
205     gs_uint64_t post_failure_cnt = 0ULL;    
206
207     gs_uint64_t rmr_post_success_cnt = 0ULL;
208     gs_uint64_t rmr_post_failure_cnt = 0ULL;    
209
210     
211         gsopenlog(argv[0]);
212     
213     while ((ch = getopt(argc, argv, "l:p:r:vXDC:U:R:A:V:")) != -1) {
214         switch (ch) {
215             case 'r':
216                 bufsz=atoi(optarg);
217                 break;
218             case 'p':
219                 tcpport=atoi(optarg);
220                 break;
221             case 'v':
222                 verbose++;
223                 break;
224             case 'X':
225                 xit++;
226                 break;
227             case 'D':
228                 dump++;
229                 break;
230             case 'l':
231                 tlimit = atoi(optarg);
232                 break;
233             case 'V':
234                 ves_version = atoi(optarg);
235                 break;
236                         case 'C':
237                                 curl_address = strdup(optarg);
238                         if (sscanf(curl_address,"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(curl_endpoint.port))!= 5 ) {
239                                 gslog(LOG_EMERG,"Curl IP NOT DEFINED");
240                                 exit(1);
241                         }
242                         curl_endpoint.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
243                         curl_endpoint.port=htons(curl_endpoint.port);
244                                 break;
245                         case 'R':
246                 rmr_port=strdup(optarg);
247                 break;
248             case 'U':
249                                 curl_url = strdup(optarg);
250                 break;
251             case 'A':
252                                 curl_auth = strdup(optarg);
253                 break;
254             default:
255             usage:
256                 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-C <curl_dest>:<curl_port>] [-U <curl_url>] [-A <authentication_string>] [-V <ves_version>] [-R <rmr_port>] <gshub-hostname>:<gshub-port> <gsinstance_name>  query param1 param2...\n",
257                         *argv);
258                 exit(1);
259         }
260     }
261     argc -= optind;
262     argv += optind;
263     if (argc<3) goto usage;
264     
265     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
266         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
267         exit(1);
268     }
269     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
270     gshub.port=htons(gshub.port);
271     instance_name=strdup(argv[1]);
272     if (set_hub(gshub)!=0) {
273         gslog(LOG_EMERG,"Could not set hub");
274         exit(1);
275     }
276     if (set_instance_name(instance_name)!=0) {
277         gslog(LOG_EMERG,"Could not set instance name");
278         exit(1);
279     }
280     
281     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
282         gslog(LOG_EMERG,"Did not receive signal that GS is initiated");
283     }
284
285
286 //      If this uses curl output, ensure consistency in the curl args
287         if(curl_address != NULL){
288                 if(curl_url == NULL){
289                         gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar");
290                         exit(1);
291                 }
292                 if(curl_auth==NULL){
293                         curl_auth = "";
294                 } 
295         }
296     
297     gettimeofday(&tvs, 0);
298     argc -=2;
299     argv +=2;
300     if (argc < 1)
301         goto usage;
302
303     if (rmr_port) {
304         /* initialize RMR library */
305         if( (mrc = rmr_init( rmr_port, 1400, RMRFL_NONE )) == NULL ) {
306             fprintf(stderr, "%s::error:unable to initialise RMR\n", me);
307             exit( 1 );
308         }
309
310         rcv_fd = rmr_get_rcvfd( mrc );                                  // set up epoll things, start by getting the FD from MRr
311         if( rcv_fd < 0 ) {
312             fprintf(stderr, "%s::error:unable to set up polling fd\n", me);
313             exit( 1 );
314         }
315         if( (ep_fd = epoll_create1( 0 )) < 0 ) {
316             fprintf(stderr, "%s::error:unable to create epoll fd: %d\n", me, errno);
317             exit( 1 );
318         }
319         epe.events = EPOLLIN;
320         epe.data.fd = rcv_fd;
321
322         if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
323             fprintf(stderr, "%s::error:epoll_ctl status not 0 : %s\n", me, strerror(errno));
324             exit( 1 );
325         }
326
327         rmr_sbuf = rmr_alloc_msg( mrc, MAXLINE );       // alloc first send buffer; subsequent buffers allcoated on send
328         rmr_rbuf = NULL;                                                // don't need to alloc receive buffer
329
330         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr say it has one
331             sleep( 1 );
332         }
333         fprintf( stderr, "RMR is ready\n" );        
334     }
335         
336     /* initialize host library and the sgroup  */
337     
338     if (verbose>=2) fprintf(stderr,"Initializing gscp\n");
339     
340     if (ftaapp_init(bufsz)!=0) {
341         fprintf(stderr,"%s::error:could not initialize gscp\n", me);
342         exit(1);
343     }
344     
345     signal(SIGTERM, hand);
346     signal(SIGINT, hand);
347     
348     schema = ftaapp_get_fta_schema_by_name(argv[0]);
349     if (schema < 0) {
350         fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
351                 me ,argv[0]);
352         exit(1);
353     }
354         n_expected_param = ftaschema_parameter_len(schema);
355     if (n_expected_param == 0) {
356         pblk = 0;
357         pblklen = 0;
358     } else {
359                 n_actual_param = argc-1;
360                 if(n_actual_param < n_expected_param){
361                         fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
362                         exit(1);
363                 }
364         /* parse the params */
365         for (lcv = 1 ; lcv < argc ; lcv++) {
366             char *k, *e;
367             int rv;
368             k = argv[lcv];
369             e = k;
370             while (*e && *e != '=') e++;
371             if (*e == 0) {
372                 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
373                         argv[lcv]);
374                 exit(1);
375             }
376             *e = 0;
377             rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
378             *e = '=';
379             if (rv < 0) {
380                 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
381                         argv[lcv]);
382                 exit(1);
383             }
384         }
385         if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
386             fprintf(stderr, "ftaschema_create_param_block failed!\n");
387             exit(1);
388         }
389     }
390 //    ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used
391     
392     
393     if (verbose>=2) fprintf(stderr,"Initalized FTA\n");
394     
395     fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
396     if (fta_id.streamid==0) {
397         fprintf(stderr,"%s::error:could not initialize fta %s\n",
398                 me, argv[0]);
399         exit(1);
400     }
401     /* XXXCDC: pblk is malloc'd, should we free it? */
402     
403     if (verbose>=2) fprintf(stderr,"Get schema handle\n");
404     
405     if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
406         fprintf(stderr,"%s::error:could not get schema\n", me);
407         exit(1);
408     }
409     
410     if ((numberoffields=ftaschema_tuple_len(schema))<0) {
411         fprintf(stderr,"%s::error:could not get number of fields in schema\n",
412                 me);
413         exit(1);
414     }
415     
416     if (verbose>=1) {
417         for(y=0; y<numberoffields;y++) {
418             printf("%s",ftaschema_field_name(schema,y));
419             if (y<numberoffields-1) printf("|");
420         }
421         printf("\n");
422     }
423     if (xit) {  // -X in command line
424         gettimeofday(&tve, 0);
425         timersub(&tve, &tvs, &tvd);
426         printf("TIME= %ld%06d sec\n", tvd.tv_sec, tvd.tv_usec);
427         hand(0);        // effectively an exit
428     }
429     if (tcpport!=0) {
430         wait_for_client();
431     }
432
433     start_time = time(NULL);
434
435         int measurement_interval_pos = -1; // extract measurementInterval if present
436         char *field_names[numberoffields];
437     for(y=0; y<numberoffields;y++) {
438                 field_names[y] = strdup(ftaschema_field_name(schema,y));
439                 if(strcmp(field_names[y], "measurementInterval")==0)
440                         measurement_interval_pos = y;
441         }
442
443
444         struct timeval tsample;
445         gettimeofday(&tsample, 0);
446         char start_ts[100], curr_ts[100];
447         sprintf(start_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
448
449         long unsigned int lineno=0;
450         long unsigned int seqno=0;
451         double measurement_interval;
452     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
453                 lineno++;
454         if (dump)       // -D in command line
455             continue;
456         if (ftaschema_is_eof_tuple(schema, rbuf)) {
457             /* initiate shutdown or something of that nature */
458             printf("#All data proccessed\n");
459             exit(0);
460         }
461         if (!rsize)
462             continue;
463         if (verbose >=2) {
464             snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
465             emit_line();
466         }
467         if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
468                         seqno++;
469                         gettimeofday(&tsample, 0);
470                         sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
471                         int pos;
472                         if(ves_version < 7){
473                                 pos = snprintf(linebuf, MAXLINE,
474   "{\"event\": { \"commonEventHeader\": { "
475         "\"domain\": \"measurementsForVfScaling\", "
476         "\"eventId\": \"%s%u\", "
477         "\"eventType\": \"%s\", "
478         "\"eventName\": \"Measurement_MC_%s\", "
479         "\"lastEpochMicrosec\": %s, "
480         "\"priority\": \"Normal\", "
481         "\"reportingEntityName\": \"GS-LITE MC\", "
482         "\"sequence\": %u, "
483         "\"sourceName\": \"meas_cmpgn_xapp\", "
484         "\"startEpochMicrosec\": %s, "
485         "\"version\": 5 "
486       "}, "
487       "\"measurementsForVfScalingFields\": { "
488                 "\"additionalFields\": ["
489                                 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
490                                 );
491                         }else{
492                                 pos = snprintf(linebuf, MAXLINE,
493   "{\"event\": { \"commonEventHeader\": { "
494         "\"domain\": \"measurement\", "
495         "\"eventId\": \"%s%u\", "
496         "\"eventType\": \"%s\", "
497         "\"eventName\": \"Measurement_MC_%s\", "
498         "\"lastEpochMicrosec\": %s, "
499         "\"priority\": \"Normal\", "
500         "\"reportingEntityName\": \"GS-LITE MC\", "
501         "\"sequence\": %u, "
502         "\"sourceName\": \"meas_cmpgn_xapp\", "
503         "\"startEpochMicrosec\": %s, "
504         "\"version\": \"4.0.1\", "
505         "\"vesEventListenerVersion\": \"7.0.1\" "
506       "}, "
507       "\"measurementFields\": { "
508                 "\"additionalFields\": {"
509                                 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
510                                 );
511                         }
512
513                         measurement_interval = 0;
514             for(y=0; y<numberoffields;y++) {
515                 struct access_result ar;
516 //                if (verbose>=2)
517 //                    printf("%s->",ftaschema_field_name(schema,y));
518                 if(y>0){
519                                         linebuf[pos]=',';
520                                         pos++;
521                                 }
522                 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
523                 switch (ar.field_data_type) {
524                     case INT_TYPE:
525                                                 if(ves_version < 7)
526                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i);
527                                                 else
528                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i);
529                                                 if(y==measurement_interval_pos)
530                                                         measurement_interval = (double)ar.r.i;
531                         break;
532                     case UINT_TYPE:
533                                                 if(ves_version < 7)
534                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
535                                                 else
536                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui);
537                                                 if(y==measurement_interval_pos)
538                                                         measurement_interval = (double)ar.r.ui;
539                         break;
540                     case IP_TYPE:
541                                                 if(ves_version < 7)
542                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff,
543                                  ar.r.ui>>16&0xff,
544                                  ar.r.ui>>8&0xff,
545                                  ar.r.ui&0xff);
546                                                 else
547                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff,
548                                  ar.r.ui>>16&0xff,
549                                  ar.r.ui>>8&0xff,
550                                  ar.r.ui&0xff);
551                         break;
552                     case IPV6_TYPE:
553                     {
554                                                 if(ves_version < 7)
555                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
556                                                 else
557                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]);
558                         unsigned x;
559                         unsigned zc=0;
560                         for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
561                         if (zc!=4) {
562                             snprintf(linebuf,MAXLINE,"");
563                             for(x=0;x<8;x++) {
564                                 unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
565                                 unsigned y;
566                                 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
567                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y);
568                                 if (x<7){
569                                                                         pos += snprintf(linebuf+pos,MAXLINE-pos,":");
570                                                                 }
571                             }
572                         } else {
573                             pos+=snprintf(linebuf+pos,MAXLINE-pos,"::");
574                         }
575                                                 if(ves_version < 7)
576                                                         pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}");
577                                                 else
578                                                         pos += snprintf(linebuf+pos, MAXLINE-pos,"\"");
579                     }
580                         break;
581                         
582                     case USHORT_TYPE:
583                                                 if(ves_version < 7)
584                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
585                                                 else
586                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\":  \"%u\"}",field_names[y], ar.r.ui);
587                                                 if(y==measurement_interval_pos)
588                                                         measurement_interval = (double)ar.r.ui;
589                         break;
590                     case BOOL_TYPE:
591                                                 if(ves_version < 7){
592                                 if (ar.r.ui==0) {
593                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]);
594                                      } else {
595                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]);
596                                 }
597                                                 }else{
598                                 if (ar.r.ui==0) {
599                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]);
600                                      } else {
601                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]);
602                                 }
603                                                 }
604                         break;
605                     case ULLONG_TYPE:
606                                                 if(ves_version < 7)
607                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul);
608                                                 else
609                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul);
610                                                 if(y==measurement_interval_pos)
611                                                         measurement_interval = (double)ar.r.ul;
612                         break;
613                     case LLONG_TYPE:
614                                                 if(ves_version < 7)
615                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l);
616                                                 else
617                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l);
618                                                 if(y==measurement_interval_pos)
619                                                         measurement_interval = (double)ar.r.l;
620                         break;
621                     case FLOAT_TYPE:
622                                                 if(ves_version < 7)
623                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f);
624                                                 else
625                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f);
626                                                 if(y==measurement_interval_pos)
627                                                         measurement_interval = (double)ar.r.f;                            
628                         break;
629                     case TIMEVAL_TYPE:
630                     {
631                         gs_float_t t;
632                         t= ar.r.t.tv_usec;
633                         t=t/1000000;
634                         t=t+ar.r.t.tv_sec;
635                                                 if(ves_version < 7)
636                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t);
637                                                 else
638                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t);
639                     }
640                         break;
641                     case VSTR_TYPE:
642                     {
643                                                 if(ves_version < 7)
644                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
645                                                 else
646                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]);
647                         int x;
648                         int c;
649                         char * src;
650                         src=(char*)ar.r.vs.offset;
651                                                 for(x=0;x<ar.r.vs.length;x++){
652                             c=src[x];
653                             if ((c<='~') && (c>=' ')) {
654                                 if (pos<MAXLINE-1) {
655                                     linebuf[pos]=c;
656                                     pos++;
657                                 }
658                             } else {
659                                 if (pos<MAXLINE-1) {
660                                     linebuf[pos]='.';
661                                     pos++;
662                                 }
663                             }
664                                                 }
665                                                 if(ves_version < 7)
666                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"}");
667                                                 else
668                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"");
669                     }
670                         break;
671                     default:
672                         linebuf[0]=0;
673                         break;
674                 }
675                         }
676                         if(ves_version < 7){
677                                 snprintf(linebuf+pos, MAXLINE-pos,
678                 "], \"measurementInterval\": %f, \"measurementsForVfScalingVersion\": 1"
679                 "}}}\n", measurement_interval
680                                 );
681                         }else{
682                                 snprintf(linebuf+pos, MAXLINE-pos,
683                 "}, \"measurementInterval\": %f, \"measurementFieldsVersion\": \"4.0\""
684                 "}}}\n", measurement_interval
685                                 );
686                         }
687
688             if (rmr_port) {
689                 rmr_sbuf->mtype = rmr_mtype;                                                    // fill in the message bits
690                 rmr_sbuf->len =  strlen(linebuf) + 1;           // our receiver likely wants a nice acsii-z string
691                 rmr_sbuf->state = 0;
692                 rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf);                                // send it (send returns an empty payload on success, or the original payload on fail/retry)
693                 while( rmr_sbuf->state == RMR_ERR_RETRY ) {                     // soft failure (device busy?) retry
694                     rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf);                    // retry send until it's good (simple test; real programmes should do better)
695                 }
696                 rmr_post_success_cnt++;
697
698             }
699
700                         if(curl_address==NULL){
701                 emit_line();
702                         }else{
703                                 http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth);
704                                 if(http_code != 200 && http_code != 202){
705                     post_failure_cnt++; 
706                                         gslog(LOG_WARNING, "http return code is %d",http_code);
707                                 } else {
708                     post_success_cnt++;   
709                 }  
710                 if (((post_success_cnt+post_failure_cnt) % STAT_FREQUENCY) == 0)
711                     gslog(LOG_WARNING, "%s: successful ves posts - %llu, failed ves posts - %llu", argv[0], post_success_cnt, post_failure_cnt);
712                         }
713             if (verbose!=0) fflush(stdout);
714         } else {
715             if (rfta_id.streamid != fta_id.streamid)
716                 fprintf(stderr,"Got unknown streamid %llu \n",rfta_id.streamid);
717         }
718
719         // whenever we receive a temp tuple check if we reached time limit
720         if ((code==2)  && tlimit && (time(NULL)-start_time)>=tlimit) {
721             fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);    
722             ftaapp_exit();
723             exit(0);
724         }        
725     }
726 }
727