Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscphost / callbackregistries.c
index 228b781..8bd0957 100644 (file)
-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
\r
- http://www.apache.org/licenses/LICENSE-2.0\r
\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#include "callbackregistries.h"\r
-#include "lapp.h"\r
-#include "gscpipc.h"\r
-#include "stdlib.h"\r
-#include "stdio.h"\r
-#include "string.h"\r
-#include <unistd.h>\r
-#include <signal.h>\r
-#include <sys/mman.h>\r
-#include "schemaparser.h"\r
-#include "errno.h"\r
-\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-\r
-struct ftacallbackalloc {\r
-    gs_int32_t used;\r
-    gs_sp_t name;\r
-    alloc_fta fta_alloc_functionptr;\r
-    gs_uint64_t prefilter;\r
-};\r
-\r
-static struct ftacallbackalloc * falloc =0;\r
-static gs_int32_t lalloc=0;\r
-\r
-struct ftacallbackstreamid {\r
-    gs_int32_t refcnt;\r
-    gs_uint32_t streamid;\r
-    gs_uint32_t state;\r
-    struct ringbuf * r;\r
-};\r
-\r
-static struct ftacallbackstreamid * fstreamid =0;\r
-static gs_int32_t lstreamid=0;\r
-static gs_uint32_t cstreamid;\r
-static gs_int32_t nstreamid;\r
-\r
-struct ftacallbackwakeup {\r
-    gs_int32_t used;\r
-    FTAID ftaid;\r
-    struct ringbuf * r;\r
-};\r
-\r
-static struct ftacallbackwakeup * fwakeup =0;\r
-static gs_int32_t lwakeup=0;\r
-static gs_uint32_t swakeup;\r
-static gs_int32_t nwakeup;\r
-\r
-/* XXX memory of fta list has no owner struct */\r
-\r
-struct fta_list {\r
-    struct fta_list * prev;\r
-    struct fta_list * next;\r
-    struct FTA * fta;\r
-};\r
-\r
-static struct fta_list * process=0;\r
-static struct fta_list * created=0;\r
-\r
-static struct fta_list * itteration=0;\r
-\r
-// Side queue datastructures\r
-\r
-struct sq {\r
-    FTAID from;\r
-    gs_int8_t buf[MAXMSGSZ];\r
-    gs_int32_t length;\r
-    struct sq * next;\r
-};\r
-\r
-static struct sq * sqtop=0;\r
-static struct sq * sqtail=0;\r
-\r
-\r
-/* HFTA internal print function*/\r
-\r
-gs_retval_t add_printfunction_to_stream( struct FTA  * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,\r
-                                        gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {\r
-       gs_uint32_t parserversion;\r
-       gs_int32_t schemaid;\r
-       gs_uint32_t x;\r
-       gs_int8_t temp[50000];\r
-       if (ftaid->printfunc.in_use==1) {\r
-        gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");\r
-           return -1;\r
-       }\r
-       ftaid->printfunc.path=strdup(path);\r
-       ftaid->printfunc.basename=strdup(basename);\r
-       ftaid->printfunc.nexttime=0;\r
-       ftaid->printfunc.split=(split%1000);\r
-       ftaid->printfunc.itt=(split/1000)%1000;\r
-       ftaid->printfunc.base=(split/1000000)%1000;\r
-       ftaid->printfunc.delta=delta;\r
-       ftaid->printfunc.in_use=1;\r
-       if (ftaid->printfunc.split > MAXPRINTFILES) {\r
-        gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");\r
-           return -1;\r
-       }\r
-       for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;\r
-    \r
-       if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {\r
-               gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");\r
-               return -1;\r
-       }\r
-       if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(\r
-                                                                            ftaid->printfunc.schemaid,temporal_field))<0) {\r
-               gslog(LOG_EMERG,"ERROR:could not get "\r
-              "offset for timefield %s in HFTA print function\n",\r
-              temporal_field);\r
-               return -1;\r
-       }\r
-    \r
-       if (ftaschema_get_field_type_by_name(\r
-                                         ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {\r
-           gslog(LOG_EMERG,"ERROR: illegal type for timefield "\r
-              "%s in HFTA print function UINT expected\n",\r
-              temporal_field);\r
-           return -1;\r
-       }\r
-       if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(\r
-                                                                         ftaid->printfunc.schemaid,split_field))<0) {\r
-               gslog(LOG_EMERG,"ERROR:could not get "\r
-              "offset for splitfield %s in HFTA print function\n",\r
-              split_field);\r
-               return -1;\r
-       }\r
-    \r
-       if (ftaschema_get_field_type_by_name(\r
-                                         ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {\r
-           gslog(LOG_EMERG,"ERROR: illegal type for splitfield"\r
-              "%s in HFTA print function UINT expected\n",\r
-              split_field);\r
-           return -1;\r
-       }\r
-       parserversion=get_schemaparser_version();\r
-       sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);\r
-       ftaid->printfunc.header=strdup(temp);\r
-       gslog(LOG_INFO,"Established print function for %s",basename);\r
-       return 0;\r
-}\r
-\r
-gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)\r
-{\r
-       gs_int32_t problem;\r
-       gs_uint32_t timeval;\r
-       gs_uint32_t splitval;\r
-       gs_uint32_t x;\r
-       gs_uint32_t nsz;\r
-       timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);\r
-       if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp\r
-       if (timeval>= self->printfunc.nexttime) {\r
-               gs_int8_t oldname[1024];\r
-               gs_int8_t newname[1024];\r
-               if (self->printfunc.split==0) {\r
-                       if (self->printfunc.fa[0] != 0) {\r
-                               sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);\r
-                               sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);\r
-                               fclose(self->printfunc.fa[0]);\r
-                               rename(oldname,newname);\r
-                       }\r
-                       if (self->printfunc.nexttime==0) {\r
-                               self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;\r
-                       }\r
-                       sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);\r
-                       if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {\r
-                               gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");\r
-                               return -1;\r
-                       }\r
-                       if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {\r
-                               gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");\r
-                       }\r
-                       if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {\r
-                               gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);\r
-                       }\r
-                       gslog(LOG_INFO,"Opened file %s",oldname);\r
-               } else {\r
-                       for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {\r
-                               if (self->printfunc.fa[x] != 0) {\r
-                                       sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);\r
-                                       sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);\r
-                                       fclose(self->printfunc.fa[x]);\r
-                                       rename(oldname,newname);\r
-                               }\r
-                               if (self->printfunc.nexttime==0) {\r
-                                       self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;\r
-                               }\r
-                               sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);\r
-                               if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {\r
-                                       gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");\r
-                                       return -1;\r
-                               }\r
-                if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {\r
-                    gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");\r
-                }\r
-                               if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1)  {\r
-                    gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);\r
-                }\r
-                gslog(LOG_INFO,"Opened file %s",oldname);\r
-                       }\r
-               }\r
-               self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;\r
-       }\r
-    // don't write temporal tuples to file but use them to advance file name.\r
-       if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;\r
-       if (self->printfunc.split!=0) {\r
-               splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);\r
-               if (self->printfunc.fa[splitval]==0) {\r
-                       gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);\r
-                       exit(0);\r
-               }\r
-       } else {\r
-               splitval=0;\r
-       }\r
-       nsz=htonl(sz);\r
-       if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {\r
-               gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",\r
-              self->printfunc.basename,errno);\r
-               exit(0);\r
-       }\r
-       if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {\r
-               gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",\r
-              self->printfunc.basename,errno);\r
-               exit(0);\r
-       }\r
-       return 0;\r
-}\r
-\r
-/* registers an alloc function of an FTA and returns a unique index */\r
-gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)\r
-{\r
-    gs_int32_t x;\r
-    gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);\r
-    if (lalloc == 0) {\r
-        if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {\r
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
-            return -1;\r
-        }\r
-        memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);\r
-        lalloc = STARTSZ;\r
-    }\r
-    for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);\r
-    if (x == lalloc) {\r
-        gs_int32_t y;\r
-        lalloc = 2*lalloc;\r
-        if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {\r
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
-            return -1;\r
-        }\r
-        for (y=x;y<lalloc;y++)\r
-            falloc[y].used=0;\r
-    }\r
-    falloc[x].name=strdup(name);\r
-    falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;\r
-    falloc[x].prefilter=prefilter;\r
-    falloc[x].used=1;\r
-    return x;\r
-}\r
-\r
-/* unregisters an alloc function of an FTA and makes the index available\r
- * for reuse\r
- */\r
-\r
-gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)\r
-{\r
-    falloc[index].used=0;\r
-    free(falloc[index].name);\r
-    return 0;\r
-}\r
-/* returns the prefilter for a given\r
- * index\r
- */\r
-\r
-gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)\r
-{\r
-    if ((index<lalloc) && (falloc[index].used!=0)) {\r
-        return falloc[index].prefilter;\r
-    }\r
-    return 0;\r
-}\r
-\r
-/* returns the function pointer of the callback function for a given\r
- * index\r
- */\r
-\r
-alloc_fta ftacallback_get_alloc(gs_int32_t index)\r
-{\r
-    if ((index<lalloc) && (falloc[index].used!=0)) {\r
-        return falloc[index].fta_alloc_functionptr;\r
-    }\r
-    return 0;\r
-}\r
-\r
-\r
-\r
-\r
-\r
-/* associate ringbuffer with streamid (using refcounting) */\r
-gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {\r
-    gs_int32_t x;\r
-    if (lstreamid == 0) {\r
-        if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {\r
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
-            return -1;\r
-        }\r
-        memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);\r
-        lstreamid = STARTSZ;\r
-    }\r
-    /* first try to increment refcnt */\r
-    for(x=0;(x<lstreamid)&&(\r
-                            (fstreamid[x].streamid!=streamid)\r
-                            ||(fstreamid[x].r!=r)\r
-                            ||(fstreamid[x].refcnt<=0)) ;x++);\r
-    if (x>=lstreamid) {\r
-        /* now try to find empty slot */\r
-        for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);\r
-        if (x >= lstreamid) {\r
-            gs_int32_t y;\r
-            lstreamid = 2*lstreamid;\r
-            if ((fstreamid =\r
-                 realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {\r
-                gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
-                return -1;\r
-            }\r
-            for (y=x;y<lstreamid;y++) {\r
-                fstreamid[y].refcnt=0;\r
-                fstreamid[y].streamid=0;\r
-            }\r
-        }\r
-        fstreamid[x].state=HFTA_RINGBUF_ATOMIC;\r
-       fstreamid[x].streamid=streamid;\r
-       fstreamid[x].r=r;\r
-    }\r
-    fstreamid[x].refcnt+=1;\r
-    return 0;\r
-}\r
-\r
-\r
-/* unassosciate a ringbuffer from a streamid */\r
-\r
-gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)\r
-{\r
-    gs_int32_t x;\r
-    for(x=0;x<lstreamid;x++) {\r
-        if ((fstreamid[x].streamid == streamid)\r
-            && (fstreamid[x].r == r)\r
-            && (fstreamid[x].refcnt > 0))\r
-            fstreamid[x].refcnt--;\r
-    }\r
-    return 0;\r
-}\r
-\r
-/* set the state for a given streamid and destination process */\r
-gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)\r
-{\r
-    gs_int32_t x;\r
-    for(x=0;x<lstreamid;x++) {\r
-        if ((fstreamid[x].streamid == streamid)\r
-            && (fstreamid[x].r->destid.ip == process.ip )\r
-            && (fstreamid[x].r->destid.port == process.port )\r
-            && (fstreamid[x].refcnt > 0))\r
-            fstreamid[x].state=state;\r
-        return 0;\r
-    }\r
-    return -1;\r
-}\r
-\r
-/* starts an itteration through all ringbuffers for a particular streamid */\r
-\r
-gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)\r
-{\r
-    cstreamid=streamid;\r
-    nstreamid=0;\r
-    return 0;\r
-}\r
-\r
-/* returns all the ringbuffer associated with the streamid passed in\r
- * ftacallback_start_streamid\r
- */\r
-struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)\r
-{\r
-    for(;(nstreamid<lstreamid)\r
-           &&(fstreamid[nstreamid].streamid != cstreamid);\r
-        nstreamid++);\r
-    if (nstreamid<lstreamid) {\r
-        nstreamid++;\r
-        *state=fstreamid[nstreamid-1].state;\r
-        return fstreamid[nstreamid-1].r;\r
-    }\r
-    return 0;\r
-}\r
-\r
-\r
-/* associate msgid with ringbuf  */\r
-gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)\r
-{\r
-    gs_int32_t x;\r
-    if (lwakeup == 0) {\r
-        if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {\r
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
-            return -1;\r
-        }\r
-        memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);\r
-        lwakeup = STARTSZ;\r
-    }\r
-    /* first try to find one for the same process */\r
-    for(x=0;(x<lwakeup)&&(\r
-                          ((fwakeup[x].ftaid.ip!=ftaid.ip)\r
-                           || (fwakeup[x].ftaid.port!=ftaid.port))\r
-                          ||(fwakeup[x].used==0)) ;x++);\r
-    if (x==lwakeup) {\r
-        /* now try to find empty slot */\r
-        for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);\r
-        if (x == lwakeup) {\r
-            gs_int32_t y;\r
-            lwakeup = 2*lwakeup;\r
-            if ((fwakeup =\r
-                 realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {\r
-                gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
-                return -1;\r
-            }\r
-            for (y=x;y<lwakeup;y++)\r
-                fwakeup[y].used=0;\r
-        }\r
-    }\r
-    fwakeup[x].used=1;\r
-    fwakeup[x].ftaid=ftaid;\r
-    fwakeup[x].r=r;\r
-    return x;\r
-}\r
-\r
-/* starts an itteration through all msgids associated with\r
- a streamid. This also uses data kept in the fstreamid\r
- registry\r
- */\r
-gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)\r
-{\r
-    swakeup=streamid;\r
-    nwakeup=0;\r
-    return 0;\r
-}\r
-\r
-/* returns all the msgid blocked on the streamid passed in\r
- * ftacallback_start_streamid and removes the msgid from\r
- * the wakeup list\r
- */\r
-FTAID * ftacallback_next_wakeup()\r
-{\r
-    for(;(nwakeup<lstreamid)\r
-           &&(fstreamid[nwakeup].streamid != swakeup);\r
-        nwakeup++);\r
-    if (nwakeup<lstreamid) {\r
-        gs_int32_t x;\r
-        for(x=0;x<lwakeup;x++)\r
-            if ((fwakeup[x].r==fstreamid[nwakeup].r) &&\r
-                (fwakeup[x].used==1)) {\r
-                fwakeup[x].used=0;\r
-                nwakeup++;\r
-                return & fwakeup[x].ftaid;\r
-            }\r
-    }\r
-    nwakeup ++;\r
-    return (FTAID *) 0;\r
-}\r
-\r
-\r
-\r
-static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,\r
-                                struct FTA * fta) {\r
-    struct fta_list * new;\r
-    struct fta_list * tmp;\r
-    \r
-    if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {\r
-        gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");\r
-        return -1;\r
-    }\r
-    \r
-    new->fta=fta;\r
-    \r
-    \r
-    if (after==0) {\r
-        new->next=*root;\r
-        new->prev=0;\r
-        *root=new;\r
-        if (new->next) {\r
-            new->next->prev=new;\r
-        }\r
-    } else {\r
-        tmp=*root;\r
-        while((tmp)&&(tmp->fta!=after)) {\r
-            tmp=tmp->next;\r
-        }\r
-        if (tmp==0) {\r
-            gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");\r
-            return -1;\r
-        }\r
-        new->next=tmp->next;\r
-        new->prev=tmp;\r
-        tmp->next=new;\r
-        if (new->next) {\r
-            new->next->prev=new;\r
-        }\r
-    }\r
-    return 0;\r
-}\r
-\r
-static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {\r
-    struct fta_list * tmp;\r
-    tmp=*root;\r
-    while((tmp)&&(tmp->fta!=fta)) {\r
-        tmp=tmp->next;\r
-    }\r
-    if (tmp==0) {\r
-        gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");\r
-        return -1;\r
-    }\r
-    if (tmp == (*root)) {\r
-        *root=tmp->next;\r
-        if (tmp->next) {\r
-            tmp->next->prev=0;\r
-        }\r
-    } else {\r
-        tmp->prev->next=tmp->next;\r
-        if (tmp->next) {\r
-            tmp->next->prev=tmp->prev;\r
-        }\r
-    }\r
-    \r
-    free(tmp);\r
-    return 0;\r
-}\r
-\r
-\r
-static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {\r
-    struct fta_list * tmp;\r
-    tmp=*root;\r
-    while((tmp)&&(tmp->fta!=fta)) {\r
-        tmp=tmp->next;\r
-    }\r
-    if (tmp==0) {\r
-        return -1;\r
-    }\r
-    return 0;\r
-}\r
-\r
-gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)\r
-{\r
-    \r
-    if ((after!=0) && (fta_list_check(&process,after)<0)) {\r
-        gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");\r
-        return -1;\r
-    }\r
-    \r
-    if (fta_list_check(&created,new)<0) {\r
-        gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");\r
-        return -1;\r
-    }\r
-    \r
-    if (fta_list_check(&process,new)==0) {\r
-               new->runrefcnt++;\r
-        gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);\r
-        return 0;\r
-    }\r
-    \r
-    if (fta_list_add(&process,after,new)<0) {\r
-        gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");\r
-        return -1;\r
-    }\r
-    \r
-    new->runrefcnt++;\r
-    \r
-    return 0;\r
-    \r
-}\r
-\r
-gs_retval_t ftaexec_remove(struct FTA * id){\r
-    \r
-    if (fta_list_check(&process,id)<0) {\r
-        gslog(LOG_EMERG,"fta_remove:: id not in process list\n");\r
-        return -1;\r
-    }\r
-    id->runrefcnt--;\r
-    if (id->runrefcnt<=0) {\r
-        if (fta_list_rm(&process,id)<0) {\r
-            gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");\r
-            return -1;\r
-        }\r
-    }\r
-    return 0;\r
-}\r
-\r
-\r
-struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,\r
-                                    gs_uint32_t reusable,\r
-                                    gs_int32_t command, gs_int32_t sz, void *  data){\r
-    struct FTA * f;\r
-    FTAID ftaid;\r
-    ftaid=gscpipc_getftaid();\r
-    ftaid.index=index;\r
-    ftaid.streamid=0;\r
-    if (fta_list_check(&created,reuse)==0) {\r
-        reuse->refcnt++;\r
-        return reuse;\r
-    }\r
-    if (ftacallback_get_alloc(index)==0) return 0;\r
-    f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);\r
-    f->prefilter=ftacallback_get_prefilter(index);\r
-    gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);\r
-    if (fta_list_add(&created,0,f)<0) {\r
-        gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");\r
-        return 0;\r
-    }\r
-    f->refcnt=1;\r
-    return f;\r
-}\r
-\r
-\r
-\r
-gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){\r
-    id->refcnt --;\r
-    if (id->refcnt==0) {\r
-        if (fta_list_rm(&created,id)<0) {\r
-            gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");\r
-            return -1;\r
-        }\r
-        /* just to make sure remove it form process list too */\r
-        if (fta_list_check(&process,id)>=0) {\r
-            fta_list_rm(&process,id);\r
-        }\r
-        \r
-        id->free_fta(id,recursive);\r
-    }\r
-    return 0;\r
-}\r
-\r
-gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){\r
-    if (fta_list_check(&created,id)<0) {\r
-        gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");\r
-        return -1;\r
-    }\r
-    return id->control_fta(id,command,sz,value);\r
-}\r
-\r
-gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){\r
-    struct FTA * f;\r
-    ftaexec_start();\r
-    while((f=ftaexec_next())!=0) {\r
-        f->control_fta(f,command,sz,value);\r
-    }\r
-    return 1;\r
-}\r
-\r
-\r
-/* Start itteration through list of active FTA */\r
-\r
-gs_retval_t ftaexec_start()\r
-{\r
-    itteration=process;\r
-    return 0;\r
-}\r
-\r
-/* get one FTA at a time */\r
-\r
-struct FTA * ftaexec_next()\r
-{\r
-    struct FTA * fta=0;\r
-    if (itteration) {\r
-        fta=itteration->fta;\r
-        itteration=itteration->next;\r
-    }\r
-    return fta;\r
-}\r
-\r
-\r
-/* adds a buffer to the end of the sidequeue*/\r
-gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)\r
-{\r
-    struct sq * s;\r
-    if ((s=malloc(sizeof(struct sq)))==0) {\r
-        gslog(LOG_EMERG,"Could not allocate memory for sidequeue");\r
-        return -1;\r
-    }\r
-    s->from=from;\r
-    memcpy(&s->buf[0],buf,MAXMSGSZ);\r
-    s->length=length;\r
-    s->next=0;\r
-    if (sqtail) {\r
-        sqtail->next=s;\r
-        sqtail=s;\r
-    } else {\r
-        sqtop = s;\r
-        sqtail = s;\r
-    }\r
-    return 0;\r
-}\r
-\r
-/* removes a buffer from the top of the sidequeue*/\r
-gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)\r
-{\r
-    struct sq * s;\r
-    \r
-    if (sqtop) {\r
-        *from=sqtop->from;\r
-        memcpy(buf,&sqtop->buf[0],MAXMSGSZ);\r
-        *length=sqtop->length;\r
-        s=sqtop;\r
-        sqtop=sqtop->next;\r
-        if (sqtop==0) sqtail=0;\r
-        free(s);\r
-        return 0;\r
-    }\r
-    return -1;\r
-}\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+
+#include "callbackregistries.h"
+#include "lapp.h"
+#include "gscpipc.h"
+#include "stdlib.h"
+#include "stdio.h"
+#include "string.h"
+#include <unistd.h>
+#include <signal.h>
+#include <sys/mman.h>
+#include "schemaparser.h"
+#include "errno.h"
+
+#include "gsconfig.h"
+#include "gstypes.h"
+
+struct ftacallbackalloc {
+    gs_int32_t used;
+    gs_sp_t name;
+    alloc_fta fta_alloc_functionptr;
+    gs_uint64_t prefilter;
+};
+
+static struct ftacallbackalloc * falloc =0;
+static gs_int32_t lalloc=0;
+
+struct ftacallbackstreamid {
+    gs_int32_t refcnt;
+    gs_uint32_t streamid;
+    gs_uint32_t state;
+    struct ringbuf * r;
+};
+
+static struct ftacallbackstreamid * fstreamid =0;
+static gs_int32_t lstreamid=0;
+static gs_uint32_t cstreamid;
+static gs_int32_t nstreamid;
+
+struct ftacallbackwakeup {
+    gs_int32_t used;
+    FTAID ftaid;
+    struct ringbuf * r;
+};
+
+static struct ftacallbackwakeup * fwakeup =0;
+static gs_int32_t lwakeup=0;
+static gs_uint32_t swakeup;
+static gs_int32_t nwakeup;
+
+/* XXX memory of fta list has no owner struct */
+
+struct fta_list {
+    struct fta_list * prev;
+    struct fta_list * next;
+    struct FTA * fta;
+};
+
+static struct fta_list * process=0;
+static struct fta_list * created=0;
+
+static struct fta_list * itteration=0;
+
+// Side queue datastructures
+
+struct sq {
+    FTAID from;
+    gs_int8_t buf[MAXMSGSZ];
+    gs_int32_t length;
+    struct sq * next;
+};
+
+static struct sq * sqtop=0;
+static struct sq * sqtail=0;
+
+
+/* HFTA internal print function*/
+
+gs_retval_t add_printfunction_to_stream( struct FTA  * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,
+                                        gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {
+       gs_uint32_t parserversion;
+       gs_int32_t schemaid;
+       gs_uint32_t x;
+       gs_int8_t temp[50000];
+       if (ftaid->printfunc.in_use==1) {
+        gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");
+           return -1;
+       }
+       ftaid->printfunc.path=strdup(path);
+       ftaid->printfunc.basename=strdup(basename);
+       ftaid->printfunc.nexttime=0;
+       ftaid->printfunc.split=(split%1000);
+       ftaid->printfunc.itt=(split/1000)%1000;
+       ftaid->printfunc.base=(split/1000000)%1000;
+       ftaid->printfunc.delta=delta;
+       ftaid->printfunc.in_use=1;
+       if (ftaid->printfunc.split > MAXPRINTFILES) {
+        gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");
+           return -1;
+       }
+       for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;
+    
+       if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {
+               gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");
+               return -1;
+       }
+       if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(
+                                                                            ftaid->printfunc.schemaid,temporal_field))<0) {
+               gslog(LOG_EMERG,"ERROR:could not get "
+              "offset for timefield %s in HFTA print function\n",
+              temporal_field);
+               return -1;
+       }
+    
+       if (ftaschema_get_field_type_by_name(
+                                         ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {
+           gslog(LOG_EMERG,"ERROR: illegal type for timefield "
+              "%s in HFTA print function UINT expected\n",
+              temporal_field);
+           return -1;
+       }
+       if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(
+                                                                         ftaid->printfunc.schemaid,split_field))<0) {
+               gslog(LOG_EMERG,"ERROR:could not get "
+              "offset for splitfield %s in HFTA print function\n",
+              split_field);
+               return -1;
+       }
+    
+       if (ftaschema_get_field_type_by_name(
+                                         ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {
+           gslog(LOG_EMERG,"ERROR: illegal type for splitfield"
+              "%s in HFTA print function UINT expected\n",
+              split_field);
+           return -1;
+       }
+       parserversion=get_schemaparser_version();
+       sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);
+       ftaid->printfunc.header=strdup(temp);
+       gslog(LOG_INFO,"Established print function for %s",basename);
+       return 0;
+}
+
+gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)
+{
+       gs_int32_t problem;
+       gs_uint32_t timeval;
+       gs_uint32_t splitval;
+       gs_uint32_t x;
+       gs_uint32_t nsz;
+       timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);
+       if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp
+       if (timeval>= self->printfunc.nexttime) {
+               gs_int8_t oldname[1024];
+               gs_int8_t newname[1024];
+               if (self->printfunc.split==0) {
+                       if (self->printfunc.fa[0] != 0) {
+                               sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
+                               sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
+                               fclose(self->printfunc.fa[0]);
+                               rename(oldname,newname);
+                       }
+                       if (self->printfunc.nexttime==0) {
+                               self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
+                       }
+                       sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);
+                       if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {
+                               gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
+                               return -1;
+                       }
+                       if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {
+                               gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
+                       }
+                       if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {
+                               gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);
+                       }
+                       gslog(LOG_INFO,"Opened file %s",oldname);
+               } else {
+                       for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {
+                               if (self->printfunc.fa[x] != 0) {
+                                       sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
+                                       sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
+                                       fclose(self->printfunc.fa[x]);
+                                       rename(oldname,newname);
+                               }
+                               if (self->printfunc.nexttime==0) {
+                                       self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
+                               }
+                               sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);
+                               if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {
+                                       gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
+                                       return -1;
+                               }
+                if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {
+                    gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
+                }
+                               if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1)  {
+                    gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);
+                }
+                gslog(LOG_INFO,"Opened file %s",oldname);
+                       }
+               }
+               self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;
+       }
+    // don't write temporal tuples to file but use them to advance file name.
+       if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;
+       if (self->printfunc.split!=0) {
+               splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);
+               if (self->printfunc.fa[splitval]==0) {
+                       gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);
+                       exit(0);
+               }
+       } else {
+               splitval=0;
+       }
+       nsz=htonl(sz);
+       if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {
+               gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",
+              self->printfunc.basename,errno);
+               exit(0);
+       }
+       if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {
+               gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",
+              self->printfunc.basename,errno);
+               exit(0);
+       }
+       return 0;
+}
+
+/* registers an alloc function of an FTA and returns a unique index */
+gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)
+{
+    gs_int32_t x;
+    gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);
+    if (lalloc == 0) {
+        if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+            return -1;
+        }
+        memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);
+        lalloc = STARTSZ;
+    }
+    for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);
+    if (x == lalloc) {
+        gs_int32_t y;
+        lalloc = 2*lalloc;
+        if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+            return -1;
+        }
+        for (y=x;y<lalloc;y++)
+            falloc[y].used=0;
+    }
+    falloc[x].name=strdup(name);
+    falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;
+    falloc[x].prefilter=prefilter;
+    falloc[x].used=1;
+    return x;
+}
+
+/* unregisters an alloc function of an FTA and makes the index available
+ * for reuse
+ */
+
+gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)
+{
+    falloc[index].used=0;
+    free(falloc[index].name);
+    return 0;
+}
+/* returns the prefilter for a given
+ * index
+ */
+
+gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)
+{
+    if ((index<lalloc) && (falloc[index].used!=0)) {
+        return falloc[index].prefilter;
+    }
+    return 0;
+}
+
+/* returns the function pointer of the callback function for a given
+ * index
+ */
+
+alloc_fta ftacallback_get_alloc(gs_int32_t index)
+{
+    if ((index<lalloc) && (falloc[index].used!=0)) {
+        return falloc[index].fta_alloc_functionptr;
+    }
+    return 0;
+}
+
+
+
+
+
+/* associate ringbuffer with streamid (using refcounting) */
+gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {
+    gs_int32_t x;
+    if (lstreamid == 0) {
+        if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+            return -1;
+        }
+        memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);
+        lstreamid = STARTSZ;
+    }
+    /* first try to increment refcnt */
+    for(x=0;(x<lstreamid)&&(
+                            (fstreamid[x].streamid!=streamid)
+                            ||(fstreamid[x].r!=r)
+                            ||(fstreamid[x].refcnt<=0)) ;x++);
+    if (x>=lstreamid) {
+        /* now try to find empty slot */
+        for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);
+        if (x >= lstreamid) {
+            gs_int32_t y;
+            lstreamid = 2*lstreamid;
+            if ((fstreamid =
+                 realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {
+                gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+                return -1;
+            }
+            for (y=x;y<lstreamid;y++) {
+                fstreamid[y].refcnt=0;
+                fstreamid[y].streamid=0;
+            }
+        }
+        fstreamid[x].state=HFTA_RINGBUF_ATOMIC;
+       fstreamid[x].streamid=streamid;
+       fstreamid[x].r=r;
+    }
+    fstreamid[x].refcnt+=1;
+    return 0;
+}
+
+
+/* unassosciate a ringbuffer from a streamid */
+
+gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)
+{
+    gs_int32_t x;
+    for(x=0;x<lstreamid;x++) {
+        if ((fstreamid[x].streamid == streamid)
+            && (fstreamid[x].r == r)
+            && (fstreamid[x].refcnt > 0))
+            fstreamid[x].refcnt--;
+    }
+    return 0;
+}
+
+/* set the state for a given streamid and destination process */
+gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)
+{
+    gs_int32_t x;
+    for(x=0;x<lstreamid;x++) {
+        if ((fstreamid[x].streamid == streamid)
+            && (fstreamid[x].r->destid.ip == process.ip )
+            && (fstreamid[x].r->destid.port == process.port )
+            && (fstreamid[x].refcnt > 0))
+            fstreamid[x].state=state;
+        return 0;
+    }
+    return -1;
+}
+
+/* starts an itteration through all ringbuffers for a particular streamid */
+
+gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)
+{
+    cstreamid=streamid;
+    nstreamid=0;
+    return 0;
+}
+
+/* returns all the ringbuffer associated with the streamid passed in
+ * ftacallback_start_streamid
+ */
+struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)
+{
+    for(;(nstreamid<lstreamid)
+           &&(fstreamid[nstreamid].streamid != cstreamid);
+        nstreamid++);
+    if (nstreamid<lstreamid) {
+        nstreamid++;
+        *state=fstreamid[nstreamid-1].state;
+        return fstreamid[nstreamid-1].r;
+    }
+    return 0;
+}
+
+
+/* associate msgid with ringbuf  */
+gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)
+{
+    gs_int32_t x;
+    if (lwakeup == 0) {
+        if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+            return -1;
+        }
+        memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);
+        lwakeup = STARTSZ;
+    }
+    /* first try to find one for the same process */
+    for(x=0;(x<lwakeup)&&(
+                          ((fwakeup[x].ftaid.ip!=ftaid.ip)
+                           || (fwakeup[x].ftaid.port!=ftaid.port))
+                          ||(fwakeup[x].used==0)) ;x++);
+    if (x==lwakeup) {
+        /* now try to find empty slot */
+        for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);
+        if (x == lwakeup) {
+            gs_int32_t y;
+            lwakeup = 2*lwakeup;
+            if ((fwakeup =
+                 realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {
+                gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+                return -1;
+            }
+            for (y=x;y<lwakeup;y++)
+                fwakeup[y].used=0;
+        }
+    }
+    fwakeup[x].used=1;
+    fwakeup[x].ftaid=ftaid;
+    fwakeup[x].r=r;
+    return x;
+}
+
+/* starts an itteration through all msgids associated with
+ a streamid. This also uses data kept in the fstreamid
+ registry
+ */
+gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)
+{
+    swakeup=streamid;
+    nwakeup=0;
+    return 0;
+}
+
+/* returns all the msgid blocked on the streamid passed in
+ * ftacallback_start_streamid and removes the msgid from
+ * the wakeup list
+ */
+FTAID * ftacallback_next_wakeup()
+{
+    for(;(nwakeup<lstreamid)
+           &&(fstreamid[nwakeup].streamid != swakeup);
+        nwakeup++);
+    if (nwakeup<lstreamid) {
+        gs_int32_t x;
+        for(x=0;x<lwakeup;x++)
+            if ((fwakeup[x].r==fstreamid[nwakeup].r) &&
+                (fwakeup[x].used==1)) {
+                fwakeup[x].used=0;
+                nwakeup++;
+                return & fwakeup[x].ftaid;
+            }
+    }
+    nwakeup ++;
+    return (FTAID *) 0;
+}
+
+
+
+static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,
+                                struct FTA * fta) {
+    struct fta_list * new;
+    struct fta_list * tmp;
+    
+    if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {
+        gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");
+        return -1;
+    }
+    
+    new->fta=fta;
+    
+    
+    if (after==0) {
+        new->next=*root;
+        new->prev=0;
+        *root=new;
+        if (new->next) {
+            new->next->prev=new;
+        }
+    } else {
+        tmp=*root;
+        while((tmp)&&(tmp->fta!=after)) {
+            tmp=tmp->next;
+        }
+        if (tmp==0) {
+            gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");
+            return -1;
+        }
+        new->next=tmp->next;
+        new->prev=tmp;
+        tmp->next=new;
+        if (new->next) {
+            new->next->prev=new;
+        }
+    }
+    return 0;
+}
+
+static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {
+    struct fta_list * tmp;
+    tmp=*root;
+    while((tmp)&&(tmp->fta!=fta)) {
+        tmp=tmp->next;
+    }
+    if (tmp==0) {
+        gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");
+        return -1;
+    }
+    if (tmp == (*root)) {
+        *root=tmp->next;
+        if (tmp->next) {
+            tmp->next->prev=0;
+        }
+    } else {
+        tmp->prev->next=tmp->next;
+        if (tmp->next) {
+            tmp->next->prev=tmp->prev;
+        }
+    }
+    
+    free(tmp);
+    return 0;
+}
+
+
+static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {
+    struct fta_list * tmp;
+    tmp=*root;
+    while((tmp)&&(tmp->fta!=fta)) {
+        tmp=tmp->next;
+    }
+    if (tmp==0) {
+        return -1;
+    }
+    return 0;
+}
+
+gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)
+{
+    
+    if ((after!=0) && (fta_list_check(&process,after)<0)) {
+        gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");
+        return -1;
+    }
+    
+    if (fta_list_check(&created,new)<0) {
+        gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");
+        return -1;
+    }
+    
+    if (fta_list_check(&process,new)==0) {
+               new->runrefcnt++;
+        gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);
+        return 0;
+    }
+    
+    if (fta_list_add(&process,after,new)<0) {
+        gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");
+        return -1;
+    }
+    
+    new->runrefcnt++;
+    
+    return 0;
+    
+}
+
+gs_retval_t ftaexec_remove(struct FTA * id){
+    
+    if (fta_list_check(&process,id)<0) {
+        gslog(LOG_EMERG,"fta_remove:: id not in process list\n");
+        return -1;
+    }
+    id->runrefcnt--;
+    if (id->runrefcnt<=0) {
+        if (fta_list_rm(&process,id)<0) {
+            gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");
+            return -1;
+        }
+    }
+    return 0;
+}
+
+
+struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,
+                                    gs_uint32_t reusable,
+                                    gs_int32_t command, gs_int32_t sz, void *  data){
+    struct FTA * f;
+    FTAID ftaid;
+    ftaid=gscpipc_getftaid();
+    ftaid.index=index;
+    ftaid.streamid=0;
+    if (fta_list_check(&created,reuse)==0) {
+        reuse->refcnt++;
+        return reuse;
+    }
+    if (ftacallback_get_alloc(index)==0) return 0;
+    f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);
+    f->prefilter=ftacallback_get_prefilter(index);
+    gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);
+    if (fta_list_add(&created,0,f)<0) {
+        gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");
+        return 0;
+    }
+    f->refcnt=1;
+    return f;
+}
+
+
+
+gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){
+    id->refcnt --;
+    if (id->refcnt==0) {
+        if (fta_list_rm(&created,id)<0) {
+            gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");
+            return -1;
+        }
+        /* just to make sure remove it form process list too */
+        if (fta_list_check(&process,id)>=0) {
+            fta_list_rm(&process,id);
+        }
+        
+        id->free_fta(id,recursive);
+    }
+    return 0;
+}
+
+gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){
+    if (fta_list_check(&created,id)<0) {
+        gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");
+        return -1;
+    }
+    return id->control_fta(id,command,sz,value);
+}
+
+gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){
+    struct FTA * f;
+    ftaexec_start();
+    while((f=ftaexec_next())!=0) {
+        f->control_fta(f,command,sz,value);
+    }
+    return 1;
+}
+
+
+/* Start itteration through list of active FTA */
+
+gs_retval_t ftaexec_start()
+{
+    itteration=process;
+    return 0;
+}
+
+/* get one FTA at a time */
+
+struct FTA * ftaexec_next()
+{
+    struct FTA * fta=0;
+    if (itteration) {
+        fta=itteration->fta;
+        itteration=itteration->next;
+    }
+    return fta;
+}
+
+
+/* adds a buffer to the end of the sidequeue*/
+gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)
+{
+    struct sq * s;
+    if ((s=malloc(sizeof(struct sq)))==0) {
+        gslog(LOG_EMERG,"Could not allocate memory for sidequeue");
+        return -1;
+    }
+    s->from=from;
+    memcpy(&s->buf[0],buf,MAXMSGSZ);
+    s->length=length;
+    s->next=0;
+    if (sqtail) {
+        sqtail->next=s;
+        sqtail=s;
+    } else {
+        sqtop = s;
+        sqtail = s;
+    }
+    return 0;
+}
+
+/* removes a buffer from the top of the sidequeue*/
+gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)
+{
+    struct sq * s;
+    
+    if (sqtop) {
+        *from=sqtop->from;
+        memcpy(buf,&sqtop->buf[0],MAXMSGSZ);
+        *length=sqtop->length;
+        s=sqtop;
+        sqtop=sqtop->next;
+        if (sqtop==0) sqtail=0;
+        free(s);
+        return 0;
+    }
+    return -1;
+}