Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscphost / callbackinterface.c
index 500a74a..9d21d73 100644 (file)
-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
\r
- http://www.apache.org/licenses/LICENSE-2.0\r
\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-#include <gscpipc.h>\r
-#include <ipcencoding.h>\r
-#include <callbackregistries.h>\r
-#include <clearinghouseregistries.h>\r
-#include <lappregistries.h>\r
-#include <stdlib.h>\r
-#include <stdio.h>\r
-#include <lapp.h>\r
-#include <string.h>\r
-#include "rdtsc.h"\r
-\r
-gs_uint64_t shared_memory_full_warning =0;\r
-\r
-static gs_int32_t maxsnaplen = 0;\r
-\r
-struct FTAID clearinghouseftaid;\r
-\r
-gs_uint64_t intupledrop=0;\r
-gs_uint64_t outtupledrop=0;\r
-gs_uint64_t intuple=0;\r
-gs_uint64_t outtuple=0;\r
-gs_uint64_t inbytes=0;\r
-gs_uint64_t outbytes=0;\r
-gs_uint64_t cycles=0;\r
-\r
-\r
-\r
-/* following function is internal and defined in lappinterface.c */\r
-gs_retval_t  ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result);\r
-\r
-gs_retval_t  gscp_blocking_mode() {\r
-#ifdef BLOCKRINGBUFFER\r
-       return 1;\r
-#else\r
-       return 0;\r
-#endif\r
-}\r
-\r
-static void clock_signal_check() {\r
-    struct FTA * fa;\r
-    static gs_int32_t  t=0;\r
-    if (t==0) {\r
-        t=time(0);\r
-    } else {\r
-       if (time(0)>t) {\r
-            if (ftaexec_start()<0) {\r
-                gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "\r
-                        "FTA list\n");\r
-                return;\r
-            }\r
-            while ((fa=ftaexec_next())!=0) {\r
-                if (fa->clock_fta!=0) {\r
-                    fa->clock_fta(fa);\r
-                }\r
-            }\r
-            t=time(0);\r
-               if (t%GSLOGINTERVAL==0) gsstats();// log all the stats\r
-        }\r
-    }\r
-}\r
-\r
-static gs_retval_t  send_standard_reply(FTAID f, gs_int32_t  result) {\r
-    struct standard_result r;\r
-    r.h.callid=STANDARD_RESULT;\r
-    r.h.size=sizeof(struct standard_result);\r
-    r.result=result;\r
-    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {\r
-       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
-       return -1;\r
-    }\r
-    return 0 ;\r
-}\r
-\r
-static gs_retval_t  send_lookup_reply(FTAID f, gs_int32_t  result,\r
-                            FTAID * ftaid,\r
-                            gs_sp_t* schema) {\r
-    struct ftafind_result r;\r
-    r.h.callid=FTAFIND_RESULT;\r
-    r.h.size=sizeof(struct ftafind_result);\r
-    r.result=result;\r
-    r.f=*ftaid;\r
-    if (result >=0) {\r
-       if (strlen(*schema)>=(MAXSCHEMASZ-1)) {\r
-           gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",(unsigned char *)schema);\r
-           r.result=-1;\r
-       } else {\r
-           strcpy(r.schema,*schema);\r
-       }\r
-    }\r
-    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {\r
-       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
-       return -1;\r
-    }\r
-    return 0 ;\r
-}\r
-\r
-\r
-static gs_retval_t  send_fta_result(FTAID f,\r
-                          FTAID * ftaid, gs_int32_t  result) {\r
-    struct fta_result r;\r
-    r.h.callid=FTA_RESULT;\r
-    r.h.size=sizeof(struct fta_result);\r
-    r.result=result;\r
-    if (ftaid!=0) {\r
-      r.f=*ftaid;\r
-    }\r
-    if (gscpipc_send(f, FTACALLBACK,(gs_sp_t)&r,r.h.size,1)<0) {\r
-       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
-       return -1;\r
-    }\r
-    return 0 ;\r
-}\r
-\r
-\r
-// Is also used by the lfta rts enviroment on a post. So make it none\r
-// static.\r
-gs_retval_t  send_wakeup(FTAID f)\r
-{\r
-    struct wakeup_result a;\r
-\r
-    a.h.callid=WAKEUP;\r
-    a.h.size=sizeof(struct wakeup_result);\r
-    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t)&a,a.h.size,0)<0) {\r
-       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
-       return -1;\r
-    }\r
-    return 0;\r
-}\r
-\r
-static gs_retval_t  fta_register_instance(FTAID subscriber,\r
-                                FTAID f,gs_uint32_t   reusable,\r
-                                FTAname name,\r
-                                gs_csp_t  schema) {\r
-  gs_int8_t  rb[MAXRES];\r
-  struct fta_register_arg a;\r
-  struct standard_result * sr = (struct standard_result *)rb;\r
-\r
-  if (curprocess.type != CLEARINGHOUSE) {\r
-    a.h.callid = FTA_REGISTER;\r
-    a.h.size = sizeof(struct fta_register_arg);\r
-    if (strlen(name)>=(MAXFTANAME-1)) {\r
-      gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
-      return -1;\r
-    }\r
-    if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
-      gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);\r
-      return -1;\r
-    }\r
-    strcpy(a.name,name);\r
-    strcpy(a.schema,schema);\r
-    a.f=f;\r
-    a.subscriber=subscriber; /* consumer is the same as f for an FTA*/\r
-    a.reusable=reusable;\r
-    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);\r
-    if (sr->h.callid != STANDARD_RESULT) {\r
-      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
-      return -1;\r
-    }\r
-    if (sr->result != 0) {\r
-      return -1;\r
-    }\r
-  } else {\r
-    if (ftalookup_register_fta(subscriber,f,name,reusable,schema)<0) {\r
-      return -1;\r
-    }\r
-  }\r
-  return 0;\r
-}\r
-\r
-static gs_retval_t  fta_unregister_instance(FTAID subscriber,\r
-                                  FTAID f) {\r
-  gs_int8_t  rb[MAXRES];\r
-  struct fta_unregister_arg a;\r
-  struct standard_result * sr = (struct standard_result *)rb;\r
-\r
-  if (curprocess.type != CLEARINGHOUSE) {\r
-    a.h.callid = FTA_UNREGISTER;\r
-    a.h.size = sizeof(struct fta_register_arg);\r
-    a.f=f;\r
-    a.subscriber=subscriber; /* consumer is the same as f for an FTA*/\r
-    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);\r
-    if (sr->h.callid != STANDARD_RESULT) {\r
-      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
-      return -1;\r
-    }\r
-    return -1;\r
-  } else {\r
-    if (ftalookup_unregister_fta(f,subscriber)<0) {\r
-      return -1;\r
-    }\r
-  }\r
-  return 0;\r
-}\r
-\r
-gs_retval_t  fta_start_service(gs_int32_t  number)\r
-{\r
-    gs_int8_t   buf[MAXMSGSZ];\r
-    FTAID from;\r
-    gs_int32_t  length;\r
-    struct hostcall * h= (struct hostcall *) buf;\r
-    gs_int32_t  forever=0;\r
-    gs_int32_t  endtime=0;\r
-    gs_int32_t  block=1;\r
-    gs_int32_t  res;\r
-    struct ringbuf * r;\r
-    FTAID ftaid;\r
-    FTAID * ftaidp;\r
-    struct FTA * fta;\r
-    gs_int32_t  lopp;\r
-       gs_int32_t  preemptq;\r
-       gs_int32_t  endq;\r
-       gs_uint64_t s1;\r
-       gs_uint64_t s2;\r
-\r
-    if (number == 0) {\r
-       block=0;\r
-       forever=1;\r
-    }\r
-    if (number < 0) {\r
-       forever=1;\r
-    }\r
-    if (number > 0) {\r
-        block=1;\r
-       endtime=time(0)+number;\r
-    }\r
-\r
-    while((forever!=0)||\r
-          (endtime==0)||(endtime>time(0))) {\r
-        /* check if we need to give the FTAs there clock signal */\r
-        if ((curprocess.type == LFTA)\r
-            ||(curprocess.type == HFTA)) {\r
-            clock_signal_check();\r
-        }\r
-\r
-#ifdef POLLING\r
-       poll:\r
-#endif\r
-    preemptq=0;\r
-       /* first empty out sidequeu then read from messagequeue */\r
-       if (sidequeue_pop(&from,buf,&length)<0) {\r
-         /* empty out the sidequeue before processing the shared\r
-            memory */\r
-         if (curprocess.type == HFTA) {\r
-           /* process all the shared memory regions and register\r
-              for callbacks */\r
-               s1=rdtsc();\r
-               endq=time(0)+2;\r
-           streamregistry_getactiveringbuf_reset();\r
-           while ((r=streamregistry_getactiveringbuf())>0) {\r
-             while (UNREAD(r)) {\r
-                       struct FTA * fa;\r
-                       if (ftaexec_start()<0) {\r
-                       gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "\r
-                         "FTA list\n");\r
-                       return -1;\r
-                       }\r
-                       while ((fa=ftaexec_next())!=0) {\r
-                       gs_int32_t  x;\r
-                       for(x=0;x<fa->stream_subscribed_cnt;x++) {\r
-                               if ((fa->stream_subscribed[x].streamid\r
-                                       ==CURREAD(r)->f.streamid)\r
-                                       && (fa->stream_subscribed[x].ip\r
-                                               ==CURREAD(r)->f.ip)\r
-                                       && (fa->stream_subscribed[x].port\r
-                                               ==CURREAD(r)->f.port)) {\r
-                                       fa->accept_packet(fa,&(CURREAD(r)->f),\r
-                                               &(CURREAD(r)->data[0]),\r
-                                               CURREAD(r)->sz);\r
-                               }\r
-                       }\r
-                       }\r
-                       intuple++;\r
-                       inbytes+=CURREAD(r)->sz;\r
-                       ADVANCEREAD(r);\r
-                       if (endq <= time(0)) {\r
-                               preemptq=1;\r
-                               goto processmsg;\r
-                       }\r
-             }\r
-           }\r
-               s2=rdtsc();\r
-               cycles+=(s2-s1);\r
-#ifndef POLLING\r
-           /* register wakeups all arround to make sure we don't sleep\r
-            * for ever,\r
-            */\r
-           streamregistry_getactiveftaid_reset();\r
-           while ((ftaidp=streamregistry_getactiveftaid())>0) {\r
-             struct gscp_get_buffer_arg a;\r
-             a.h.callid = GSCP_GET_BUFFER;\r
-             a.h.size = sizeof(struct gscp_get_buffer_arg);\r
-             a.timeout = 0;\r
-             if (gscpipc_send(*ftaidp,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {\r
-               return -1;\r
-             }\r
-           }\r
-#endif\r
-         }\r
-         processmsg:\r
-         do {\r
-                       /* even if we block we return every 100msec to be able to generate the clock signal to\r
-                        * the HFTAs\r
-             */\r
-            if ((res=gscpipc_read(&from,&lopp,buf,&length,((block==1)&&(preemptq==0))?2:0))<0) {\r
-                gslog(LOG_EMERG,"GSCPRTS::error::reading from messagequeue\n");\r
-                return -1;\r
-            }\r
-            /* check if we need to give the FTAs there clock signal */\r
-            if ((curprocess.type == LFTA)\r
-                ||(curprocess.type == HFTA)) {\r
-                clock_signal_check();\r
-            }\r
-            if ((res==0) && (block==0)) {\r
-                /* nonblocking and nothing to do so return */\r
-                return 0;\r
-            }\r
-            if ((res==0) && (endtime!=0) && (endtime<time(0))) {\r
-                /* timeout reached so return */\r
-                return 0;\r
-            }\r
-#ifdef POLLING\r
-           if ((res==0)&&(curprocess.type == HFTA)) {\r
-                goto poll;\r
-           }\r
-#endif\r
-         } while (res==0);\r
-         if ((lopp)!=FTACALLBACK) {\r
-           gslog(LOG_EMERG,"GSCPRTS::error::unknown lowlevel opp\n");\r
-           return -1;\r
-         }\r
-       }\r
-\r
-#ifdef PRINTMSG\r
-       gslog(LOG_EMERG, "HFTA message from %u of type %u of length %u\n",from,\r
-       h->callid,h->size);\r
-#endif\r
-       switch (h->callid) {\r
-\r
-       case FTA_LOOKUP: {\r
-         if (curprocess.type == CLEARINGHOUSE) {\r
-           struct fta_find_arg * n;\r
-           FTAID rf;\r
-           gs_csp_t  schema;\r
-           n = (struct fta_find_arg *) buf;\r
-           /* Note: Side effect of ftalookup_lookup_fta_index is to\r
-              fill msgid and index */\r
-                 if (send_lookup_reply(from,\r
-                                       ftalookup_lookup_fta_index(from,\r
-                                                                  n->name,\r
-                                                                  n->reuse,\r
-                                                                  &rf,\r
-                                                                  &schema),\r
-                                       &rf,\r
-                                       (gs_sp_t *)&schema)<0) {\r
-                   gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-                   return -1;\r
-                 }\r
-         } else {\r
-           if (send_standard_reply(from,-1)<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"\r
-                   "contacted for clearinghouse processing\n");\r
-         }\r
-       }\r
-         break;\r
-       case FTA_REGISTER: {\r
-         if (curprocess.type == CLEARINGHOUSE) {\r
-           struct fta_register_arg * n;\r
-           n = (struct fta_register_arg *) buf;\r
-           if (send_standard_reply(from,\r
-                                   ftalookup_register_fta(n->subscriber,\r
-                                                          n->f,\r
-                                                          n->name,\r
-                                                          n->reusable,\r
-                                                          n->schema))<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-         } else {\r
-           if (send_standard_reply(from,-1)<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"\r
-                   "contacted for clearinghouse processing\n");\r
-         }\r
-       }\r
-         break;\r
-       case FTA_UNREGISTER: {\r
-         if (curprocess.type == CLEARINGHOUSE) {\r
-           struct fta_unregister_arg * n;\r
-           n = (struct fta_unregister_arg *) buf;\r
-           if (send_standard_reply(from,\r
-                                   ftalookup_unregister_fta(n->subscriber,\r
-                                                            n->f))<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-         } else {\r
-           if (send_standard_reply(from,-1)<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"\r
-                   "contacted for clearinghouse processing\n");\r
-         }\r
-       }\r
-         break;\r
-       case FTA_ALLOC_INSTANCE:\r
-       case FTA_ALLOC_PRINT_INSTANCE:\r
-         if ((curprocess.type == LFTA)\r
-             ||(curprocess.type == HFTA)) {\r
-           struct ringbuf * r;\r
-           struct fta_alloc_instance_arg * n;\r
-           n = (struct fta_alloc_instance_arg *) buf;\r
-           if ((fta=ftaexec_alloc_instance(n->f.index,\r
-                                           (struct FTA *)n->f.streamid,\r
-                                           n->reusable,\r
-                                           n->command,\r
-                                           n->sz,\r
-                                           &(n->data[0])))==0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::warning::could not allocate"\r
-                     "FTA\n");\r
-             if (send_fta_result(from,0,-1)<0) {\r
-               gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
-                       "reply faild\n");\r
-               return -1;\r
-             }\r
-           } else {\r
-             /* shared memory is only required if data is beeing transfered */\r
-             if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&\r
-                 ((r=gscpipc_getshm(from))==0)) {\r
-                               gslog(LOG_EMERG,"GSCPRTS::warning::could not get"\r
-                               "shared memory\n");\r
-                               ftaexec_remove(fta);\r
-                               if (send_fta_result(from,0,-1)<0) {\r
-                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
-                                       "reply faild\n");\r
-                                       return -1;\r
-                               }\r
-               } else {\r
-                       /* no callback to register for print function */\r
-                       if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&(ftacallback_add_streamid(r,fta->ftaid.streamid)!=0)) {\r
-                               gslog(LOG_EMERG,"GSCPRTS::warning::could not add"\r
-                                       "streamid to ringbuffer\n");\r
-                               ftaexec_free_instance(fta,1);\r
-                               if (send_fta_result(from,0,-1)<0) {\r
-                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
-                                       "reply faild\n");\r
-                                       return -1;\r
-                               }\r
-               } else {\r
-                 if (ftaexec_insert(0,fta)<0) {\r
-                   gslog(LOG_EMERG,"GSCPRTS::warning::could not"\r
-                           "insert FTA\n");\r
-                   ftacallback_rm_streamid(r,fta->ftaid.streamid);\r
-                   ftaexec_free_instance(fta,1);\r
-                   if (send_fta_result(from,0,-1)<0) {\r
-                     gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
-                             "reply faild\n");\r
-                     return -1;\r
-                   }\r
-                 } else {\r
-                   if (fta_register_instance(n->subscriber,fta->ftaid,\r
-                                             n->reusable,\r
-                                             n->name,\r
-                                             n->schema)!=0) {\r
-                     gslog(LOG_EMERG,"GSCPRTS::warning::could not register"\r
-                       " instance\n");\r
-                     ftaexec_remove(fta);\r
-                     ftacallback_rm_streamid(r,fta->ftaid.streamid);\r
-                     ftaexec_free_instance(fta,1);\r
-                     if (send_fta_result(from,0,-1)<0) {\r
-                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
-                               "reply faild\n");\r
-                       return -1;\r
-                     }\r
-                   }  else {\r
-                     if (h->callid==FTA_ALLOC_PRINT_INSTANCE) {\r
-                               if (curprocess.type == LFTA) {\r
-                                       gslog(LOG_EMERG,"GSCPRTS::error:: alloc print instance not "\r
-                                               "implemented for LFTA.\n");\r
-                                       ftaexec_remove(fta);\r
-                                       ftaexec_free_instance(fta,1);\r
-                                       if (send_fta_result(from,0,-1)<0) {\r
-                                               gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
-                                               "reply faild\n");\r
-                                               return -1;\r
-                                       }\r
-                               } else {\r
-                                       if (add_printfunction_to_stream(fta, n->schema, n->path, n->basename,\r
-                                               n->temporal_field, n->split_field, n->delta, n->split) < 0) {\r
-                                               ftaexec_remove(fta);\r
-                                               ftaexec_free_instance(fta,1);\r
-                                               if (send_fta_result(from,0,-1)<0) {\r
-                                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
-                                                       "reply faild\n");\r
-                                                       return -1;\r
-                                               }\r
-                               }\r
-                         }\r
-                       }\r
-                     if (send_fta_result(from,&fta->ftaid,0)<0) {\r
-                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
-                               "reply faild\n");\r
-                               return -1;\r
-                     }\r
-                   }\r
-                 }\r
-               }\r
-             }\r
-           }\r
-         } else {\r
-           if (send_fta_result(from,0,-1)<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard "\r
-                     "reply faild\n");\r
-             return -1;\r
-           }\r
-         }\r
-         break;\r
-       case FTA_FREE_INSTANCE:{\r
-         if ((curprocess.type == LFTA)\r
-             ||(curprocess.type == HFTA)) {\r
-           struct fta_free_instance_arg * n;\r
-           n = (struct fta_free_instance_arg *) buf;\r
-           if (((r=gscpipc_getshm(from))!=0)\r
-               && ( ftaexec_remove((struct FTA *) n->f.streamid)==0)\r
-               && ( ftacallback_rm_streamid(r,n->f.streamid)==0)\r
-               && (ftaexec_free_instance((struct FTA *)n->f.streamid,\r
-                                         n->recursive)\r
-                   ==0)\r
-               && (fta_unregister_instance(n->subscriber,n->f)<0)) {\r
-             if (send_standard_reply(from,0)<0) {\r
-               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-               return -1;\r
-             }\r
-           } else {\r
-             if (send_standard_reply(from,-1)<0) {\r
-               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-               return -1;\r
-             }\r
-           }\r
-         } else {\r
-           if (send_standard_reply(from,-1)<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-         }\r
-       }\r
-          break;\r
-       case FTA_CONTROL:{\r
-         if ((curprocess.type == LFTA)\r
-             ||(curprocess.type == HFTA)) {\r
-           struct fta_control_arg * n;\r
-           n = (struct fta_control_arg *) buf;\r
-           if (send_standard_reply(from,\r
-                                   ftaexec_control((struct FTA *)\r
-                                                   n->f.streamid,\r
-                                                   n->command,\r
-                                                   n->sz,\r
-                                                   &(n->data[0])))<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-         } else {\r
-           if (send_standard_reply(from,-1)<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"\r
-                   "contacted for clearinghouse or HFTA processing\n");\r
-         }\r
-       }\r
-          break;\r
-       case FTA_PRODUCER_FAILURE: {\r
-         if (curprocess.type == CLEARINGHOUSE) {\r
-           struct fta_notify_producer_failure_arg * n;\r
-           n = (struct fta_notify_producer_failure_arg *) buf;\r
-           ftalookup_producer_failure(n->sender,n->producer);\r
-         }\r
-       }\r
-         break;\r
-       case FTA_HEARTBEAT: {\r
-         if (curprocess.type == CLEARINGHOUSE) {\r
-           struct fta_heartbeat_arg * n;\r
-           n = (struct fta_heartbeat_arg *) buf;\r
-           ftalookup_heartbeat(n->sender,n->trace_id,\r
-                               n->sz,&(n->data[0]));\r
-         }\r
-       }\r
-         break;\r
-       case GSCP_GET_BUFFER:{\r
-         struct sgroup_get_buffer_arg * n;\r
-         gs_int32_t  res;\r
-         struct ringbuf * r;\r
-         n = (struct sgroup_get_buffer_arg *) buf;\r
-\r
-         if ((r=gscpipc_getshm(from))==0) {\r
-           gslog(LOG_EMERG,"GSCPRTS::error::proccess blocked without"\r
-                   "sharedmemory\n");\r
-         } else {\r
-           if (UNREAD(r)) {\r
-             /* something arrived in the meantime so wakeup\r
-                right away */\r
-               if (send_wakeup(from)<0) {\r
-                 gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");\r
-                 return -1;\r
-               }\r
-           } else {\r
-#ifndef POLLING\r
-             if (ftacallback_add_wakeup(from,r)<0) {\r
-               gslog(LOG_EMERG,"ERROR:Could not add wakeup\n");\r
-               return -1;\r
-             }\r
-#else\r
-               gslog(LOG_EMERG,"Received wakeup request on polling systems\n");\r
-#endif\r
-           }\r
-         }\r
-       }\r
-         break;\r
-       case PROCESS_CONTROL:{\r
-         if ((curprocess.type == LFTA)\r
-             ||(curprocess.type == HFTA)) {\r
-           struct process_control_arg * n;\r
-           n = (struct process_control_arg *) buf;\r
-           if (send_standard_reply(from,\r
-                                   ftaexec_process_control(n->command,\r
-                                                           n->sz,\r
-                                                           &(n->data[0])))<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-         } else {\r
-           if (send_standard_reply(from,-1)<0) {\r
-             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");\r
-             return -1;\r
-           }\r
-           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"\r
-                   "contacted for clearinghouse or HFTA processing\n");\r
-         }\r
-       }\r
-          break;\r
-       case WAKEUP:\r
-       case TIMEOUT:\r
-          break;\r
-       default:\r
-         gslog(LOG_EMERG,"GSCPRTS::error::illegal message queue type %u\n",h->callid);\r
-         return -1;\r
-        }\r
-       /* use this occation to cleanup the messagequeue we can't afford\r
-          a backlog in the real message queue since it is limited in\r
-          size */\r
-       while (gscpipc_read(&from,&lopp,buf,&length,0)>0) {\r
-#ifdef PRINTMSG\r
-         gslog(LOG_EMERG, "request from %u of type %u with length %u\n",from,\r
-                 h->callid,h->size);\r
-#endif\r
-         if ((lopp == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {\r
-           if (sidequeue_append(from,buf,length)<0) {\r
-             gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");\r
-             return -1;\r
-           }\r
-         }\r
-\r
-       }\r
-    }\r
-    return 0;\r
-}\r
-\r
-static gs_retval_t  map_match(gs_csp_t  dev) {\r
-  gs_int32_t  x;\r
-  for(x=0;x<curprocess.mapcnt;x++){\r
-    if (strcmp(dev,curprocess.map[x])==0) {\r
-      return 1;\r
-    }\r
-  }\r
-  return 0;\r
-}\r
-\r
-gs_retval_t  fta_max_snaplen()\r
-{\r
-  return maxsnaplen;\r
-}\r
-\r
-FTAID fta_register(FTAname name,gs_uint32_t   reusable, DEVname dev,\r
-                    alloc_fta fta_alloc_functionptr,\r
-                    gs_csp_t  schema, gs_int32_t  snaplen, gs_uint64_t prefilter)\r
-{\r
-  gs_int8_t  rb[MAXRES];\r
-  struct fta_register_arg a;\r
-  struct standard_result * sr = (struct standard_result *)rb;\r
-  gs_int32_t  index;\r
-  FTAID res;\r
-  FTAID reserr;\r
-  res.ip=0;\r
-  res.port=0;\r
-  res.index=0;\r
-  res.streamid=0;\r
-  reserr=res;\r
-\r
-  /* check if the device matches for the registration */\r
-  if (((dev==0) && (curprocess.deviceid==0))\r
-      || ((dev!=0)&&(map_match(dev)==1))) {\r
-    if (dev!=0) {\r
-      gslog(LOG_INFO,"Register %s on device %s\n",name,dev);\r
-    } else {\r
-      gslog(LOG_INFO,"Register %s on default device\n",name);\r
-    }\r
-    if ((index=ftacallback_add_alloc(name,fta_alloc_functionptr,prefilter))<0) {\r
-      gslog(LOG_EMERG,"ERROR could not register callback\n");\r
-      return res;\r
-    }\r
-\r
-    if (snaplen<0) {\r
-      maxsnaplen=-1;\r
-    } else {\r
-      if (maxsnaplen!=-1) {\r
-       maxsnaplen=(snaplen>maxsnaplen)?snaplen:maxsnaplen;\r
-      }\r
-    }\r
-\r
-    res=gscpipc_getftaid();\r
-    res.index=index;\r
-    if (curprocess.type != CLEARINGHOUSE) {\r
-               a.h.callid = FTA_REGISTER;\r
-               a.h.size = sizeof(struct fta_register_arg);\r
-               if (strlen(name)>=(MAXFTANAME-1)) {\r
-                       gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
-                       return reserr;\r
-               }\r
-               if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
-                       gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);\r
-                       return reserr;\r
-               }\r
-               strcpy(a.name,name);\r
-               strcpy(a.schema,schema);\r
-               a.f=res;\r
-               a.subscriber=res; /* consumer is the same as f for an FTA*/\r
-               a.reusable=reusable;\r
-               ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);\r
-               if (sr->h.callid != STANDARD_RESULT) {\r
-                       gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
-                       return reserr;\r
-               }\r
-               if (sr->result != 0 )  {\r
-                       gslog(LOG_EMERG,"ERROR:Error in registration\n");\r
-                       return reserr;\r
-               }\r
-    } else {\r
-      if (ftalookup_register_fta(gscpipc_getftaid(),\r
-                       res,name,reusable,schema)!=0) {\r
-                       return res;\r
-      }\r
-    }\r
-  }\r
-  return res;\r
-}\r
-\r
-gs_retval_t  fta_unregister(FTAID ftaid)\r
-{\r
-  gs_int8_t  rb[MAXRES];\r
-  struct fta_unregister_arg a;\r
-  struct standard_result * sr = (struct standard_result *)rb;\r
-  if (ftacallback_rm_alloc(ftaid.index)<0) {\r
-    gslog(LOG_EMERG,"ERROR could not unregister callback\n");\r
-    return -1;\r
-  }\r
-\r
-  if (curprocess.type != CLEARINGHOUSE) {\r
-    a.h.callid = FTA_UNREGISTER;\r
-    a.h.size = sizeof(struct fta_register_arg);\r
-    a.f=ftaid;\r
-    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t) &a,rb);\r
-    if (sr->h.callid != STANDARD_RESULT) {\r
-      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
-      return -1;\r
-    }\r
-    return sr->result;\r
-  } else {\r
-    return ftalookup_unregister_fta(gscpipc_getftaid(),ftaid);\r
-  }\r
-  return 0;\r
-}\r
-\r
-gs_retval_t  hfta_post_tuple(struct FTA * self, gs_int32_t  sz, void *tuple)\r
-{\r
-  struct ringbuf * r;\r
-  gs_uint32_t   msgid;\r
-  struct wakeup_result a;\r
-  FTAID * f;\r
-  gs_int32_t  state;\r
-\r
-    if (sz>MAXTUPLESZ) {\r
-        gslog(LOG_EMERG,"Maximum tuple size is %u\n",MAXTUPLESZ);\r
-        return -1;\r
-    }\r
-\r
-    if (self->printfunc.in_use==1) {\r
-               return print_stream(self,sz,tuple);\r
-       }\r
-\r
-    if (ftacallback_start_streamid((gs_p_t  )self)<0) {\r
-       gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");\r
-       return -1;\r
-    }\r
-    /* now make sure we have space to write in all atomic ringbuffer */\r
-    while((r=ftacallback_next_streamid(&state))!=0) {\r
-        if (state == HFTA_RINGBUF_ATOMIC) {\r
-#ifdef BLOCKRINGBUFFER\r
-            while (!SPACETOWRITE(r)) {\r
-                usleep(100);\r
-            }\r
-#endif\r
-            if (! SPACETOWRITE(r)) {\r
-                /* atomic ring buffer and no space so post nothing */\r
-                return -1;\r
-            }\r
-        }\r
-    }\r
-\r
-\r
-    if (ftacallback_start_streamid((gs_p_t  )self)<0) {\r
-        gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");\r
-        return -1;\r
-    }\r
-\r
-    while((r=ftacallback_next_streamid(&state))!=0) {\r
-        if (state != HFTA_RINGBUF_SUSPEND) {\r
-            if (!SPACETOWRITE(r)) {\r
-                //since memory is full we set a warning\r
-                shared_memory_full_warning++;\r
-                // give receiver a chance to clean up\r
-                usleep(0);\r
-            }\r
-            if (SPACETOWRITE(r)) {\r
-                CURWRITE(r)->f=self->ftaid;\r
-                CURWRITE(r)->sz=sz;\r
-                memcpy(&(CURWRITE(r)->data[0]),tuple,sz);\r
-                               outtuple++;\r
-                               outbytes=outbytes+CURWRITE(r)->sz;\r
-                ADVANCEWRITE(r);\r
-#ifdef PRINTMSG\r
-               gslog(LOG_EMERG,"Wrote in ringpuffer %p [%p:%u]"\r
-                    "(%u %u) \n",r,&r->start,r->end,r->reader,r->writer);\r
-                gslog(LOG_EMERG,"\t%u  %u\n",CURREAD(r)->next,\r
-                    CURREAD(r)->sz);\r
-#endif\r
-            } else {\r
-                outtupledrop++;\r
-            }\r
-            if (HOWFULL(r) > 500) {\r
-      // buffer is at least half full\r
-                shared_memory_full_warning++;\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"\t\t buffer full\n");\r
-#endif\r
-            }\r
-        }\r
-    }\r
-#ifndef POLLING\r
-    if (ftacallback_start_wakeup((gs_p_t  ) self)<0) {\r
-        gslog(LOG_EMERG,"ERROR:Wakeup for unkown streamid\n");\r
-        return -1;\r
-    }\r
-    a.h.callid=WAKEUP;\r
-    a.h.size=sizeof(struct wakeup_result);\r
-    while((f=ftacallback_next_wakeup())!=0) {\r
-        if (send_wakeup(*f)<0) {\r
-            gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");\r
-            return -1;\r
-        }\r
-    }\r
-#endif\r
-    return 0;\r
-}\r
-\r
-gs_retval_t  hfta_get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t  * space, gs_int32_t  szr, gs_int32_t  tuplesz)\r
-{\r
-    gs_int32_t  x=0;\r
-    gs_int32_t  state;\r
-    struct ringbuf * ru;\r
-\r
-       if (f->printfunc.in_use==1) {\r
-       // XXX WHAT TO DO???\r
-               gslog(LOG_INFO,"Checking space for printfunc");\r
-               return 0;\r
-       }\r
-\r
-       if (ftacallback_start_streamid(f->ftaid.streamid)<0) {\r
-        gslog(LOG_EMERG,"ERROR:Space check for unkown streamid in HFTA\n");\r
-        return -1;\r
-    }\r
-\r
-    while ((ru=ftacallback_next_streamid(&state))!=0) {\r
-        if (szr > x ) {\r
-            r[x]=ru->destid;\r
-            space[x]=TUPLEFIT(ru,tuplesz);\r
-        }\r
-        x++;\r
-    }\r
-    return x;\r
-}\r
-\r
-\r
-gs_retval_t  hfta_set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t  state)\r
-{\r
-\r
-    if (ftacallback_state_streamid(f->ftaid.streamid,process,state)<0) {\r
-       gslog(LOG_EMERG,"ERROR:state change for unkown streamid\n");\r
-       return -1;\r
-    }\r
-    return 0;\r
-}\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+#include "gsconfig.h"
+#include "gstypes.h"
+#include <gscpipc.h>
+#include <ipcencoding.h>
+#include <callbackregistries.h>
+#include <clearinghouseregistries.h>
+#include <lappregistries.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <lapp.h>
+#include <string.h>
+#include "rdtsc.h"
+
+gs_uint64_t shared_memory_full_warning =0;
+
+static gs_int32_t maxsnaplen = 0;
+
+struct FTAID clearinghouseftaid;
+
+gs_uint64_t intupledrop=0;
+gs_uint64_t outtupledrop=0;
+gs_uint64_t intuple=0;
+gs_uint64_t outtuple=0;
+gs_uint64_t inbytes=0;
+gs_uint64_t outbytes=0;
+gs_uint64_t cycles=0;
+
+
+
+/* following function is internal and defined in lappinterface.c */
+gs_retval_t  ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result);
+
+gs_retval_t  gscp_blocking_mode() {
+#ifdef BLOCKRINGBUFFER
+       return 1;
+#else
+       return 0;
+#endif
+}
+
+static void clock_signal_check() {
+    struct FTA * fa;
+    static gs_int32_t  t=0;
+    if (t==0) {
+        t=time(0);
+    } else {
+       if (time(0)>t) {
+            if (ftaexec_start()<0) {
+                gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "
+                        "FTA list\n");
+                return;
+            }
+            while ((fa=ftaexec_next())!=0) {
+                if (fa->clock_fta!=0) {
+                    fa->clock_fta(fa);
+                }
+            }
+            t=time(0);
+               if (t%GSLOGINTERVAL==0) gsstats();// log all the stats
+        }
+    }
+}
+
+static gs_retval_t  send_standard_reply(FTAID f, gs_int32_t  result) {
+    struct standard_result r;
+    r.h.callid=STANDARD_RESULT;
+    r.h.size=sizeof(struct standard_result);
+    r.result=result;
+    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {
+       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
+       return -1;
+    }
+    return 0 ;
+}
+
+static gs_retval_t  send_lookup_reply(FTAID f, gs_int32_t  result,
+                            FTAID * ftaid,
+                            gs_sp_t* schema) {
+    struct ftafind_result r;
+    r.h.callid=FTAFIND_RESULT;
+    r.h.size=sizeof(struct ftafind_result);
+    r.result=result;
+    r.f=*ftaid;
+    if (result >=0) {
+       if (strlen(*schema)>=(MAXSCHEMASZ-1)) {
+           gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",(unsigned char *)schema);
+           r.result=-1;
+       } else {
+           strcpy(r.schema,*schema);
+       }
+    }
+    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t) &r,r.h.size,1)<0) {
+       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
+       return -1;
+    }
+    return 0 ;
+}
+
+
+static gs_retval_t  send_fta_result(FTAID f,
+                          FTAID * ftaid, gs_int32_t  result) {
+    struct fta_result r;
+    r.h.callid=FTA_RESULT;
+    r.h.size=sizeof(struct fta_result);
+    r.result=result;
+    if (ftaid!=0) {
+      r.f=*ftaid;
+    }
+    if (gscpipc_send(f, FTACALLBACK,(gs_sp_t)&r,r.h.size,1)<0) {
+       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
+       return -1;
+    }
+    return 0 ;
+}
+
+
+// Is also used by the lfta rts enviroment on a post. So make it none
+// static.
+gs_retval_t  send_wakeup(FTAID f)
+{
+    struct wakeup_result a;
+
+    a.h.callid=WAKEUP;
+    a.h.size=sizeof(struct wakeup_result);
+    if (gscpipc_send(f, FTACALLBACK, (gs_sp_t)&a,a.h.size,0)<0) {
+       gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
+       return -1;
+    }
+    return 0;
+}
+
+static gs_retval_t  fta_register_instance(FTAID subscriber,
+                                FTAID f,gs_uint32_t   reusable,
+                                FTAname name,
+                                gs_csp_t  schema) {
+  gs_int8_t  rb[MAXRES];
+  struct fta_register_arg a;
+  struct standard_result * sr = (struct standard_result *)rb;
+
+  if (curprocess.type != CLEARINGHOUSE) {
+    a.h.callid = FTA_REGISTER;
+    a.h.size = sizeof(struct fta_register_arg);
+    if (strlen(name)>=(MAXFTANAME-1)) {
+      gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
+      return -1;
+    }
+    if (strlen(schema)>=(MAXSCHEMASZ-1)) {
+      gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);
+      return -1;
+    }
+    strcpy(a.name,name);
+    strcpy(a.schema,schema);
+    a.f=f;
+    a.subscriber=subscriber; /* consumer is the same as f for an FTA*/
+    a.reusable=reusable;
+    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
+    if (sr->h.callid != STANDARD_RESULT) {
+      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
+      return -1;
+    }
+    if (sr->result != 0) {
+      return -1;
+    }
+  } else {
+    if (ftalookup_register_fta(subscriber,f,name,reusable,schema)<0) {
+      return -1;
+    }
+  }
+  return 0;
+}
+
+static gs_retval_t  fta_unregister_instance(FTAID subscriber,
+                                  FTAID f) {
+  gs_int8_t  rb[MAXRES];
+  struct fta_unregister_arg a;
+  struct standard_result * sr = (struct standard_result *)rb;
+
+  if (curprocess.type != CLEARINGHOUSE) {
+    a.h.callid = FTA_UNREGISTER;
+    a.h.size = sizeof(struct fta_register_arg);
+    a.f=f;
+    a.subscriber=subscriber; /* consumer is the same as f for an FTA*/
+    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
+    if (sr->h.callid != STANDARD_RESULT) {
+      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
+      return -1;
+    }
+    return -1;
+  } else {
+    if (ftalookup_unregister_fta(f,subscriber)<0) {
+      return -1;
+    }
+  }
+  return 0;
+}
+
+gs_retval_t  fta_start_service(gs_int32_t  number)
+{
+    gs_int8_t   buf[MAXMSGSZ];
+    FTAID from;
+    gs_int32_t  length;
+    struct hostcall * h= (struct hostcall *) buf;
+    gs_int32_t  forever=0;
+    gs_int32_t  endtime=0;
+    gs_int32_t  block=1;
+    gs_int32_t  res;
+    struct ringbuf * r;
+    FTAID ftaid;
+    FTAID * ftaidp;
+    struct FTA * fta;
+    gs_int32_t  lopp;
+       gs_int32_t  preemptq;
+       gs_int32_t  endq;
+       gs_uint64_t s1;
+       gs_uint64_t s2;
+
+    if (number == 0) {
+       block=0;
+       forever=1;
+    }
+    if (number < 0) {
+       forever=1;
+    }
+    if (number > 0) {
+        block=1;
+       endtime=time(0)+number;
+    }
+
+    while((forever!=0)||
+          (endtime==0)||(endtime>time(0))) {
+        /* check if we need to give the FTAs there clock signal */
+        if ((curprocess.type == LFTA)
+            ||(curprocess.type == HFTA)) {
+            clock_signal_check();
+        }
+
+#ifdef POLLING
+       poll:
+#endif
+    preemptq=0;
+       /* first empty out sidequeu then read from messagequeue */
+       if (sidequeue_pop(&from,buf,&length)<0) {
+         /* empty out the sidequeue before processing the shared
+            memory */
+         if (curprocess.type == HFTA) {
+           /* process all the shared memory regions and register
+              for callbacks */
+               s1=rdtsc();
+               endq=time(0)+2;
+           streamregistry_getactiveringbuf_reset();
+           while ((r=streamregistry_getactiveringbuf())>0) {
+             while (UNREAD(r)) {
+                       struct FTA * fa;
+                       if (ftaexec_start()<0) {
+                       gslog(LOG_EMERG,"GSCPRTS::error::could not init check of "
+                         "FTA list\n");
+                       return -1;
+                       }
+                       while ((fa=ftaexec_next())!=0) {
+                       gs_int32_t  x;
+                       for(x=0;x<fa->stream_subscribed_cnt;x++) {
+                               if ((fa->stream_subscribed[x].streamid
+                                       ==CURREAD(r)->f.streamid)
+                                       && (fa->stream_subscribed[x].ip
+                                               ==CURREAD(r)->f.ip)
+                                       && (fa->stream_subscribed[x].port
+                                               ==CURREAD(r)->f.port)) {
+                                       fa->accept_packet(fa,&(CURREAD(r)->f),
+                                               &(CURREAD(r)->data[0]),
+                                               CURREAD(r)->sz);
+                               }
+                       }
+                       }
+                       intuple++;
+                       inbytes+=CURREAD(r)->sz;
+                       ADVANCEREAD(r);
+                       if (endq <= time(0)) {
+                               preemptq=1;
+                               goto processmsg;
+                       }
+             }
+           }
+               s2=rdtsc();
+               cycles+=(s2-s1);
+#ifndef POLLING
+           /* register wakeups all arround to make sure we don't sleep
+            * for ever,
+            */
+           streamregistry_getactiveftaid_reset();
+           while ((ftaidp=streamregistry_getactiveftaid())>0) {
+             struct gscp_get_buffer_arg a;
+             a.h.callid = GSCP_GET_BUFFER;
+             a.h.size = sizeof(struct gscp_get_buffer_arg);
+             a.timeout = 0;
+             if (gscpipc_send(*ftaidp,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {
+               return -1;
+             }
+           }
+#endif
+         }
+         processmsg:
+         do {
+                       /* even if we block we return every 100msec to be able to generate the clock signal to
+                        * the HFTAs
+             */
+            if ((res=gscpipc_read(&from,&lopp,buf,&length,((block==1)&&(preemptq==0))?2:0))<0) {
+                gslog(LOG_EMERG,"GSCPRTS::error::reading from messagequeue\n");
+                return -1;
+            }
+            /* check if we need to give the FTAs there clock signal */
+            if ((curprocess.type == LFTA)
+                ||(curprocess.type == HFTA)) {
+                clock_signal_check();
+            }
+            if ((res==0) && (block==0)) {
+                /* nonblocking and nothing to do so return */
+                return 0;
+            }
+            if ((res==0) && (endtime!=0) && (endtime<time(0))) {
+                /* timeout reached so return */
+                return 0;
+            }
+#ifdef POLLING
+           if ((res==0)&&(curprocess.type == HFTA)) {
+                goto poll;
+           }
+#endif
+         } while (res==0);
+         if ((lopp)!=FTACALLBACK) {
+           gslog(LOG_EMERG,"GSCPRTS::error::unknown lowlevel opp\n");
+           return -1;
+         }
+       }
+
+#ifdef PRINTMSG
+       gslog(LOG_EMERG, "HFTA message from %u of type %u of length %u\n",from,
+       h->callid,h->size);
+#endif
+       switch (h->callid) {
+
+       case FTA_LOOKUP: {
+         if (curprocess.type == CLEARINGHOUSE) {
+           struct fta_find_arg * n;
+           FTAID rf;
+           gs_csp_t  schema;
+           n = (struct fta_find_arg *) buf;
+           /* Note: Side effect of ftalookup_lookup_fta_index is to
+              fill msgid and index */
+                 if (send_lookup_reply(from,
+                                       ftalookup_lookup_fta_index(from,
+                                                                  n->name,
+                                                                  n->reuse,
+                                                                  &rf,
+                                                                  &schema),
+                                       &rf,
+                                       (gs_sp_t *)&schema)<0) {
+                   gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+                   return -1;
+                 }
+         } else {
+           if (send_standard_reply(from,-1)<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
+                   "contacted for clearinghouse processing\n");
+         }
+       }
+         break;
+       case FTA_REGISTER: {
+         if (curprocess.type == CLEARINGHOUSE) {
+           struct fta_register_arg * n;
+           n = (struct fta_register_arg *) buf;
+           if (send_standard_reply(from,
+                                   ftalookup_register_fta(n->subscriber,
+                                                          n->f,
+                                                          n->name,
+                                                          n->reusable,
+                                                          n->schema))<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+         } else {
+           if (send_standard_reply(from,-1)<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
+                   "contacted for clearinghouse processing\n");
+         }
+       }
+         break;
+       case FTA_UNREGISTER: {
+         if (curprocess.type == CLEARINGHOUSE) {
+           struct fta_unregister_arg * n;
+           n = (struct fta_unregister_arg *) buf;
+           if (send_standard_reply(from,
+                                   ftalookup_unregister_fta(n->subscriber,
+                                                            n->f))<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+         } else {
+           if (send_standard_reply(from,-1)<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse proccess got"
+                   "contacted for clearinghouse processing\n");
+         }
+       }
+         break;
+       case FTA_ALLOC_INSTANCE:
+       case FTA_ALLOC_PRINT_INSTANCE:
+         if ((curprocess.type == LFTA)
+             ||(curprocess.type == HFTA)) {
+           struct ringbuf * r;
+           struct fta_alloc_instance_arg * n;
+           n = (struct fta_alloc_instance_arg *) buf;
+           if ((fta=ftaexec_alloc_instance(n->f.index,
+                                           (struct FTA *)n->f.streamid,
+                                           n->reusable,
+                                           n->command,
+                                           n->sz,
+                                           &(n->data[0])))==0) {
+             gslog(LOG_EMERG,"GSCPRTS::warning::could not allocate"
+                     "FTA\n");
+             if (send_fta_result(from,0,-1)<0) {
+               gslog(LOG_EMERG,"GSCPRTS::error::send standard "
+                       "reply faild\n");
+               return -1;
+             }
+           } else {
+             /* shared memory is only required if data is beeing transfered */
+             if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&
+                 ((r=gscpipc_getshm(from))==0)) {
+                               gslog(LOG_EMERG,"GSCPRTS::warning::could not get"
+                               "shared memory\n");
+                               ftaexec_remove(fta);
+                               if (send_fta_result(from,0,-1)<0) {
+                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
+                                       "reply faild\n");
+                                       return -1;
+                               }
+               } else {
+                       /* no callback to register for print function */
+                       if ((h->callid!=FTA_ALLOC_PRINT_INSTANCE)&&(ftacallback_add_streamid(r,fta->ftaid.streamid)!=0)) {
+                               gslog(LOG_EMERG,"GSCPRTS::warning::could not add"
+                                       "streamid to ringbuffer\n");
+                               ftaexec_free_instance(fta,1);
+                               if (send_fta_result(from,0,-1)<0) {
+                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
+                                       "reply faild\n");
+                                       return -1;
+                               }
+               } else {
+                 if (ftaexec_insert(0,fta)<0) {
+                   gslog(LOG_EMERG,"GSCPRTS::warning::could not"
+                           "insert FTA\n");
+                   ftacallback_rm_streamid(r,fta->ftaid.streamid);
+                   ftaexec_free_instance(fta,1);
+                   if (send_fta_result(from,0,-1)<0) {
+                     gslog(LOG_EMERG,"GSCPRTS::error::send standard "
+                             "reply faild\n");
+                     return -1;
+                   }
+                 } else {
+                   if (fta_register_instance(n->subscriber,fta->ftaid,
+                                             n->reusable,
+                                             n->name,
+                                             n->schema)!=0) {
+                     gslog(LOG_EMERG,"GSCPRTS::warning::could not register"
+                       " instance\n");
+                     ftaexec_remove(fta);
+                     ftacallback_rm_streamid(r,fta->ftaid.streamid);
+                     ftaexec_free_instance(fta,1);
+                     if (send_fta_result(from,0,-1)<0) {
+                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
+                               "reply faild\n");
+                       return -1;
+                     }
+                   }  else {
+                     if (h->callid==FTA_ALLOC_PRINT_INSTANCE) {
+                               if (curprocess.type == LFTA) {
+                                       gslog(LOG_EMERG,"GSCPRTS::error:: alloc print instance not "
+                                               "implemented for LFTA.\n");
+                                       ftaexec_remove(fta);
+                                       ftaexec_free_instance(fta,1);
+                                       if (send_fta_result(from,0,-1)<0) {
+                                               gslog(LOG_EMERG,"GSCPRTS::error::send standard "
+                                               "reply faild\n");
+                                               return -1;
+                                       }
+                               } else {
+                                       if (add_printfunction_to_stream(fta, n->schema, n->path, n->basename,
+                                               n->temporal_field, n->split_field, n->delta, n->split) < 0) {
+                                               ftaexec_remove(fta);
+                                               ftaexec_free_instance(fta,1);
+                                               if (send_fta_result(from,0,-1)<0) {
+                                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
+                                                       "reply faild\n");
+                                                       return -1;
+                                               }
+                               }
+                         }
+                       }
+                     if (send_fta_result(from,&fta->ftaid,0)<0) {
+                                       gslog(LOG_EMERG,"GSCPRTS::error::send standard "
+                               "reply faild\n");
+                               return -1;
+                     }
+                   }
+                 }
+               }
+             }
+           }
+         } else {
+           if (send_fta_result(from,0,-1)<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard "
+                     "reply faild\n");
+             return -1;
+           }
+         }
+         break;
+       case FTA_FREE_INSTANCE:{
+         if ((curprocess.type == LFTA)
+             ||(curprocess.type == HFTA)) {
+           struct fta_free_instance_arg * n;
+           n = (struct fta_free_instance_arg *) buf;
+           if (((r=gscpipc_getshm(from))!=0)
+               && ( ftaexec_remove((struct FTA *) n->f.streamid)==0)
+               && ( ftacallback_rm_streamid(r,n->f.streamid)==0)
+               && (ftaexec_free_instance((struct FTA *)n->f.streamid,
+                                         n->recursive)
+                   ==0)
+               && (fta_unregister_instance(n->subscriber,n->f)<0)) {
+             if (send_standard_reply(from,0)<0) {
+               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+               return -1;
+             }
+           } else {
+             if (send_standard_reply(from,-1)<0) {
+               gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+               return -1;
+             }
+           }
+         } else {
+           if (send_standard_reply(from,-1)<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+         }
+       }
+          break;
+       case FTA_CONTROL:{
+         if ((curprocess.type == LFTA)
+             ||(curprocess.type == HFTA)) {
+           struct fta_control_arg * n;
+           n = (struct fta_control_arg *) buf;
+           if (send_standard_reply(from,
+                                   ftaexec_control((struct FTA *)
+                                                   n->f.streamid,
+                                                   n->command,
+                                                   n->sz,
+                                                   &(n->data[0])))<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+         } else {
+           if (send_standard_reply(from,-1)<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"
+                   "contacted for clearinghouse or HFTA processing\n");
+         }
+       }
+          break;
+       case FTA_PRODUCER_FAILURE: {
+         if (curprocess.type == CLEARINGHOUSE) {
+           struct fta_notify_producer_failure_arg * n;
+           n = (struct fta_notify_producer_failure_arg *) buf;
+           ftalookup_producer_failure(n->sender,n->producer);
+         }
+       }
+         break;
+       case FTA_HEARTBEAT: {
+         if (curprocess.type == CLEARINGHOUSE) {
+           struct fta_heartbeat_arg * n;
+           n = (struct fta_heartbeat_arg *) buf;
+           ftalookup_heartbeat(n->sender,n->trace_id,
+                               n->sz,&(n->data[0]));
+         }
+       }
+         break;
+       case GSCP_GET_BUFFER:{
+         struct sgroup_get_buffer_arg * n;
+         gs_int32_t  res;
+         struct ringbuf * r;
+         n = (struct sgroup_get_buffer_arg *) buf;
+
+         if ((r=gscpipc_getshm(from))==0) {
+           gslog(LOG_EMERG,"GSCPRTS::error::proccess blocked without"
+                   "sharedmemory\n");
+         } else {
+           if (UNREAD(r)) {
+             /* something arrived in the meantime so wakeup
+                right away */
+               if (send_wakeup(from)<0) {
+                 gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");
+                 return -1;
+               }
+           } else {
+#ifndef POLLING
+             if (ftacallback_add_wakeup(from,r)<0) {
+               gslog(LOG_EMERG,"ERROR:Could not add wakeup\n");
+               return -1;
+             }
+#else
+               gslog(LOG_EMERG,"Received wakeup request on polling systems\n");
+#endif
+           }
+         }
+       }
+         break;
+       case PROCESS_CONTROL:{
+         if ((curprocess.type == LFTA)
+             ||(curprocess.type == HFTA)) {
+           struct process_control_arg * n;
+           n = (struct process_control_arg *) buf;
+           if (send_standard_reply(from,
+                                   ftaexec_process_control(n->command,
+                                                           n->sz,
+                                                           &(n->data[0])))<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+         } else {
+           if (send_standard_reply(from,-1)<0) {
+             gslog(LOG_EMERG,"GSCPRTS::error::send standard reply faild\n");
+             return -1;
+           }
+           gslog(LOG_EMERG,"GSCPRTS::error: Non clearinghouse or HFTA proccess got"
+                   "contacted for clearinghouse or HFTA processing\n");
+         }
+       }
+          break;
+       case WAKEUP:
+       case TIMEOUT:
+          break;
+       default:
+         gslog(LOG_EMERG,"GSCPRTS::error::illegal message queue type %u\n",h->callid);
+         return -1;
+        }
+       /* use this occation to cleanup the messagequeue we can't afford
+          a backlog in the real message queue since it is limited in
+          size */
+       while (gscpipc_read(&from,&lopp,buf,&length,0)>0) {
+#ifdef PRINTMSG
+         gslog(LOG_EMERG, "request from %u of type %u with length %u\n",from,
+                 h->callid,h->size);
+#endif
+         if ((lopp == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {
+           if (sidequeue_append(from,buf,length)<0) {
+             gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
+             return -1;
+           }
+         }
+
+       }
+    }
+    return 0;
+}
+
+static gs_retval_t  map_match(gs_csp_t  dev) {
+  gs_int32_t  x;
+  for(x=0;x<curprocess.mapcnt;x++){
+    if (strcmp(dev,curprocess.map[x])==0) {
+      return 1;
+    }
+  }
+  return 0;
+}
+
+gs_retval_t  fta_max_snaplen()
+{
+  return maxsnaplen;
+}
+
+FTAID fta_register(FTAname name,gs_uint32_t   reusable, DEVname dev,
+                    alloc_fta fta_alloc_functionptr,
+                    gs_csp_t  schema, gs_int32_t  snaplen, gs_uint64_t prefilter)
+{
+  gs_int8_t  rb[MAXRES];
+  struct fta_register_arg a;
+  struct standard_result * sr = (struct standard_result *)rb;
+  gs_int32_t  index;
+  FTAID res;
+  FTAID reserr;
+  res.ip=0;
+  res.port=0;
+  res.index=0;
+  res.streamid=0;
+  reserr=res;
+
+  /* check if the device matches for the registration */
+  if (((dev==0) && (curprocess.deviceid==0))
+      || ((dev!=0)&&(map_match(dev)==1))) {
+    if (dev!=0) {
+      gslog(LOG_INFO,"Register %s on device %s\n",name,dev);
+    } else {
+      gslog(LOG_INFO,"Register %s on default device\n",name);
+    }
+    if ((index=ftacallback_add_alloc(name,fta_alloc_functionptr,prefilter))<0) {
+      gslog(LOG_EMERG,"ERROR could not register callback\n");
+      return res;
+    }
+
+    if (snaplen<0) {
+      maxsnaplen=-1;
+    } else {
+      if (maxsnaplen!=-1) {
+       maxsnaplen=(snaplen>maxsnaplen)?snaplen:maxsnaplen;
+      }
+    }
+
+    res=gscpipc_getftaid();
+    res.index=index;
+    if (curprocess.type != CLEARINGHOUSE) {
+               a.h.callid = FTA_REGISTER;
+               a.h.size = sizeof(struct fta_register_arg);
+               if (strlen(name)>=(MAXFTANAME-1)) {
+                       gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
+                       return reserr;
+               }
+               if (strlen(schema)>=(MAXSCHEMASZ-1)) {
+                       gslog(LOG_EMERG,"ERROR:FTA schema (%s) to large\n",schema);
+                       return reserr;
+               }
+               strcpy(a.name,name);
+               strcpy(a.schema,schema);
+               a.f=res;
+               a.subscriber=res; /* consumer is the same as f for an FTA*/
+               a.reusable=reusable;
+               ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
+               if (sr->h.callid != STANDARD_RESULT) {
+                       gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
+                       return reserr;
+               }
+               if (sr->result != 0 )  {
+                       gslog(LOG_EMERG,"ERROR:Error in registration\n");
+                       return reserr;
+               }
+    } else {
+      if (ftalookup_register_fta(gscpipc_getftaid(),
+                       res,name,reusable,schema)!=0) {
+                       return res;
+      }
+    }
+  }
+  return res;
+}
+
+gs_retval_t  fta_unregister(FTAID ftaid)
+{
+  gs_int8_t  rb[MAXRES];
+  struct fta_unregister_arg a;
+  struct standard_result * sr = (struct standard_result *)rb;
+  if (ftacallback_rm_alloc(ftaid.index)<0) {
+    gslog(LOG_EMERG,"ERROR could not unregister callback\n");
+    return -1;
+  }
+
+  if (curprocess.type != CLEARINGHOUSE) {
+    a.h.callid = FTA_UNREGISTER;
+    a.h.size = sizeof(struct fta_register_arg);
+    a.f=ftaid;
+    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t) &a,rb);
+    if (sr->h.callid != STANDARD_RESULT) {
+      gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
+      return -1;
+    }
+    return sr->result;
+  } else {
+    return ftalookup_unregister_fta(gscpipc_getftaid(),ftaid);
+  }
+  return 0;
+}
+
+gs_retval_t  hfta_post_tuple(struct FTA * self, gs_int32_t  sz, void *tuple)
+{
+  struct ringbuf * r;
+  gs_uint32_t   msgid;
+  struct wakeup_result a;
+  FTAID * f;
+  gs_int32_t  state;
+
+    if (sz>MAXTUPLESZ) {
+        gslog(LOG_EMERG,"Maximum tuple size is %u\n",MAXTUPLESZ);
+        return -1;
+    }
+
+    if (self->printfunc.in_use==1) {
+               return print_stream(self,sz,tuple);
+       }
+
+    if (ftacallback_start_streamid((gs_p_t  )self)<0) {
+       gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");
+       return -1;
+    }
+    /* now make sure we have space to write in all atomic ringbuffer */
+    while((r=ftacallback_next_streamid(&state))!=0) {
+        if (state == HFTA_RINGBUF_ATOMIC) {
+#ifdef BLOCKRINGBUFFER
+            while (!SPACETOWRITE(r)) {
+                usleep(100);
+            }
+#endif
+            if (! SPACETOWRITE(r)) {
+                /* atomic ring buffer and no space so post nothing */
+                return -1;
+            }
+        }
+    }
+
+
+    if (ftacallback_start_streamid((gs_p_t  )self)<0) {
+        gslog(LOG_EMERG,"ERROR:Post for unkown streamid\n");
+        return -1;
+    }
+
+    while((r=ftacallback_next_streamid(&state))!=0) {
+        if (state != HFTA_RINGBUF_SUSPEND) {
+            if (!SPACETOWRITE(r)) {
+                //since memory is full we set a warning
+                shared_memory_full_warning++;
+                // give receiver a chance to clean up
+                usleep(0);
+            }
+            if (SPACETOWRITE(r)) {
+                CURWRITE(r)->f=self->ftaid;
+                CURWRITE(r)->sz=sz;
+                memcpy(&(CURWRITE(r)->data[0]),tuple,sz);
+                               outtuple++;
+                               outbytes=outbytes+CURWRITE(r)->sz;
+                ADVANCEWRITE(r);
+#ifdef PRINTMSG
+               gslog(LOG_EMERG,"Wrote in ringpuffer %p [%p:%u]"
+                    "(%u %u) \n",r,&r->start,r->end,r->reader,r->writer);
+                gslog(LOG_EMERG,"\t%u  %u\n",CURREAD(r)->next,
+                    CURREAD(r)->sz);
+#endif
+            } else {
+                outtupledrop++;
+            }
+            if (HOWFULL(r) > 500) {
+      // buffer is at least half full
+                shared_memory_full_warning++;
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"\t\t buffer full\n");
+#endif
+            }
+        }
+    }
+#ifndef POLLING
+    if (ftacallback_start_wakeup((gs_p_t  ) self)<0) {
+        gslog(LOG_EMERG,"ERROR:Wakeup for unkown streamid\n");
+        return -1;
+    }
+    a.h.callid=WAKEUP;
+    a.h.size=sizeof(struct wakeup_result);
+    while((f=ftacallback_next_wakeup())!=0) {
+        if (send_wakeup(*f)<0) {
+            gslog(LOG_EMERG,"ERROR:Could not send wakeup\n");
+            return -1;
+        }
+    }
+#endif
+    return 0;
+}
+
+gs_retval_t  hfta_get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t  * space, gs_int32_t  szr, gs_int32_t  tuplesz)
+{
+    gs_int32_t  x=0;
+    gs_int32_t  state;
+    struct ringbuf * ru;
+
+       if (f->printfunc.in_use==1) {
+       // XXX WHAT TO DO???
+               gslog(LOG_INFO,"Checking space for printfunc");
+               return 0;
+       }
+
+       if (ftacallback_start_streamid(f->ftaid.streamid)<0) {
+        gslog(LOG_EMERG,"ERROR:Space check for unkown streamid in HFTA\n");
+        return -1;
+    }
+
+    while ((ru=ftacallback_next_streamid(&state))!=0) {
+        if (szr > x ) {
+            r[x]=ru->destid;
+            space[x]=TUPLEFIT(ru,tuplesz);
+        }
+        x++;
+    }
+    return x;
+}
+
+
+gs_retval_t  hfta_set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t  state)
+{
+
+    if (ftacallback_state_streamid(f->ftaid.streamid,process,state)<0) {
+       gslog(LOG_EMERG,"ERROR:state change for unkown streamid\n");
+       return -1;
+    }
+    return 0;
+}