1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netinet/in.h>
27 #include <schemaparser.h>
32 #define MAXLINE 100000
33 static unsigned tcpport=0;
34 static char linebuf[MAXLINE];
43 static void wait_for_feed() {
44 struct sockaddr_in serv_addr,cli_addr;
46 struct sockaddr_in sin;
48 if (listensockfd==0) {
52 fprintf(stderr,"Create listen socket for port %u\n",tcpport);
54 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
55 if (listensockfd < 0) {
56 gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
59 bzero((char *) &serv_addr, sizeof(serv_addr));
60 serv_addr.sin_family = AF_INET;
61 serv_addr.sin_addr.s_addr = INADDR_ANY;
62 serv_addr.sin_port = htons(tcpport);
64 /* make sure we can reuse the common port rapidly */
65 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
66 (gs_sp_t )&on, sizeof(on)) != 0) {
67 gslog(LOG_EMERG,"Error::could not set socket option\n");
71 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
72 (gs_sp_t )&on, sizeof(on)) != 0) {
73 gslog(LOG_EMERG,"Error::could not set socket option\n");
77 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
78 sizeof(serv_addr)) < 0) {
79 gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
85 fprintf(stderr,"Socket created waiting for data producer\n");
87 if (listen(listensockfd,5)< 0) {
88 fprintf(stderr,"Error::could not listen to socket for port %u \n",ntohs(serv_addr.sin_port));
93 if (getsockname(listensockfd, (struct sockaddr *) &sin, &sin_sz) < 0) {
94 fprintf(stderr,"Error::could not get local port number of listen socket\n");
97 ds.ip=htonl(127<<24|1);
99 if (set_streamsink(hub,sink_name,ds)!=0) {
100 fprintf(stderr,"Error::could not set sink in GSHUB for %s source name\n",sink_name);
105 clilen = sizeof(cli_addr);
106 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
108 gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
112 fprintf(stderr,"Producer found ready to rock!\n");
118 static void gs_read(gs_sp_t buffer, gs_uint32_t length){
122 while(used < length) {
124 fprintf(stderr,"\tread %u out of %u\n",used,length);
126 if ((cur=read(fd,&(buffer[used]),length-used))<=0) {
127 gslog(LOG_EMERG,"ERROR:could not read data from gdat stream");
134 static void gs_read_line(gs_sp_t buffer, gs_uint32_t length){
138 while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {
139 if ((cur=read(fd,&(buffer[used]),1))<=0) {
140 gslog(LOG_EMERG,"ERROR:could not read data from gdat stream");
148 static gs_uint32_t get_stream_tuple(gs_uint32_t * rsize,gs_sp_t rbuf,gs_uint32_t bufsz){
151 gs_read((gs_sp_t)&nsz,sizeof(gs_uint32_t));
154 gslog(LOG_EMERG,"Error::Illegal tuple received");
159 gslog(LOG_EMERG,"Error::Illegal tuple size received -- to long");
168 int main(int argc, char** argv) {
169 gs_uint32_t streamid;
170 gs_schemahandle_t schema;
172 gs_uint32_t rstreamid;
174 gs_int8_t rbuf[2*MAXTUPLESZ];
176 gs_int32_t numberoffields;
178 gs_int32_t parserversion;
179 gs_uint32_t schemalen;
185 gs_uint32_t tip1,tip2,tip3,tip4;
187 /* initialize host library and the sgroup */
191 while ((ch = getopt(argc, argv, "hvxp:")) != -1) {
194 fprintf(stderr,"%s::usage: %s -v -x -p <port> <IP>:<port> <source_name> \n",argv[0],argv[0]);
198 tcpport=atoi(optarg);;
211 fprintf(stderr,"Could not find hub and stream source name on command line\n");
214 if (sscanf(argv[optind],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(hub.port))!= 5 ) {
215 fprintf(stderr,"Could not parse hub endpoint\n");
218 hub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
219 hub.port=htons(hub.port);
220 sink_name=strdup(argv[optind+1]);
224 gs_read_line(buf,1024);
225 if (strncmp(buf,"GDAT",4)!=0) {
226 gslog(LOG_EMERG,"ERROR: not a GDAT stream\n");
229 gs_read_line(buf,1024);
230 if (sscanf(buf,"VERSION:%u\n",&parserversion)!=1) {
231 gslog(LOG_EMERG,"ERROR: no GDAT VERSION given\n");
234 gs_read_line(buf,1024);
235 if (sscanf(buf,"SCHEMALENGTH:%u\n",&schemalen)!=1) {
236 gslog(LOG_EMERG,"ERROR: no GDAT SCHEMALENGTH given\n");
241 fprintf (stderr,"GDAT version %u schemalength %u\n",parserversion,schemalen);
244 if (schemaparser_accepts_version(parserversion)!=1) {
245 fprintf(stderr,"%s::error: wrong parser version %u for file %s\n",
246 me,parserversion,argv[0]);
250 if ((asciischema=malloc(schemalen))==0) {
251 fprintf(stderr,"%s::error: could not allocate schema buffer of sz %u "
253 me,schemalen,argv[0]);
258 fprintf (stderr,"Reading schema from scocket\n");
261 gs_read(asciischema,schemalen);
264 fprintf (stderr,"Received schema\n");
268 fprintf(stderr,"%s\n",asciischema);
270 if ((schema=ftaschema_parse_string(asciischema))<0) {
271 fprintf(stderr,"%s::error: could not parse schema \n",
276 if ((numberoffields=ftaschema_tuple_len(schema))<0) {
277 fprintf(stderr,"%s::error:could not get number of fields in schema\n",
282 for(y=0; y<numberoffields;y++) {
283 printf("%s",ftaschema_field_name(schema,y));
284 if (y<numberoffields-1) printf("|");
288 while(get_stream_tuple(&rsize,rbuf,2*MAXTUPLESZ)==0) {
290 if (ftaschema_is_eof_tuple(schema, rbuf)) {
291 /* initiate shutdown or something of that nature */
292 fprintf(stderr,"%s::All data proccessed\n",me);
294 if (ftaschema_is_temporal_tuple(schema, rbuf)) {
295 /* initiate shutdown or something of that nature */
296 fprintf(stderr,"%s:: temporal tuple\n",me);
298 fprintf(stderr,"%s:: regular tuple\n",me);
305 for(y=0; y<numberoffields;y++) {
306 struct access_result ar;
308 printf("%s->",ftaschema_field_name(schema,y));
309 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
310 switch (ar.field_data_type) {
315 printf("%u",ar.r.ui);
318 printf("%u.%u.%u.%u",ar.r.ui>>24&0xff,
327 for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
330 unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]);
332 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
334 if (x<7) printf(":");
342 printf("%hu",(unsigned short)ar.r.ui);
361 printf("%lld",ar.r.l);
380 src=(gs_sp_t)ar.r.vs.offset;
381 if ((ar.r.vs.length>0) && (src[ar.r.vs.length-1]==0)) {
382 ar.r.vs.length = ar.r.vs.length-1;
384 for(x=0;x<ar.r.vs.length;x++) {
386 if (((c<='~') && (c>=' '))&&(c!='|')) {
389 printf("(0x%x)",(gs_uint8_t)c);
397 if (y<numberoffields-1) printf("|");
400 if (verbose!=0) fflush(stdout);