Fixed newline characters throughout the code
[com/gs-lite.git] / src / tools / gsexit.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <app.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <stdio.h>
19 #include <unistd.h>
20 #include <signal.h>
21 #include <time.h>
22 #include <fcntl.h>
23 #include <sys/time.h>
24 #include <sys/stat.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include "errno.h"
29
30
31 #include "gsconfig.h"
32 #include "gstypes.h"
33 #include "gshub.h"
34 #include <schemaparser.h>
35
36
37
38 #define MAXSTRLEN 256
39 #define MAXNUMFIELDS 64
40 #define BUFSIZE 16*1024*1024
41
42
43 gs_sp_t me;     /* saved copy argv[0] */
44
45 gs_uint32_t socket_desc;
46 gs_uint32_t verbose=0;
47 gs_uint32_t parserversion;
48 gs_uint32_t withtrace=0;
49
50 struct FTA_state {
51     FTAID fta_id;
52     gs_schemahandle_t schema;
53     gs_sp_t asciischema;
54     gs_int32_t numfields;
55 };
56
57 struct FTA_state fs;
58
59 void hand(int iv) {
60     ftaapp_exit();
61     gslog(LOG_NOTICE, "exiting via signal handler %d...\n", iv);
62     exit(0);
63 }
64
65 void timeouthand(int iv) {
66     ftaapp_exit();
67     //  if (s->verbose!=0) fprintf(stderr, "exiting because of timeout...\n");
68     exit(0);
69 }
70
71
72 static void gs_write(gs_sp_t buffer, gs_uint32_t len)
73 {
74     if (send(socket_desc,buffer,len,0) != len) {
75         gslog(LOG_EMERG,"could not write on stream socket");
76         exit(0);
77     }
78 }
79
80 static void print_usage_exit(gs_sp_t reason) {
81     fprintf(stderr,
82             "%s::error: %s\n"
83             "%s::usage: %s -v -t -h <gshub-hostname>:<gshub-port> <gsinstance_name>  <query_name>  <data_sink_name>\n"
84             , me, reason, me, me);
85     exit(1);
86 }
87
88
89
90
91 static void init(gs_int32_t argc, gs_sp_t argv[]) {
92     void *pblk;
93     gs_int32_t x, y, schema, pblklen, lcv;
94     gs_sp_t c;
95     gs_int8_t name[1024];
96     gs_sp_t instance_name;
97     gs_sp_t data_sink_name;
98     gs_sp_t query_name;
99     endpoint gshub;
100     endpoint data_sink;
101     endpoint dummyep;
102     gs_uint32_t tip1,tip2,tip3,tip4;
103     struct sockaddr_in server;
104     
105     if( (argc!=4) ) {
106         print_usage_exit("Wrong number of paramters");
107     }
108     sprintf(name,"gsexit: %s %s %s %s ",argv[0],argv[1],argv[2],argv[3]);
109     
110     gsopenlog(name);
111     
112     
113     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
114         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
115         exit(1);
116     }
117     
118     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
119     gshub.port=htons(gshub.port);
120     instance_name=strdup(argv[1]);
121     query_name=strdup(argv[2]);
122     data_sink_name=strdup(argv[3]);
123     
124     if (set_hub(gshub)!=0) {
125         gslog(LOG_EMERG,"Could not set hub");
126         exit(1);
127     }
128     if (set_instance_name(instance_name)!=0) {
129         gslog(LOG_EMERG,"Could not set instance name");
130         exit(1);
131     }
132     
133     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
134         gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
135     }
136     
137     if (get_streamsink(gshub,data_sink_name,&data_sink,1) !=0 ) {
138         gslog(LOG_EMERG,"Could not find data sink");
139         exit(0);
140     }
141     
142     if (ftaapp_init(BUFSIZE)!=0) {
143         gslog(LOG_EMERG,"%s::error:could not initialize gscp\n", me);
144         exit(1);
145     }
146     
147     signal(SIGTERM, hand);
148     signal(SIGINT, hand);
149     signal(SIGPIPE, hand);
150     
151     if (verbose!=0) gslog(LOG_DEBUG,"Initializing FTAs\n");
152     
153     pblk = 0;
154     pblklen = 0;
155     
156     fs.fta_id=ftaapp_add_fta(query_name,pblk?0:1,pblk?0:1,0,pblklen,pblk);
157     if (fs.fta_id.streamid==0){
158         gslog(LOG_EMERG,"%s::error:could not initialize fta %s\n",
159               me,query_name);
160         exit(1);
161     }
162     
163     if ((c=ftaapp_get_fta_ascii_schema_by_name(query_name))==0){
164         gslog(LOG_EMERG,"%s::error:could not get ascii schema for %s\n",
165               me,query_name);
166         exit(1);
167     }
168     
169     //ftaapp_get_fta_ascii_schema_by_name uses static buffer so make a copy
170     fs.asciischema=strdup(c);
171     
172     
173     // Set parser version here
174     parserversion=get_schemaparser_version();
175     
176     if ((fs.schema=ftaapp_get_fta_schema(fs.fta_id))<0) {
177         gslog(LOG_EMERG,"%s::error:could not get schema for query\n",
178               me,query_name);
179         exit(1);
180     }
181     
182     // Use all available fields
183     if ((fs.numfields=ftaschema_tuple_len(fs.schema))<0) {
184         gslog(LOG_EMERG,"%s::error:could not get number of fields for query %s\n",
185               me,query_name);
186         exit(1);
187     }
188     
189     // Important that we only open the socket to the data sink AFTER we have subscribed to the output query as it uses it as a signal
190     
191     socket_desc = socket(AF_INET , SOCK_STREAM , 0);
192     if (socket_desc == -1)
193     {
194         gslog(LOG_EMERG,"ERROR:could not create socket for data stream");
195         exit(0);
196     }
197     server.sin_addr.s_addr = data_sink.ip;
198     server.sin_family = AF_INET;
199     server.sin_port = data_sink.port;
200     
201     if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
202     {
203         gslog(LOG_EMERG,"ERROR: could not open connection to data source");
204         exit(0);
205     }
206     if (set_streamsubscription(gshub,instance_name,data_sink_name) !=0 ) {
207         gslog(LOG_EMERG,"Could not announce streamsubscription for exit process");
208         exit(0);
209     }
210   
211     
212 }
213
214
215 static void process_data()
216 {
217     gs_uint32_t nsz;
218     FTAID rfta_id;
219     gs_uint32_t rsize;
220     gs_int32_t code;
221     gs_int8_t rbuf[2*MAXTUPLESZ];
222         gs_int8_t topb[1024];
223     
224     
225     if (verbose!=0) gslog(LOG_INFO,"Getting Data");
226     
227         sprintf(&topb[0],"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n",
228                         parserversion,(unsigned int)strlen(fs.asciischema)+1);
229     gs_write(&topb[0],strlen(topb));
230     gs_write(fs.asciischema,strlen(fs.asciischema)+1);
231     
232     
233     
234     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
235         nsz=htonl(rsize);
236         if ((withtrace==0)&&(code==2)) continue;
237         if (verbose) {
238             if (ftaschema_is_eof_tuple(fs.schema, rbuf)) {
239                 /* initiate shutdown or something of that nature */
240                 gslog(LOG_INFO,"gsexit::All data proccessed\n");
241             }
242         }
243         gs_write((gs_sp_t)&nsz,sizeof(gs_uint32_t));
244         gs_write(rbuf,rsize);
245         }
246 }
247
248
249
250 int main(int argc, char** argv) {
251     gs_int32_t ch;
252     me = argv[0];
253     
254     while ((ch = getopt(argc, argv, "hvt")) != -1) {
255         switch(ch) {
256             case 'h':
257                 print_usage_exit("help");
258                 break;
259             case 'v':
260                 verbose=1;
261                 break;
262             case 't':
263                 withtrace=1;
264                 break;
265             default:
266                 break;
267         }
268     }
269     
270     argc -= optind;
271     argv += optind;
272     
273     /* initialize host library and the sgroup  */
274     
275     if (argc<=1) {
276         print_usage_exit("Not enough arguments");
277     }
278     
279     init(argc, argv);
280     
281     process_data();
282     
283     gslog(LOG_EMERG,"%s::internal error reached unexpected end", me);
284 }
285