X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Ftools%2Fgssinksim.c;h=89a5f1855ae4b5383813a7731ddde09e2b55df12;hb=07495effe193ca3f73c3bf0ce417068f9ac9dcdd;hp=9334ee8e28b26b38db0c0614630a1012a5afa24a;hpb=93d248304a68de7a8f9daf4aa74f9ee4cd27410c;p=com%2Fgs-lite.git diff --git a/src/tools/gssinksim.c b/src/tools/gssinksim.c index 9334ee8..89a5f18 100644 --- a/src/tools/gssinksim.c +++ b/src/tools/gssinksim.c @@ -1,404 +1,404 @@ -/* ------------------------------------------------ - Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include "gsconfig.h" -#include "gstypes.h" -#include "gshub.h" - -#define MAXLINE 100000 -static unsigned tcpport=0; -static char linebuf[MAXLINE]; -int listensockfd=0; -int fd=0; -gs_int32_t verbose=0; -endpoint hub; -endpoint ds; -gs_sp_t sink_name; - - -static void wait_for_feed() { - struct sockaddr_in serv_addr,cli_addr; - socklen_t clilen; - struct sockaddr_in sin; - socklen_t sin_sz; - if (listensockfd==0) { - gs_int32_t on = 1; - - if (verbose==2) { - fprintf(stderr,"Create listen socket for port %u\n",tcpport); - } - listensockfd=socket(AF_INET, SOCK_STREAM, 0); - if (listensockfd < 0) { - gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream"); - exit(1); - } - bzero((char *) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; - serv_addr.sin_port = htons(tcpport); -#ifndef __linux__ - /* make sure we can reuse the common port rapidly */ - if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT, - (gs_sp_t )&on, sizeof(on)) != 0) { - gslog(LOG_EMERG,"Error::could not set socket option\n"); - exit(1); - } -#endif - if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR, - (gs_sp_t )&on, sizeof(on)) != 0) { - gslog(LOG_EMERG,"Error::could not set socket option\n"); - exit(1); - } - - if (bind(listensockfd, (struct sockaddr *) &serv_addr, - sizeof(serv_addr)) < 0) { - gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream"); - exit(1); - } - } - - if (verbose==2) { - fprintf(stderr,"Socket created waiting for data producer\n"); - } - if (listen(listensockfd,5)< 0) { - fprintf(stderr,"Error::could not listen to socket for port %u \n",ntohs(serv_addr.sin_port)); - close(listensockfd); - exit(1); - } - sin_sz=sizeof(sin); - if (getsockname(listensockfd, (struct sockaddr *) &sin, &sin_sz) < 0) { - fprintf(stderr,"Error::could not get local port number of listen socket\n"); - exit(1); - } - ds.ip=htonl(127<<24|1); - ds.port=sin.sin_port; - if (set_streamsink(hub,sink_name,ds)!=0) { - fprintf(stderr,"Error::could not set sink in GSHUB for %s source name\n",sink_name); - exit(1); - } - - do { - clilen = sizeof(cli_addr); - fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen); - if (fd<0) { - gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket"); - } - } while (fd==0); - if (verbose) { - fprintf(stderr,"Producer found ready to rock!\n"); - } - -} - - -static void gs_read(gs_sp_t buffer, gs_uint32_t length){ - gs_uint32_t used=0; - gs_uint32_t cur; - - while(used < length) { - if (verbose==2) { - fprintf(stderr,"\tread %u out of %u\n",used,length); - } - if ((cur=read(fd,&(buffer[used]),length-used))<=0) { - gslog(LOG_EMERG,"ERROR:could not read data from gdat stream"); - exit(0); - } - used+=cur; - } -} - -static void gs_read_line(gs_sp_t buffer, gs_uint32_t length){ - gs_uint32_t used=0; - gs_uint32_t cur; - - while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) { - if ((cur=read(fd,&(buffer[used]),1))<=0) { - gslog(LOG_EMERG,"ERROR:could not read data from gdat stream"); - exit(0); - } - used+=cur; - } - buffer[used]=0; -} - -static gs_uint32_t get_stream_tuple(gs_uint32_t * rsize,gs_sp_t rbuf,gs_uint32_t bufsz){ - - gs_uint32_t nsz,sz; - gs_read((gs_sp_t)&nsz,sizeof(gs_uint32_t)); - sz=ntohl(nsz); - if (sz>MAXTUPLESZ) { - gslog(LOG_EMERG,"Error::Illegal tuple received"); - exit(0); - } - - if (sz>bufsz) { - gslog(LOG_EMERG,"Error::Illegal tuple size received -- to long"); - exit(0); - } - *rsize=sz; - - gs_read(rbuf,(sz)); - return 0; -} - -int main(int argc, char** argv) { - gs_uint32_t streamid; - gs_schemahandle_t schema; - - gs_uint32_t rstreamid; - gs_uint32_t rsize; - gs_int8_t rbuf[2*MAXTUPLESZ]; - - gs_int32_t numberoffields; - gs_int32_t y; - gs_int32_t parserversion; - gs_uint32_t schemalen; - gs_sp_t asciischema; - gs_sp_t me; - gs_int32_t ch; - - gs_int8_t buf[1024]; - gs_uint32_t tip1,tip2,tip3,tip4; - - /* initialize host library and the sgroup */ - - me=argv[0]; - - while ((ch = getopt(argc, argv, "hvxp:")) != -1) { - switch(ch) { - case 'h': - fprintf(stderr,"%s::usage: %s -v -x -p : \n",argv[0],argv[0]); - exit(0); - break; - case 'p': - tcpport=atoi(optarg);; - break; - case 'v': - verbose=1; - break; - case 'x': - verbose=2; - break; - default: - break; - } - } - if (optind+2>argc) { - fprintf(stderr,"Could not find hub and stream source name on command line\n"); - exit(1); - } - if (sscanf(argv[optind],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(hub.port))!= 5 ) { - fprintf(stderr,"Could not parse hub endpoint\n"); - exit(1); - } - hub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4); - hub.port=htons(hub.port); - sink_name=strdup(argv[optind+1]); - - wait_for_feed(); - - gs_read_line(buf,1024); - if (strncmp(buf,"GDAT",4)!=0) { - gslog(LOG_EMERG,"ERROR: not a GDAT stream\n"); - exit(0); - } - gs_read_line(buf,1024); - if (sscanf(buf,"VERSION:%u\n",&parserversion)!=1) { - gslog(LOG_EMERG,"ERROR: no GDAT VERSION given\n"); - exit(0); - } - gs_read_line(buf,1024); - if (sscanf(buf,"SCHEMALENGTH:%u\n",&schemalen)!=1) { - gslog(LOG_EMERG,"ERROR: no GDAT SCHEMALENGTH given\n"); - exit(0); - } - - if(verbose==2) { - fprintf (stderr,"GDAT version %u schemalength %u\n",parserversion,schemalen); - } - - if (schemaparser_accepts_version(parserversion)!=1) { - fprintf(stderr,"%s::error: wrong parser version %u for file %s\n", - me,parserversion,argv[0]); - exit(1); - } - - if ((asciischema=malloc(schemalen))==0) { - fprintf(stderr,"%s::error: could not allocate schema buffer of sz %u " - "for file %s\n", - me,schemalen,argv[0]); - exit(1); - } - - if(verbose==2) { - fprintf (stderr,"Reading schema from scocket\n"); - } - - gs_read(asciischema,schemalen); - - if(verbose==2) { - fprintf (stderr,"Received schema\n"); - } - - if (verbose==2) { - fprintf(stderr,"%s\n",asciischema); - } - if ((schema=ftaschema_parse_string(asciischema))<0) { - fprintf(stderr,"%s::error: could not parse schema \n", - me); - exit(1); - } - - if ((numberoffields=ftaschema_tuple_len(schema))<0) { - fprintf(stderr,"%s::error:could not get number of fields in schema\n", - me); - exit(1); - } - if (verbose==1) { - for(y=0; y",ftaschema_field_name(schema,y)); - ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize); - switch (ar.field_data_type) { - case INT_TYPE: - printf("%d",ar.r.i); - break; - case UINT_TYPE: - printf("%u",ar.r.ui); - break; - case IP_TYPE: - printf("%u.%u.%u.%u",ar.r.ui>>24&0xff, - ar.r.ui>>16&0xff, - ar.r.ui>>8&0xff, - ar.r.ui&0xff); - break; - case IPV6_TYPE: - { - unsigned x; - unsigned zc=0; - for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;} - if (zc!=4) { - for(x=0;x<8;x++) { - unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]); - unsigned y; - y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]); - printf("%04x",y); - if (x<7) printf(":"); - } - } else { - printf("::"); - } - } - break; - case USHORT_TYPE: - printf("%hu",(unsigned short)ar.r.ui); - break; - case BOOL_TYPE: - if (ar.r.ui==0) { - printf("FALSE"); - } else { - printf("TRUE"); - } - break; - case ULLONG_TYPE: - { - gs_uint64_t ul; - gs_uint64_t t1; - gs_uint64_t t2; - ul=ar.r.ul; - printf("%llu",ul); - } - break; - case LLONG_TYPE: - printf("%lld",ar.r.l); - break; - case FLOAT_TYPE: - printf("%f",ar.r.f); - break; - case TIMEVAL_TYPE: - { - gs_float_t t; - t= ar.r.t.tv_usec; - t=t/1000000; - t=t+ar.r.t.tv_sec; - printf("%lf sec",t); - } - break; - case VSTR_TYPE: - { - int x; - int c; - char * src; - src=(gs_sp_t)ar.r.vs.offset; - if ((ar.r.vs.length>0) && (src[ar.r.vs.length-1]==0)) { - ar.r.vs.length = ar.r.vs.length-1; - } - for(x=0;x=' '))&&(c!='|')) { - printf("%c",c); - } else { - printf("(0x%x)",(gs_uint8_t)c); - } - } - } - break; - default: - break; - } - if (y +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "gsconfig.h" +#include "gstypes.h" +#include "gshub.h" + +#define MAXLINE 100000 +static unsigned tcpport=0; +static char linebuf[MAXLINE]; +int listensockfd=0; +int fd=0; +gs_int32_t verbose=0; +endpoint hub; +endpoint ds; +gs_sp_t sink_name; + + +static void wait_for_feed() { + struct sockaddr_in serv_addr,cli_addr; + socklen_t clilen; + struct sockaddr_in sin; + socklen_t sin_sz; + if (listensockfd==0) { + gs_int32_t on = 1; + + if (verbose==2) { + fprintf(stderr,"Create listen socket for port %u\n",tcpport); + } + listensockfd=socket(AF_INET, SOCK_STREAM, 0); + if (listensockfd < 0) { + gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream"); + exit(1); + } + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(tcpport); +#ifndef __linux__ + /* make sure we can reuse the common port rapidly */ + if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT, + (gs_sp_t )&on, sizeof(on)) != 0) { + gslog(LOG_EMERG,"Error::could not set socket option\n"); + exit(1); + } +#endif + if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR, + (gs_sp_t )&on, sizeof(on)) != 0) { + gslog(LOG_EMERG,"Error::could not set socket option\n"); + exit(1); + } + + if (bind(listensockfd, (struct sockaddr *) &serv_addr, + sizeof(serv_addr)) < 0) { + gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream"); + exit(1); + } + } + + if (verbose==2) { + fprintf(stderr,"Socket created waiting for data producer\n"); + } + if (listen(listensockfd,5)< 0) { + fprintf(stderr,"Error::could not listen to socket for port %u \n",ntohs(serv_addr.sin_port)); + close(listensockfd); + exit(1); + } + sin_sz=sizeof(sin); + if (getsockname(listensockfd, (struct sockaddr *) &sin, &sin_sz) < 0) { + fprintf(stderr,"Error::could not get local port number of listen socket\n"); + exit(1); + } + ds.ip=htonl(127<<24|1); + ds.port=sin.sin_port; + if (set_streamsink(hub,sink_name,ds)!=0) { + fprintf(stderr,"Error::could not set sink in GSHUB for %s source name\n",sink_name); + exit(1); + } + + do { + clilen = sizeof(cli_addr); + fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen); + if (fd<0) { + gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket"); + } + } while (fd==0); + if (verbose) { + fprintf(stderr,"Producer found ready to rock!\n"); + } + +} + + +static void gs_read(gs_sp_t buffer, gs_uint32_t length){ + gs_uint32_t used=0; + gs_uint32_t cur; + + while(used < length) { + if (verbose==2) { + fprintf(stderr,"\tread %u out of %u\n",used,length); + } + if ((cur=read(fd,&(buffer[used]),length-used))<=0) { + gslog(LOG_EMERG,"ERROR:could not read data from gdat stream"); + exit(0); + } + used+=cur; + } +} + +static void gs_read_line(gs_sp_t buffer, gs_uint32_t length){ + gs_uint32_t used=0; + gs_uint32_t cur; + + while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) { + if ((cur=read(fd,&(buffer[used]),1))<=0) { + gslog(LOG_EMERG,"ERROR:could not read data from gdat stream"); + exit(0); + } + used+=cur; + } + buffer[used]=0; +} + +static gs_uint32_t get_stream_tuple(gs_uint32_t * rsize,gs_sp_t rbuf,gs_uint32_t bufsz){ + + gs_uint32_t nsz,sz; + gs_read((gs_sp_t)&nsz,sizeof(gs_uint32_t)); + sz=ntohl(nsz); + if (sz>MAXTUPLESZ) { + gslog(LOG_EMERG,"Error::Illegal tuple received"); + exit(0); + } + + if (sz>bufsz) { + gslog(LOG_EMERG,"Error::Illegal tuple size received -- to long"); + exit(0); + } + *rsize=sz; + + gs_read(rbuf,(sz)); + return 0; +} + +int main(int argc, char** argv) { + gs_uint32_t streamid; + gs_schemahandle_t schema; + + gs_uint32_t rstreamid; + gs_uint32_t rsize; + gs_int8_t rbuf[2*MAXTUPLESZ]; + + gs_int32_t numberoffields; + gs_int32_t y; + gs_int32_t parserversion; + gs_uint32_t schemalen; + gs_sp_t asciischema; + gs_sp_t me; + gs_int32_t ch; + + gs_int8_t buf[1024]; + gs_uint32_t tip1,tip2,tip3,tip4; + + /* initialize host library and the sgroup */ + + me=argv[0]; + + while ((ch = getopt(argc, argv, "hvxp:")) != -1) { + switch(ch) { + case 'h': + fprintf(stderr,"%s::usage: %s -v -x -p : \n",argv[0],argv[0]); + exit(0); + break; + case 'p': + tcpport=atoi(optarg);; + break; + case 'v': + verbose=1; + break; + case 'x': + verbose=2; + break; + default: + break; + } + } + if (optind+2>argc) { + fprintf(stderr,"Could not find hub and stream source name on command line\n"); + exit(1); + } + if (sscanf(argv[optind],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(hub.port))!= 5 ) { + fprintf(stderr,"Could not parse hub endpoint\n"); + exit(1); + } + hub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4); + hub.port=htons(hub.port); + sink_name=strdup(argv[optind+1]); + + wait_for_feed(); + + gs_read_line(buf,1024); + if (strncmp(buf,"GDAT",4)!=0) { + gslog(LOG_EMERG,"ERROR: not a GDAT stream\n"); + exit(0); + } + gs_read_line(buf,1024); + if (sscanf(buf,"VERSION:%u\n",&parserversion)!=1) { + gslog(LOG_EMERG,"ERROR: no GDAT VERSION given\n"); + exit(0); + } + gs_read_line(buf,1024); + if (sscanf(buf,"SCHEMALENGTH:%u\n",&schemalen)!=1) { + gslog(LOG_EMERG,"ERROR: no GDAT SCHEMALENGTH given\n"); + exit(0); + } + + if(verbose==2) { + fprintf (stderr,"GDAT version %u schemalength %u\n",parserversion,schemalen); + } + + if (schemaparser_accepts_version(parserversion)!=1) { + fprintf(stderr,"%s::error: wrong parser version %u for file %s\n", + me,parserversion,argv[0]); + exit(1); + } + + if ((asciischema=malloc(schemalen))==0) { + fprintf(stderr,"%s::error: could not allocate schema buffer of sz %u " + "for file %s\n", + me,schemalen,argv[0]); + exit(1); + } + + if(verbose==2) { + fprintf (stderr,"Reading schema from scocket\n"); + } + + gs_read(asciischema,schemalen); + + if(verbose==2) { + fprintf (stderr,"Received schema\n"); + } + + if (verbose==2) { + fprintf(stderr,"%s\n",asciischema); + } + if ((schema=ftaschema_parse_string(asciischema))<0) { + fprintf(stderr,"%s::error: could not parse schema \n", + me); + exit(1); + } + + if ((numberoffields=ftaschema_tuple_len(schema))<0) { + fprintf(stderr,"%s::error:could not get number of fields in schema\n", + me); + exit(1); + } + if (verbose==1) { + for(y=0; y",ftaschema_field_name(schema,y)); + ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize); + switch (ar.field_data_type) { + case INT_TYPE: + printf("%d",ar.r.i); + break; + case UINT_TYPE: + printf("%u",ar.r.ui); + break; + case IP_TYPE: + printf("%u.%u.%u.%u",ar.r.ui>>24&0xff, + ar.r.ui>>16&0xff, + ar.r.ui>>8&0xff, + ar.r.ui&0xff); + break; + case IPV6_TYPE: + { + unsigned x; + unsigned zc=0; + for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;} + if (zc!=4) { + for(x=0;x<8;x++) { + unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]); + unsigned y; + y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]); + printf("%04x",y); + if (x<7) printf(":"); + } + } else { + printf("::"); + } + } + break; + case USHORT_TYPE: + printf("%hu",(unsigned short)ar.r.ui); + break; + case BOOL_TYPE: + if (ar.r.ui==0) { + printf("FALSE"); + } else { + printf("TRUE"); + } + break; + case ULLONG_TYPE: + { + gs_uint64_t ul; + gs_uint64_t t1; + gs_uint64_t t2; + ul=ar.r.ul; + printf("%llu",ul); + } + break; + case LLONG_TYPE: + printf("%lld",ar.r.l); + break; + case FLOAT_TYPE: + printf("%f",ar.r.f); + break; + case TIMEVAL_TYPE: + { + gs_float_t t; + t= ar.r.t.tv_usec; + t=t/1000000; + t=t+ar.r.t.tv_sec; + printf("%lf sec",t); + } + break; + case VSTR_TYPE: + { + int x; + int c; + char * src; + src=(gs_sp_t)ar.r.vs.offset; + if ((ar.r.vs.length>0) && (src[ar.r.vs.length-1]==0)) { + ar.r.vs.length = ar.r.vs.length-1; + } + for(x=0;x=' '))&&(c!='|')) { + printf("%c",c); + } else { + printf("(0x%x)",(gs_uint8_t)c); + } + } + } + break; + default: + break; + } + if (y