0ddb8fceefbe05fea586ade0b342ff2144e00c4c
[com/gs-lite.git] / src / tools / gssource.c
1 /* ------------------------------------------------\r
2  Copyright 2014 AT&T Intellectual Property\r
3  Licensed under the Apache License, Version 2.0 (the "License");\r
4  you may not use this file except in compliance with the License.\r
5  You may obtain a copy of the License at\r
6  \r
7  http://www.apache.org/licenses/LICENSE-2.0\r
8  \r
9  Unless required by applicable law or agreed to in writing, software\r
10  distributed under the License is distributed on an "AS IS" BASIS,\r
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12  See the License for the specific language governing permissions and\r
13  limitations under the License.\r
14  ------------------------------------------- */\r
15 #include <stdlib.h>\r
16 #include <stdio.h>\r
17 #include <unistd.h>\r
18 #include <signal.h>\r
19 #include <time.h>\r
20 #include <string.h>\r
21 #include <sys/time.h>\r
22 #include <sys/stat.h>\r
23 #include <sys/types.h>\r
24 #include <sys/socket.h>\r
25 #include <netinet/in.h>\r
26 \r
27 #include "gsconfig.h"\r
28 #include "gstypes.h"\r
29 #include "gshub.h"\r
30 \r
31 \r
32 \r
33 \r
34 gs_sp_t me = 0;\r
35 gs_sp_t schematext = 0;\r
36 gs_int32_t schematextlen = 0;\r
37 gs_sp_t schematmp = 0;\r
38 gs_int32_t verbose=0;\r
39 gs_uint32_t tcpport=0;\r
40 int listensockfd=0;\r
41 int fd=0;\r
42 endpoint hub;\r
43 endpoint ds;\r
44 gs_sp_t source_name;\r
45 \r
46 static void gs_write(gs_sp_t buffer, gs_uint32_t len)\r
47 {\r
48     if (send(fd,buffer,len,0) != len) {\r
49         fprintf(stderr,"could not write on stream socket");\r
50         exit(0);\r
51     }\r
52 }\r
53 \r
54 static void wait_for_feed() {\r
55     struct sockaddr_in serv_addr,cli_addr;\r
56     struct sockaddr_in sin;\r
57     socklen_t clilen;\r
58     socklen_t sin_sz;\r
59     if (listensockfd==0) {\r
60                 gs_int32_t on = 1;\r
61         \r
62         if (verbose) {\r
63             fprintf(stderr,"Create listen socket for port %u\n",tcpport);\r
64         }\r
65                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);\r
66         if (listensockfd < 0) {\r
67                         fprintf(stderr,"Error:Could not create socket for tcp data stream");\r
68                         exit(1);\r
69                 }\r
70                 bzero((char *) &serv_addr, sizeof(serv_addr));\r
71                 serv_addr.sin_family = AF_INET;\r
72                 serv_addr.sin_addr.s_addr = INADDR_ANY;\r
73                 serv_addr.sin_port = htons(tcpport);\r
74 #ifndef __linux__\r
75         /* make sure we can reuse the common port rapidly */\r
76         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,\r
77                        (gs_sp_t )&on, sizeof(on)) != 0) {\r
78             fprintf(stderr,"Error::could not set socket option\n");\r
79             exit(1);\r
80         }\r
81 #endif\r
82         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,\r
83                        (gs_sp_t )&on, sizeof(on)) != 0) {\r
84             fprintf(stderr,"Error::could not set socket option\n");\r
85             exit(1);\r
86                 }\r
87         \r
88                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,\r
89                  sizeof(serv_addr)) < 0) {\r
90                         fprintf(stderr,"Error:Could not bind socket for tcp data stream");\r
91             exit(1);\r
92         }\r
93         }\r
94     \r
95     if (verbose) {\r
96         fprintf(stderr,"Socket created waiting for data producer\n");\r
97     }\r
98     if (listen(listensockfd,5)< 0) {\r
99         fprintf(stderr,"Error::could not listen to socket for port %u \n",ntohs(serv_addr.sin_port));\r
100         close(listensockfd);\r
101         exit(1);\r
102     }\r
103     sin_sz=sizeof(sin);\r
104     if (getsockname(listensockfd, (struct sockaddr *) &sin, &sin_sz) < 0) {\r
105         fprintf(stderr,"Error::could not get local port number of listen socket\n");\r
106         exit(1);\r
107     }\r
108     ds.ip=htonl(127<<24|1);\r
109     ds.port=sin.sin_port;\r
110     if (set_streamsource(hub,source_name,ds)!=0) {\r
111         fprintf(stderr,"Error::could not set source in GSHUB for %s source name\n",source_name);\r
112         exit(1);\r
113     }\r
114     \r
115         do {\r
116                 clilen = sizeof(cli_addr);\r
117                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);\r
118                 if (fd<0) {\r
119             fprintf(stderr,"Error:Could not accept connection on tcp socket\n");\r
120                 }\r
121         } while (fd==0);\r
122     if (verbose) {\r
123         fprintf(stderr,"Sink found ready to rock!\n");\r
124     }\r
125     \r
126 }\r
127 \r
128 \r
129 static void do_file(gs_sp_t filename, gs_int32_t fnlen);\r
130 \r
131 int main(int argc, char** argv) {\r
132     gs_int32_t x;\r
133     gs_int32_t s=0;\r
134     gs_int32_t ch;\r
135     gs_int32_t endless=0; // repeats files forever\r
136     gs_uint32_t tip1,tip2,tip3,tip4;\r
137     while ((ch = getopt(argc, argv, "hxep:")) != -1) {\r
138         switch(ch) {\r
139             case 'h':\r
140                 fprintf(stderr,"%s::usage: %s -x -v -e -p <portnumber> <IP>:<port> <source_name> <gdatfiles...>\n",argv[0],argv[0]);\r
141                 exit(0);\r
142                 break;\r
143             case 'p':\r
144                 tcpport=atoi(optarg);;\r
145                 break;\r
146             case 'v':\r
147                 verbose=1;\r
148                 break;\r
149             case 'x':\r
150                 verbose=2;\r
151                 break;\r
152             case 'e':\r
153                 endless=1;\r
154                 break;\r
155             default:\r
156                 break;\r
157         }\r
158     }\r
159     s+=optind;\r
160     if (s+2>argc) {\r
161         fprintf(stderr,"Could not find hub and stream source name on command line\n");\r
162         fprintf(stderr,"%s::usage: %s -x -v -e -p <portnumber> <IP>:<port> <source_name> <gdatfiles...>\n",argv[0],argv[0]);\r
163         exit(1);\r
164     }\r
165     if (sscanf(argv[s],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(hub.port))!= 5 ) {\r
166         fprintf(stderr,"Could not parse hub endpoint\n");\r
167         fprintf(stderr,"%s::usage: %s -x -v -e -p <portnumber> <IP>:<port> <source_name> <gdatfiles...>\n",argv[0],argv[0]);\r
168         exit(1);\r
169     }\r
170     hub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\r
171     hub.port=htons(hub.port);\r
172     source_name=strdup(argv[s+1]);\r
173     s+=2;\r
174     wait_for_feed();\r
175     do {\r
176         for(x=s;x<argc;x++) {\r
177             if (verbose) {\r
178                 fprintf(stderr,"%s\n",argv[x]);\r
179             }\r
180             do_file(argv[x], strlen(argv[x]));\r
181         }\r
182     } while (endless !=0); // will run forever if endless option is set\r
183     close(fd); // make sure we wait till buffers are empty\r
184     return 0;\r
185 }\r
186 \r
187 \r
188 /*\r
189  * do_file: dump the file out\r
190  */\r
191 \r
192 static void do_file(gs_sp_t filename, gs_int32_t fnlen) {\r
193     gs_int32_t pipe, parserversion, schemalen;\r
194     FILE *input;\r
195     gs_int8_t cmd2[4096 + 128];\r
196     static gs_int8_t *dbuf;\r
197     size_t sz;\r
198     \r
199     if (fnlen > 3 && filename[fnlen - 3] == '.' &&\r
200         filename[fnlen - 2] == 'g' &&\r
201         filename[fnlen - 1] == 'z') {\r
202         pipe = 1;\r
203         snprintf(cmd2, sizeof(cmd2), "gzcat %s", filename);\r
204         input = popen(cmd2, "r");\r
205     } else {\r
206         if (fnlen > 3 && filename[fnlen - 3] == 'b' &&\r
207             filename[fnlen - 2] == 'z' &&\r
208             filename[fnlen - 1] == '2') {\r
209             pipe = 1;\r
210             snprintf(cmd2, sizeof(cmd2), "bzcat %s", filename);\r
211             input = popen(cmd2, "r");\r
212         } else {\r
213             pipe=0;\r
214             input = fopen(filename, "r");\r
215         }\r
216     }\r
217     \r
218     if (!input) {\r
219         perror("stream open");\r
220         fprintf(stderr, "%s: cannot open %s\n", me, filename);\r
221         return;\r
222     }\r
223     \r
224     if (fscanf(input, "GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n",\r
225                &parserversion,&schemalen) != 2) {\r
226         fprintf(stderr,"%s: cannot parse GDAT file header in '%s'\n",\r
227                 me, filename);\r
228         exit(1);\r
229     }\r
230     \r
231     /* first time ? */\r
232     if (schematext == 0) {\r
233         gs_uint8_t buf[1024];\r
234         schematextlen = schemalen;\r
235         schematext = malloc(schemalen);\r
236         dbuf = malloc(CATBLOCKSZ);\r
237         if (!schematext  || !dbuf) {\r
238             fprintf(stderr,"%s: malloc error reading GDAT file header in '%s'\n",\r
239                     me, filename);\r
240             exit(1);\r
241         }\r
242         if (fread(schematext, schemalen, 1, input) != 1) {\r
243             fprintf(stderr,"%s: cannot parse-read GDAT file header in '%s'\n",\r
244                     me, filename);\r
245             exit(1);\r
246         }\r
247         sprintf((char *)buf,"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n", parserversion, schemalen);\r
248         gs_write((gs_sp_t)buf,strlen((const char*)buf));\r
249         gs_write(schematext, schemalen);\r
250     } else {\r
251         schematmp = malloc(schemalen);\r
252         if (!schematmp ) {\r
253             fprintf(stderr,"%s: malloc error reading GDAT file header in '%s'\n",\r
254                     me, filename);\r
255             exit(1);\r
256         }\r
257         if (fread(schematmp, schemalen, 1, input) != 1) {\r
258             fprintf(stderr,"%s: cannot parse-read GDAT file header in '%s'\n",\r
259                     me, filename);\r
260             exit(1);\r
261         }\r
262         free(schematmp);\r
263         //   if (memcmp(schematext, schematmp, schematextlen)) {\r
264         //     fprintf(stderr,"%s: GDAT schema mis-match in file '%s'\n",\r
265         //             me, filename);\r
266         //     exit(1);\r
267         //   }\r
268     }\r
269     \r
270     while ((sz = fread(dbuf, 1, CATBLOCKSZ, input)) > 0) {\r
271         gs_write(dbuf,sz);\r
272     }\r
273     \r
274     if (pipe) {\r
275         pclose(input);\r
276     } else {\r
277         fclose(input);\r
278     }\r
279     \r
280 }\r