92083169f23c43cddc9e90712967baeb85cdd111
[com/gs-lite.git] / src / tools / gsprintconsole_ves.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15
16
17 /*
18  * Print ves formatted records to the console.
19  * Each line is a json record.
20  * Based on gsprintconsole.c, just differences in formatting.
21 */
22
23
24 #include <app.h>
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <signal.h>
29 #include <time.h>
30 #include <string.h>
31 #include <sys/time.h>
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36
37
38 #include "gsconfig.h"
39 #include "gstypes.h"
40 #include "gshub.h"
41 #include "simple_http.h"
42
43 #include <schemaparser.h>
44
45 #define MAXLINE 100000
46 static unsigned tcpport=0;
47 static char linebuf[MAXLINE];
48 int listensockfd=0;
49 int fd=0;
50
51
52 // Not all systems have timersub defined so make sure its ther
53 #ifndef timersub
54
55 #define timersub(tvp, uvp, vvp)                                         \
56 do {                                                            \
57 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec;          \
58 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec;       \
59 if ((vvp)->tv_usec < 0) {                               \
60 (vvp)->tv_sec--;                                \
61 (vvp)->tv_usec += 1000000;                      \
62 }                                                       \
63 } while (0)
64
65 #endif
66
67 void hand(int iv) {
68     ftaapp_exit();
69     fprintf(stderr, "exiting via signal handler %d...\n", iv);
70     exit(1);
71 }
72
73 static void wait_for_client() {
74     struct sockaddr_in serv_addr,cli_addr;
75     socklen_t clilen;
76     if (listensockfd==0) {
77                 gs_int32_t on = 1;
78                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
79         if (listensockfd < 0) {
80                         gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
81                         exit(1);
82                 }
83                 bzero((char *) &serv_addr, sizeof(serv_addr));
84                 serv_addr.sin_family = AF_INET;
85                 serv_addr.sin_addr.s_addr = INADDR_ANY;
86                 serv_addr.sin_port = htons(tcpport);
87 #ifndef __linux__
88         /* make sure we can reuse the common port rapidly */
89         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
90                        (gs_sp_t )&on, sizeof(on)) != 0) {
91             gslog(LOG_EMERG,"Error::could not set socket option\n");
92             exit(1);
93         }
94 #endif
95         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
96                        (gs_sp_t )&on, sizeof(on)) != 0) {
97             gslog(LOG_EMERG,"Error::could not set socket option\n");
98             exit(1);
99                 }
100         
101                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
102                  sizeof(serv_addr)) < 0) {
103                         gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
104             exit(1);
105         }
106         }
107     
108         do {
109                 listen(listensockfd,5);
110                 clilen = sizeof(cli_addr);
111                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
112                 if (fd<0) {
113             gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
114                 }
115         } while (fd==0);
116 }
117
118
119 static void emit_socket() {
120         unsigned o,w,l;
121         o=0;
122         w=0;
123         l=strlen(linebuf);
124         do {
125                 if((w=write(fd,&linebuf[o],l))==0) {
126                         close(fd);
127                         wait_for_client();
128                 }
129                 o=o+w;
130         } while (o<l);
131 }
132
133 static void emit_line() {
134     
135     if (tcpport==0) {
136         printf("%s",linebuf);
137     } else {
138         emit_socket();
139     }
140     
141 }
142
143 int main(int argc, char* argv[]) {
144     gs_sp_t me = argv[0];
145     FTAID fta_id;
146     gs_int32_t schema, ch;
147     
148     FTAID rfta_id;
149     gs_uint32_t rsize;
150     gs_uint32_t bufsz=8*1024*1024;
151     gs_int8_t rbuf[2*MAXTUPLESZ];
152     
153     gs_int32_t numberoffields;
154     gs_int32_t verbose=0;
155     gs_int32_t y, lcv;
156     
157     void *pblk;
158     gs_int32_t pblklen;
159         gs_int32_t n_actual_param;
160         gs_int32_t n_expected_param;
161     gs_int32_t xit = 0;
162     gs_int32_t dump = 0;
163     struct timeval tvs, tve, tvd;
164     gs_retval_t code;
165     endpoint gshub;
166     endpoint dummyep;
167     gs_uint32_t tip1,tip2,tip3,tip4;
168     gs_sp_t instance_name;
169
170         gs_sp_t curl_address = NULL;
171         endpoint curl_endpoint;
172         gs_sp_t curl_url = NULL;
173         gs_sp_t curl_auth = NULL;
174         gs_uint32_t http_code;
175
176         gs_uint32_t ves_version=5;
177
178
179     gs_uint32_t tlimit = 0;     // time limit in seconds
180     time_t start_time, curr_time;
181     
182         gsopenlog(argv[0]);
183     
184     
185     while ((ch = getopt(argc, argv, "l:p:r:vXDC:U:A:V:")) != -1) {
186         switch (ch) {
187             case 'r':
188                 bufsz=atoi(optarg);
189                 break;
190             case 'p':
191                 tcpport=atoi(optarg);
192                 break;
193             case 'v':
194                 verbose++;
195                 break;
196             case 'X':
197                 xit++;
198                 break;
199             case 'D':
200                 dump++;
201                 break;
202             case 'l':
203                 tlimit = atoi(optarg);
204                 break;
205             case 'V':
206                 ves_version = atoi(optarg);
207                 break;
208                         case 'C':
209                                 curl_address = strdup(optarg);
210                         if (sscanf(curl_address,"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(curl_endpoint.port))!= 5 ) {
211                                 gslog(LOG_EMERG,"Curl IP NOT DEFINED");
212                                 exit(1);
213                         }
214                         curl_endpoint.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
215                         curl_endpoint.port=htons(curl_endpoint.port);
216                                 break;
217             case 'U':
218                                 curl_url = strdup(optarg);
219                 break;
220             case 'A':
221                                 curl_auth = strdup(optarg);
222                 break;
223             default:
224             usage:
225                 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-C <curl_dest>:<curl_port>] [-U <curl_url>] [-A <authentication_string>] [-V <ves_version>] <gshub-hostname>:<gshub-port> <gsinstance_name>  query param1 param2...\n",
226                         *argv);
227                 exit(1);
228         }
229     }
230     argc -= optind;
231     argv += optind;
232     if (argc<3) goto usage;
233     
234     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
235         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
236         exit(1);
237     }
238     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
239     gshub.port=htons(gshub.port);
240     instance_name=strdup(argv[1]);
241     if (set_hub(gshub)!=0) {
242         gslog(LOG_EMERG,"Could not set hub");
243         exit(1);
244     }
245     if (set_instance_name(instance_name)!=0) {
246         gslog(LOG_EMERG,"Could not set instance name");
247         exit(1);
248     }
249     
250     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
251         gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
252     }
253
254
255 //      If this uses curl output, ensure consistency in the curl args
256         if(curl_address != NULL){
257                 if(curl_url == NULL){
258                         gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar");
259                         exit(1);
260                 }
261                 if(curl_auth==NULL){
262                         curl_auth = "";
263                 } 
264         }
265     
266     gettimeofday(&tvs, 0);
267     argc -=2;
268     argv +=2;
269     if (argc < 1)
270         goto usage;
271     
272     /* initialize host library and the sgroup  */
273     
274     if (verbose>=2) fprintf(stderr,"Initializing gscp\n");
275     
276     if (ftaapp_init(bufsz)!=0) {
277         fprintf(stderr,"%s::error:could not initialize gscp\n", me);
278         exit(1);
279     }
280     
281     signal(SIGTERM, hand);
282     signal(SIGINT, hand);
283     
284     schema = ftaapp_get_fta_schema_by_name(argv[0]);
285     if (schema < 0) {
286         fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
287                 me ,argv[0]);
288         exit(1);
289     }
290         n_expected_param = ftaschema_parameter_len(schema);
291     if (n_expected_param == 0) {
292         pblk = 0;
293         pblklen = 0;
294     } else {
295                 n_actual_param = argc-1;
296                 if(n_actual_param < n_expected_param){
297                         fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
298                         exit(1);
299                 }
300         /* parse the params */
301         for (lcv = 1 ; lcv < argc ; lcv++) {
302             char *k, *e;
303             int rv;
304             k = argv[lcv];
305             e = k;
306             while (*e && *e != '=') e++;
307             if (*e == 0) {
308                 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
309                         argv[lcv]);
310                 exit(1);
311             }
312             *e = 0;
313             rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
314             *e = '=';
315             if (rv < 0) {
316                 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
317                         argv[lcv]);
318                 exit(1);
319             }
320         }
321         if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
322             fprintf(stderr, "ftaschema_create_param_block failed!\n");
323             exit(1);
324         }
325     }
326 //    ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used
327     
328     
329     if (verbose>=2) fprintf(stderr,"Initalized FTA\n");
330     
331     fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
332     if (fta_id.streamid==0) {
333         fprintf(stderr,"%s::error:could not initialize fta %s\n",
334                 me, argv[0]);
335         exit(1);
336     }
337     /* XXXCDC: pblk is malloc'd, should we free it? */
338     
339     if (verbose>=2) fprintf(stderr,"Get schema handle\n");
340     
341     if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
342         fprintf(stderr,"%s::error:could not get schema\n", me);
343         exit(1);
344     }
345     
346     if ((numberoffields=ftaschema_tuple_len(schema))<0) {
347         fprintf(stderr,"%s::error:could not get number of fields in schema\n",
348                 me);
349         exit(1);
350     }
351     
352     if (verbose>=1) {
353         for(y=0; y<numberoffields;y++) {
354             printf("%s",ftaschema_field_name(schema,y));
355             if (y<numberoffields-1) printf("|");
356         }
357         printf("\n");
358     }
359     if (xit) {  // -X in command line
360         gettimeofday(&tve, 0);
361         timersub(&tve, &tvs, &tvd);
362         printf("TIME= %ld%06d sec\n", tvd.tv_sec, tvd.tv_usec);
363         hand(0);        // effectively an exit
364     }
365     if (tcpport!=0) {
366         wait_for_client();
367     }
368
369     start_time = time(NULL);
370
371         int measurement_interval_pos = -1; // extract measurementInterval if present
372         char *field_names[numberoffields];
373     for(y=0; y<numberoffields;y++) {
374                 field_names[y] = strdup(ftaschema_field_name(schema,y));
375                 if(strcmp(field_names[y], "measurementInterval")==0)
376                         measurement_interval_pos = y;
377         }
378
379
380         struct timeval tsample;
381         gettimeofday(&tsample, 0);
382         char start_ts[100], curr_ts[100];
383         sprintf(start_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
384
385         long unsigned int lineno=0;
386         long unsigned int seqno=0;
387         unsigned int measurement_interval;
388     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
389                 lineno++;
390         if (dump)       // -D in command line
391             continue;
392         if (ftaschema_is_eof_tuple(schema, rbuf)) {
393             /* initiate shutdown or something of that nature */
394             printf("#All data proccessed\n");
395             exit(0);
396         }
397         if (!rsize)
398             continue;
399         if (verbose >=2) {
400             snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
401             emit_line();
402         }
403         if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
404                         seqno++;
405                         gettimeofday(&tsample, 0);
406                         sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
407                         int pos;
408                         if(ves_version < 7){
409                                 pos = snprintf(linebuf, MAXLINE,
410   "{\"event\": { \"commonEventHeader\": { "
411         "\"domain\": \"measurementsForVfScaling\", "
412         "\"eventId\": \"%s%u\", "
413         "\"eventType\": \"%s\", "
414         "\"eventName\": \"Measurement_MC_%s\", "
415         "\"lastEpochMicrosec\": %s, "
416         "\"priority\": \"Normal\", "
417         "\"reportingEntityName\": \"GS-LITE MC\", "
418         "\"sequence\": %u, "
419         "\"sourceName\": \"meas_cmpgn_xapp\", "
420         "\"startEpochMicrosec\": %s, "
421         "\"version\": 5 "
422       "}, "
423       "\"measurementsForVfScalingFields\": { "
424                 "\"additionalFields\": ["
425                                 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
426                                 );
427                         }else{
428                                 pos = snprintf(linebuf, MAXLINE,
429   "{\"event\": { \"commonEventHeader\": { "
430         "\"domain\": \"measurement\", "
431         "\"eventId\": \"%s%u\", "
432         "\"eventType\": \"%s\", "
433         "\"eventName\": \"Measurement_MC_%s\", "
434         "\"lastEpochMicrosec\": %s, "
435         "\"priority\": \"Normal\", "
436         "\"reportingEntityName\": \"GS-LITE MC\", "
437         "\"sequence\": %u, "
438         "\"sourceName\": \"meas_cmpgn_xapp\", "
439         "\"startEpochMicrosec\": %s, "
440         "\"version\": \"4.0.1\", "
441         "\"vesEventListenerVersion\": \"7.0.1\" "
442       "}, "
443       "\"measurementFields\": { "
444                 "\"additionalFields\": {"
445                                 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
446                                 );
447                         }
448
449                         measurement_interval = 0;
450             for(y=0; y<numberoffields;y++) {
451                 struct access_result ar;
452 //                if (verbose>=2)
453 //                    printf("%s->",ftaschema_field_name(schema,y));
454                 if(y>0){
455                                         linebuf[pos]=',';
456                                         pos++;
457                                 }
458                 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
459                 switch (ar.field_data_type) {
460                     case INT_TYPE:
461                                                 if(ves_version < 7)
462                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i);
463                                                 else
464                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i);
465                                                 if(y==measurement_interval_pos)
466                                                         measurement_interval = (unsigned int)ar.r.i;
467                         break;
468                     case UINT_TYPE:
469                                                 if(ves_version < 7)
470                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
471                                                 else
472                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui);
473                                                 if(y==measurement_interval_pos)
474                                                         measurement_interval = (unsigned int)ar.r.ui;
475                         break;
476                     case IP_TYPE:
477                                                 if(ves_version < 7)
478                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff,
479                                  ar.r.ui>>16&0xff,
480                                  ar.r.ui>>8&0xff,
481                                  ar.r.ui&0xff);
482                                                 else
483                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff,
484                                  ar.r.ui>>16&0xff,
485                                  ar.r.ui>>8&0xff,
486                                  ar.r.ui&0xff);
487                         break;
488                     case IPV6_TYPE:
489                     {
490                                                 if(ves_version < 7)
491                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
492                                                 else
493                                 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]);
494                         unsigned x;
495                         unsigned zc=0;
496                         for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
497                         if (zc!=4) {
498                             snprintf(linebuf,MAXLINE,"");
499                             for(x=0;x<8;x++) {
500                                 unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
501                                 unsigned y;
502                                 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
503                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y);
504                                 if (x<7){
505                                                                         pos += snprintf(linebuf+pos,MAXLINE-pos,":");
506                                                                 }
507                             }
508                         } else {
509                             pos+=snprintf(linebuf+pos,MAXLINE-pos,"::");
510                         }
511                                                 if(ves_version < 7)
512                                                         pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}");
513                                                 else
514                                                         pos += snprintf(linebuf+pos, MAXLINE-pos,"\"");
515                     }
516                         break;
517                         
518                     case USHORT_TYPE:
519                                                 if(ves_version < 7)
520                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
521                                                 else
522                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\":  \"%u\"}",field_names[y], ar.r.ui);
523                                                 if(y==measurement_interval_pos)
524                                                         measurement_interval = (unsigned int)ar.r.ui;
525                         break;
526                     case BOOL_TYPE:
527                                                 if(ves_version < 7){
528                                 if (ar.r.ui==0) {
529                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]);
530                                      } else {
531                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]);
532                                 }
533                                                 }else{
534                                 if (ar.r.ui==0) {
535                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]);
536                                      } else {
537                                         pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]);
538                                 }
539                                                 }
540                         break;
541                     case ULLONG_TYPE:
542                                                 if(ves_version < 7)
543                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul);
544                                                 else
545                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul);
546                                                 if(y==measurement_interval_pos)
547                                                         measurement_interval = (unsigned int)ar.r.ul;
548                         break;
549                     case LLONG_TYPE:
550                                                 if(ves_version < 7)
551                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l);
552                                                 else
553                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l);
554                                                 if(y==measurement_interval_pos)
555                                                         measurement_interval = (unsigned int)ar.r.l;
556                         break;
557                     case FLOAT_TYPE:
558                                                 if(ves_version < 7)
559                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f);
560                                                 else
561                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f);
562                         break;
563                     case TIMEVAL_TYPE:
564                     {
565                         gs_float_t t;
566                         t= ar.r.t.tv_usec;
567                         t=t/1000000;
568                         t=t+ar.r.t.tv_sec;
569                                                 if(ves_version < 7)
570                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t);
571                                                 else
572                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t);
573                     }
574                         break;
575                     case VSTR_TYPE:
576                     {
577                                                 if(ves_version < 7)
578                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
579                                                 else
580                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]);
581                         int x;
582                         int c;
583                         char * src;
584                         src=(char*)ar.r.vs.offset;
585                                                 for(x=0;x<ar.r.vs.length;x++){
586                             c=src[x];
587                             if ((c<='~') && (c>=' ')) {
588                                 if (pos<MAXLINE-1) {
589                                     linebuf[pos]=c;
590                                     pos++;
591                                 }
592                             } else {
593                                 if (pos<MAXLINE-1) {
594                                     linebuf[pos]='.';
595                                     pos++;
596                                 }
597                             }
598                                                 }
599                                                 if(ves_version < 7)
600                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"}");
601                                                 else
602                                 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"");
603                     }
604                         break;
605                     default:
606                         linebuf[0]=0;
607                         break;
608                 }
609                         }
610                         if(ves_version < 7){
611                                 snprintf(linebuf+pos, MAXLINE-pos,
612                 "], \"measurementInterval\": %u, \"measurementsForVfScalingVersion\": 1"
613                 "}}}\n", measurement_interval
614                                 );
615                         }else{
616                                 snprintf(linebuf+pos, MAXLINE-pos,
617                 "}, \"measurementInterval\": %u, \"measurementFieldsVersion\": \"4.0\""
618                 "}}}\n", measurement_interval
619                                 );
620                         }
621                         if(curl_address==NULL){
622                 emit_line();
623                         }else{
624                                 http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth);
625                                 if(http_code != 200 && http_code != 202){
626                                         gslog(LOG_WARNING, "http return code is %d\n",http_code);
627                                 }
628                         }
629             if (verbose!=0) fflush(stdout);
630         } else {
631             if (rfta_id.streamid != fta_id.streamid)
632                 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
633         }
634
635         // whenever we receive a temp tuple check if we reached time limit
636         if ((code==2)  && tlimit && (time(NULL)-start_time)>=tlimit) {
637             fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);    
638             ftaapp_exit();
639             exit(0);
640         }        
641     }
642 }
643