1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
34 #include <schemaparser.h>
39 #define MAXNUMFIELDS 64
40 #define BUFSIZE 16*1024*1024
43 gs_sp_t me; /* saved copy argv[0] */
45 gs_uint32_t socket_desc;
46 gs_uint32_t verbose=0;
47 gs_uint32_t parserversion;
48 gs_uint32_t withtrace=0;
52 gs_schemahandle_t schema;
61 gslog(LOG_NOTICE, "exiting via signal handler %d...\n", iv);
65 void timeouthand(int iv) {
67 // if (s->verbose!=0) fprintf(stderr, "exiting because of timeout...\n");
72 static void gs_write(gs_sp_t buffer, gs_uint32_t len)
74 if (send(socket_desc,buffer,len,0) != len) {
75 gslog(LOG_EMERG,"could not write on stream socket");
80 static void print_usage_exit(gs_sp_t reason) {
83 "%s::usage: %s -v -t -h <gshub-hostname>:<gshub-port> <gsinstance_name> <query_name> <data_sink_name>\n"
84 , me, reason, me, me);
91 static void init(gs_int32_t argc, gs_sp_t argv[]) {
93 gs_int32_t x, y, schema, pblklen, lcv;
96 gs_sp_t instance_name;
97 gs_sp_t data_sink_name;
102 gs_uint32_t tip1,tip2,tip3,tip4;
103 struct sockaddr_in server;
106 print_usage_exit("Wrong number of paramters");
108 sprintf(name,"gsexit: %s %s %s %s ",argv[0],argv[1],argv[2],argv[3]);
113 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
114 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
118 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
119 gshub.port=htons(gshub.port);
120 instance_name=strdup(argv[1]);
121 query_name=strdup(argv[2]);
122 data_sink_name=strdup(argv[3]);
124 if (set_hub(gshub)!=0) {
125 gslog(LOG_EMERG,"Could not set hub");
128 if (set_instance_name(instance_name)!=0) {
129 gslog(LOG_EMERG,"Could not set instance name");
133 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
134 gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
137 if (get_streamsink(gshub,data_sink_name,&data_sink,1) !=0 ) {
138 gslog(LOG_EMERG,"Could not find data sink");
142 if (ftaapp_init(BUFSIZE)!=0) {
143 gslog(LOG_EMERG,"%s::error:could not initialize gscp\n", me);
147 signal(SIGTERM, hand);
148 signal(SIGINT, hand);
149 signal(SIGPIPE, hand);
151 if (verbose!=0) gslog(LOG_DEBUG,"Initializing FTAs\n");
156 fs.fta_id=ftaapp_add_fta(query_name,pblk?0:1,pblk?0:1,0,pblklen,pblk);
157 if (fs.fta_id.streamid==0){
158 gslog(LOG_EMERG,"%s::error:could not initialize fta %s\n",
163 if ((c=ftaapp_get_fta_ascii_schema_by_name(query_name))==0){
164 gslog(LOG_EMERG,"%s::error:could not get ascii schema for %s\n",
169 //ftaapp_get_fta_ascii_schema_by_name uses static buffer so make a copy
170 fs.asciischema=strdup(c);
173 // Set parser version here
174 parserversion=get_schemaparser_version();
176 if ((fs.schema=ftaapp_get_fta_schema(fs.fta_id))<0) {
177 gslog(LOG_EMERG,"%s::error:could not get schema for query\n",
182 // Use all available fields
183 if ((fs.numfields=ftaschema_tuple_len(fs.schema))<0) {
184 gslog(LOG_EMERG,"%s::error:could not get number of fields for query %s\n",
189 // Important that we only open the socket to the data sink AFTER we have subscribed to the output query as it uses it as a signal
191 socket_desc = socket(AF_INET , SOCK_STREAM , 0);
192 if (socket_desc == -1)
194 gslog(LOG_EMERG,"ERROR:could not create socket for data stream");
197 server.sin_addr.s_addr = data_sink.ip;
198 server.sin_family = AF_INET;
199 server.sin_port = data_sink.port;
201 if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
203 gslog(LOG_EMERG,"ERROR: could not open connection to data source");
206 if (set_streamsubscription(gshub,instance_name,data_sink_name) !=0 ) {
207 gslog(LOG_EMERG,"Could not announce streamsubscription for exit process");
215 static void process_data()
221 gs_int8_t rbuf[2*MAXTUPLESZ];
222 gs_int8_t topb[1024];
225 if (verbose!=0) gslog(LOG_INFO,"Getting Data");
227 sprintf(&topb[0],"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n",
228 parserversion,(unsigned int)strlen(fs.asciischema)+1);
229 gs_write(&topb[0],strlen(topb));
230 gs_write(fs.asciischema,strlen(fs.asciischema)+1);
234 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
236 if ((withtrace==0)&&(code==2)) continue;
238 if (ftaschema_is_eof_tuple(fs.schema, rbuf)) {
239 /* initiate shutdown or something of that nature */
240 gslog(LOG_INFO,"gsexit::All data proccessed\n");
243 gs_write((gs_sp_t)&nsz,sizeof(gs_uint32_t));
244 gs_write(rbuf,rsize);
250 int main(int argc, char** argv) {
254 while ((ch = getopt(argc, argv, "hvt")) != -1) {
257 print_usage_exit("help");
273 /* initialize host library and the sgroup */
276 print_usage_exit("Not enough arguments");
283 gslog(LOG_EMERG,"%s::internal error reached unexpected end", me);