Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscphost / callbackinterface.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include "gsconfig.h"
16 #include "gstypes.h"
17 #include <gscpipc.h>
18 #include <ipcencoding.h>
19 #include <callbackregistries.h>
20 #include <clearinghouseregistries.h>
21 #include <lappregistries.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <lapp.h>
25 #include <string.h>
26 #include "rdtsc.h"
27
28 gs_uint64_t shared_memory_full_warning =0;
29
30 static gs_int32_t maxsnaplen = 0;
31
32 struct FTAID clearinghouseftaid;
33
34 gs_uint64_t intupledrop=0;
35 gs_uint64_t outtupledrop=0;
36 gs_uint64_t intuple=0;
37 gs_uint64_t outtuple=0;
38 gs_uint64_t inbytes=0;
39 gs_uint64_t outbytes=0;
40 gs_uint64_t cycles=0;
41
42
43
44 /* following function is internal and defined in lappinterface.c */
45 gs_retval_t  ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result);
46
47 gs_retval_t  gscp_blocking_mode() {
48 #ifdef BLOCKRINGBUFFER
49         return 1;
50 #else
51         return 0;
52 #endif
53 }
54
55 static void clock_signal_check() {
56     struct FTA * fa;
57     static gs_int32_t  t=0;
58     if (t==0) {
59         t=time(0);
60     } else {
61        if (time(0)>t) {
62             if (ftaexec_start()<0) {
63                 gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "
64                         "FTA list\n");
65                 return;
66             }
67             while ((fa=ftaexec_next())!=0) {
68                 if (fa->clock_fta!=0) {
69                     fa->clock_fta(fa);
70                 }
71             }
72             t=time(0);
73                 if (t%GSLOGINTERVAL==0) gsstats();// log all the stats
74         }
75     }
76 }
77
78 static gs_retval_t  send_standard_reply(FTAID f, gs_int32_t  result) {
79     struct standard_result r;
80     r.h.callid=STANDARD_RESULT;
81     r.h.size=sizeof(struct standard_result);
82     r.result=result;
83     if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {
84         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
85         return -1;
86     }
87     return 0 ;
88 }
89
90 static gs_retval_t  send_lookup_reply(FTAID f, gs_int32_t  result,
91                              FTAID * ftaid,
92                              gs_sp_t* schema) {
93     struct ftafind_result r;
94     r.h.callid=FTAFIND_RESULT;
95     r.h.size=sizeof(struct ftafind_result);
96     r.result=result;
97     r.f=*ftaid;
98     if (result >=0) {
99         if (strlen(*schema)>=(MAXSCHEMASZ-1)) {
100             gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",(unsigned char *)schema);
101             r.result=-1;
102         } else {
103             strcpy(r.schema,*schema);
104         }
105     }
106     if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {
107         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
108         return -1;
109     }
110     return 0 ;
111 }
112
113
114 static gs_retval_t  send_fta_result(FTAID f,
115                            FTAID * ftaid, gs_int32_t  result) {
116     struct fta_result r;
117     r.h.callid=FTA_RESULT;
118     r.h.size=sizeof(struct fta_result);
119     r.result=result;
120     if (ftaid!=0) {
121       r.f=*ftaid;
122     }
123     if (gscpipc_send(f, FTACALLBACK,(gs_sp_t)&r,r.h.size,1)<0) {
124         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
125         return -1;
126     }
127     return 0 ;
128 }
129
130
131 // Is also used by the lfta rts enviroment on a post. So make it none
132 // static.
133 gs_retval_t  send_wakeup(FTAID f)
134 {
135     struct wakeup_result a;
136
137     a.h.callid=WAKEUP;
138     a.h.size=sizeof(struct wakeup_result);
139     if (gscpipc_send(f, FTACALLBACK, (gs_sp_t)&a,a.h.size,0)<0) {
140         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
141         return -1;
142     }
143     return 0;
144 }
145
146 static gs_retval_t  fta_register_instance(FTAID subscriber,
147                                  FTAID f,gs_uint32_t   reusable,
148                                  FTAname name,
149                                  gs_csp_t  schema) {
150   gs_int8_t  rb[MAXRES];
151   struct fta_register_arg a;
152   struct standard_result * sr = (struct standard_result *)rb;
153
154   if (curprocess.type != CLEARINGHOUSE) {
155     a.h.callid = FTA_REGISTER;
156     a.h.size = sizeof(struct fta_register_arg);
157     if (strlen(name)>=(MAXFTANAME-1)) {
158       gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
159       return -1;
160     }
161     if (strlen(schema)>=(MAXSCHEMASZ-1)) {
162       gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);
163       return -1;
164     }
165     strcpy(a.name,name);
166     strcpy(a.schema,schema);
167     a.f=f;
168     a.subscriber=subscriber; /* consumer is the same as f for an FTA*/
169     a.reusable=reusable;
170     ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
171     if (sr->h.callid != STANDARD_RESULT) {
172       gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
173       return -1;
174     }
175     if (sr->result != 0) {
176       return -1;
177     }
178   } else {
179     if (ftalookup_register_fta(subscriber,f,name,reusable,schema)<0) {
180       return -1;
181     }
182   }
183   return 0;
184 }
185
186 static gs_retval_t  fta_unregister_instance(FTAID subscriber,
187                                    FTAID f) {
188   gs_int8_t  rb[MAXRES];
189   struct fta_unregister_arg a;
190   struct standard_result * sr = (struct standard_result *)rb;
191
192   if (curprocess.type != CLEARINGHOUSE) {
193     a.h.callid = FTA_UNREGISTER;
194     a.h.size = sizeof(struct fta_register_arg);
195     a.f=f;
196     a.subscriber=subscriber; /* consumer is the same as f for an FTA*/
197     ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
198     if (sr->h.callid != STANDARD_RESULT) {
199       gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
200       return -1;
201     }
202     return -1;
203   } else {
204     if (ftalookup_unregister_fta(f,subscriber)<0) {
205       return -1;
206     }
207   }
208   return 0;
209 }
210
211 gs_retval_t  fta_start_service(gs_int32_t  number)
212 {
213     gs_int8_t   buf[MAXMSGSZ];
214     FTAID from;
215     gs_int32_t  length;
216     struct hostcall * h= (struct hostcall *) buf;
217     gs_int32_t  forever=0;
218     gs_int32_t  endtime=0;
219     gs_int32_t  block=1;
220     gs_int32_t  res;
221     struct ringbuf * r;
222     FTAID ftaid;
223     FTAID * ftaidp;
224     struct FTA * fta;
225     gs_int32_t  lopp;
226         gs_int32_t  preemptq;
227         gs_int32_t  endq;
228         gs_uint64_t s1;
229         gs_uint64_t s2;
230
231     if (number == 0) {
232         block=0;
233         forever=1;
234     }
235     if (number < 0) {
236         forever=1;
237     }
238     if (number > 0) {
239         block=1;
240         endtime=time(0)+number;
241     }
242
243     while((forever!=0)||
244           (endtime==0)||(endtime>time(0))) {
245         /* check if we need to give the FTAs there clock signal */
246         if ((curprocess.type == LFTA)
247             ||(curprocess.type == HFTA)) {
248             clock_signal_check();
249         }
250
251 #ifdef POLLING
252         poll:
253 #endif
254     preemptq=0;
255         /* first empty out sidequeu then read from messagequeue */
256         if (sidequeue_pop(&from,buf,&length)<0) {
257           /* empty out the sidequeue before processing the shared
258              memory */
259           if (curprocess.type == HFTA) {
260             /* process all the shared memory regions and register
261                for callbacks */
262                 s1=rdtsc();
263                 endq=time(0)+2;
264             streamregistry_getactiveringbuf_reset();
265             while ((r=streamregistry_getactiveringbuf())>0) {
266               while (UNREAD(r)) {
267                         struct FTA * fa;
268                         if (ftaexec_start()<0) {
269                         gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "
270                           "FTA list\n");
271                         return -1;
272                         }
273                         while ((fa=ftaexec_next())!=0) {
274                         gs_int32_t  x;
275                         for(x=0;x<fa->stream_subscribed_cnt;x++) {
276                                 if ((fa->stream_subscribed[x].streamid
277                                         ==CURREAD(r)->f.streamid)
278                                         && (fa->stream_subscribed[x].ip
279                                                 ==CURREAD(r)->f.ip)
280                                         && (fa->stream_subscribed[x].port
281                                                 ==CURREAD(r)->f.port)) {
282                                         fa->accept_packet(fa,&(CURREAD(r)->f),
283                                                 &(CURREAD(r)->data[0]),
284                                                 CURREAD(r)->sz);
285                                 }
286                         }
287                         }
288                         intuple++;
289                         inbytes+=CURREAD(r)->sz;
290                         ADVANCEREAD(r);
291                         if (endq <= time(0)) {
292                                 preemptq=1;
293                                 goto processmsg;
294                         }
295               }
296             }
297                 s2=rdtsc();
298                 cycles+=(s2-s1);
299 #ifndef POLLING
300             /* register wakeups all arround to make sure we don't sleep
301              * for ever,
302              */
303             streamregistry_getactiveftaid_reset();
304             while ((ftaidp=streamregistry_getactiveftaid())>0) {
305               struct gscp_get_buffer_arg a;
306               a.h.callid = GSCP_GET_BUFFER;
307               a.h.size = sizeof(struct gscp_get_buffer_arg);
308               a.timeout = 0;
309               if (gscpipc_send(*ftaidp,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {
310                 return -1;
311               }
312             }
313 #endif
314           }
315           processmsg:
316           do {
317                         /* even if we block we return every 100msec to be able to generate the clock signal to
318                          * the HFTAs
319              */
320             if ((res=gscpipc_read(&from,&lopp,buf,&length,((block==1)&&(preemptq==0))?2:0))<0) {
321                 gslog(LOG_EMERG,"GSCPRTS::error::reading from messagequeue\n");
322                 return -1;
323             }
324             /* check if we need to give the FTAs there clock signal */
325             if ((curprocess.type == LFTA)
326                 ||(curprocess.type == HFTA)) {
327                 clock_signal_check();
328             }
329             if ((res==0) && (block==0)) {
330                 /* nonblocking and nothing to do so return */
331                 return 0;
332             }
333             if ((res==0) && (endtime!=0) && (endtime<time(0))) {
334                 /* timeout reached so return */
335                 return 0;
336             }
337 #ifdef POLLING
338             if ((res==0)&&(curprocess.type == HFTA)) {
339                 goto poll;
340             }
341 #endif
342           } while (res==0);
343           if ((lopp)!=FTACALLBACK) {
344             gslog(LOG_EMERG,"GSCPRTS::error::unknown lowlevel opp\n");
345             return -1;
346           }
347         }
348
349 #ifdef PRINTMSG
350         gslog(LOG_EMERG, "HFTA message from %u of type %u of length %u\n",from,
351         h->callid,h->size);
352 #endif
353         switch (h->callid) {
354
355         case FTA_LOOKUP: {
356           if (curprocess.type == CLEARINGHOUSE) {
357             struct fta_find_arg * n;
358             FTAID rf;
359             gs_csp_t  schema;
360             n = (struct fta_find_arg *) buf;
361             /* Note: Side effect of ftalookup_lookup_fta_index is to
362                fill msgid and index */
363                   if (send_lookup_reply(from,
364                                         ftalookup_lookup_fta_index(from,
365                                                                    n->name,
366                                                                    n->reuse,
367                                                                    &rf,
368                                                                    &schema),
369                                         &rf,
370                                         (gs_sp_t *)&schema)<0) {
371                     gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
372                     return -1;
373                   }
374           } else {
375             if (send_standard_reply(from,-1)<0) {
376               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
377               return -1;
378             }
379             gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
380                     "contacted for clearinghouse processing\n");
381           }
382         }
383           break;
384         case FTA_REGISTER: {
385           if (curprocess.type == CLEARINGHOUSE) {
386             struct fta_register_arg * n;
387             n = (struct fta_register_arg *) buf;
388             if (send_standard_reply(from,
389                                     ftalookup_register_fta(n->subscriber,
390                                                            n->f,
391                                                            n->name,
392                                                            n->reusable,
393                                                            n->schema))<0) {
394               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
395               return -1;
396             }
397           } else {
398             if (send_standard_reply(from,-1)<0) {
399               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
400               return -1;
401             }
402             gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
403                     "contacted for clearinghouse processing\n");
404           }
405         }
406           break;
407         case FTA_UNREGISTER: {
408           if (curprocess.type == CLEARINGHOUSE) {
409             struct fta_unregister_arg * n;
410             n = (struct fta_unregister_arg *) buf;
411             if (send_standard_reply(from,
412                                     ftalookup_unregister_fta(n->subscriber,
413                                                              n->f))<0) {
414               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
415               return -1;
416             }
417           } else {
418             if (send_standard_reply(from,-1)<0) {
419               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
420               return -1;
421             }
422             gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
423                     "contacted for clearinghouse processing\n");
424           }
425         }
426           break;
427         case FTA_ALLOC_INSTANCE:
428         case FTA_ALLOC_PRINT_INSTANCE:
429           if ((curprocess.type == LFTA)
430               ||(curprocess.type == HFTA)) {
431             struct ringbuf * r;
432             struct fta_alloc_instance_arg * n;
433             n = (struct fta_alloc_instance_arg *) buf;
434             if ((fta=ftaexec_alloc_instance(n->f.index,
435                                             (struct FTA *)n->f.streamid,
436                                             n->reusable,
437                                             n->command,
438                                             n->sz,
439                                             &(n->data[0])))==0) {
440               gslog(LOG_EMERG,"GSCPRTS::warning::could not allocate"
441                       "FTA\n");
442               if (send_fta_result(from,0,-1)<0) {
443                 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
444                         "reply faild\n");
445                 return -1;
446               }
447             } else {
448               /* shared memory is only required if data is beeing transfered */
449               if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&
450                   ((r=gscpipc_getshm(from))==0)) {
451                                 gslog(LOG_EMERG,"GSCPRTS::warning::could not get"
452                                 "shared memory\n");
453                                 ftaexec_remove(fta);
454                                 if (send_fta_result(from,0,-1)<0) {
455                                         gslog(LOG_EMERG,"GSCPRTS::error::send standard "
456                                         "reply faild\n");
457                                         return -1;
458                                 }
459                 } else {
460                         /* no callback to register for print function */
461                         if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&(ftacallback_add_streamid(r,fta->ftaid.streamid)!=0)) {
462                                 gslog(LOG_EMERG,"GSCPRTS::warning::could not add"
463                                         "streamid to ringbuffer\n");
464                                 ftaexec_free_instance(fta,1);
465                                 if (send_fta_result(from,0,-1)<0) {
466                                         gslog(LOG_EMERG,"GSCPRTS::error::send standard "
467                                         "reply faild\n");
468                                         return -1;
469                                 }
470                 } else {
471                   if (ftaexec_insert(0,fta)<0) {
472                     gslog(LOG_EMERG,"GSCPRTS::warning::could not"
473                             "insert FTA\n");
474                     ftacallback_rm_streamid(r,fta->ftaid.streamid);
475                     ftaexec_free_instance(fta,1);
476                     if (send_fta_result(from,0,-1)<0) {
477                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
478                               "reply faild\n");
479                       return -1;
480                     }
481                   } else {
482                     if (fta_register_instance(n->subscriber,fta->ftaid,
483                                               n->reusable,
484                                               n->name,
485                                               n->schema)!=0) {
486                       gslog(LOG_EMERG,"GSCPRTS::warning::could not register"
487                         " instance\n");
488                       ftaexec_remove(fta);
489                       ftacallback_rm_streamid(r,fta->ftaid.streamid);
490                       ftaexec_free_instance(fta,1);
491                       if (send_fta_result(from,0,-1)<0) {
492                         gslog(LOG_EMERG,"GSCPRTS::error::send standard "
493                                 "reply faild\n");
494                         return -1;
495                       }
496                     }  else {
497                       if (h->callid==FTA_ALLOC_PRINT_INSTANCE) {
498                                 if (curprocess.type == LFTA) {
499                                         gslog(LOG_EMERG,"GSCPRTS::error:: alloc print instance not "
500                                                 "implemented for LFTA.\n");
501                                         ftaexec_remove(fta);
502                                         ftaexec_free_instance(fta,1);
503                                         if (send_fta_result(from,0,-1)<0) {
504                                                 gslog(LOG_EMERG,"GSCPRTS::error::send standard "
505                                                 "reply faild\n");
506                                                 return -1;
507                                         }
508                                 } else {
509                                         if (add_printfunction_to_stream(fta, n->schema, n->path, n->basename,
510                                                 n->temporal_field, n->split_field, n->delta, n->split) < 0) {
511                                                 ftaexec_remove(fta);
512                                                 ftaexec_free_instance(fta,1);
513                                                 if (send_fta_result(from,0,-1)<0) {
514                                                         gslog(LOG_EMERG,"GSCPRTS::error::send standard "
515                                                         "reply faild\n");
516                                                         return -1;
517                                                 }
518                                 }
519                           }
520                         }
521                       if (send_fta_result(from,&fta->ftaid,0)<0) {
522                                         gslog(LOG_EMERG,"GSCPRTS::error::send standard "
523                                 "reply faild\n");
524                                 return -1;
525                       }
526                     }
527                   }
528                 }
529               }
530             }
531           } else {
532             if (send_fta_result(from,0,-1)<0) {
533               gslog(LOG_EMERG,"GSCPRTS::error::send standard "
534                       "reply faild\n");
535               return -1;
536             }
537           }
538           break;
539         case FTA_FREE_INSTANCE:{
540           if ((curprocess.type == LFTA)
541               ||(curprocess.type == HFTA)) {
542             struct fta_free_instance_arg * n;
543             n = (struct fta_free_instance_arg *) buf;
544             if (((r=gscpipc_getshm(from))!=0)
545                 && ( ftaexec_remove((struct FTA *) n->f.streamid)==0)
546                 && ( ftacallback_rm_streamid(r,n->f.streamid)==0)
547                 && (ftaexec_free_instance((struct FTA *)n->f.streamid,
548                                           n->recursive)
549                     ==0)
550                 && (fta_unregister_instance(n->subscriber,n->f)<0)) {
551               if (send_standard_reply(from,0)<0) {
552                 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
553                 return -1;
554               }
555             } else {
556               if (send_standard_reply(from,-1)<0) {
557                 gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
558                 return -1;
559               }
560             }
561           } else {
562             if (send_standard_reply(from,-1)<0) {
563               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
564               return -1;
565             }
566           }
567         }
568           break;
569         case FTA_CONTROL:{
570           if ((curprocess.type == LFTA)
571               ||(curprocess.type == HFTA)) {
572             struct fta_control_arg * n;
573             n = (struct fta_control_arg *) buf;
574             if (send_standard_reply(from,
575                                     ftaexec_control((struct FTA *)
576                                                     n->f.streamid,
577                                                     n->command,
578                                                     n->sz,
579                                                     &(n->data[0])))<0) {
580               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
581               return -1;
582             }
583           } else {
584             if (send_standard_reply(from,-1)<0) {
585               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
586               return -1;
587             }
588             gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"
589                     "contacted for clearinghouse or HFTA processing\n");
590           }
591         }
592           break;
593         case FTA_PRODUCER_FAILURE: {
594           if (curprocess.type == CLEARINGHOUSE) {
595             struct fta_notify_producer_failure_arg * n;
596             n = (struct fta_notify_producer_failure_arg *) buf;
597             ftalookup_producer_failure(n->sender,n->producer);
598           }
599         }
600           break;
601         case FTA_HEARTBEAT: {
602           if (curprocess.type == CLEARINGHOUSE) {
603             struct fta_heartbeat_arg * n;
604             n = (struct fta_heartbeat_arg *) buf;
605             ftalookup_heartbeat(n->sender,n->trace_id,
606                                 n->sz,&(n->data[0]));
607           }
608         }
609           break;
610         case GSCP_GET_BUFFER:{
611           struct sgroup_get_buffer_arg * n;
612           gs_int32_t  res;
613           struct ringbuf * r;
614           n = (struct sgroup_get_buffer_arg *) buf;
615
616           if ((r=gscpipc_getshm(from))==0) {
617             gslog(LOG_EMERG,"GSCPRTS::error::proccess blocked without"
618                     "sharedmemory\n");
619           } else {
620             if (UNREAD(r)) {
621               /* something arrived in the meantime so wakeup
622                  right away */
623                 if (send_wakeup(from)<0) {
624                   gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");
625                   return -1;
626                 }
627             } else {
628 #ifndef POLLING
629               if (ftacallback_add_wakeup(from,r)<0) {
630                 gslog(LOG_EMERG,"ERROR:Could not add wakeup\n");
631                 return -1;
632               }
633 #else
634                 gslog(LOG_EMERG,"Received wakeup request on polling systems\n");
635 #endif
636             }
637           }
638         }
639           break;
640         case PROCESS_CONTROL:{
641           if ((curprocess.type == LFTA)
642               ||(curprocess.type == HFTA)) {
643             struct process_control_arg * n;
644             n = (struct process_control_arg *) buf;
645             if (send_standard_reply(from,
646                                     ftaexec_process_control(n->command,
647                                                             n->sz,
648                                                             &(n->data[0])))<0) {
649               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
650               return -1;
651             }
652           } else {
653             if (send_standard_reply(from,-1)<0) {
654               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
655               return -1;
656             }
657             gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"
658                     "contacted for clearinghouse or HFTA processing\n");
659           }
660         }
661           break;
662         case WAKEUP:
663         case TIMEOUT:
664           break;
665         default:
666           gslog(LOG_EMERG,"GSCPRTS::error::illegal message queue type %u\n",h->callid);
667           return -1;
668         }
669         /* use this occation to cleanup the messagequeue we can't afford
670            a backlog in the real message queue since it is limited in
671            size */
672         while (gscpipc_read(&from,&lopp,buf,&length,0)>0) {
673 #ifdef PRINTMSG
674           gslog(LOG_EMERG, "request from %u of type %u with length %u\n",from,
675                   h->callid,h->size);
676 #endif
677           if ((lopp == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {
678             if (sidequeue_append(from,buf,length)<0) {
679               gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
680               return -1;
681             }
682           }
683
684         }
685     }
686     return 0;
687 }
688
689 static gs_retval_t  map_match(gs_csp_t  dev) {
690   gs_int32_t  x;
691   for(x=0;x<curprocess.mapcnt;x++){
692     if (strcmp(dev,curprocess.map[x])==0) {
693       return 1;
694     }
695   }
696   return 0;
697 }
698
699 gs_retval_t  fta_max_snaplen()
700 {
701   return maxsnaplen;
702 }
703
704 FTAID fta_register(FTAname name,gs_uint32_t   reusable, DEVname dev,
705                      alloc_fta fta_alloc_functionptr,
706                      gs_csp_t  schema, gs_int32_t  snaplen, gs_uint64_t prefilter)
707 {
708   gs_int8_t  rb[MAXRES];
709   struct fta_register_arg a;
710   struct standard_result * sr = (struct standard_result *)rb;
711   gs_int32_t  index;
712   FTAID res;
713   FTAID reserr;
714   res.ip=0;
715   res.port=0;
716   res.index=0;
717   res.streamid=0;
718   reserr=res;
719
720   /* check if the device matches for the registration */
721   if (((dev==0) && (curprocess.deviceid==0))
722       || ((dev!=0)&&(map_match(dev)==1))) {
723     if (dev!=0) {
724       gslog(LOG_INFO,"Register %s on device %s\n",name,dev);
725     } else {
726       gslog(LOG_INFO,"Register %s on default device\n",name);
727     }
728     if ((index=ftacallback_add_alloc(name,fta_alloc_functionptr,prefilter))<0) {
729       gslog(LOG_EMERG,"ERROR could not register callback\n");
730       return res;
731     }
732
733     if (snaplen<0) {
734       maxsnaplen=-1;
735     } else {
736       if (maxsnaplen!=-1) {
737         maxsnaplen=(snaplen>maxsnaplen)?snaplen:maxsnaplen;
738       }
739     }
740
741     res=gscpipc_getftaid();
742     res.index=index;
743     if (curprocess.type != CLEARINGHOUSE) {
744                 a.h.callid = FTA_REGISTER;
745                 a.h.size = sizeof(struct fta_register_arg);
746                 if (strlen(name)>=(MAXFTANAME-1)) {
747                         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
748                         return reserr;
749                 }
750                 if (strlen(schema)>=(MAXSCHEMASZ-1)) {
751                         gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);
752                         return reserr;
753                 }
754                 strcpy(a.name,name);
755                 strcpy(a.schema,schema);
756                 a.f=res;
757                 a.subscriber=res; /* consumer is the same as f for an FTA*/
758                 a.reusable=reusable;
759                 ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
760                 if (sr->h.callid != STANDARD_RESULT) {
761                         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
762                         return reserr;
763                 }
764                 if (sr->result != 0 )  {
765                         gslog(LOG_EMERG,"ERROR:Error in registration\n");
766                         return reserr;
767                 }
768     } else {
769       if (ftalookup_register_fta(gscpipc_getftaid(),
770                         res,name,reusable,schema)!=0) {
771                         return res;
772       }
773     }
774   }
775   return res;
776 }
777
778 gs_retval_t  fta_unregister(FTAID ftaid)
779 {
780   gs_int8_t  rb[MAXRES];
781   struct fta_unregister_arg a;
782   struct standard_result * sr = (struct standard_result *)rb;
783   if (ftacallback_rm_alloc(ftaid.index)<0) {
784     gslog(LOG_EMERG,"ERROR could not unregister callback\n");
785     return -1;
786   }
787
788   if (curprocess.type != CLEARINGHOUSE) {
789     a.h.callid = FTA_UNREGISTER;
790     a.h.size = sizeof(struct fta_register_arg);
791     a.f=ftaid;
792     ipc_call_and_wait(clearinghouseftaid,(gs_sp_t) &a,rb);
793     if (sr->h.callid != STANDARD_RESULT) {
794       gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
795       return -1;
796     }
797     return sr->result;
798   } else {
799     return ftalookup_unregister_fta(gscpipc_getftaid(),ftaid);
800   }
801   return 0;
802 }
803
804 gs_retval_t  hfta_post_tuple(struct FTA * self, gs_int32_t  sz, void *tuple)
805 {
806   struct ringbuf * r;
807   gs_uint32_t   msgid;
808   struct wakeup_result a;
809   FTAID * f;
810   gs_int32_t  state;
811
812     if (sz>MAXTUPLESZ) {
813         gslog(LOG_EMERG,"Maximum tuple size is %u\n",MAXTUPLESZ);
814         return -1;
815     }
816
817     if (self->printfunc.in_use==1) {
818                 return print_stream(self,sz,tuple);
819         }
820
821     if (ftacallback_start_streamid((gs_p_t  )self)<0) {
822         gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");
823         return -1;
824     }
825     /* now make sure we have space to write in all atomic ringbuffer */
826     while((r=ftacallback_next_streamid(&state))!=0) {
827         if (state == HFTA_RINGBUF_ATOMIC) {
828 #ifdef BLOCKRINGBUFFER
829             while (!SPACETOWRITE(r)) {
830                 usleep(100);
831             }
832 #endif
833             if (! SPACETOWRITE(r)) {
834                 /* atomic ring buffer and no space so post nothing */
835                 return -1;
836             }
837         }
838     }
839
840
841     if (ftacallback_start_streamid((gs_p_t  )self)<0) {
842         gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");
843         return -1;
844     }
845
846     while((r=ftacallback_next_streamid(&state))!=0) {
847         if (state != HFTA_RINGBUF_SUSPEND) {
848             if (!SPACETOWRITE(r)) {
849                 //since memory is full we set a warning
850                 shared_memory_full_warning++;
851                 // give receiver a chance to clean up
852                 usleep(0);
853             }
854             if (SPACETOWRITE(r)) {
855                 CURWRITE(r)->f=self->ftaid;
856                 CURWRITE(r)->sz=sz;
857                 memcpy(&(CURWRITE(r)->data[0]),tuple,sz);
858                                 outtuple++;
859                                 outbytes=outbytes+CURWRITE(r)->sz;
860                 ADVANCEWRITE(r);
861 #ifdef PRINTMSG
862                gslog(LOG_EMERG,"Wrote in ringpuffer %p [%p:%u]"
863                     "(%u %u) \n",r,&r->start,r->end,r->reader,r->writer);
864                 gslog(LOG_EMERG,"\t%u  %u\n",CURREAD(r)->next,
865                     CURREAD(r)->sz);
866 #endif
867             } else {
868                 outtupledrop++;
869             }
870             if (HOWFULL(r) > 500) {
871       // buffer is at least half full
872                 shared_memory_full_warning++;
873 #ifdef PRINTMSG
874                 gslog(LOG_EMERG,"\t\t buffer full\n");
875 #endif
876             }
877         }
878     }
879 #ifndef POLLING
880     if (ftacallback_start_wakeup((gs_p_t  ) self)<0) {
881         gslog(LOG_EMERG,"ERROR:Wakeup for unkown streamid\n");
882         return -1;
883     }
884     a.h.callid=WAKEUP;
885     a.h.size=sizeof(struct wakeup_result);
886     while((f=ftacallback_next_wakeup())!=0) {
887         if (send_wakeup(*f)<0) {
888             gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");
889             return -1;
890         }
891     }
892 #endif
893     return 0;
894 }
895
896 gs_retval_t  hfta_get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t  * space, gs_int32_t  szr, gs_int32_t  tuplesz)
897 {
898     gs_int32_t  x=0;
899     gs_int32_t  state;
900     struct ringbuf * ru;
901
902         if (f->printfunc.in_use==1) {
903         // XXX WHAT TO DO???
904                 gslog(LOG_INFO,"Checking space for printfunc");
905                 return 0;
906         }
907
908         if (ftacallback_start_streamid(f->ftaid.streamid)<0) {
909         gslog(LOG_EMERG,"ERROR:Space check for unkown streamid in HFTA\n");
910         return -1;
911     }
912
913     while ((ru=ftacallback_next_streamid(&state))!=0) {
914         if (szr > x ) {
915             r[x]=ru->destid;
916             space[x]=TUPLEFIT(ru,tuplesz);
917         }
918         x++;
919     }
920     return x;
921 }
922
923
924 gs_retval_t  hfta_set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t  state)
925 {
926
927     if (ftacallback_state_streamid(f->ftaid.streamid,process,state)<0) {
928         gslog(LOG_EMERG,"ERROR:state change for unkown streamid\n");
929         return -1;
930     }
931     return 0;
932 }