Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscphost / lappinterface.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <gscpipc.h>
16 #include <ipcencoding.h>
17 #include <stdlib.h>
18 #include <stdio.h>
19 #include <lapp.h>
20 #include <lappregistries.h>
21 #include <string.h>
22 #include <unistd.h>
23 #include <signal.h>
24 #include <sys/mman.h>
25 #include "rdtsc.h"
26 // If POLLING is defined applications poll every 100 msec instead of blocking
27 #define POLLING
28
29 struct processtate curprocess = {0,0,0,255,0};
30 struct FTAID clearinghouseftaid = {0,0,0,0};
31
32 /*
33  * sends the message passed in buf and waits for a result
34  * if a message returned is not a result it is put in the
35  * request queue. The resultsbuf has to be large enough
36  * for the largest result
37  */
38 gs_retval_t ipc_call_and_wait(FTAID f, gs_sp_t  msg, gs_sp_t  result)
39 {
40     struct hostcall * h = (struct hostcall *) msg;
41     gs_int8_t  buf[MAXMSGSZ];
42     FTAID from;
43     gs_int32_t length;
44     gs_int32_t lowop;
45 #ifdef PRINTMSG
46     fprintf(stderr, "HOST sending to %u.%u.%u.%u:%u of "
47             "type %u with length %u\n",
48             (f.ip>>24)&0xff,
49             (f.ip>>16)&0xff,
50             (f.ip>>8)&0xff,
51             (f.ip)&0xff,
52             f.port,h->callid,h->size);
53 #endif
54     if (gscpipc_send(f,FTACALLBACK,msg,h->size,1)<0) {
55         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
56         return -1;
57     }
58     h=(struct hostcall *) buf;
59     while (gscpipc_read(&from,&lowop,buf,&length,1)>0) {
60 #ifdef PRINTMSG
61         fprintf(stderr, "HOST response from %u.%u.%u.%u:%u"
62                 " of type %u with length %u\n",
63                 (from.ip>>24)&0xff,
64                 (from.ip>>16)&0xff,
65                 (from.ip>>8)&0xff,
66                 (from.ip)&0xff,
67                 from.port,
68                 h->callid,h->size);
69 #endif
70         if ((lowop == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {
71             h=(struct hostcall *) buf;
72             if (h->callid > RESULT_OPCODE_BASE) {
73                 memcpy(result,buf,length);
74                 return 0;
75             }
76             if (sidequeue_append(from,buf,length)<0) {
77                 gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
78                 return -1;
79             }
80         }
81     }
82     gslog(LOG_EMERG, "ERROR::gscipc_read failed in ipc_call_and_wait\n");
83     return -1;
84 }
85
86
87 gs_retval_t hostlib_init(gs_int32_t type, gs_int32_t buffersize, gs_int32_t deviceid, gs_int32_t mapcnt, gs_sp_t map[])
88 {
89     FILE * f;
90     
91     if (curprocess.active != 0 ) {
92         return -1;
93     }
94     
95     switch (type) {
96         case CLEARINGHOUSE:
97             if (gscpipc_init(1) < 0) {
98                 gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "
99                       "clearinghouse process\n");
100                 return -1;
101             }
102             break;
103         case LFTA:
104 #ifdef __linux__
105             mlockall(MCL_CURRENT|MCL_FUTURE);
106 #endif
107         case APP:
108         case HFTA:
109             if (gscpipc_init(0) < 0) {
110                 gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "
111                       "non clearinghouse process\n");
112                 return -1;
113             }
114             break;
115         default:
116             gslog(LOG_EMERG,"ERROR:Unknown process type\n");
117             return -1;
118     }
119     
120     // if the buffersize is zero then allocating shared memory
121     // will fail. So only use it for the clearinghouse and LFTAs
122     if ((buffersize<(4*MAXTUPLESZ)) && (buffersize!=0)) {
123         gslog(LOG_EMERG,
124               "ERROR:buffersize in hostlib_init has to "
125               "be at least %u Bytes long\n",
126               4*MAXTUPLESZ);
127         return -1;
128     }
129     
130     curprocess.type=type;
131     curprocess.buffersize=buffersize;
132     curprocess.active = 1;
133     curprocess.deviceid=deviceid;
134     curprocess.mapcnt=mapcnt;
135     curprocess.map=map;
136     return 0;
137 }
138
139 void hostlib_free()
140 {
141     if (curprocess.active != 1 ) {
142         return;
143     }
144     curprocess.active = 0;
145     gscpipc_free();
146 }
147
148
149 gs_retval_t fta_find(FTAname name, gs_uint32_t  reuse, FTAID *ftaid,
150                      gs_sp_t  schema, gs_int32_t schemasz)
151 {
152     gs_int8_t  rb[MAXRES];
153     struct fta_find_arg a;
154     struct ftafind_result * sr = (struct ftafind_result *)rb;
155     
156     a.h.callid = FTA_LOOKUP;
157     a.h.size = sizeof(struct fta_find_arg);
158     a.reuse=reuse;
159     if (strlen(name)>=(MAXFTANAME-1)) {
160         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
161         return -1;
162     }
163     strcpy(a.name,name);
164     ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
165     if (sr->h.callid != FTAFIND_RESULT) {
166         gslog(LOG_EMERG,"ERROR:Wrong result code received in fta_find\n");
167         return -1;
168     }
169     if (sr->result >= 0) {
170         if (schema !=0) {
171             if (strlen(sr->schema) >= schemasz) {
172                 gslog(LOG_EMERG,"Could not fit schema into schema buffer fta_find\n");
173                 return -1;
174             } else {
175                 strcpy(schema,sr->schema);
176             }
177         }
178         *ftaid=sr->f;
179     }
180     return sr->result;
181 }
182
183 gs_retval_t fta_alloc_instance(FTAID subscriber,
184                                FTAID * ftaid, FTAname name, gs_sp_t schema,
185                                gs_uint32_t  reusable,
186                                gs_int32_t command, gs_int32_t sz, void *  data)
187 {
188     gs_int8_t  rb[MAXRES];
189     struct fta_alloc_instance_arg * a;
190     struct fta_result * fr = (struct fta_result *)rb;
191     struct ringbuf *r;
192     
193     /* make sure we have the share memory required */
194     if ((r=gscpipc_createshm(*ftaid,curprocess.buffersize))==0) {
195         gslog(LOG_EMERG,"ERROR:could not allocate shared memory"
196               "for FTA %s\n",name);
197         return -1;
198     }
199     
200     if (strlen(name)>=(MAXFTANAME-1)) {
201         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
202         return -1;
203     }
204     
205     if (strlen(schema)>=(MAXSCHEMASZ-1)) {
206         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
207         return -1;
208     }
209     
210     a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);
211     
212     a->h.callid = FTA_ALLOC_INSTANCE;
213     a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;
214     a->f=*ftaid;
215     a->subscriber=subscriber;
216     a->reusable=reusable;
217     a->command = command;
218     a->sz = sz;
219     memcpy(&a->data[0],data,sz);
220     strcpy(a->name,name);
221     strcpy(a->schema,schema);
222     
223     ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);
224     
225     if (fr->h.callid != FTA_RESULT) {
226         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
227         return -1;
228     }
229     
230     *ftaid=fr->f;
231     
232     if (fr->result==0) {
233         gslog(LOG_INFO,"Allocated fta instance %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);
234         return streamregistry_add(*ftaid,r);
235     }
236     
237     return fr->result;
238 }
239
240 gs_retval_t fta_alloc_print_instance(FTAID subscriber,
241                                      FTAID * ftaid,
242                                      FTAname name, gs_sp_t schema, gs_uint32_t  reusable,
243                                      gs_int32_t command, gs_int32_t sz, void *  data,
244                                      gs_sp_t  path,gs_sp_t  basename,
245                                      gs_sp_t  temporal_field, gs_sp_t  split_field,
246                                      gs_uint32_t  delta, gs_uint32_t  split)
247 {
248     gs_int8_t  rb[MAXRES];
249     struct fta_alloc_instance_arg * a;
250     struct fta_result * fr = (struct fta_result *)rb;
251     
252     if ((strlen(path)>=MAXPRINTSTRING-1)
253         || (strlen(basename)>=MAXPRINTSTRING-1)
254         || (strlen(temporal_field)>=MAXPRINTSTRING-1)) {
255         gslog(LOG_EMERG,"INTERNAL ERROR:fta_alloc_print_instance string"
256               " arguments to long\n");
257         return -1;
258     }
259     if (strlen(name)>=(MAXFTANAME-1)) {
260         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
261         return -1;
262     }
263     
264     if (strlen(schema)>=(MAXSCHEMASZ-1)) {
265         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
266         return -1;
267     }
268     
269     a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);
270     
271     a->h.callid = FTA_ALLOC_PRINT_INSTANCE;
272     a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;
273     a->f=*ftaid;
274     a->subscriber=subscriber;
275     a->reusable=reusable;
276     a->split=split;
277     strcpy(a->name,name);
278     strcpy(a->schema,schema);
279     a->command = command;
280     a->sz = sz;
281     strcpy(a->path,path);
282     strcpy(a->basename,basename);
283     strcpy(a->temporal_field,temporal_field);
284     strcpy(a->split_field,split_field);
285     a->delta=delta;
286     memcpy(&a->data[0],data,sz);
287     
288     ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);
289     
290     if (fr->h.callid != FTA_RESULT) {
291         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
292         return -1;
293     }
294     
295     *ftaid=fr->f;
296     
297     return fr->result;
298 }
299
300 gs_retval_t fta_free_instance(FTAID subscriber, FTAID ftaid, gs_uint32_t  recursive)
301 {
302     gs_int8_t  rb[MAXRES];
303     struct fta_free_instance_arg a;
304     struct standard_result * sr = (struct standard_result *)rb;
305     struct ringbuf *r;
306     
307     a.h.callid = FTA_FREE_INSTANCE;
308     a.h.size = sizeof(struct fta_free_instance_arg);
309     a.subscriber=subscriber;
310     a.f=ftaid;
311     a.recursive=recursive;
312     ipc_call_and_wait(ftaid,(gs_sp_t )&a,rb);
313     if (sr->h.callid != STANDARD_RESULT) {
314         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
315         return -1;
316     }
317     
318     /* make sure we remove the mapping*/
319     streamregistry_remove(ftaid);
320     
321     return sr->result;
322 }
323
324 gs_retval_t fta_control(FTAID subscriber,
325                         FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)
326 {
327     gs_int8_t  rb[MAXRES];
328     struct fta_control_arg * a;
329     struct standard_result * sr = (struct standard_result *)rb;
330     
331     a = alloca(sizeof(struct fta_control_arg) + sz);
332     
333     a->h.callid = FTA_CONTROL;
334     a->h.size = sizeof(struct fta_control_arg)+ sz;
335     a->subscriber=subscriber;
336     a->f=ftaid;
337     a->command = command;
338     a->sz = sz;
339     memcpy(&a->data[0],value,sz);
340     
341     ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);
342     
343     if (sr->h.callid != STANDARD_RESULT) {
344         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
345         return -1;
346     }
347     
348     return sr->result;
349 }
350
351 gs_retval_t fta_heartbeat(FTAID self,gs_uint64_t trace_id,
352                           gs_uint32_t  sz, fta_stat * trace){
353 #ifdef CLEARINGHOUSE_HEARTBEAT
354     struct fta_heartbeat_arg  * a;
355     a = alloca(sizeof(struct fta_heartbeat_arg) + (sz*sizeof(fta_stat)));
356     a->h.callid = FTA_HEARTBEAT;
357     a->h.size = sizeof(struct fta_heartbeat_arg)+(sz*sizeof(fta_stat));
358     a->sender=self;
359     a->trace_id=trace_id;
360     a->sz=sz;
361     if (sz!=0) {
362         memcpy(&a->data[0],trace,(sz*sizeof(fta_stat)));
363     }
364 #ifdef PRINTMSG
365     fprintf(stderr, "HOST sending heartbeat to %u.%u.%u.%u:%u of "
366             "type %u with length %u\n",
367             (clearinghouseftaid.ip>>24)&0xff,
368             (clearinghouseftaid.ip>>16)&0xff,
369             (clearinghouseftaid.ip>>8)&0xff,
370             (clearinghouseftaid.ip)&0xff,
371             clearinghouseftaid.port,a->h.callid,a->h.size);
372 #endif
373     if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)a,a->h.size,1)<0) {
374         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
375         return -1;
376     }
377 #endif
378     return 0;
379 }
380
381 gs_retval_t fta_notify_producer_failure(FTAID self, FTAID producer){
382     struct fta_notify_producer_failure_arg  a;
383     a.h.callid = FTA_PRODUCER_FAILURE;
384     a.h.size = sizeof(struct fta_notify_producer_failure_arg);
385     a.sender=self;
386     a.producer=producer;
387 #ifdef PRINTMSG
388     fprintf(stderr, "HOST sending producer failure to %u.%u.%u.%u:%u of "
389             "type %u with length %u\n",
390             (clearinghouseftaid.ip>>24)&0xff,
391             (clearinghouseftaid.ip>>16)&0xff,
392             (clearinghouseftaid.ip>>8)&0xff,
393             (clearinghouseftaid.ip)&0xff,
394             clearinghouseftaid.port,a.h.callid,a.h.size);
395 #endif
396     if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)&a,a.h.size,1)<0) {
397         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
398         return -1;
399     }
400     return 0;
401 }
402
403 gs_retval_t process_control(FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)
404 {
405     gs_int8_t  rb[MAXRES];
406     struct process_control_arg * a;
407     struct standard_result * sr = (struct standard_result *)rb;
408     
409     
410     a = alloca(sizeof(struct process_control_arg) + sz);
411     
412     a->h.callid = PROCESS_CONTROL;
413     a->h.size = sizeof(struct process_control_arg)+ sz;
414     a->command = command;
415     a->sz = sz;
416     memcpy(&a->data[0],value,sz);
417     
418     ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);
419     
420     if (sr->h.callid != STANDARD_RESULT) {
421         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
422         return -1;
423     }
424     
425     return sr->result;
426 }
427
428
429 static void timeouthandler ()
430 {
431     struct timeout_result a;
432     
433     a.h.callid=TIMEOUT;
434     a.h.size=sizeof(struct timeout_result);
435     if (gscpipc_send(gscpipc_getftaid(), FTACALLBACK, (gs_sp_t )&a,a.h.size,1)<0) {
436         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
437     }
438 }
439
440 gs_retval_t gscp_get_buffer(FTAID * ftaid, gs_int32_t * size, void *tbuffer,
441                             gs_int32_t tbuf_size, gs_int32_t timeout)
442 {
443     struct ringbuf * r;
444     FTAID from;
445     gs_int32_t length;
446     gs_int8_t  buf[MAXMSGSZ];
447     gs_int32_t lopp;
448     FTAID * f;
449     static      gs_uint64_t s1=0;
450     static      gs_uint64_t s2;
451     if (s1==0) {
452         s1=rdtsc();
453     }
454     s2=rdtsc();
455     cycles+=(s2-s1);
456 start:
457 #ifdef PRINTMSG
458     fprintf(stderr,"CHECK RINGBUFS\n");
459 #endif
460 #ifndef POLLING
461     /* use chance to cleanout message queue no reason
462      to keep anything else */
463     while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
464 #endif
465     
466     streamregistry_getactiveringbuf_reset();
467     while ((r=streamregistry_getactiveringbuf())>0) {
468 #ifdef PRINTMSG
469             fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"
470                 "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
471                 r->length);
472             if (UNREAD(r)) {
473             fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,
474                     (CURREAD(r)->f.ip>>24)&0xff,
475                     (CURREAD(r)->f.ip>>16)&0xff,
476                     (CURREAD(r)->f.ip>>8)&0xff,
477                     (CURREAD(r)->f.ip)&0xff,
478                     CURREAD(r)->f.port,
479                     CURREAD(r)->f.streamid,
480                     CURREAD(r)->sz);
481             }
482         
483 #endif
484         if (UNREAD(r)) {
485             *ftaid=(CURREAD(r)->f);
486             *size=CURREAD(r)->sz;
487             memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);
488             intuple++;
489             inbytes+=CURREAD(r)->sz;
490             ADVANCEREAD(r);
491             s1=rdtsc();
492             return 0;
493         }
494     }
495     if (timeout == -1) {
496         *size=0;
497         s1=rdtsc();
498         return 1;
499     }
500     if (timeout !=0) {
501         signal(SIGALRM, timeouthandler);
502         alarm(timeout);
503     }
504     
505 #ifndef POLLING
506 #ifdef PRINTMSG
507     fprintf(stderr,"START BLOCKCALLS\n");
508 #endif
509     streamregistry_getactiveftaid_reset();
510     while ((f=streamregistry_getactiveftaid())!=0) {
511         struct gscp_get_buffer_arg a;
512         a.h.callid = GSCP_GET_BUFFER;
513         a.h.size = sizeof(struct gscp_get_buffer_arg);
514         a.timeout = timeout;
515 #ifdef PRINTMSG
516         fprintf(stderr,"Waiting for  %u.%u.%u.%u:%u\n",
517                 (f->ip>>24)&0xff,
518                 (f->ip>>16)&0xff,
519                 (f->ip>>8)&0xff,
520                 (f->ip)&0xff,
521                 f->port
522                 );
523 #endif
524         if (gscpipc_send(*f,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {
525             s1=rdtsc();
526             return -1;
527         }
528     }
529 #ifdef PRINTMSG
530     fprintf(stderr,"BLOCK\n");
531 #endif
532     while (gscpipc_read(&from,&lopp,buf,&length,1)>0) {
533 #else  // If we poll we return after 100 msec
534     sleepagain:
535         while (gscpipc_read(&from,&lopp,buf,&length,2)>0) {
536 #endif
537             struct standard_result * sr = (struct standard_result *) buf;
538 #ifdef PRINTMSG
539             fprintf(stderr,"Got return code %u\n",sr->h.callid);
540 #endif
541             if (lopp==FTACALLBACK) {
542                 if (timeout != 0) {
543                     signal(SIGALRM, SIG_IGN);
544                 }
545                 if (sr->h.callid == WAKEUP) {
546                     /* use chance to cleanout message queue no reason
547                      to keep anything else */
548                     while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
549                     goto start;
550                 }
551                 if (sr->h.callid == TIMEOUT) {
552                     /* use chance to cleanout message queue no reason
553                      to keep anything else */
554                     while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
555                     *size=0;
556                     s1=rdtsc();
557                     return 1;
558                 }
559                 if (sidequeue_append(from,buf,length)<0) {
560                     gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
561                     s1=rdtsc();
562                     return -1;
563                 }
564             }
565         }
566 #ifdef POLLING
567         streamregistry_getactiveringbuf_reset();
568         while ((r=streamregistry_getactiveringbuf())>0) {
569 #ifdef PRINTMSG
570             fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"
571                     "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
572                     r->length);
573             if (UNREAD(r)) {
574                 fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,
575                         (CURREAD(r)->f.ip>>24)&0xff,
576                         (CURREAD(r)->f.ip>>16)&0xff,
577                         (CURREAD(r)->f.ip>>8)&0xff,
578                         (CURREAD(r)->f.ip)&0xff,
579                         CURREAD(r)->f.port,
580                         CURREAD(r)->f.streamid,
581                         CURREAD(r)->sz);
582             }
583             
584 #endif
585             if (UNREAD(r)) {
586                 *ftaid=(CURREAD(r)->f);
587                 *size=CURREAD(r)->sz;
588                 memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);
589                 intuple++;
590                 inbytes+=CURREAD(r)->sz;
591                 ADVANCEREAD(r);
592                 if (timeout != 0) {
593                     signal(SIGALRM, SIG_IGN);
594                 }
595                 s1=rdtsc();
596                 return 0;
597             }
598         }
599         goto sleepagain; // Try again
600 #endif
601         gslog(LOG_EMERG,"Unexpected code reached in: gscp_get_buffer \n");
602         /* we should never get here */
603         s1=rdtsc();
604         return -1;
605     }
606     
607     
608