1 /* ------------------------------------------------
\r
2 Copyright 2014 AT&T Intellectual Property
\r
3 Licensed under the Apache License, Version 2.0 (the "License");
\r
4 you may not use this file except in compliance with the License.
\r
5 You may obtain a copy of the License at
\r
7 http://www.apache.org/licenses/LICENSE-2.0
\r
9 Unless required by applicable law or agreed to in writing, software
\r
10 distributed under the License is distributed on an "AS IS" BASIS,
\r
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 See the License for the specific language governing permissions and
\r
13 limitations under the License.
\r
14 ------------------------------------------- */
\r
21 #include <sys/time.h>
\r
22 #include <sys/stat.h>
\r
23 #include <sys/types.h>
\r
24 #include <sys/socket.h>
\r
25 #include <netinet/in.h>
\r
27 #include "gsconfig.h"
\r
28 #include "gstypes.h"
\r
35 gs_sp_t schematext = 0;
\r
36 gs_int32_t schematextlen = 0;
\r
37 gs_sp_t schematmp = 0;
\r
38 gs_int32_t verbose=0;
\r
39 gs_uint32_t tcpport=0;
\r
44 gs_sp_t source_name;
\r
46 static void gs_write(gs_sp_t buffer, gs_uint32_t len)
\r
48 if (send(fd,buffer,len,0) != len) {
\r
49 fprintf(stderr,"could not write on stream socket");
\r
54 static void wait_for_feed() {
\r
55 struct sockaddr_in serv_addr,cli_addr;
\r
56 struct sockaddr_in sin;
\r
59 if (listensockfd==0) {
\r
63 fprintf(stderr,"Create listen socket for port %u\n",tcpport);
\r
65 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
\r
66 if (listensockfd < 0) {
\r
67 fprintf(stderr,"Error:Could not create socket for tcp data stream");
\r
70 bzero((char *) &serv_addr, sizeof(serv_addr));
\r
71 serv_addr.sin_family = AF_INET;
\r
72 serv_addr.sin_addr.s_addr = INADDR_ANY;
\r
73 serv_addr.sin_port = htons(tcpport);
\r
75 /* make sure we can reuse the common port rapidly */
\r
76 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
\r
77 (gs_sp_t )&on, sizeof(on)) != 0) {
\r
78 fprintf(stderr,"Error::could not set socket option\n");
\r
82 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
\r
83 (gs_sp_t )&on, sizeof(on)) != 0) {
\r
84 fprintf(stderr,"Error::could not set socket option\n");
\r
88 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
\r
89 sizeof(serv_addr)) < 0) {
\r
90 fprintf(stderr,"Error:Could not bind socket for tcp data stream");
\r
96 fprintf(stderr,"Socket created waiting for data producer\n");
\r
98 if (listen(listensockfd,5)< 0) {
\r
99 fprintf(stderr,"Error::could not listen to socket for port %u \n",ntohs(serv_addr.sin_port));
\r
100 close(listensockfd);
\r
103 sin_sz=sizeof(sin);
\r
104 if (getsockname(listensockfd, (struct sockaddr *) &sin, &sin_sz) < 0) {
\r
105 fprintf(stderr,"Error::could not get local port number of listen socket\n");
\r
108 ds.ip=htonl(127<<24|1);
\r
109 ds.port=sin.sin_port;
\r
110 if (set_streamsource(hub,source_name,ds)!=0) {
\r
111 fprintf(stderr,"Error::could not set source in GSHUB for %s source name\n",source_name);
\r
116 clilen = sizeof(cli_addr);
\r
117 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
\r
119 fprintf(stderr,"Error:Could not accept connection on tcp socket\n");
\r
123 fprintf(stderr,"Sink found ready to rock!\n");
\r
129 static void do_file(gs_sp_t filename, gs_int32_t fnlen);
\r
131 int main(int argc, char** argv) {
\r
135 gs_int32_t endless=0; // repeats files forever
\r
136 gs_uint32_t tip1,tip2,tip3,tip4;
\r
137 while ((ch = getopt(argc, argv, "hxep:")) != -1) {
\r
140 fprintf(stderr,"%s::usage: %s -x -v -e -p <portnumber> <IP>:<port> <source_name> <gdatfiles...>\n",argv[0],argv[0]);
\r
144 tcpport=atoi(optarg);;
\r
161 fprintf(stderr,"Could not find hub and stream source name on command line\n");
\r
162 fprintf(stderr,"%s::usage: %s -x -v -e -p <portnumber> <IP>:<port> <source_name> <gdatfiles...>\n",argv[0],argv[0]);
\r
165 if (sscanf(argv[s],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(hub.port))!= 5 ) {
\r
166 fprintf(stderr,"Could not parse hub endpoint\n");
\r
167 fprintf(stderr,"%s::usage: %s -x -v -e -p <portnumber> <IP>:<port> <source_name> <gdatfiles...>\n",argv[0],argv[0]);
\r
170 hub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
\r
171 hub.port=htons(hub.port);
\r
172 source_name=strdup(argv[s+1]);
\r
176 for(x=s;x<argc;x++) {
\r
178 fprintf(stderr,"%s\n",argv[x]);
\r
180 do_file(argv[x], strlen(argv[x]));
\r
182 } while (endless !=0); // will run forever if endless option is set
\r
183 close(fd); // make sure we wait till buffers are empty
\r
189 * do_file: dump the file out
\r
192 static void do_file(gs_sp_t filename, gs_int32_t fnlen) {
\r
193 gs_int32_t pipe, parserversion, schemalen;
\r
195 gs_int8_t cmd2[4096 + 128];
\r
196 static gs_int8_t *dbuf;
\r
199 if (fnlen > 3 && filename[fnlen - 3] == '.' &&
\r
200 filename[fnlen - 2] == 'g' &&
\r
201 filename[fnlen - 1] == 'z') {
\r
203 snprintf(cmd2, sizeof(cmd2), "gzcat %s", filename);
\r
204 input = popen(cmd2, "r");
\r
206 if (fnlen > 3 && filename[fnlen - 3] == 'b' &&
\r
207 filename[fnlen - 2] == 'z' &&
\r
208 filename[fnlen - 1] == '2') {
\r
210 snprintf(cmd2, sizeof(cmd2), "bzcat %s", filename);
\r
211 input = popen(cmd2, "r");
\r
214 input = fopen(filename, "r");
\r
219 perror("stream open");
\r
220 fprintf(stderr, "%s: cannot open %s\n", me, filename);
\r
224 if (fscanf(input, "GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n",
\r
225 &parserversion,&schemalen) != 2) {
\r
226 fprintf(stderr,"%s: cannot parse GDAT file header in '%s'\n",
\r
232 if (schematext == 0) {
\r
233 gs_uint8_t buf[1024];
\r
234 schematextlen = schemalen;
\r
235 schematext = malloc(schemalen);
\r
236 dbuf = malloc(CATBLOCKSZ);
\r
237 if (!schematext || !dbuf) {
\r
238 fprintf(stderr,"%s: malloc error reading GDAT file header in '%s'\n",
\r
242 if (fread(schematext, schemalen, 1, input) != 1) {
\r
243 fprintf(stderr,"%s: cannot parse-read GDAT file header in '%s'\n",
\r
247 sprintf((char *)buf,"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n", parserversion, schemalen);
\r
248 gs_write((gs_sp_t)buf,strlen((const char*)buf));
\r
249 gs_write(schematext, schemalen);
\r
251 schematmp = malloc(schemalen);
\r
253 fprintf(stderr,"%s: malloc error reading GDAT file header in '%s'\n",
\r
257 if (fread(schematmp, schemalen, 1, input) != 1) {
\r
258 fprintf(stderr,"%s: cannot parse-read GDAT file header in '%s'\n",
\r
263 // if (memcmp(schematext, schematmp, schematextlen)) {
\r
264 // fprintf(stderr,"%s: GDAT schema mis-match in file '%s'\n",
\r
270 while ((sz = fread(dbuf, 1, CATBLOCKSZ, input)) > 0) {
\r