5bb6205f9dd75b4adc4618c520e971646422647d
[com/gs-lite.git] / src / tools / gsprintconsole_ves.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15
16
17 /*
18  * Print ves formatted records to the console.
19  * Each line is a json record.
20  * Based on gsprintconsole.c, just differences in formatting.
21 */
22
23
24 #include <app.h>
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <signal.h>
29 #include <time.h>
30 #include <string.h>
31 #include <sys/time.h>
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36
37
38 #include "gsconfig.h"
39 #include "gstypes.h"
40 #include "gshub.h"
41 #include "simple_http.h"
42
43 #include <schemaparser.h>
44
45 #define MAXLINE 100000
46 static unsigned tcpport=0;
47 static char linebuf[MAXLINE];
48 int listensockfd=0;
49 int fd=0;
50
51 // how frequently we will log stats (expressed in tuples posted)
52 #define STAT_FREQUENCY 5
53
54
55 // Not all systems have timersub defined so make sure its ther
56 #ifndef timersub
57
58 #define timersub(tvp, uvp, vvp)                                         \
59 do {                                                            \
60 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec;          \
61 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec;       \
62 if ((vvp)->tv_usec < 0) {                               \
63 (vvp)->tv_sec--;                                \
64 (vvp)->tv_usec += 1000000;                      \
65 }                                                       \
66 } while (0)
67
68 #endif
69
70 void hand(int iv) {
71     ftaapp_exit();
72     fprintf(stderr, "exiting via signal handler %d...\n", iv);
73     exit(1);
74 }
75
76 static void wait_for_client() {
77     struct sockaddr_in serv_addr,cli_addr;
78     socklen_t clilen;
79     if (listensockfd==0) {
80                 gs_int32_t on = 1;
81                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
82         if (listensockfd < 0) {
83                         gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
84                         exit(1);
85                 }
86                 bzero((char *) &serv_addr, sizeof(serv_addr));
87                 serv_addr.sin_family = AF_INET;
88                 serv_addr.sin_addr.s_addr = INADDR_ANY;
89                 serv_addr.sin_port = htons(tcpport);
90 #ifndef __linux__
91         /* make sure we can reuse the common port rapidly */
92         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
93                        (gs_sp_t )&on, sizeof(on)) != 0) {
94             gslog(LOG_EMERG,"Error::could not set socket option");
95             exit(1);
96         }
97 #endif
98         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
99                        (gs_sp_t )&on, sizeof(on)) != 0) {
100             gslog(LOG_EMERG,"Error::could not set socket option");
101             exit(1);
102                 }
103         
104                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
105                  sizeof(serv_addr)) < 0) {
106                         gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
107             exit(1);
108         }
109         }
110     
111         do {
112                 listen(listensockfd,5);
113                 clilen = sizeof(cli_addr);
114                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
115                 if (fd<0) {
116             gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
117                 }
118         } while (fd==0);
119 }
120
121
122 static void emit_socket() {
123         unsigned o,w,l;
124         o=0;
125         w=0;
126         l=strlen(linebuf);
127         do {
128                 if((w=write(fd,&linebuf[o],l))==0) {
129                         close(fd);
130                         wait_for_client();
131                 }
132                 o=o+w;
133         } while (o<l);
134 }
135
136 static void emit_line() {
137     
138     if (tcpport==0) {
139         printf("%s",linebuf);
140     } else {
141         emit_socket();
142     }
143     
144 }
145
146 int main(int argc, char* argv[]) {
147     gs_sp_t me = argv[0];
148     FTAID fta_id;
149     gs_int32_t schema, ch;
150     
151     FTAID rfta_id;
152     gs_uint32_t rsize;
153     gs_uint32_t bufsz=8*1024*1024;
154     gs_int8_t rbuf[2*MAXTUPLESZ];
155     
156     gs_int32_t numberoffields;
157     gs_int32_t verbose=0;
158     gs_int32_t y, lcv;
159     
160     void *pblk;
161     gs_int32_t pblklen;
162         gs_int32_t n_actual_param;
163         gs_int32_t n_expected_param;
164     gs_int32_t xit = 0;
165     gs_int32_t dump = 0;
166     struct timeval tvs, tve, tvd;
167     gs_retval_t code;
168     endpoint gshub;
169     endpoint dummyep;
170     gs_uint32_t tip1,tip2,tip3,tip4;
171     gs_sp_t instance_name;
172
173         gs_sp_t curl_address = NULL;
174         endpoint curl_endpoint;
175         gs_sp_t curl_url = NULL;
176         gs_sp_t curl_auth = NULL;
177         gs_uint32_t http_code;
178
179         gs_uint32_t ves_version=7;
180
181
182     gs_uint32_t tlimit = 0;     // time limit in seconds
183     time_t start_time, curr_time;
184
185     gs_uint64_t post_success_cnt = 0ULL;
186     gs_uint64_t post_failure_cnt = 0ULL;    
187     
188         gsopenlog(argv[0]);
189     
190     
191     while ((ch = getopt(argc, argv, "l:p:r:vXDC:U:A:V:")) != -1) {
192         switch (ch) {
193             case 'r':
194                 bufsz=atoi(optarg);
195                 break;
196             case 'p':
197                 tcpport=atoi(optarg);
198                 break;
199             case 'v':
200                 verbose++;
201                 break;
202             case 'X':
203                 xit++;
204                 break;
205             case 'D':
206                 dump++;
207                 break;
208             case 'l':
209                 tlimit = atoi(optarg);
210                 break;
211             case 'V':
212                 ves_version = atoi(optarg);
213                 break;
214                         case 'C':
215                                 curl_address = strdup(optarg);
216                         if (sscanf(curl_address,"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(curl_endpoint.port))!= 5 ) {
217                                 gslog(LOG_EMERG,"Curl IP NOT DEFINED");
218                                 exit(1);
219                         }
220                         curl_endpoint.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
221                         curl_endpoint.port=htons(curl_endpoint.port);
222                                 break;
223             case 'U':
224                                 curl_url = strdup(optarg);
225                 break;
226             case 'A':
227                                 curl_auth = strdup(optarg);
228                 break;
229             default:
230             usage:
231                 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-C <curl_dest>:<curl_port>] [-U <curl_url>] [-A <authentication_string>] [-V <ves_version>] <gshub-hostname>:<gshub-port> <gsinstance_name>  query param1 param2...\n",
232                         *argv);
233                 exit(1);
234         }
235     }
236     argc -= optind;
237     argv += optind;
238     if (argc<3) goto usage;
239     
240     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
241         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
242         exit(1);
243     }
244     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
245     gshub.port=htons(gshub.port);
246     instance_name=strdup(argv[1]);
247     if (set_hub(gshub)!=0) {
248         gslog(LOG_EMERG,"Could not set hub");
249         exit(1);
250     }
251     if (set_instance_name(instance_name)!=0) {
252         gslog(LOG_EMERG,"Could not set instance name");
253         exit(1);
254     }
255     
256     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
257         gslog(LOG_EMERG,"Did not receive signal that GS is initiated");
258     }
259
260
261 //      If this uses curl output, ensure consistency in the curl args
262         if(curl_address != NULL){
263                 if(curl_url == NULL){
264                         gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar");
265                         exit(1);
266                 }
267                 if(curl_auth==NULL){
268                         curl_auth = "";
269                 } 
270         }
271     
272     gettimeofday(&tvs, 0);
273     argc -=2;
274     argv +=2;
275     if (argc < 1)
276         goto usage;
277     
278     /* initialize host library and the sgroup  */
279     
280     if (verbose>=2) fprintf(stderr,"Initializing gscp\n");
281     
282     if (ftaapp_init(bufsz)!=0) {
283         fprintf(stderr,"%s::error:could not initialize gscp\n", me);
284         exit(1);
285     }
286     
287     signal(SIGTERM, hand);
288     signal(SIGINT, hand);
289     
290     schema = ftaapp_get_fta_schema_by_name(argv[0]);
291     if (schema < 0) {
292         fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
293                 me ,argv[0]);
294         exit(1);
295     }
296         n_expected_param = ftaschema_parameter_len(schema);
297     if (n_expected_param == 0) {
298         pblk = 0;
299         pblklen = 0;
300     } else {
301                 n_actual_param = argc-1;
302                 if(n_actual_param < n_expected_param){
303                         fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
304                         exit(1);
305                 }
306         /* parse the params */
307         for (lcv = 1 ; lcv < argc ; lcv++) {
308             char *k, *e;
309             int rv;
310             k = argv[lcv];
311             e = k;
312             while (*e && *e != '=') e++;
313             if (*e == 0) {
314                 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
315                         argv[lcv]);
316                 exit(1);
317             }
318             *e = 0;
319             rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
320             *e = '=';
321             if (rv < 0) {
322                 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
323                         argv[lcv]);
324                 exit(1);
325             }
326         }
327         if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
328             fprintf(stderr, "ftaschema_create_param_block failed!\n");
329             exit(1);
330         }
331     }
332 //    ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used
333     
334     
335     if (verbose>=2) fprintf(stderr,"Initalized FTA\n");
336     
337     fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
338     if (fta_id.streamid==0) {
339         fprintf(stderr,"%s::error:could not initialize fta %s\n",
340                 me, argv[0]);
341         exit(1);
342     }
343     /* XXXCDC: pblk is malloc'd, should we free it? */
344     
345     if (verbose>=2) fprintf(stderr,"Get schema handle\n");
346     
347     if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
348         fprintf(stderr,"%s::error:could not get schema\n", me);
349         exit(1);
350     }
351     
352     if ((numberoffields=ftaschema_tuple_len(schema))<0) {
353         fprintf(stderr,"%s::error:could not get number of fields in schema\n",
354                 me);
355         exit(1);
356     }
357     
358     if (verbose>=1) {
359         for(y=0; y<numberoffields;y++) {
360             printf("%s",ftaschema_field_name(schema,y));
361             if (y<numberoffields-1) printf("|");
362         }
363         printf("\n");
364     }
365     if (xit) {  // -X in command line
366         gettimeofday(&tve, 0);
367         timersub(&tve, &tvs, &tvd);
368         printf("TIME= %ld%06d sec\n", tvd.tv_sec, tvd.tv_usec);
369         hand(0);        // effectively an exit
370     }
371     if (tcpport!=0) {
372         wait_for_client();
373     }
374
375     start_time = time(NULL);
376
377         int measurement_interval_pos = -1; // extract measurementInterval if present
378         char *field_names[numberoffields];
379     for(y=0; y<numberoffields;y++) {
380                 field_names[y] = strdup(ftaschema_field_name(schema,y));
381                 if(strcmp(field_names[y], "measurementInterval")==0)
382                         measurement_interval_pos = y;
383         }
384
385
386         struct timeval tsample;
387         gettimeofday(&tsample, 0);
388         char start_ts[100], curr_ts[100];
389         sprintf(start_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
390
391         long unsigned int lineno=0;
392         long unsigned int seqno=0;
393         double measurement_interval;
394     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
395                 lineno++;
396         if (dump)       // -D in command line
397             continue;
398         if (ftaschema_is_eof_tuple(schema, rbuf)) {
399             /* initiate shutdown or something of that nature */
400             printf("#All data proccessed\n");
401             exit(0);
402         }
403         if (!rsize)
404             continue;
405         if (verbose >=2) {
406             snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
407             emit_line();
408         }
409         if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
410                         seqno++;
411                         gettimeofday(&tsample, 0);
412                         sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
413                         int pos;
414                         if(ves_version < 7){
415                                 pos = snprintf(linebuf, MAXLINE,
416   "{\"event\": { \"commonEventHeader\": { "
417         "\"domain\": \"measurementsForVfScaling\", "
418         "\"eventId\": \"%s%u\", "
419         "\"eventType\": \"%s\", "
420         "\"eventName\": \"Measurement_MC_%s\", "
421         "\"lastEpochMicrosec\": %s, "
422         "\"priority\": \"Normal\", "
423         "\"reportingEntityName\": \"GS-LITE MC\", "
424         "\"sequence\": %u, "
425         "\"sourceName\": \"meas_cmpgn_xapp\", "
426         "\"startEpochMicrosec\": %s, "
427         "\"version\": 5 "
428       "}, "
429       "\"measurementsForVfScalingFields\": { "
430                 "\"additionalFields\": ["
431                                 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
432                                 );
433                         }else{
434                                 pos = snprintf(linebuf, MAXLINE,
435   "{\"event\": { \"commonEventHeader\": { "
436         "\"domain\": \"measurement\", "
437         "\"eventId\": \"%s%u\", "
438         "\"eventType\": \"%s\", "
439         "\"eventName\": \"Measurement_MC_%s\", "
440         "\"lastEpochMicrosec\": %s, "
441         "\"priority\": \"Normal\", "
442         "\"reportingEntityName\": \"GS-LITE MC\", "
443         "\"sequence\": %u, "
444         "\"sourceName\": \"meas_cmpgn_xapp\", "
445         "\"startEpochMicrosec\": %s, "
446         "\"version\": \"4.0.1\", "
447         "\"vesEventListenerVersion\": \"7.0.1\" "
448       "}, "
449       "\"measurementFields\": { "
450                 "\"additionalFields\": {"
451                                 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
452                                 );
453                         }
454
455                         measurement_interval = 0;
456             for(y=0; y<numberoffields;y++) {
457                 struct access_result ar;
458 //                if (verbose>=2)
459 //                    printf("%s->",ftaschema_field_name(schema,y));
460                 if(y>0){
461                                         linebuf[pos]=',';
462                                         pos++;
463                                 }
464                 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
465                 switch (ar.field_data_type) {
466                     case INT_TYPE:
467                                                 if(ves_version < 7)
468                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i);
469                                                 else
470                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i);
471                                                 if(y==measurement_interval_pos)
472                                                         measurement_interval = (double)ar.r.i;
473                         break;
474                     case UINT_TYPE:
475                                                 if(ves_version < 7)
476                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
477                                                 else
478                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui);
479                                                 if(y==measurement_interval_pos)
480                                                         measurement_interval = (double)ar.r.ui;
481                         break;
482                     case IP_TYPE:
483                                                 if(ves_version < 7)
484                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff,
485                                  ar.r.ui>>16&0xff,
486                                  ar.r.ui>>8&0xff,
487                                  ar.r.ui&0xff);
488                                                 else
489                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff,
490                                  ar.r.ui>>16&0xff,
491                                  ar.r.ui>>8&0xff,
492                                  ar.r.ui&0xff);
493                         break;
494                     case IPV6_TYPE:
495                     {
496                                                 if(ves_version < 7)
497                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
498                                                 else
499                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]);
500                         unsigned x;
501                         unsigned zc=0;
502                         for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
503                         if (zc!=4) {
504                             snprintf(linebuf,MAXLINE,"");
505                             for(x=0;x<8;x++) {
506                                 unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
507                                 unsigned y;
508                                 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
509                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y);
510                                 if (x<7){
511                                                                         pos += snprintf(linebuf+pos,MAXLINE-pos,":");
512                                                                 }
513                             }
514                         } else {
515                             pos+=snprintf(linebuf+pos,MAXLINE-pos,"::");
516                         }
517                                                 if(ves_version < 7)
518                                                         pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}");
519                                                 else
520                                                         pos += snprintf(linebuf+pos, MAXLINE-pos,"\"");
521                     }
522                         break;
523                         
524                     case USHORT_TYPE:
525                                                 if(ves_version < 7)
526                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
527                                                 else
528                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\":  \"%u\"}",field_names[y], ar.r.ui);
529                                                 if(y==measurement_interval_pos)
530                                                         measurement_interval = (double)ar.r.ui;
531                         break;
532                     case BOOL_TYPE:
533                                                 if(ves_version < 7){
534                                 if (ar.r.ui==0) {
535                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]);
536                                      } else {
537                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]);
538                                 }
539                                                 }else{
540                                 if (ar.r.ui==0) {
541                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]);
542                                      } else {
543                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]);
544                                 }
545                                                 }
546                         break;
547                     case ULLONG_TYPE:
548                                                 if(ves_version < 7)
549                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul);
550                                                 else
551                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul);
552                                                 if(y==measurement_interval_pos)
553                                                         measurement_interval = (double)ar.r.ul;
554                         break;
555                     case LLONG_TYPE:
556                                                 if(ves_version < 7)
557                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l);
558                                                 else
559                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l);
560                                                 if(y==measurement_interval_pos)
561                                                         measurement_interval = (double)ar.r.l;
562                         break;
563                     case FLOAT_TYPE:
564                                                 if(ves_version < 7)
565                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f);
566                                                 else
567                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f);
568                                                 if(y==measurement_interval_pos)
569                                                         measurement_interval = (double)ar.r.f;                            
570                         break;
571                     case TIMEVAL_TYPE:
572                     {
573                         gs_float_t t;
574                         t= ar.r.t.tv_usec;
575                         t=t/1000000;
576                         t=t+ar.r.t.tv_sec;
577                                                 if(ves_version < 7)
578                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t);
579                                                 else
580                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t);
581                     }
582                         break;
583                     case VSTR_TYPE:
584                     {
585                                                 if(ves_version < 7)
586                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
587                                                 else
588                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]);
589                         int x;
590                         int c;
591                         char * src;
592                         src=(char*)ar.r.vs.offset;
593                                                 for(x=0;x<ar.r.vs.length;x++){
594                             c=src[x];
595                             if ((c<='~') && (c>=' ')) {
596                                 if (pos<MAXLINE-1) {
597                                     linebuf[pos]=c;
598                                     pos++;
599                                 }
600                             } else {
601                                 if (pos<MAXLINE-1) {
602                                     linebuf[pos]='.';
603                                     pos++;
604                                 }
605                             }
606                                                 }
607                                                 if(ves_version < 7)
608                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"}");
609                                                 else
610                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"");
611                     }
612                         break;
613                     default:
614                         linebuf[0]=0;
615                         break;
616                 }
617                         }
618                         if(ves_version < 7){
619                                 snprintf(linebuf+pos, MAXLINE-pos,
620                 "], \"measurementInterval\": %f, \"measurementsForVfScalingVersion\": 1"
621                 "}}}\n", measurement_interval
622                                 );
623                         }else{
624                                 snprintf(linebuf+pos, MAXLINE-pos,
625                 "}, \"measurementInterval\": %f, \"measurementFieldsVersion\": \"4.0\""
626                 "}}}\n", measurement_interval
627                                 );
628                         }
629                         if(curl_address==NULL){
630                 emit_line();
631                         }else{
632                                 http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth);
633                                 if(http_code != 200 && http_code != 202){
634                     post_failure_cnt++; 
635                                         gslog(LOG_WARNING, "http return code is %d",http_code);
636                                 } else {
637                     post_success_cnt++;   
638                 }  
639                 if (((post_success_cnt+post_failure_cnt) % STAT_FREQUENCY) == 0)
640                     gslog(LOG_WARNING, "%s: successful ves posts - %llu, failed ves posts - %llu", argv[0], post_success_cnt, post_failure_cnt);
641                         }
642             if (verbose!=0) fflush(stdout);
643         } else {
644             if (rfta_id.streamid != fta_id.streamid)
645                 fprintf(stderr,"Got unknown streamid %llu \n",rfta_id.streamid);
646         }
647
648         // whenever we receive a temp tuple check if we reached time limit
649         if ((code==2)  && tlimit && (time(NULL)-start_time)>=tlimit) {
650             fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);    
651             ftaapp_exit();
652             exit(0);
653         }        
654     }
655 }
656