X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Ftools%2Fgssource.c;h=0ddb8fceefbe05fea586ade0b342ff2144e00c4c;hb=07495effe193ca3f73c3bf0ce417068f9ac9dcdd;hp=e7adf8fdfcf0847a03a9d30a3dc83d55025b5a55;hpb=c9783d8ea8b85d810483559e50dbf2297109e349;p=com%2Fgs-lite.git diff --git a/src/tools/gssource.c b/src/tools/gssource.c index e7adf8f..0ddb8fc 100644 --- a/src/tools/gssource.c +++ b/src/tools/gssource.c @@ -1,280 +1,280 @@ -/* ------------------------------------------------ - Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "gsconfig.h" -#include "gstypes.h" -#include "gshub.h" - - - - -gs_sp_t me = 0; -gs_sp_t schematext = 0; -gs_int32_t schematextlen = 0; -gs_sp_t schematmp = 0; -gs_int32_t verbose=0; -gs_uint32_t tcpport=0; -int listensockfd=0; -int fd=0; -endpoint hub; -endpoint ds; -gs_sp_t source_name; - -static void gs_write(gs_sp_t buffer, gs_uint32_t len) -{ - if (send(fd,buffer,len,0) != len) { - fprintf(stderr,"could not write on stream socket"); - exit(0); - } -} - -static void wait_for_feed() { - struct sockaddr_in serv_addr,cli_addr; - struct sockaddr_in sin; - socklen_t clilen; - socklen_t sin_sz; - if (listensockfd==0) { - gs_int32_t on = 1; - - if (verbose) { - fprintf(stderr,"Create listen socket for port %u\n",tcpport); - } - listensockfd=socket(AF_INET, SOCK_STREAM, 0); - if (listensockfd < 0) { - fprintf(stderr,"Error:Could not create socket for tcp data stream"); - exit(1); - } - bzero((char *) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; - serv_addr.sin_port = htons(tcpport); -#ifndef __linux__ - /* make sure we can reuse the common port rapidly */ - if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT, - (gs_sp_t )&on, sizeof(on)) != 0) { - fprintf(stderr,"Error::could not set socket option\n"); - exit(1); - } -#endif - if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR, - (gs_sp_t )&on, sizeof(on)) != 0) { - fprintf(stderr,"Error::could not set socket option\n"); - exit(1); - } - - if (bind(listensockfd, (struct sockaddr *) &serv_addr, - sizeof(serv_addr)) < 0) { - fprintf(stderr,"Error:Could not bind socket for tcp data stream"); - exit(1); - } - } - - if (verbose) { - fprintf(stderr,"Socket created waiting for data producer\n"); - } - if (listen(listensockfd,5)< 0) { - fprintf(stderr,"Error::could not listen to socket for port %u \n",ntohs(serv_addr.sin_port)); - close(listensockfd); - exit(1); - } - sin_sz=sizeof(sin); - if (getsockname(listensockfd, (struct sockaddr *) &sin, &sin_sz) < 0) { - fprintf(stderr,"Error::could not get local port number of listen socket\n"); - exit(1); - } - ds.ip=htonl(127<<24|1); - ds.port=sin.sin_port; - if (set_streamsource(hub,source_name,ds)!=0) { - fprintf(stderr,"Error::could not set source in GSHUB for %s source name\n",source_name); - exit(1); - } - - do { - clilen = sizeof(cli_addr); - fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen); - if (fd<0) { - fprintf(stderr,"Error:Could not accept connection on tcp socket\n"); - } - } while (fd==0); - if (verbose) { - fprintf(stderr,"Sink found ready to rock!\n"); - } - -} - - -static void do_file(gs_sp_t filename, gs_int32_t fnlen); - -int main(int argc, char** argv) { - gs_int32_t x; - gs_int32_t s=0; - gs_int32_t ch; - gs_int32_t endless=0; // repeats files forever - gs_uint32_t tip1,tip2,tip3,tip4; - while ((ch = getopt(argc, argv, "hxep:")) != -1) { - switch(ch) { - case 'h': - fprintf(stderr,"%s::usage: %s -x -v -e -p : \n",argv[0],argv[0]); - exit(0); - break; - case 'p': - tcpport=atoi(optarg);; - break; - case 'v': - verbose=1; - break; - case 'x': - verbose=2; - break; - case 'e': - endless=1; - break; - default: - break; - } - } - s+=optind; - if (s+2>argc) { - fprintf(stderr,"Could not find hub and stream source name on command line\n"); - fprintf(stderr,"%s::usage: %s -x -v -e -p : \n",argv[0],argv[0]); - exit(1); - } - if (sscanf(argv[s],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(hub.port))!= 5 ) { - fprintf(stderr,"Could not parse hub endpoint\n"); - fprintf(stderr,"%s::usage: %s -x -v -e -p : \n",argv[0],argv[0]); - exit(1); - } - hub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4); - hub.port=htons(hub.port); - source_name=strdup(argv[s+1]); - s+=2; - wait_for_feed(); - do { - for(x=s;x 3 && filename[fnlen - 3] == '.' && - filename[fnlen - 2] == 'g' && - filename[fnlen - 1] == 'z') { - pipe = 1; - snprintf(cmd2, sizeof(cmd2), "gzcat %s", filename); - input = popen(cmd2, "r"); - } else { - if (fnlen > 3 && filename[fnlen - 3] == 'b' && - filename[fnlen - 2] == 'z' && - filename[fnlen - 1] == '2') { - pipe = 1; - snprintf(cmd2, sizeof(cmd2), "bzcat %s", filename); - input = popen(cmd2, "r"); - } else { - pipe=0; - input = fopen(filename, "r"); - } - } - - if (!input) { - perror("stream open"); - fprintf(stderr, "%s: cannot open %s\n", me, filename); - return; - } - - if (fscanf(input, "GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n", - &parserversion,&schemalen) != 2) { - fprintf(stderr,"%s: cannot parse GDAT file header in '%s'\n", - me, filename); - exit(1); - } - - /* first time ? */ - if (schematext == 0) { - gs_uint8_t buf[1024]; - schematextlen = schemalen; - schematext = malloc(schemalen); - dbuf = malloc(CATBLOCKSZ); - if (!schematext || !dbuf) { - fprintf(stderr,"%s: malloc error reading GDAT file header in '%s'\n", - me, filename); - exit(1); - } - if (fread(schematext, schemalen, 1, input) != 1) { - fprintf(stderr,"%s: cannot parse-read GDAT file header in '%s'\n", - me, filename); - exit(1); - } - sprintf((char *)buf,"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n", parserversion, schemalen); - gs_write((gs_sp_t)buf,strlen((const char*)buf)); - gs_write(schematext, schemalen); - } else { - schematmp = malloc(schemalen); - if (!schematmp ) { - fprintf(stderr,"%s: malloc error reading GDAT file header in '%s'\n", - me, filename); - exit(1); - } - if (fread(schematmp, schemalen, 1, input) != 1) { - fprintf(stderr,"%s: cannot parse-read GDAT file header in '%s'\n", - me, filename); - exit(1); - } - free(schematmp); - // if (memcmp(schematext, schematmp, schematextlen)) { - // fprintf(stderr,"%s: GDAT schema mis-match in file '%s'\n", - // me, filename); - // exit(1); - // } - } - - while ((sz = fread(dbuf, 1, CATBLOCKSZ, input)) > 0) { - gs_write(dbuf,sz); - } - - if (pipe) { - pclose(input); - } else { - fclose(input); - } - -} +/* ------------------------------------------------ + Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "gsconfig.h" +#include "gstypes.h" +#include "gshub.h" + + + + +gs_sp_t me = 0; +gs_sp_t schematext = 0; +gs_int32_t schematextlen = 0; +gs_sp_t schematmp = 0; +gs_int32_t verbose=0; +gs_uint32_t tcpport=0; +int listensockfd=0; +int fd=0; +endpoint hub; +endpoint ds; +gs_sp_t source_name; + +static void gs_write(gs_sp_t buffer, gs_uint32_t len) +{ + if (send(fd,buffer,len,0) != len) { + fprintf(stderr,"could not write on stream socket"); + exit(0); + } +} + +static void wait_for_feed() { + struct sockaddr_in serv_addr,cli_addr; + struct sockaddr_in sin; + socklen_t clilen; + socklen_t sin_sz; + if (listensockfd==0) { + gs_int32_t on = 1; + + if (verbose) { + fprintf(stderr,"Create listen socket for port %u\n",tcpport); + } + listensockfd=socket(AF_INET, SOCK_STREAM, 0); + if (listensockfd < 0) { + fprintf(stderr,"Error:Could not create socket for tcp data stream"); + exit(1); + } + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(tcpport); +#ifndef __linux__ + /* make sure we can reuse the common port rapidly */ + if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT, + (gs_sp_t )&on, sizeof(on)) != 0) { + fprintf(stderr,"Error::could not set socket option\n"); + exit(1); + } +#endif + if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR, + (gs_sp_t )&on, sizeof(on)) != 0) { + fprintf(stderr,"Error::could not set socket option\n"); + exit(1); + } + + if (bind(listensockfd, (struct sockaddr *) &serv_addr, + sizeof(serv_addr)) < 0) { + fprintf(stderr,"Error:Could not bind socket for tcp data stream"); + exit(1); + } + } + + if (verbose) { + fprintf(stderr,"Socket created waiting for data producer\n"); + } + if (listen(listensockfd,5)< 0) { + fprintf(stderr,"Error::could not listen to socket for port %u \n",ntohs(serv_addr.sin_port)); + close(listensockfd); + exit(1); + } + sin_sz=sizeof(sin); + if (getsockname(listensockfd, (struct sockaddr *) &sin, &sin_sz) < 0) { + fprintf(stderr,"Error::could not get local port number of listen socket\n"); + exit(1); + } + ds.ip=htonl(127<<24|1); + ds.port=sin.sin_port; + if (set_streamsource(hub,source_name,ds)!=0) { + fprintf(stderr,"Error::could not set source in GSHUB for %s source name\n",source_name); + exit(1); + } + + do { + clilen = sizeof(cli_addr); + fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen); + if (fd<0) { + fprintf(stderr,"Error:Could not accept connection on tcp socket\n"); + } + } while (fd==0); + if (verbose) { + fprintf(stderr,"Sink found ready to rock!\n"); + } + +} + + +static void do_file(gs_sp_t filename, gs_int32_t fnlen); + +int main(int argc, char** argv) { + gs_int32_t x; + gs_int32_t s=0; + gs_int32_t ch; + gs_int32_t endless=0; // repeats files forever + gs_uint32_t tip1,tip2,tip3,tip4; + while ((ch = getopt(argc, argv, "hxep:")) != -1) { + switch(ch) { + case 'h': + fprintf(stderr,"%s::usage: %s -x -v -e -p : \n",argv[0],argv[0]); + exit(0); + break; + case 'p': + tcpport=atoi(optarg);; + break; + case 'v': + verbose=1; + break; + case 'x': + verbose=2; + break; + case 'e': + endless=1; + break; + default: + break; + } + } + s+=optind; + if (s+2>argc) { + fprintf(stderr,"Could not find hub and stream source name on command line\n"); + fprintf(stderr,"%s::usage: %s -x -v -e -p : \n",argv[0],argv[0]); + exit(1); + } + if (sscanf(argv[s],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(hub.port))!= 5 ) { + fprintf(stderr,"Could not parse hub endpoint\n"); + fprintf(stderr,"%s::usage: %s -x -v -e -p : \n",argv[0],argv[0]); + exit(1); + } + hub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4); + hub.port=htons(hub.port); + source_name=strdup(argv[s+1]); + s+=2; + wait_for_feed(); + do { + for(x=s;x 3 && filename[fnlen - 3] == '.' && + filename[fnlen - 2] == 'g' && + filename[fnlen - 1] == 'z') { + pipe = 1; + snprintf(cmd2, sizeof(cmd2), "gzcat %s", filename); + input = popen(cmd2, "r"); + } else { + if (fnlen > 3 && filename[fnlen - 3] == 'b' && + filename[fnlen - 2] == 'z' && + filename[fnlen - 1] == '2') { + pipe = 1; + snprintf(cmd2, sizeof(cmd2), "bzcat %s", filename); + input = popen(cmd2, "r"); + } else { + pipe=0; + input = fopen(filename, "r"); + } + } + + if (!input) { + perror("stream open"); + fprintf(stderr, "%s: cannot open %s\n", me, filename); + return; + } + + if (fscanf(input, "GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n", + &parserversion,&schemalen) != 2) { + fprintf(stderr,"%s: cannot parse GDAT file header in '%s'\n", + me, filename); + exit(1); + } + + /* first time ? */ + if (schematext == 0) { + gs_uint8_t buf[1024]; + schematextlen = schemalen; + schematext = malloc(schemalen); + dbuf = malloc(CATBLOCKSZ); + if (!schematext || !dbuf) { + fprintf(stderr,"%s: malloc error reading GDAT file header in '%s'\n", + me, filename); + exit(1); + } + if (fread(schematext, schemalen, 1, input) != 1) { + fprintf(stderr,"%s: cannot parse-read GDAT file header in '%s'\n", + me, filename); + exit(1); + } + sprintf((char *)buf,"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n", parserversion, schemalen); + gs_write((gs_sp_t)buf,strlen((const char*)buf)); + gs_write(schematext, schemalen); + } else { + schematmp = malloc(schemalen); + if (!schematmp ) { + fprintf(stderr,"%s: malloc error reading GDAT file header in '%s'\n", + me, filename); + exit(1); + } + if (fread(schematmp, schemalen, 1, input) != 1) { + fprintf(stderr,"%s: cannot parse-read GDAT file header in '%s'\n", + me, filename); + exit(1); + } + free(schematmp); + // if (memcmp(schematext, schematmp, schematextlen)) { + // fprintf(stderr,"%s: GDAT schema mis-match in file '%s'\n", + // me, filename); + // exit(1); + // } + } + + while ((sz = fread(dbuf, 1, CATBLOCKSZ, input)) > 0) { + gs_write(dbuf,sz); + } + + if (pipe) { + pclose(input); + } else { + fclose(input); + } + +}