Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphost / callbackinterface.c
index 9d21d73..500a74a 100644 (file)
-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-#include "gsconfig.h"
-#include "gstypes.h"
-#include <gscpipc.h>
-#include <ipcencoding.h>
-#include <callbackregistries.h>
-#include <clearinghouseregistries.h>
-#include <lappregistries.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <lapp.h>
-#include <string.h>
-#include "rdtsc.h"
-
-gs_uint64_t shared_memory_full_warning =0;
-
-static gs_int32_t maxsnaplen = 0;
-
-struct FTAID clearinghouseftaid;
-
-gs_uint64_t intupledrop=0;
-gs_uint64_t outtupledrop=0;
-gs_uint64_t intuple=0;
-gs_uint64_t outtuple=0;
-gs_uint64_t inbytes=0;
-gs_uint64_t outbytes=0;
-gs_uint64_t cycles=0;
-
-
-
-/* following function is internal and defined in lappinterface.c */
-gs_retval_t  ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result);
-
-gs_retval_t  gscp_blocking_mode() {
-#ifdef BLOCKRINGBUFFER
-       return 1;
-#else
-       return 0;
-#endif
-}
-
-static void clock_signal_check() {
-    struct FTA * fa;
-    static gs_int32_t  t=0;
-    if (t==0) {
-        t=time(0);
-    } else {
-       if (time(0)>t) {
-            if (ftaexec_start()<0) {
-                gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "
-                        "FTA list\n");
-                return;
-            }
-            while ((fa=ftaexec_next())!=0) {
-                if (fa->clock_fta!=0) {
-                    fa->clock_fta(fa);
-                }
-            }
-            t=time(0);
-               if (t%GSLOGINTERVAL==0) gsstats();// log all the stats
-        }
-    }
-}
-
-static gs_retval_t  send_standard_reply(FTAID f, gs_int32_t  result) {
-    struct standard_result r;
-    r.h.callid=STANDARD_RESULT;
-    r.h.size=sizeof(struct standard_result);
-    r.result=result;
-    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {
-       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
-       return -1;
-    }
-    return 0 ;
-}
-
-static gs_retval_t  send_lookup_reply(FTAID f, gs_int32_t  result,
-                            FTAID * ftaid,
-                            gs_sp_t* schema) {
-    struct ftafind_result r;
-    r.h.callid=FTAFIND_RESULT;
-    r.h.size=sizeof(struct ftafind_result);
-    r.result=result;
-    r.f=*ftaid;
-    if (result >=0) {
-       if (strlen(*schema)>=(MAXSCHEMASZ-1)) {
-           gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",(unsigned char *)schema);
-           r.result=-1;
-       } else {
-           strcpy(r.schema,*schema);
-       }
-    }
-    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {
-       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
-       return -1;
-    }
-    return 0 ;
-}
-
-
-static gs_retval_t  send_fta_result(FTAID f,
-                          FTAID * ftaid, gs_int32_t  result) {
-    struct fta_result r;
-    r.h.callid=FTA_RESULT;
-    r.h.size=sizeof(struct fta_result);
-    r.result=result;
-    if (ftaid!=0) {
-      r.f=*ftaid;
-    }
-    if (gscpipc_send(f, FTACALLBACK,(gs_sp_t)&r,r.h.size,1)<0) {
-       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
-       return -1;
-    }
-    return 0 ;
-}
-
-
-// Is also used by the lfta rts enviroment on a post. So make it none
-// static.
-gs_retval_t  send_wakeup(FTAID f)
-{
-    struct wakeup_result a;
-
-    a.h.callid=WAKEUP;
-    a.h.size=sizeof(struct wakeup_result);
-    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t)&a,a.h.size,0)<0) {
-       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
-       return -1;
-    }
-    return 0;
-}
-
-static gs_retval_t  fta_register_instance(FTAID subscriber,
-                                FTAID f,gs_uint32_t   reusable,
-                                FTAname name,
-                                gs_csp_t  schema) {
-  gs_int8_t  rb[MAXRES];
-  struct fta_register_arg a;
-  struct standard_result * sr = (struct standard_result *)rb;
-
-  if (curprocess.type != CLEARINGHOUSE) {
-    a.h.callid = FTA_REGISTER;
-    a.h.size = sizeof(struct fta_register_arg);
-    if (strlen(name)>=(MAXFTANAME-1)) {
-      gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
-      return -1;
-    }
-    if (strlen(schema)>=(MAXSCHEMASZ-1)) {
-      gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);
-      return -1;
-    }
-    strcpy(a.name,name);
-    strcpy(a.schema,schema);
-    a.f=f;
-    a.subscriber=subscriber; /* consumer is the same as f for an FTA*/
-    a.reusable=reusable;
-    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
-    if (sr->h.callid != STANDARD_RESULT) {
-      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
-      return -1;
-    }
-    if (sr->result != 0) {
-      return -1;
-    }
-  } else {
-    if (ftalookup_register_fta(subscriber,f,name,reusable,schema)<0) {
-      return -1;
-    }
-  }
-  return 0;
-}
-
-static gs_retval_t  fta_unregister_instance(FTAID subscriber,
-                                  FTAID f) {
-  gs_int8_t  rb[MAXRES];
-  struct fta_unregister_arg a;
-  struct standard_result * sr = (struct standard_result *)rb;
-
-  if (curprocess.type != CLEARINGHOUSE) {
-    a.h.callid = FTA_UNREGISTER;
-    a.h.size = sizeof(struct fta_register_arg);
-    a.f=f;
-    a.subscriber=subscriber; /* consumer is the same as f for an FTA*/
-    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
-    if (sr->h.callid != STANDARD_RESULT) {
-      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
-      return -1;
-    }
-    return -1;
-  } else {
-    if (ftalookup_unregister_fta(f,subscriber)<0) {
-      return -1;
-    }
-  }
-  return 0;
-}
-
-gs_retval_t  fta_start_service(gs_int32_t  number)
-{
-    gs_int8_t   buf[MAXMSGSZ];
-    FTAID from;
-    gs_int32_t  length;
-    struct hostcall * h= (struct hostcall *) buf;
-    gs_int32_t  forever=0;
-    gs_int32_t  endtime=0;
-    gs_int32_t  block=1;
-    gs_int32_t  res;
-    struct ringbuf * r;
-    FTAID ftaid;
-    FTAID * ftaidp;
-    struct FTA * fta;
-    gs_int32_t  lopp;
-       gs_int32_t  preemptq;
-       gs_int32_t  endq;
-       gs_uint64_t s1;
-       gs_uint64_t s2;
-
-    if (number == 0) {
-       block=0;
-       forever=1;
-    }
-    if (number < 0) {
-       forever=1;
-    }
-    if (number > 0) {
-        block=1;
-       endtime=time(0)+number;
-    }
-
-    while((forever!=0)||
-          (endtime==0)||(endtime>time(0))) {
-        /* check if we need to give the FTAs there clock signal */
-        if ((curprocess.type == LFTA)
-            ||(curprocess.type == HFTA)) {
-            clock_signal_check();
-        }
-
-#ifdef POLLING
-       poll:
-#endif
-    preemptq=0;
-       /* first empty out sidequeu then read from messagequeue */
-       if (sidequeue_pop(&from,buf,&length)<0) {
-         /* empty out the sidequeue before processing the shared
-            memory */
-         if (curprocess.type == HFTA) {
-           /* process all the shared memory regions and register
-              for callbacks */
-               s1=rdtsc();
-               endq=time(0)+2;
-           streamregistry_getactiveringbuf_reset();
-           while ((r=streamregistry_getactiveringbuf())>0) {
-             while (UNREAD(r)) {
-                       struct FTA * fa;
-                       if (ftaexec_start()<0) {
-                       gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "
-                         "FTA list\n");
-                       return -1;
-                       }
-                       while ((fa=ftaexec_next())!=0) {
-                       gs_int32_t  x;
-                       for(x=0;x<fa->stream_subscribed_cnt;x++) {
-                               if ((fa->stream_subscribed[x].streamid
-                                       ==CURREAD(r)->f.streamid)
-                                       && (fa->stream_subscribed[x].ip
-                                               ==CURREAD(r)->f.ip)
-                                       && (fa->stream_subscribed[x].port
-                                               ==CURREAD(r)->f.port)) {
-                                       fa->accept_packet(fa,&(CURREAD(r)->f),
-                                               &(CURREAD(r)->data[0]),
-                                               CURREAD(r)->sz);
-                               }
-                       }
-                       }
-                       intuple++;
-                       inbytes+=CURREAD(r)->sz;
-                       ADVANCEREAD(r);
-                       if (endq <= time(0)) {
-                               preemptq=1;
-                               goto processmsg;
-                       }
-             }
-           }
-               s2=rdtsc();
-               cycles+=(s2-s1);
-#ifndef POLLING
-           /* register wakeups all arround to make sure we don't sleep
-            * for ever,
-            */
-           streamregistry_getactiveftaid_reset();
-           while ((ftaidp=streamregistry_getactiveftaid())>0) {
-             struct gscp_get_buffer_arg a;
-             a.h.callid = GSCP_GET_BUFFER;
-             a.h.size = sizeof(struct gscp_get_buffer_arg);
-             a.timeout = 0;
-             if (gscpipc_send(*ftaidp,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {
-               return -1;
-             }
-           }
-#endif
-         }
-         processmsg:
-         do {
-                       /* even if we block we return every 100msec to be able to generate the clock signal to
-                        * the HFTAs
-             */
-            if ((res=gscpipc_read(&from,&lopp,buf,&length,((block==1)&&(preemptq==0))?2:0))<0) {
-                gslog(LOG_EMERG,"GSCPRTS::error::reading from messagequeue\n");
-                return -1;
-            }
-            /* check if we need to give the FTAs there clock signal */
-            if ((curprocess.type == LFTA)
-                ||(curprocess.type == HFTA)) {
-                clock_signal_check();
-            }
-            if ((res==0) && (block==0)) {
-                /* nonblocking and nothing to do so return */
-                return 0;
-            }
-            if ((res==0) && (endtime!=0) && (endtime<time(0))) {
-                /* timeout reached so return */
-                return 0;
-            }
-#ifdef POLLING
-           if ((res==0)&&(curprocess.type == HFTA)) {
-                goto poll;
-           }
-#endif
-         } while (res==0);
-         if ((lopp)!=FTACALLBACK) {
-           gslog(LOG_EMERG,"GSCPRTS::error::unknown lowlevel opp\n");
-           return -1;
-         }
-       }
-
-#ifdef PRINTMSG
-       gslog(LOG_EMERG, "HFTA message from %u of type %u of length %u\n",from,
-       h->callid,h->size);
-#endif
-       switch (h->callid) {
-
-       case FTA_LOOKUP: {
-         if (curprocess.type == CLEARINGHOUSE) {
-           struct fta_find_arg * n;
-           FTAID rf;
-           gs_csp_t  schema;
-           n = (struct fta_find_arg *) buf;
-           /* Note: Side effect of ftalookup_lookup_fta_index is to
-              fill msgid and index */
-                 if (send_lookup_reply(from,
-                                       ftalookup_lookup_fta_index(from,
-                                                                  n->name,
-                                                                  n->reuse,
-                                                                  &rf,
-                                                                  &schema),
-                                       &rf,
-                                       (gs_sp_t *)&schema)<0) {
-                   gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-                   return -1;
-                 }
-         } else {
-           if (send_standard_reply(from,-1)<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
-                   "contacted for clearinghouse processing\n");
-         }
-       }
-         break;
-       case FTA_REGISTER: {
-         if (curprocess.type == CLEARINGHOUSE) {
-           struct fta_register_arg * n;
-           n = (struct fta_register_arg *) buf;
-           if (send_standard_reply(from,
-                                   ftalookup_register_fta(n->subscriber,
-                                                          n->f,
-                                                          n->name,
-                                                          n->reusable,
-                                                          n->schema))<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-         } else {
-           if (send_standard_reply(from,-1)<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
-                   "contacted for clearinghouse processing\n");
-         }
-       }
-         break;
-       case FTA_UNREGISTER: {
-         if (curprocess.type == CLEARINGHOUSE) {
-           struct fta_unregister_arg * n;
-           n = (struct fta_unregister_arg *) buf;
-           if (send_standard_reply(from,
-                                   ftalookup_unregister_fta(n->subscriber,
-                                                            n->f))<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-         } else {
-           if (send_standard_reply(from,-1)<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
-                   "contacted for clearinghouse processing\n");
-         }
-       }
-         break;
-       case FTA_ALLOC_INSTANCE:
-       case FTA_ALLOC_PRINT_INSTANCE:
-         if ((curprocess.type == LFTA)
-             ||(curprocess.type == HFTA)) {
-           struct ringbuf * r;
-           struct fta_alloc_instance_arg * n;
-           n = (struct fta_alloc_instance_arg *) buf;
-           if ((fta=ftaexec_alloc_instance(n->f.index,
-                                           (struct FTA *)n->f.streamid,
-                                           n->reusable,
-                                           n->command,
-                                           n->sz,
-                                           &(n->data[0])))==0) {
-             gslog(LOG_EMERG,"GSCPRTS::warning::could not allocate"
-                     "FTA\n");
-             if (send_fta_result(from,0,-1)<0) {
-               gslog(LOG_EMERG,"GSCPRTS::error::send standard "
-                       "reply faild\n");
-               return -1;
-             }
-           } else {
-             /* shared memory is only required if data is beeing transfered */
-             if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&
-                 ((r=gscpipc_getshm(from))==0)) {
-                               gslog(LOG_EMERG,"GSCPRTS::warning::could not get"
-                               "shared memory\n");
-                               ftaexec_remove(fta);
-                               if (send_fta_result(from,0,-1)<0) {
-                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
-                                       "reply faild\n");
-                                       return -1;
-                               }
-               } else {
-                       /* no callback to register for print function */
-                       if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&(ftacallback_add_streamid(r,fta->ftaid.streamid)!=0)) {
-                               gslog(LOG_EMERG,"GSCPRTS::warning::could not add"
-                                       "streamid to ringbuffer\n");
-                               ftaexec_free_instance(fta,1);
-                               if (send_fta_result(from,0,-1)<0) {
-                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
-                                       "reply faild\n");
-                                       return -1;
-                               }
-               } else {
-                 if (ftaexec_insert(0,fta)<0) {
-                   gslog(LOG_EMERG,"GSCPRTS::warning::could not"
-                           "insert FTA\n");
-                   ftacallback_rm_streamid(r,fta->ftaid.streamid);
-                   ftaexec_free_instance(fta,1);
-                   if (send_fta_result(from,0,-1)<0) {
-                     gslog(LOG_EMERG,"GSCPRTS::error::send standard "
-                             "reply faild\n");
-                     return -1;
-                   }
-                 } else {
-                   if (fta_register_instance(n->subscriber,fta->ftaid,
-                                             n->reusable,
-                                             n->name,
-                                             n->schema)!=0) {
-                     gslog(LOG_EMERG,"GSCPRTS::warning::could not register"
-                       " instance\n");
-                     ftaexec_remove(fta);
-                     ftacallback_rm_streamid(r,fta->ftaid.streamid);
-                     ftaexec_free_instance(fta,1);
-                     if (send_fta_result(from,0,-1)<0) {
-                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
-                               "reply faild\n");
-                       return -1;
-                     }
-                   }  else {
-                     if (h->callid==FTA_ALLOC_PRINT_INSTANCE) {
-                               if (curprocess.type == LFTA) {
-                                       gslog(LOG_EMERG,"GSCPRTS::error:: alloc print instance not "
-                                               "implemented for LFTA.\n");
-                                       ftaexec_remove(fta);
-                                       ftaexec_free_instance(fta,1);
-                                       if (send_fta_result(from,0,-1)<0) {
-                                               gslog(LOG_EMERG,"GSCPRTS::error::send standard "
-                                               "reply faild\n");
-                                               return -1;
-                                       }
-                               } else {
-                                       if (add_printfunction_to_stream(fta, n->schema, n->path, n->basename,
-                                               n->temporal_field, n->split_field, n->delta, n->split) < 0) {
-                                               ftaexec_remove(fta);
-                                               ftaexec_free_instance(fta,1);
-                                               if (send_fta_result(from,0,-1)<0) {
-                                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
-                                                       "reply faild\n");
-                                                       return -1;
-                                               }
-                               }
-                         }
-                       }
-                     if (send_fta_result(from,&fta->ftaid,0)<0) {
-                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
-                               "reply faild\n");
-                               return -1;
-                     }
-                   }
-                 }
-               }
-             }
-           }
-         } else {
-           if (send_fta_result(from,0,-1)<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard "
-                     "reply faild\n");
-             return -1;
-           }
-         }
-         break;
-       case FTA_FREE_INSTANCE:{
-         if ((curprocess.type == LFTA)
-             ||(curprocess.type == HFTA)) {
-           struct fta_free_instance_arg * n;
-           n = (struct fta_free_instance_arg *) buf;
-           if (((r=gscpipc_getshm(from))!=0)
-               && ( ftaexec_remove((struct FTA *) n->f.streamid)==0)
-               && ( ftacallback_rm_streamid(r,n->f.streamid)==0)
-               && (ftaexec_free_instance((struct FTA *)n->f.streamid,
-                                         n->recursive)
-                   ==0)
-               && (fta_unregister_instance(n->subscriber,n->f)<0)) {
-             if (send_standard_reply(from,0)<0) {
-               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-               return -1;
-             }
-           } else {
-             if (send_standard_reply(from,-1)<0) {
-               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-               return -1;
-             }
-           }
-         } else {
-           if (send_standard_reply(from,-1)<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-         }
-       }
-          break;
-       case FTA_CONTROL:{
-         if ((curprocess.type == LFTA)
-             ||(curprocess.type == HFTA)) {
-           struct fta_control_arg * n;
-           n = (struct fta_control_arg *) buf;
-           if (send_standard_reply(from,
-                                   ftaexec_control((struct FTA *)
-                                                   n->f.streamid,
-                                                   n->command,
-                                                   n->sz,
-                                                   &(n->data[0])))<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-         } else {
-           if (send_standard_reply(from,-1)<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"
-                   "contacted for clearinghouse or HFTA processing\n");
-         }
-       }
-          break;
-       case FTA_PRODUCER_FAILURE: {
-         if (curprocess.type == CLEARINGHOUSE) {
-           struct fta_notify_producer_failure_arg * n;
-           n = (struct fta_notify_producer_failure_arg *) buf;
-           ftalookup_producer_failure(n->sender,n->producer);
-         }
-       }
-         break;
-       case FTA_HEARTBEAT: {
-         if (curprocess.type == CLEARINGHOUSE) {
-           struct fta_heartbeat_arg * n;
-           n = (struct fta_heartbeat_arg *) buf;
-           ftalookup_heartbeat(n->sender,n->trace_id,
-                               n->sz,&(n->data[0]));
-         }
-       }
-         break;
-       case GSCP_GET_BUFFER:{
-         struct sgroup_get_buffer_arg * n;
-         gs_int32_t  res;
-         struct ringbuf * r;
-         n = (struct sgroup_get_buffer_arg *) buf;
-
-         if ((r=gscpipc_getshm(from))==0) {
-           gslog(LOG_EMERG,"GSCPRTS::error::proccess blocked without"
-                   "sharedmemory\n");
-         } else {
-           if (UNREAD(r)) {
-             /* something arrived in the meantime so wakeup
-                right away */
-               if (send_wakeup(from)<0) {
-                 gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");
-                 return -1;
-               }
-           } else {
-#ifndef POLLING
-             if (ftacallback_add_wakeup(from,r)<0) {
-               gslog(LOG_EMERG,"ERROR:Could not add wakeup\n");
-               return -1;
-             }
-#else
-               gslog(LOG_EMERG,"Received wakeup request on polling systems\n");
-#endif
-           }
-         }
-       }
-         break;
-       case PROCESS_CONTROL:{
-         if ((curprocess.type == LFTA)
-             ||(curprocess.type == HFTA)) {
-           struct process_control_arg * n;
-           n = (struct process_control_arg *) buf;
-           if (send_standard_reply(from,
-                                   ftaexec_process_control(n->command,
-                                                           n->sz,
-                                                           &(n->data[0])))<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-         } else {
-           if (send_standard_reply(from,-1)<0) {
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
-             return -1;
-           }
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"
-                   "contacted for clearinghouse or HFTA processing\n");
-         }
-       }
-          break;
-       case WAKEUP:
-       case TIMEOUT:
-          break;
-       default:
-         gslog(LOG_EMERG,"GSCPRTS::error::illegal message queue type %u\n",h->callid);
-         return -1;
-        }
-       /* use this occation to cleanup the messagequeue we can't afford
-          a backlog in the real message queue since it is limited in
-          size */
-       while (gscpipc_read(&from,&lopp,buf,&length,0)>0) {
-#ifdef PRINTMSG
-         gslog(LOG_EMERG, "request from %u of type %u with length %u\n",from,
-                 h->callid,h->size);
-#endif
-         if ((lopp == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {
-           if (sidequeue_append(from,buf,length)<0) {
-             gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
-             return -1;
-           }
-         }
-
-       }
-    }
-    return 0;
-}
-
-static gs_retval_t  map_match(gs_csp_t  dev) {
-  gs_int32_t  x;
-  for(x=0;x<curprocess.mapcnt;x++){
-    if (strcmp(dev,curprocess.map[x])==0) {
-      return 1;
-    }
-  }
-  return 0;
-}
-
-gs_retval_t  fta_max_snaplen()
-{
-  return maxsnaplen;
-}
-
-FTAID fta_register(FTAname name,gs_uint32_t   reusable, DEVname dev,
-                    alloc_fta fta_alloc_functionptr,
-                    gs_csp_t  schema, gs_int32_t  snaplen, gs_uint64_t prefilter)
-{
-  gs_int8_t  rb[MAXRES];
-  struct fta_register_arg a;
-  struct standard_result * sr = (struct standard_result *)rb;
-  gs_int32_t  index;
-  FTAID res;
-  FTAID reserr;
-  res.ip=0;
-  res.port=0;
-  res.index=0;
-  res.streamid=0;
-  reserr=res;
-
-  /* check if the device matches for the registration */
-  if (((dev==0) && (curprocess.deviceid==0))
-      || ((dev!=0)&&(map_match(dev)==1))) {
-    if (dev!=0) {
-      gslog(LOG_INFO,"Register %s on device %s\n",name,dev);
-    } else {
-      gslog(LOG_INFO,"Register %s on default device\n",name);
-    }
-    if ((index=ftacallback_add_alloc(name,fta_alloc_functionptr,prefilter))<0) {
-      gslog(LOG_EMERG,"ERROR could not register callback\n");
-      return res;
-    }
-
-    if (snaplen<0) {
-      maxsnaplen=-1;
-    } else {
-      if (maxsnaplen!=-1) {
-       maxsnaplen=(snaplen>maxsnaplen)?snaplen:maxsnaplen;
-      }
-    }
-
-    res=gscpipc_getftaid();
-    res.index=index;
-    if (curprocess.type != CLEARINGHOUSE) {
-               a.h.callid = FTA_REGISTER;
-               a.h.size = sizeof(struct fta_register_arg);
-               if (strlen(name)>=(MAXFTANAME-1)) {
-                       gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
-                       return reserr;
-               }
-               if (strlen(schema)>=(MAXSCHEMASZ-1)) {
-                       gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);
-                       return reserr;
-               }
-               strcpy(a.name,name);
-               strcpy(a.schema,schema);
-               a.f=res;
-               a.subscriber=res; /* consumer is the same as f for an FTA*/
-               a.reusable=reusable;
-               ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
-               if (sr->h.callid != STANDARD_RESULT) {
-                       gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
-                       return reserr;
-               }
-               if (sr->result != 0 )  {
-                       gslog(LOG_EMERG,"ERROR:Error in registration\n");
-                       return reserr;
-               }
-    } else {
-      if (ftalookup_register_fta(gscpipc_getftaid(),
-                       res,name,reusable,schema)!=0) {
-                       return res;
-      }
-    }
-  }
-  return res;
-}
-
-gs_retval_t  fta_unregister(FTAID ftaid)
-{
-  gs_int8_t  rb[MAXRES];
-  struct fta_unregister_arg a;
-  struct standard_result * sr = (struct standard_result *)rb;
-  if (ftacallback_rm_alloc(ftaid.index)<0) {
-    gslog(LOG_EMERG,"ERROR could not unregister callback\n");
-    return -1;
-  }
-
-  if (curprocess.type != CLEARINGHOUSE) {
-    a.h.callid = FTA_UNREGISTER;
-    a.h.size = sizeof(struct fta_register_arg);
-    a.f=ftaid;
-    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t) &a,rb);
-    if (sr->h.callid != STANDARD_RESULT) {
-      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
-      return -1;
-    }
-    return sr->result;
-  } else {
-    return ftalookup_unregister_fta(gscpipc_getftaid(),ftaid);
-  }
-  return 0;
-}
-
-gs_retval_t  hfta_post_tuple(struct FTA * self, gs_int32_t  sz, void *tuple)
-{
-  struct ringbuf * r;
-  gs_uint32_t   msgid;
-  struct wakeup_result a;
-  FTAID * f;
-  gs_int32_t  state;
-
-    if (sz>MAXTUPLESZ) {
-        gslog(LOG_EMERG,"Maximum tuple size is %u\n",MAXTUPLESZ);
-        return -1;
-    }
-
-    if (self->printfunc.in_use==1) {
-               return print_stream(self,sz,tuple);
-       }
-
-    if (ftacallback_start_streamid((gs_p_t  )self)<0) {
-       gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");
-       return -1;
-    }
-    /* now make sure we have space to write in all atomic ringbuffer */
-    while((r=ftacallback_next_streamid(&state))!=0) {
-        if (state == HFTA_RINGBUF_ATOMIC) {
-#ifdef BLOCKRINGBUFFER
-            while (!SPACETOWRITE(r)) {
-                usleep(100);
-            }
-#endif
-            if (! SPACETOWRITE(r)) {
-                /* atomic ring buffer and no space so post nothing */
-                return -1;
-            }
-        }
-    }
-
-
-    if (ftacallback_start_streamid((gs_p_t  )self)<0) {
-        gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");
-        return -1;
-    }
-
-    while((r=ftacallback_next_streamid(&state))!=0) {
-        if (state != HFTA_RINGBUF_SUSPEND) {
-            if (!SPACETOWRITE(r)) {
-                //since memory is full we set a warning
-                shared_memory_full_warning++;
-                // give receiver a chance to clean up
-                usleep(0);
-            }
-            if (SPACETOWRITE(r)) {
-                CURWRITE(r)->f=self->ftaid;
-                CURWRITE(r)->sz=sz;
-                memcpy(&(CURWRITE(r)->data[0]),tuple,sz);
-                               outtuple++;
-                               outbytes=outbytes+CURWRITE(r)->sz;
-                ADVANCEWRITE(r);
-#ifdef PRINTMSG
-               gslog(LOG_EMERG,"Wrote in ringpuffer %p [%p:%u]"
-                    "(%u %u) \n",r,&r->start,r->end,r->reader,r->writer);
-                gslog(LOG_EMERG,"\t%u  %u\n",CURREAD(r)->next,
-                    CURREAD(r)->sz);
-#endif
-            } else {
-                outtupledrop++;
-            }
-            if (HOWFULL(r) > 500) {
-      // buffer is at least half full
-                shared_memory_full_warning++;
-#ifdef PRINTMSG
-                gslog(LOG_EMERG,"\t\t buffer full\n");
-#endif
-            }
-        }
-    }
-#ifndef POLLING
-    if (ftacallback_start_wakeup((gs_p_t  ) self)<0) {
-        gslog(LOG_EMERG,"ERROR:Wakeup for unkown streamid\n");
-        return -1;
-    }
-    a.h.callid=WAKEUP;
-    a.h.size=sizeof(struct wakeup_result);
-    while((f=ftacallback_next_wakeup())!=0) {
-        if (send_wakeup(*f)<0) {
-            gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");
-            return -1;
-        }
-    }
-#endif
-    return 0;
-}
-
-gs_retval_t  hfta_get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t  * space, gs_int32_t  szr, gs_int32_t  tuplesz)
-{
-    gs_int32_t  x=0;
-    gs_int32_t  state;
-    struct ringbuf * ru;
-
-       if (f->printfunc.in_use==1) {
-       // XXX WHAT TO DO???
-               gslog(LOG_INFO,"Checking space for printfunc");
-               return 0;
-       }
-
-       if (ftacallback_start_streamid(f->ftaid.streamid)<0) {
-        gslog(LOG_EMERG,"ERROR:Space check for unkown streamid in HFTA\n");
-        return -1;
-    }
-
-    while ((ru=ftacallback_next_streamid(&state))!=0) {
-        if (szr > x ) {
-            r[x]=ru->destid;
-            space[x]=TUPLEFIT(ru,tuplesz);
-        }
-        x++;
-    }
-    return x;
-}
-
-
-gs_retval_t  hfta_set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t  state)
-{
-
-    if (ftacallback_state_streamid(f->ftaid.streamid,process,state)<0) {
-       gslog(LOG_EMERG,"ERROR:state change for unkown streamid\n");
-       return -1;
-    }
-    return 0;
-}
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+#include <gscpipc.h>\r
+#include <ipcencoding.h>\r
+#include <callbackregistries.h>\r
+#include <clearinghouseregistries.h>\r
+#include <lappregistries.h>\r
+#include <stdlib.h>\r
+#include <stdio.h>\r
+#include <lapp.h>\r
+#include <string.h>\r
+#include "rdtsc.h"\r
+\r
+gs_uint64_t shared_memory_full_warning =0;\r
+\r
+static gs_int32_t maxsnaplen = 0;\r
+\r
+struct FTAID clearinghouseftaid;\r
+\r
+gs_uint64_t intupledrop=0;\r
+gs_uint64_t outtupledrop=0;\r
+gs_uint64_t intuple=0;\r
+gs_uint64_t outtuple=0;\r
+gs_uint64_t inbytes=0;\r
+gs_uint64_t outbytes=0;\r
+gs_uint64_t cycles=0;\r
+\r
+\r
+\r
+/* following function is internal and defined in lappinterface.c */\r
+gs_retval_t  ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result);\r
+\r
+gs_retval_t  gscp_blocking_mode() {\r
+#ifdef BLOCKRINGBUFFER\r
+       return 1;\r
+#else\r
+       return 0;\r
+#endif\r
+}\r
+\r
+static void clock_signal_check() {\r
+    struct FTA * fa;\r
+    static gs_int32_t  t=0;\r
+    if (t==0) {\r
+        t=time(0);\r
+    } else {\r
+       if (time(0)>t) {\r
+            if (ftaexec_start()<0) {\r
+                gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "\r
+                        "FTA list\n");\r
+                return;\r
+            }\r
+            while ((fa=ftaexec_next())!=0) {\r
+                if (fa->clock_fta!=0) {\r
+                    fa->clock_fta(fa);\r
+                }\r
+            }\r
+            t=time(0);\r
+               if (t%GSLOGINTERVAL==0) gsstats();// log all the stats\r
+        }\r
+    }\r
+}\r
+\r
+static gs_retval_t  send_standard_reply(FTAID f, gs_int32_t  result) {\r
+    struct standard_result r;\r
+    r.h.callid=STANDARD_RESULT;\r
+    r.h.size=sizeof(struct standard_result);\r
+    r.result=result;\r
+    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {\r
+       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
+       return -1;\r
+    }\r
+    return 0 ;\r
+}\r
+\r
+static gs_retval_t  send_lookup_reply(FTAID f, gs_int32_t  result,\r
+                            FTAID * ftaid,\r
+                            gs_sp_t* schema) {\r
+    struct ftafind_result r;\r
+    r.h.callid=FTAFIND_RESULT;\r
+    r.h.size=sizeof(struct ftafind_result);\r
+    r.result=result;\r
+    r.f=*ftaid;\r
+    if (result >=0) {\r
+       if (strlen(*schema)>=(MAXSCHEMASZ-1)) {\r
+           gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",(unsigned char *)schema);\r
+           r.result=-1;\r
+       } else {\r
+           strcpy(r.schema,*schema);\r
+       }\r
+    }\r
+    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {\r
+       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
+       return -1;\r
+    }\r
+    return 0 ;\r
+}\r
+\r
+\r
+static gs_retval_t  send_fta_result(FTAID f,\r
+                          FTAID * ftaid, gs_int32_t  result) {\r
+    struct fta_result r;\r
+    r.h.callid=FTA_RESULT;\r
+    r.h.size=sizeof(struct fta_result);\r
+    r.result=result;\r
+    if (ftaid!=0) {\r
+      r.f=*ftaid;\r
+    }\r
+    if (gscpipc_send(f, FTACALLBACK,(gs_sp_t)&r,r.h.size,1)<0) {\r
+       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
+       return -1;\r
+    }\r
+    return 0 ;\r
+}\r
+\r
+\r
+// Is also used by the lfta rts enviroment on a post. So make it none\r
+// static.\r
+gs_retval_t  send_wakeup(FTAID f)\r
+{\r
+    struct wakeup_result a;\r
+\r
+    a.h.callid=WAKEUP;\r
+    a.h.size=sizeof(struct wakeup_result);\r
+    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t)&a,a.h.size,0)<0) {\r
+       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
+       return -1;\r
+    }\r
+    return 0;\r
+}\r
+\r
+static gs_retval_t  fta_register_instance(FTAID subscriber,\r
+                                FTAID f,gs_uint32_t   reusable,\r
+                                FTAname name,\r
+                                gs_csp_t  schema) {\r
+  gs_int8_t  rb[MAXRES];\r
+  struct fta_register_arg a;\r
+  struct standard_result * sr = (struct standard_result *)rb;\r
+\r
+  if (curprocess.type != CLEARINGHOUSE) {\r
+    a.h.callid = FTA_REGISTER;\r
+    a.h.size = sizeof(struct fta_register_arg);\r
+    if (strlen(name)>=(MAXFTANAME-1)) {\r
+      gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
+      return -1;\r
+    }\r
+    if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
+      gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);\r
+      return -1;\r
+    }\r
+    strcpy(a.name,name);\r
+    strcpy(a.schema,schema);\r
+    a.f=f;\r
+    a.subscriber=subscriber; /* consumer is the same as f for an FTA*/\r
+    a.reusable=reusable;\r
+    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);\r
+    if (sr->h.callid != STANDARD_RESULT) {\r
+      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
+      return -1;\r
+    }\r
+    if (sr->result != 0) {\r
+      return -1;\r
+    }\r
+  } else {\r
+    if (ftalookup_register_fta(subscriber,f,name,reusable,schema)<0) {\r
+      return -1;\r
+    }\r
+  }\r
+  return 0;\r
+}\r
+\r
+static gs_retval_t  fta_unregister_instance(FTAID subscriber,\r
+                                  FTAID f) {\r
+  gs_int8_t  rb[MAXRES];\r
+  struct fta_unregister_arg a;\r
+  struct standard_result * sr = (struct standard_result *)rb;\r
+\r
+  if (curprocess.type != CLEARINGHOUSE) {\r
+    a.h.callid = FTA_UNREGISTER;\r
+    a.h.size = sizeof(struct fta_register_arg);\r
+    a.f=f;\r
+    a.subscriber=subscriber; /* consumer is the same as f for an FTA*/\r
+    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);\r
+    if (sr->h.callid != STANDARD_RESULT) {\r
+      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
+      return -1;\r
+    }\r
+    return -1;\r
+  } else {\r
+    if (ftalookup_unregister_fta(f,subscriber)<0) {\r
+      return -1;\r
+    }\r
+  }\r
+  return 0;\r
+}\r
+\r
+gs_retval_t  fta_start_service(gs_int32_t  number)\r
+{\r
+    gs_int8_t   buf[MAXMSGSZ];\r
+    FTAID from;\r
+    gs_int32_t  length;\r
+    struct hostcall * h= (struct hostcall *) buf;\r
+    gs_int32_t  forever=0;\r
+    gs_int32_t  endtime=0;\r
+    gs_int32_t  block=1;\r
+    gs_int32_t  res;\r
+    struct ringbuf * r;\r
+    FTAID ftaid;\r
+    FTAID * ftaidp;\r
+    struct FTA * fta;\r
+    gs_int32_t  lopp;\r
+       gs_int32_t  preemptq;\r
+       gs_int32_t  endq;\r
+       gs_uint64_t s1;\r
+       gs_uint64_t s2;\r
+\r
+    if (number == 0) {\r
+       block=0;\r
+       forever=1;\r
+    }\r
+    if (number < 0) {\r
+       forever=1;\r
+    }\r
+    if (number > 0) {\r
+        block=1;\r
+       endtime=time(0)+number;\r
+    }\r
+\r
+    while((forever!=0)||\r
+          (endtime==0)||(endtime>time(0))) {\r
+        /* check if we need to give the FTAs there clock signal */\r
+        if ((curprocess.type == LFTA)\r
+            ||(curprocess.type == HFTA)) {\r
+            clock_signal_check();\r
+        }\r
+\r
+#ifdef POLLING\r
+       poll:\r
+#endif\r
+    preemptq=0;\r
+       /* first empty out sidequeu then read from messagequeue */\r
+       if (sidequeue_pop(&from,buf,&length)<0) {\r
+         /* empty out the sidequeue before processing the shared\r
+            memory */\r
+         if (curprocess.type == HFTA) {\r
+           /* process all the shared memory regions and register\r
+              for callbacks */\r
+               s1=rdtsc();\r
+               endq=time(0)+2;\r
+           streamregistry_getactiveringbuf_reset();\r
+           while ((r=streamregistry_getactiveringbuf())>0) {\r
+             while (UNREAD(r)) {\r
+                       struct FTA * fa;\r
+                       if (ftaexec_start()<0) {\r
+                       gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "\r
+                         "FTA list\n");\r
+                       return -1;\r
+                       }\r
+                       while ((fa=ftaexec_next())!=0) {\r
+                       gs_int32_t  x;\r
+                       for(x=0;x<fa->stream_subscribed_cnt;x++) {\r
+                               if ((fa->stream_subscribed[x].streamid\r
+                                       ==CURREAD(r)->f.streamid)\r
+                                       && (fa->stream_subscribed[x].ip\r
+                                               ==CURREAD(r)->f.ip)\r
+                                       && (fa->stream_subscribed[x].port\r
+                                               ==CURREAD(r)->f.port)) {\r
+                                       fa->accept_packet(fa,&(CURREAD(r)->f),\r
+                                               &(CURREAD(r)->data[0]),\r
+                                               CURREAD(r)->sz);\r
+                               }\r
+                       }\r
+                       }\r
+                       intuple++;\r
+                       inbytes+=CURREAD(r)->sz;\r
+                       ADVANCEREAD(r);\r
+                       if (endq <= time(0)) {\r
+                               preemptq=1;\r
+                               goto processmsg;\r
+                       }\r
+             }\r
+           }\r
+               s2=rdtsc();\r
+               cycles+=(s2-s1);\r
+#ifndef POLLING\r
+           /* register wakeups all arround to make sure we don't sleep\r
+            * for ever,\r
+            */\r
+           streamregistry_getactiveftaid_reset();\r
+           while ((ftaidp=streamregistry_getactiveftaid())>0) {\r
+             struct gscp_get_buffer_arg a;\r
+             a.h.callid = GSCP_GET_BUFFER;\r
+             a.h.size = sizeof(struct gscp_get_buffer_arg);\r
+             a.timeout = 0;\r
+             if (gscpipc_send(*ftaidp,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {\r
+               return -1;\r
+             }\r
+           }\r
+#endif\r
+         }\r
+         processmsg:\r
+         do {\r
+                       /* even if we block we return every 100msec to be able to generate the clock signal to\r
+                        * the HFTAs\r
+             */\r
+            if ((res=gscpipc_read(&from,&lopp,buf,&length,((block==1)&&(preemptq==0))?2:0))<0) {\r
+                gslog(LOG_EMERG,"GSCPRTS::error::reading from messagequeue\n");\r
+                return -1;\r
+            }\r
+            /* check if we need to give the FTAs there clock signal */\r
+            if ((curprocess.type == LFTA)\r
+                ||(curprocess.type == HFTA)) {\r
+                clock_signal_check();\r
+            }\r
+            if ((res==0) && (block==0)) {\r
+                /* nonblocking and nothing to do so return */\r
+                return 0;\r
+            }\r
+            if ((res==0) && (endtime!=0) && (endtime<time(0))) {\r
+                /* timeout reached so return */\r
+                return 0;\r
+            }\r
+#ifdef POLLING\r
+           if ((res==0)&&(curprocess.type == HFTA)) {\r
+                goto poll;\r
+           }\r
+#endif\r
+         } while (res==0);\r
+         if ((lopp)!=FTACALLBACK) {\r
+           gslog(LOG_EMERG,"GSCPRTS::error::unknown lowlevel opp\n");\r
+           return -1;\r
+         }\r
+       }\r
+\r
+#ifdef PRINTMSG\r
+       gslog(LOG_EMERG, "HFTA message from %u of type %u of length %u\n",from,\r
+       h->callid,h->size);\r
+#endif\r
+       switch (h->callid) {\r
+\r
+       case FTA_LOOKUP: {\r
+         if (curprocess.type == CLEARINGHOUSE) {\r
+           struct fta_find_arg * n;\r
+           FTAID rf;\r
+           gs_csp_t  schema;\r
+           n = (struct fta_find_arg *) buf;\r
+           /* Note: Side effect of ftalookup_lookup_fta_index is to\r
+              fill msgid and index */\r
+                 if (send_lookup_reply(from,\r
+                                       ftalookup_lookup_fta_index(from,\r
+                                                                  n->name,\r
+                                                                  n->reuse,\r
+                                                                  &rf,\r
+                                                                  &schema),\r
+                                       &rf,\r
+                                       (gs_sp_t *)&schema)<0) {\r
+                   gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+                   return -1;\r
+                 }\r
+         } else {\r
+           if (send_standard_reply(from,-1)<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"\r
+                   "contacted for clearinghouse processing\n");\r
+         }\r
+       }\r
+         break;\r
+       case FTA_REGISTER: {\r
+         if (curprocess.type == CLEARINGHOUSE) {\r
+           struct fta_register_arg * n;\r
+           n = (struct fta_register_arg *) buf;\r
+           if (send_standard_reply(from,\r
+                                   ftalookup_register_fta(n->subscriber,\r
+                                                          n->f,\r
+                                                          n->name,\r
+                                                          n->reusable,\r
+                                                          n->schema))<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+         } else {\r
+           if (send_standard_reply(from,-1)<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"\r
+                   "contacted for clearinghouse processing\n");\r
+         }\r
+       }\r
+         break;\r
+       case FTA_UNREGISTER: {\r
+         if (curprocess.type == CLEARINGHOUSE) {\r
+           struct fta_unregister_arg * n;\r
+           n = (struct fta_unregister_arg *) buf;\r
+           if (send_standard_reply(from,\r
+                                   ftalookup_unregister_fta(n->subscriber,\r
+                                                            n->f))<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+         } else {\r
+           if (send_standard_reply(from,-1)<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"\r
+                   "contacted for clearinghouse processing\n");\r
+         }\r
+       }\r
+         break;\r
+       case FTA_ALLOC_INSTANCE:\r
+       case FTA_ALLOC_PRINT_INSTANCE:\r
+         if ((curprocess.type == LFTA)\r
+             ||(curprocess.type == HFTA)) {\r
+           struct ringbuf * r;\r
+           struct fta_alloc_instance_arg * n;\r
+           n = (struct fta_alloc_instance_arg *) buf;\r
+           if ((fta=ftaexec_alloc_instance(n->f.index,\r
+                                           (struct FTA *)n->f.streamid,\r
+                                           n->reusable,\r
+                                           n->command,\r
+                                           n->sz,\r
+                                           &(n->data[0])))==0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::warning::could not allocate"\r
+                     "FTA\n");\r
+             if (send_fta_result(from,0,-1)<0) {\r
+               gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
+                       "reply faild\n");\r
+               return -1;\r
+             }\r
+           } else {\r
+             /* shared memory is only required if data is beeing transfered */\r
+             if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&\r
+                 ((r=gscpipc_getshm(from))==0)) {\r
+                               gslog(LOG_EMERG,"GSCPRTS::warning::could not get"\r
+                               "shared memory\n");\r
+                               ftaexec_remove(fta);\r
+                               if (send_fta_result(from,0,-1)<0) {\r
+                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
+                                       "reply faild\n");\r
+                                       return -1;\r
+                               }\r
+               } else {\r
+                       /* no callback to register for print function */\r
+                       if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&(ftacallback_add_streamid(r,fta->ftaid.streamid)!=0)) {\r
+                               gslog(LOG_EMERG,"GSCPRTS::warning::could not add"\r
+                                       "streamid to ringbuffer\n");\r
+                               ftaexec_free_instance(fta,1);\r
+                               if (send_fta_result(from,0,-1)<0) {\r
+                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
+                                       "reply faild\n");\r
+                                       return -1;\r
+                               }\r
+               } else {\r
+                 if (ftaexec_insert(0,fta)<0) {\r
+                   gslog(LOG_EMERG,"GSCPRTS::warning::could not"\r
+                           "insert FTA\n");\r
+                   ftacallback_rm_streamid(r,fta->ftaid.streamid);\r
+                   ftaexec_free_instance(fta,1);\r
+                   if (send_fta_result(from,0,-1)<0) {\r
+                     gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
+                             "reply faild\n");\r
+                     return -1;\r
+                   }\r
+                 } else {\r
+                   if (fta_register_instance(n->subscriber,fta->ftaid,\r
+                                             n->reusable,\r
+                                             n->name,\r
+                                             n->schema)!=0) {\r
+                     gslog(LOG_EMERG,"GSCPRTS::warning::could not register"\r
+                       " instance\n");\r
+                     ftaexec_remove(fta);\r
+                     ftacallback_rm_streamid(r,fta->ftaid.streamid);\r
+                     ftaexec_free_instance(fta,1);\r
+                     if (send_fta_result(from,0,-1)<0) {\r
+                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
+                               "reply faild\n");\r
+                       return -1;\r
+                     }\r
+                   }  else {\r
+                     if (h->callid==FTA_ALLOC_PRINT_INSTANCE) {\r
+                               if (curprocess.type == LFTA) {\r
+                                       gslog(LOG_EMERG,"GSCPRTS::error:: alloc print instance not "\r
+                                               "implemented for LFTA.\n");\r
+                                       ftaexec_remove(fta);\r
+                                       ftaexec_free_instance(fta,1);\r
+                                       if (send_fta_result(from,0,-1)<0) {\r
+                                               gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
+                                               "reply faild\n");\r
+                                               return -1;\r
+                                       }\r
+                               } else {\r
+                                       if (add_printfunction_to_stream(fta, n->schema, n->path, n->basename,\r
+                                               n->temporal_field, n->split_field, n->delta, n->split) < 0) {\r
+                                               ftaexec_remove(fta);\r
+                                               ftaexec_free_instance(fta,1);\r
+                                               if (send_fta_result(from,0,-1)<0) {\r
+                                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
+                                                       "reply faild\n");\r
+                                                       return -1;\r
+                                               }\r
+                               }\r
+                         }\r
+                       }\r
+                     if (send_fta_result(from,&fta->ftaid,0)<0) {\r
+                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
+                               "reply faild\n");\r
+                               return -1;\r
+                     }\r
+                   }\r
+                 }\r
+               }\r
+             }\r
+           }\r
+         } else {\r
+           if (send_fta_result(from,0,-1)<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
+                     "reply faild\n");\r
+             return -1;\r
+           }\r
+         }\r
+         break;\r
+       case FTA_FREE_INSTANCE:{\r
+         if ((curprocess.type == LFTA)\r
+             ||(curprocess.type == HFTA)) {\r
+           struct fta_free_instance_arg * n;\r
+           n = (struct fta_free_instance_arg *) buf;\r
+           if (((r=gscpipc_getshm(from))!=0)\r
+               && ( ftaexec_remove((struct FTA *) n->f.streamid)==0)\r
+               && ( ftacallback_rm_streamid(r,n->f.streamid)==0)\r
+               && (ftaexec_free_instance((struct FTA *)n->f.streamid,\r
+                                         n->recursive)\r
+                   ==0)\r
+               && (fta_unregister_instance(n->subscriber,n->f)<0)) {\r
+             if (send_standard_reply(from,0)<0) {\r
+               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+               return -1;\r
+             }\r
+           } else {\r
+             if (send_standard_reply(from,-1)<0) {\r
+               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+               return -1;\r
+             }\r
+           }\r
+         } else {\r
+           if (send_standard_reply(from,-1)<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+         }\r
+       }\r
+          break;\r
+       case FTA_CONTROL:{\r
+         if ((curprocess.type == LFTA)\r
+             ||(curprocess.type == HFTA)) {\r
+           struct fta_control_arg * n;\r
+           n = (struct fta_control_arg *) buf;\r
+           if (send_standard_reply(from,\r
+                                   ftaexec_control((struct FTA *)\r
+                                                   n->f.streamid,\r
+                                                   n->command,\r
+                                                   n->sz,\r
+                                                   &(n->data[0])))<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+         } else {\r
+           if (send_standard_reply(from,-1)<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"\r
+                   "contacted for clearinghouse or HFTA processing\n");\r
+         }\r
+       }\r
+          break;\r
+       case FTA_PRODUCER_FAILURE: {\r
+         if (curprocess.type == CLEARINGHOUSE) {\r
+           struct fta_notify_producer_failure_arg * n;\r
+           n = (struct fta_notify_producer_failure_arg *) buf;\r
+           ftalookup_producer_failure(n->sender,n->producer);\r
+         }\r
+       }\r
+         break;\r
+       case FTA_HEARTBEAT: {\r
+         if (curprocess.type == CLEARINGHOUSE) {\r
+           struct fta_heartbeat_arg * n;\r
+           n = (struct fta_heartbeat_arg *) buf;\r
+           ftalookup_heartbeat(n->sender,n->trace_id,\r
+                               n->sz,&(n->data[0]));\r
+         }\r
+       }\r
+         break;\r
+       case GSCP_GET_BUFFER:{\r
+         struct sgroup_get_buffer_arg * n;\r
+         gs_int32_t  res;\r
+         struct ringbuf * r;\r
+         n = (struct sgroup_get_buffer_arg *) buf;\r
+\r
+         if ((r=gscpipc_getshm(from))==0) {\r
+           gslog(LOG_EMERG,"GSCPRTS::error::proccess blocked without"\r
+                   "sharedmemory\n");\r
+         } else {\r
+           if (UNREAD(r)) {\r
+             /* something arrived in the meantime so wakeup\r
+                right away */\r
+               if (send_wakeup(from)<0) {\r
+                 gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");\r
+                 return -1;\r
+               }\r
+           } else {\r
+#ifndef POLLING\r
+             if (ftacallback_add_wakeup(from,r)<0) {\r
+               gslog(LOG_EMERG,"ERROR:Could not add wakeup\n");\r
+               return -1;\r
+             }\r
+#else\r
+               gslog(LOG_EMERG,"Received wakeup request on polling systems\n");\r
+#endif\r
+           }\r
+         }\r
+       }\r
+         break;\r
+       case PROCESS_CONTROL:{\r
+         if ((curprocess.type == LFTA)\r
+             ||(curprocess.type == HFTA)) {\r
+           struct process_control_arg * n;\r
+           n = (struct process_control_arg *) buf;\r
+           if (send_standard_reply(from,\r
+                                   ftaexec_process_control(n->command,\r
+                                                           n->sz,\r
+                                                           &(n->data[0])))<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+         } else {\r
+           if (send_standard_reply(from,-1)<0) {\r
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
+             return -1;\r
+           }\r
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"\r
+                   "contacted for clearinghouse or HFTA processing\n");\r
+         }\r
+       }\r
+          break;\r
+       case WAKEUP:\r
+       case TIMEOUT:\r
+          break;\r
+       default:\r
+         gslog(LOG_EMERG,"GSCPRTS::error::illegal message queue type %u\n",h->callid);\r
+         return -1;\r
+        }\r
+       /* use this occation to cleanup the messagequeue we can't afford\r
+          a backlog in the real message queue since it is limited in\r
+          size */\r
+       while (gscpipc_read(&from,&lopp,buf,&length,0)>0) {\r
+#ifdef PRINTMSG\r
+         gslog(LOG_EMERG, "request from %u of type %u with length %u\n",from,\r
+                 h->callid,h->size);\r
+#endif\r
+         if ((lopp == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {\r
+           if (sidequeue_append(from,buf,length)<0) {\r
+             gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");\r
+             return -1;\r
+           }\r
+         }\r
+\r
+       }\r
+    }\r
+    return 0;\r
+}\r
+\r
+static gs_retval_t  map_match(gs_csp_t  dev) {\r
+  gs_int32_t  x;\r
+  for(x=0;x<curprocess.mapcnt;x++){\r
+    if (strcmp(dev,curprocess.map[x])==0) {\r
+      return 1;\r
+    }\r
+  }\r
+  return 0;\r
+}\r
+\r
+gs_retval_t  fta_max_snaplen()\r
+{\r
+  return maxsnaplen;\r
+}\r
+\r
+FTAID fta_register(FTAname name,gs_uint32_t   reusable, DEVname dev,\r
+                    alloc_fta fta_alloc_functionptr,\r
+                    gs_csp_t  schema, gs_int32_t  snaplen, gs_uint64_t prefilter)\r
+{\r
+  gs_int8_t  rb[MAXRES];\r
+  struct fta_register_arg a;\r
+  struct standard_result * sr = (struct standard_result *)rb;\r
+  gs_int32_t  index;\r
+  FTAID res;\r
+  FTAID reserr;\r
+  res.ip=0;\r
+  res.port=0;\r
+  res.index=0;\r
+  res.streamid=0;\r
+  reserr=res;\r
+\r
+  /* check if the device matches for the registration */\r
+  if (((dev==0) && (curprocess.deviceid==0))\r
+      || ((dev!=0)&&(map_match(dev)==1))) {\r
+    if (dev!=0) {\r
+      gslog(LOG_INFO,"Register %s on device %s\n",name,dev);\r
+    } else {\r
+      gslog(LOG_INFO,"Register %s on default device\n",name);\r
+    }\r
+    if ((index=ftacallback_add_alloc(name,fta_alloc_functionptr,prefilter))<0) {\r
+      gslog(LOG_EMERG,"ERROR could not register callback\n");\r
+      return res;\r
+    }\r
+\r
+    if (snaplen<0) {\r
+      maxsnaplen=-1;\r
+    } else {\r
+      if (maxsnaplen!=-1) {\r
+       maxsnaplen=(snaplen>maxsnaplen)?snaplen:maxsnaplen;\r
+      }\r
+    }\r
+\r
+    res=gscpipc_getftaid();\r
+    res.index=index;\r
+    if (curprocess.type != CLEARINGHOUSE) {\r
+               a.h.callid = FTA_REGISTER;\r
+               a.h.size = sizeof(struct fta_register_arg);\r
+               if (strlen(name)>=(MAXFTANAME-1)) {\r
+                       gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
+                       return reserr;\r
+               }\r
+               if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
+                       gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);\r
+                       return reserr;\r
+               }\r
+               strcpy(a.name,name);\r
+               strcpy(a.schema,schema);\r
+               a.f=res;\r
+               a.subscriber=res; /* consumer is the same as f for an FTA*/\r
+               a.reusable=reusable;\r
+               ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);\r
+               if (sr->h.callid != STANDARD_RESULT) {\r
+                       gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
+                       return reserr;\r
+               }\r
+               if (sr->result != 0 )  {\r
+                       gslog(LOG_EMERG,"ERROR:Error in registration\n");\r
+                       return reserr;\r
+               }\r
+    } else {\r
+      if (ftalookup_register_fta(gscpipc_getftaid(),\r
+                       res,name,reusable,schema)!=0) {\r
+                       return res;\r
+      }\r
+    }\r
+  }\r
+  return res;\r
+}\r
+\r
+gs_retval_t  fta_unregister(FTAID ftaid)\r
+{\r
+  gs_int8_t  rb[MAXRES];\r
+  struct fta_unregister_arg a;\r
+  struct standard_result * sr = (struct standard_result *)rb;\r
+  if (ftacallback_rm_alloc(ftaid.index)<0) {\r
+    gslog(LOG_EMERG,"ERROR could not unregister callback\n");\r
+    return -1;\r
+  }\r
+\r
+  if (curprocess.type != CLEARINGHOUSE) {\r
+    a.h.callid = FTA_UNREGISTER;\r
+    a.h.size = sizeof(struct fta_register_arg);\r
+    a.f=ftaid;\r
+    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t) &a,rb);\r
+    if (sr->h.callid != STANDARD_RESULT) {\r
+      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
+      return -1;\r
+    }\r
+    return sr->result;\r
+  } else {\r
+    return ftalookup_unregister_fta(gscpipc_getftaid(),ftaid);\r
+  }\r
+  return 0;\r
+}\r
+\r
+gs_retval_t  hfta_post_tuple(struct FTA * self, gs_int32_t  sz, void *tuple)\r
+{\r
+  struct ringbuf * r;\r
+  gs_uint32_t   msgid;\r
+  struct wakeup_result a;\r
+  FTAID * f;\r
+  gs_int32_t  state;\r
+\r
+    if (sz>MAXTUPLESZ) {\r
+        gslog(LOG_EMERG,"Maximum tuple size is %u\n",MAXTUPLESZ);\r
+        return -1;\r
+    }\r
+\r
+    if (self->printfunc.in_use==1) {\r
+               return print_stream(self,sz,tuple);\r
+       }\r
+\r
+    if (ftacallback_start_streamid((gs_p_t  )self)<0) {\r
+       gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");\r
+       return -1;\r
+    }\r
+    /* now make sure we have space to write in all atomic ringbuffer */\r
+    while((r=ftacallback_next_streamid(&state))!=0) {\r
+        if (state == HFTA_RINGBUF_ATOMIC) {\r
+#ifdef BLOCKRINGBUFFER\r
+            while (!SPACETOWRITE(r)) {\r
+                usleep(100);\r
+            }\r
+#endif\r
+            if (! SPACETOWRITE(r)) {\r
+                /* atomic ring buffer and no space so post nothing */\r
+                return -1;\r
+            }\r
+        }\r
+    }\r
+\r
+\r
+    if (ftacallback_start_streamid((gs_p_t  )self)<0) {\r
+        gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");\r
+        return -1;\r
+    }\r
+\r
+    while((r=ftacallback_next_streamid(&state))!=0) {\r
+        if (state != HFTA_RINGBUF_SUSPEND) {\r
+            if (!SPACETOWRITE(r)) {\r
+                //since memory is full we set a warning\r
+                shared_memory_full_warning++;\r
+                // give receiver a chance to clean up\r
+                usleep(0);\r
+            }\r
+            if (SPACETOWRITE(r)) {\r
+                CURWRITE(r)->f=self->ftaid;\r
+                CURWRITE(r)->sz=sz;\r
+                memcpy(&(CURWRITE(r)->data[0]),tuple,sz);\r
+                               outtuple++;\r
+                               outbytes=outbytes+CURWRITE(r)->sz;\r
+                ADVANCEWRITE(r);\r
+#ifdef PRINTMSG\r
+               gslog(LOG_EMERG,"Wrote in ringpuffer %p [%p:%u]"\r
+                    "(%u %u) \n",r,&r->start,r->end,r->reader,r->writer);\r
+                gslog(LOG_EMERG,"\t%u  %u\n",CURREAD(r)->next,\r
+                    CURREAD(r)->sz);\r
+#endif\r
+            } else {\r
+                outtupledrop++;\r
+            }\r
+            if (HOWFULL(r) > 500) {\r
+      // buffer is at least half full\r
+                shared_memory_full_warning++;\r
+#ifdef PRINTMSG\r
+                gslog(LOG_EMERG,"\t\t buffer full\n");\r
+#endif\r
+            }\r
+        }\r
+    }\r
+#ifndef POLLING\r
+    if (ftacallback_start_wakeup((gs_p_t  ) self)<0) {\r
+        gslog(LOG_EMERG,"ERROR:Wakeup for unkown streamid\n");\r
+        return -1;\r
+    }\r
+    a.h.callid=WAKEUP;\r
+    a.h.size=sizeof(struct wakeup_result);\r
+    while((f=ftacallback_next_wakeup())!=0) {\r
+        if (send_wakeup(*f)<0) {\r
+            gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");\r
+            return -1;\r
+        }\r
+    }\r
+#endif\r
+    return 0;\r
+}\r
+\r
+gs_retval_t  hfta_get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t  * space, gs_int32_t  szr, gs_int32_t  tuplesz)\r
+{\r
+    gs_int32_t  x=0;\r
+    gs_int32_t  state;\r
+    struct ringbuf * ru;\r
+\r
+       if (f->printfunc.in_use==1) {\r
+       // XXX WHAT TO DO???\r
+               gslog(LOG_INFO,"Checking space for printfunc");\r
+               return 0;\r
+       }\r
+\r
+       if (ftacallback_start_streamid(f->ftaid.streamid)<0) {\r
+        gslog(LOG_EMERG,"ERROR:Space check for unkown streamid in HFTA\n");\r
+        return -1;\r
+    }\r
+\r
+    while ((ru=ftacallback_next_streamid(&state))!=0) {\r
+        if (szr > x ) {\r
+            r[x]=ru->destid;\r
+            space[x]=TUPLEFIT(ru,tuplesz);\r
+        }\r
+        x++;\r
+    }\r
+    return x;\r
+}\r
+\r
+\r
+gs_retval_t  hfta_set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t  state)\r
+{\r
+\r
+    if (ftacallback_state_streamid(f->ftaid.streamid,process,state)<0) {\r
+       gslog(LOG_EMERG,"ERROR:state change for unkown streamid\n");\r
+       return -1;\r
+    }\r
+    return 0;\r
+}\r