Added quantiling UDAFs
[com/gs-lite.git] / src / tools / gsexit.c
1 /* ------------------------------------------------\r
2  Copyright 2014 AT&T Intellectual Property\r
3  Licensed under the Apache License, Version 2.0 (the "License");\r
4  you may not use this file except in compliance with the License.\r
5  You may obtain a copy of the License at\r
6  \r
7  http://www.apache.org/licenses/LICENSE-2.0\r
8  \r
9  Unless required by applicable law or agreed to in writing, software\r
10  distributed under the License is distributed on an "AS IS" BASIS,\r
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12  See the License for the specific language governing permissions and\r
13  limitations under the License.\r
14  ------------------------------------------- */\r
15 #include <app.h>\r
16 #include <stdlib.h>\r
17 #include <string.h>\r
18 #include <stdio.h>\r
19 #include <unistd.h>\r
20 #include <signal.h>\r
21 #include <time.h>\r
22 #include <fcntl.h>\r
23 #include <sys/time.h>\r
24 #include <sys/stat.h>\r
25 #include <sys/types.h>\r
26 #include <sys/socket.h>\r
27 #include <netinet/in.h>\r
28 #include "errno.h"\r
29 \r
30 \r
31 #include "gsconfig.h"\r
32 #include "gstypes.h"\r
33 #include "gshub.h"\r
34 #include <schemaparser.h>\r
35 \r
36 \r
37 \r
38 #define MAXSTRLEN 256\r
39 #define MAXNUMFIELDS 64\r
40 #define BUFSIZE 16*1024*1024\r
41 \r
42 \r
43 gs_sp_t me;     /* saved copy argv[0] */\r
44 \r
45 gs_uint32_t socket_desc;\r
46 gs_uint32_t verbose=0;\r
47 gs_uint32_t parserversion;\r
48 gs_uint32_t withtrace=0;\r
49 \r
50 struct FTA_state {\r
51     FTAID fta_id;\r
52     gs_schemahandle_t schema;\r
53     gs_sp_t asciischema;\r
54     gs_int32_t numfields;\r
55 };\r
56 \r
57 struct FTA_state fs;\r
58 \r
59 void hand(int iv) {\r
60     ftaapp_exit();\r
61     gslog(LOG_NOTICE, "exiting via signal handler %d...\n", iv);\r
62     exit(0);\r
63 }\r
64 \r
65 void timeouthand(int iv) {\r
66     ftaapp_exit();\r
67     //  if (s->verbose!=0) fprintf(stderr, "exiting because of timeout...\n");\r
68     exit(0);\r
69 }\r
70 \r
71 \r
72 static void gs_write(gs_sp_t buffer, gs_uint32_t len)\r
73 {\r
74     if (send(socket_desc,buffer,len,0) != len) {\r
75         gslog(LOG_EMERG,"could not write on stream socket");\r
76         exit(0);\r
77     }\r
78 }\r
79 \r
80 static void print_usage_exit(gs_sp_t reason) {\r
81     fprintf(stderr,\r
82             "%s::error: %s\n"\r
83             "%s::usage: %s -v -t -h <gshub-hostname>:<gshub-port> <gsinstance_name>  <query_name>  <data_sink_name>\n"\r
84             , me, reason, me, me);\r
85     exit(1);\r
86 }\r
87 \r
88 \r
89 \r
90 \r
91 static void init(gs_int32_t argc, gs_sp_t argv[]) {\r
92     void *pblk;\r
93     gs_int32_t x, y, schema, pblklen, lcv;\r
94     gs_sp_t c;\r
95     gs_int8_t name[1024];\r
96     gs_sp_t instance_name;\r
97     gs_sp_t data_sink_name;\r
98     gs_sp_t query_name;\r
99     endpoint gshub;\r
100     endpoint data_sink;\r
101     endpoint dummyep;\r
102     gs_uint32_t tip1,tip2,tip3,tip4;\r
103     struct sockaddr_in server;\r
104     \r
105     if( (argc!=4) ) {\r
106         print_usage_exit("Wrong number of paramters");\r
107     }\r
108     sprintf(name,"gsexit: %s %s %s %s ",argv[0],argv[1],argv[2],argv[3]);\r
109     \r
110     gsopenlog(name);\r
111     \r
112     \r
113     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {\r
114         gslog(LOG_EMERG,"HUB IP NOT DEFINED");\r
115         exit(1);\r
116     }\r
117     \r
118     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\r
119     gshub.port=htons(gshub.port);\r
120     instance_name=strdup(argv[1]);\r
121     query_name=strdup(argv[2]);\r
122     data_sink_name=strdup(argv[3]);\r
123     \r
124     if (set_hub(gshub)!=0) {\r
125         gslog(LOG_EMERG,"Could not set hub");\r
126         exit(1);\r
127     }\r
128     if (set_instance_name(instance_name)!=0) {\r
129         gslog(LOG_EMERG,"Could not set instance name");\r
130         exit(1);\r
131     }\r
132     \r
133     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {\r
134         gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");\r
135     }\r
136     \r
137     if (get_streamsink(gshub,data_sink_name,&data_sink,1) !=0 ) {\r
138         gslog(LOG_EMERG,"Could not find data sink");\r
139         exit(0);\r
140     }\r
141     \r
142     if (ftaapp_init(BUFSIZE)!=0) {\r
143         gslog(LOG_EMERG,"%s::error:could not initialize gscp\n", me);\r
144         exit(1);\r
145     }\r
146     \r
147     signal(SIGTERM, hand);\r
148     signal(SIGINT, hand);\r
149     signal(SIGPIPE, hand);\r
150     \r
151     if (verbose!=0) gslog(LOG_DEBUG,"Initializing FTAs\n");\r
152     \r
153     pblk = 0;\r
154     pblklen = 0;\r
155     \r
156     fs.fta_id=ftaapp_add_fta(query_name,pblk?0:1,pblk?0:1,0,pblklen,pblk);\r
157     if (fs.fta_id.streamid==0){\r
158         gslog(LOG_EMERG,"%s::error:could not initialize fta %s\n",\r
159               me,query_name);\r
160         exit(1);\r
161     }\r
162     \r
163     if ((c=ftaapp_get_fta_ascii_schema_by_name(query_name))==0){\r
164         gslog(LOG_EMERG,"%s::error:could not get ascii schema for %s\n",\r
165               me,query_name);\r
166         exit(1);\r
167     }\r
168     \r
169     //ftaapp_get_fta_ascii_schema_by_name uses static buffer so make a copy\r
170     fs.asciischema=strdup(c);\r
171     \r
172     \r
173     // Set parser version here\r
174     parserversion=get_schemaparser_version();\r
175     \r
176     if ((fs.schema=ftaapp_get_fta_schema(fs.fta_id))<0) {\r
177         gslog(LOG_EMERG,"%s::error:could not get schema for query\n",\r
178               me,query_name);\r
179         exit(1);\r
180     }\r
181     \r
182     // Use all available fields\r
183     if ((fs.numfields=ftaschema_tuple_len(fs.schema))<0) {\r
184         gslog(LOG_EMERG,"%s::error:could not get number of fields for query %s\n",\r
185               me,query_name);\r
186         exit(1);\r
187     }\r
188     \r
189     // Important that we only open the socket to the data sink AFTER we have subscribed to the output query as it uses it as a signal\r
190     \r
191     socket_desc = socket(AF_INET , SOCK_STREAM , 0);\r
192     if (socket_desc == -1)\r
193     {\r
194         gslog(LOG_EMERG,"ERROR:could not create socket for data stream");\r
195         exit(0);\r
196     }\r
197     server.sin_addr.s_addr = data_sink.ip;\r
198     server.sin_family = AF_INET;\r
199     server.sin_port = data_sink.port;\r
200     \r
201     if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)\r
202     {\r
203         gslog(LOG_EMERG,"ERROR: could not open connection to data source");\r
204         exit(0);\r
205     }\r
206     if (set_streamsubscription(gshub,instance_name,data_sink_name) !=0 ) {\r
207         gslog(LOG_EMERG,"Could not announce streamsubscription for exit process");\r
208         exit(0);\r
209     }\r
210   \r
211     \r
212 }\r
213 \r
214 \r
215 static void process_data()\r
216 {\r
217     gs_uint32_t nsz;\r
218     FTAID rfta_id;\r
219     gs_uint32_t rsize;\r
220     gs_int32_t code;\r
221     gs_int8_t rbuf[2*MAXTUPLESZ];\r
222         gs_int8_t topb[1024];\r
223     \r
224     \r
225     if (verbose!=0) gslog(LOG_INFO,"Getting Data");\r
226     \r
227         sprintf(&topb[0],"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n",\r
228                         parserversion,(unsigned int)strlen(fs.asciischema)+1);\r
229     gs_write(&topb[0],strlen(topb));\r
230     gs_write(fs.asciischema,strlen(fs.asciischema)+1);\r
231     \r
232     \r
233     \r
234     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {\r
235         nsz=htonl(rsize);\r
236         if ((withtrace==0)&&(code==2)) continue;\r
237         if (verbose) {\r
238             if (ftaschema_is_eof_tuple(fs.schema, rbuf)) {\r
239                 /* initiate shutdown or something of that nature */\r
240                 gslog(LOG_INFO,"gsexit::All data proccessed\n");\r
241             }\r
242         }\r
243         gs_write((gs_sp_t)&nsz,sizeof(gs_uint32_t));\r
244         gs_write(rbuf,rsize);\r
245         }\r
246 }\r
247 \r
248 \r
249 \r
250 int main(int argc, char** argv) {\r
251     gs_int32_t ch;\r
252     me = argv[0];\r
253     \r
254     while ((ch = getopt(argc, argv, "hvt")) != -1) {\r
255         switch(ch) {\r
256             case 'h':\r
257                 print_usage_exit("help");\r
258                 break;\r
259             case 'v':\r
260                 verbose=1;\r
261                 break;\r
262             case 't':\r
263                 withtrace=1;\r
264                 break;\r
265             default:\r
266                 break;\r
267         }\r
268     }\r
269     \r
270     argc -= optind;\r
271     argv += optind;\r
272     \r
273     /* initialize host library and the sgroup  */\r
274     \r
275     if (argc<=1) {\r
276         print_usage_exit("Not enough arguments");\r
277     }\r
278     \r
279     init(argc, argv);\r
280     \r
281     process_data();\r
282     \r
283     gslog(LOG_EMERG,"%s::internal error reached unexpected end", me);\r
284 }\r
285 \r