1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
33 #include <schemaparser.h>
35 #include <sdl/syncstorage.hpp>
39 // data type definitions from sdl
40 using Namespace = std::string;
41 using Key = std::string;
42 using Data = std::vector<uint8_t>;
43 using DataMap = std::map<Key, Data>;
44 using Keys = std::set<Key>;
48 #define MAXLINE 100000
49 static unsigned tcpport=0;
50 static char linebuf[MAXLINE];
56 // Not all systems have timersub defined so make sure its ther
59 #define timersub(tvp, uvp, vvp) \
61 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
62 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
63 if ((vvp)->tv_usec < 0) { \
65 (vvp)->tv_usec += 1000000; \
73 fprintf(stderr, "exiting via signal handler %d...\n", iv);
77 static void wait_for_client() {
78 struct sockaddr_in serv_addr,cli_addr;
80 if (listensockfd==0) {
82 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
83 if (listensockfd < 0) {
84 gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
87 bzero((char *) &serv_addr, sizeof(serv_addr));
88 serv_addr.sin_family = AF_INET;
89 serv_addr.sin_addr.s_addr = INADDR_ANY;
90 serv_addr.sin_port = htons(tcpport);
92 /* make sure we can reuse the common port rapidly */
93 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
94 (gs_sp_t )&on, sizeof(on)) != 0) {
95 gslog(LOG_EMERG,"Error::could not set socket option\n");
99 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
100 (gs_sp_t )&on, sizeof(on)) != 0) {
101 gslog(LOG_EMERG,"Error::could not set socket option\n");
105 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
106 sizeof(serv_addr)) < 0) {
107 gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
113 listen(listensockfd,5);
114 clilen = sizeof(cli_addr);
115 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
117 gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
123 static void emit_socket() {
129 if((w=write(fd,&linebuf[o],l))==0) {
137 static void emit_line() {
140 fprintf(outf,"%s",linebuf);
147 int split_string(char *instr,char sep, char **words,int max_words){
153 words[nwords++] = str;
154 while( (loc = strchr(str,sep)) != NULL){
157 if(nwords >= max_words){
158 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
159 nwords = max_words-1;
161 words[nwords++] = str;
167 std::vector<string> split_string(const string &str, char sep){
168 char *instr = strdup(str.c_str());
170 int nwords = split_string(instr, sep, words, 1000);
172 for(int i=0;i<nwords;++i){
173 ret.push_back(words[i]);
179 vector<uint8_t> packData(const char *d, int len){
180 const uint8_t *d8 = (const uint8_t *)d;
181 return Data(d8, d8+len+1);
186 int main(int argc, char* argv[]) {
187 gs_sp_t me = argv[0];
189 gs_int32_t schema, ch;
193 gs_uint32_t bufsz=8*1024*1024;
194 gs_int8_t rbuf[2*MAXTUPLESZ];
196 gs_int32_t numberoffields;
197 gs_int32_t verbose=0;
202 gs_int32_t n_actual_param;
203 gs_int32_t n_expected_param;
206 struct timeval tvs, tve, tvd;
210 gs_uint32_t tip1,tip2,tip3,tip4;
211 gs_sp_t instance_name;
214 vector<string> keys_v;
217 gs_uint32_t tlimit = 0; // time limit in seconds
218 time_t start_time, curr_time;
222 // by default the output will go to stdout
225 while ((ch = getopt(argc, argv, "l:p:r:veXDK:")) != -1) {
231 tcpport=atoi(optarg);
246 tlimit = atoi(optarg);
250 keys_v = split_string(keys_s, ',');
254 fprintf(stderr, "usage: %s [-r <bufsz>] [-e] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-K comma_separated_key_fields] <gshub-hostname>:<gshub-port> <gsinstance_name> query param1 param2...\n", *argv);
260 if (argc<3) goto usage;
262 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
263 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
266 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
267 gshub.port=htons(gshub.port);
268 instance_name=strdup(argv[1]);
269 if (set_hub(gshub)!=0) {
270 gslog(LOG_EMERG,"Could not set hub");
273 if (set_instance_name(instance_name)!=0) {
274 gslog(LOG_EMERG,"Could not set instance name");
278 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
279 gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
282 gettimeofday(&tvs, 0);
288 /* initialize host library and the sgroup */
290 if (verbose>=2) fprintf(stderr,"Inializin gscp\n");
292 if (ftaapp_init(bufsz)!=0) {
293 fprintf(stderr,"%s::error:could not initialize gscp\n", me);
297 signal(SIGTERM, hand);
298 signal(SIGINT, hand);
300 Namespace ns("mcnib");
301 string key_base = argv[0];
303 schema = ftaapp_get_fta_schema_by_name(argv[0]);
305 fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
309 n_expected_param = ftaschema_parameter_len(schema);
310 if (n_expected_param == 0) {
314 /* parse the params */
315 n_actual_param = argc-1;
316 if(n_actual_param < n_expected_param){
317 fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
320 for (lcv = 1 ; lcv < argc ; lcv++) {
325 while (*e && *e != '=') e++;
327 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
332 rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
335 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
340 if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
341 fprintf(stderr, "ftaschema_create_param_block failed!\n");
345 ftaschema_free(schema); /* XXXCDC */
348 if (verbose>=2) fprintf(stderr,"Initalize FTA\n");
350 fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
351 if (fta_id.streamid==0) {
352 fprintf(stderr,"%s::error:could not initialize fta %s\n",
356 /* XXXCDC: pblk is malloc'd, should we free it? */
358 if (verbose>=2) fprintf(stderr,"Get schema handle\n");
360 if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
361 fprintf(stderr,"%s::error:could not get schema\n", me);
365 if ((numberoffields=ftaschema_tuple_len(schema))<0) {
366 fprintf(stderr,"%s::error:could not get number of fields in schema\n",
372 for(y=0; y<numberoffields;y++) {
373 printf("%s",ftaschema_field_name(schema,y));
374 if (y<numberoffields-1) printf("|");
379 gettimeofday(&tve, 0);
380 timersub(&tve, &tvs, &tvd);
381 printf("TIME= %ld.%06ld sec\n", tvd.tv_sec, tvd.tv_usec);
388 // Get the vector of keys, bail out if there is a mismatch
389 for(int ki=0;ki<keys_v.size();++ki){
391 for(fi=0; fi<numberoffields;fi++) {
392 if(ftaschema_field_name(schema, fi) == keys_v[ki]){
393 keys_i.push_back(fi);
397 if(fi>=numberoffields){
398 fprintf(stderr,"ERROR key field %s is not in the schema.\n",keys_v[ki].c_str());
404 std::unique_ptr<shareddatalayer::SyncStorage> sdl(shareddatalayer::SyncStorage::create());
406 start_time = time(NULL);
408 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
411 if (ftaschema_is_eof_tuple(schema, rbuf)) {
412 /* initiate shutdown or something of that nature */
413 printf("#All data proccessed\n");
419 snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
423 if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
424 string key = key_base;
425 for(int fi=0;fi<keys_i.size();++fi){
426 y = keys_i[fi]; //n match gsprintconsole
427 access_result ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
428 switch (ar.field_data_type) {
430 key += ":"+to_string(ar.r.i);
433 key += ":"+to_string(ar.r.ui);
436 snprintf(linebuf,MAXLINE,"%u.%u.%u.%u",ar.r.ui>>24&0xff,
440 key += ":"+string(linebuf);
446 for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
449 unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]);
451 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
452 snprintf(&linebuf[strlen(linebuf)],MAXLINE,"%04x",y);
453 if (x<7) snprintf(&linebuf[strlen(linebuf)],MAXLINE,":");
456 snprintf(linebuf,MAXLINE,"::");
458 key += ":"+string(linebuf);
462 key += ":"+to_string(ar.r.ui);
472 key += ":"+to_string(ar.r.ul);
475 key += ":"+to_string(ar.r.l);
478 key += ":"+to_string(ar.r.f);
486 snprintf(linebuf,MAXLINE,"%f sec",t);
487 key += ":"+string(linebuf);
496 src=(char*)ar.r.vs.offset;
500 for(x=0;x<ar.r.vs.length;x++) {
502 if ((c<='~') && (c>=' ')) {
516 key += ":"+string(linebuf);
524 if(keys_i.size()==0){
528 D[key] = packData(rbuf, rsize);
532 if (rfta_id.streamid != fta_id.streamid)
533 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
536 // whenever we receive a temp tuple check if we reached time limit
537 if ((code==2) && tlimit && (time(NULL)-start_time)>=tlimit) {
538 fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);