Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphost / callbackregistries.c
index 8bd0957..228b781 100644 (file)
-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#include "callbackregistries.h"
-#include "lapp.h"
-#include "gscpipc.h"
-#include "stdlib.h"
-#include "stdio.h"
-#include "string.h"
-#include <unistd.h>
-#include <signal.h>
-#include <sys/mman.h>
-#include "schemaparser.h"
-#include "errno.h"
-
-#include "gsconfig.h"
-#include "gstypes.h"
-
-struct ftacallbackalloc {
-    gs_int32_t used;
-    gs_sp_t name;
-    alloc_fta fta_alloc_functionptr;
-    gs_uint64_t prefilter;
-};
-
-static struct ftacallbackalloc * falloc =0;
-static gs_int32_t lalloc=0;
-
-struct ftacallbackstreamid {
-    gs_int32_t refcnt;
-    gs_uint32_t streamid;
-    gs_uint32_t state;
-    struct ringbuf * r;
-};
-
-static struct ftacallbackstreamid * fstreamid =0;
-static gs_int32_t lstreamid=0;
-static gs_uint32_t cstreamid;
-static gs_int32_t nstreamid;
-
-struct ftacallbackwakeup {
-    gs_int32_t used;
-    FTAID ftaid;
-    struct ringbuf * r;
-};
-
-static struct ftacallbackwakeup * fwakeup =0;
-static gs_int32_t lwakeup=0;
-static gs_uint32_t swakeup;
-static gs_int32_t nwakeup;
-
-/* XXX memory of fta list has no owner struct */
-
-struct fta_list {
-    struct fta_list * prev;
-    struct fta_list * next;
-    struct FTA * fta;
-};
-
-static struct fta_list * process=0;
-static struct fta_list * created=0;
-
-static struct fta_list * itteration=0;
-
-// Side queue datastructures
-
-struct sq {
-    FTAID from;
-    gs_int8_t buf[MAXMSGSZ];
-    gs_int32_t length;
-    struct sq * next;
-};
-
-static struct sq * sqtop=0;
-static struct sq * sqtail=0;
-
-
-/* HFTA internal print function*/
-
-gs_retval_t add_printfunction_to_stream( struct FTA  * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,
-                                        gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {
-       gs_uint32_t parserversion;
-       gs_int32_t schemaid;
-       gs_uint32_t x;
-       gs_int8_t temp[50000];
-       if (ftaid->printfunc.in_use==1) {
-        gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");
-           return -1;
-       }
-       ftaid->printfunc.path=strdup(path);
-       ftaid->printfunc.basename=strdup(basename);
-       ftaid->printfunc.nexttime=0;
-       ftaid->printfunc.split=(split%1000);
-       ftaid->printfunc.itt=(split/1000)%1000;
-       ftaid->printfunc.base=(split/1000000)%1000;
-       ftaid->printfunc.delta=delta;
-       ftaid->printfunc.in_use=1;
-       if (ftaid->printfunc.split > MAXPRINTFILES) {
-        gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");
-           return -1;
-       }
-       for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;
-    
-       if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {
-               gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");
-               return -1;
-       }
-       if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(
-                                                                            ftaid->printfunc.schemaid,temporal_field))<0) {
-               gslog(LOG_EMERG,"ERROR:could not get "
-              "offset for timefield %s in HFTA print function\n",
-              temporal_field);
-               return -1;
-       }
-    
-       if (ftaschema_get_field_type_by_name(
-                                         ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {
-           gslog(LOG_EMERG,"ERROR: illegal type for timefield "
-              "%s in HFTA print function UINT expected\n",
-              temporal_field);
-           return -1;
-       }
-       if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(
-                                                                         ftaid->printfunc.schemaid,split_field))<0) {
-               gslog(LOG_EMERG,"ERROR:could not get "
-              "offset for splitfield %s in HFTA print function\n",
-              split_field);
-               return -1;
-       }
-    
-       if (ftaschema_get_field_type_by_name(
-                                         ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {
-           gslog(LOG_EMERG,"ERROR: illegal type for splitfield"
-              "%s in HFTA print function UINT expected\n",
-              split_field);
-           return -1;
-       }
-       parserversion=get_schemaparser_version();
-       sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);
-       ftaid->printfunc.header=strdup(temp);
-       gslog(LOG_INFO,"Established print function for %s",basename);
-       return 0;
-}
-
-gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)
-{
-       gs_int32_t problem;
-       gs_uint32_t timeval;
-       gs_uint32_t splitval;
-       gs_uint32_t x;
-       gs_uint32_t nsz;
-       timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);
-       if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp
-       if (timeval>= self->printfunc.nexttime) {
-               gs_int8_t oldname[1024];
-               gs_int8_t newname[1024];
-               if (self->printfunc.split==0) {
-                       if (self->printfunc.fa[0] != 0) {
-                               sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
-                               sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
-                               fclose(self->printfunc.fa[0]);
-                               rename(oldname,newname);
-                       }
-                       if (self->printfunc.nexttime==0) {
-                               self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
-                       }
-                       sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);
-                       if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {
-                               gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
-                               return -1;
-                       }
-                       if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {
-                               gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
-                       }
-                       if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {
-                               gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);
-                       }
-                       gslog(LOG_INFO,"Opened file %s",oldname);
-               } else {
-                       for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {
-                               if (self->printfunc.fa[x] != 0) {
-                                       sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
-                                       sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
-                                       fclose(self->printfunc.fa[x]);
-                                       rename(oldname,newname);
-                               }
-                               if (self->printfunc.nexttime==0) {
-                                       self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
-                               }
-                               sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);
-                               if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {
-                                       gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
-                                       return -1;
-                               }
-                if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {
-                    gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
-                }
-                               if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1)  {
-                    gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);
-                }
-                gslog(LOG_INFO,"Opened file %s",oldname);
-                       }
-               }
-               self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;
-       }
-    // don't write temporal tuples to file but use them to advance file name.
-       if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;
-       if (self->printfunc.split!=0) {
-               splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);
-               if (self->printfunc.fa[splitval]==0) {
-                       gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);
-                       exit(0);
-               }
-       } else {
-               splitval=0;
-       }
-       nsz=htonl(sz);
-       if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {
-               gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",
-              self->printfunc.basename,errno);
-               exit(0);
-       }
-       if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {
-               gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",
-              self->printfunc.basename,errno);
-               exit(0);
-       }
-       return 0;
-}
-
-/* registers an alloc function of an FTA and returns a unique index */
-gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)
-{
-    gs_int32_t x;
-    gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);
-    if (lalloc == 0) {
-        if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
-            return -1;
-        }
-        memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);
-        lalloc = STARTSZ;
-    }
-    for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);
-    if (x == lalloc) {
-        gs_int32_t y;
-        lalloc = 2*lalloc;
-        if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
-            return -1;
-        }
-        for (y=x;y<lalloc;y++)
-            falloc[y].used=0;
-    }
-    falloc[x].name=strdup(name);
-    falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;
-    falloc[x].prefilter=prefilter;
-    falloc[x].used=1;
-    return x;
-}
-
-/* unregisters an alloc function of an FTA and makes the index available
- * for reuse
- */
-
-gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)
-{
-    falloc[index].used=0;
-    free(falloc[index].name);
-    return 0;
-}
-/* returns the prefilter for a given
- * index
- */
-
-gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)
-{
-    if ((index<lalloc) && (falloc[index].used!=0)) {
-        return falloc[index].prefilter;
-    }
-    return 0;
-}
-
-/* returns the function pointer of the callback function for a given
- * index
- */
-
-alloc_fta ftacallback_get_alloc(gs_int32_t index)
-{
-    if ((index<lalloc) && (falloc[index].used!=0)) {
-        return falloc[index].fta_alloc_functionptr;
-    }
-    return 0;
-}
-
-
-
-
-
-/* associate ringbuffer with streamid (using refcounting) */
-gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {
-    gs_int32_t x;
-    if (lstreamid == 0) {
-        if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
-            return -1;
-        }
-        memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);
-        lstreamid = STARTSZ;
-    }
-    /* first try to increment refcnt */
-    for(x=0;(x<lstreamid)&&(
-                            (fstreamid[x].streamid!=streamid)
-                            ||(fstreamid[x].r!=r)
-                            ||(fstreamid[x].refcnt<=0)) ;x++);
-    if (x>=lstreamid) {
-        /* now try to find empty slot */
-        for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);
-        if (x >= lstreamid) {
-            gs_int32_t y;
-            lstreamid = 2*lstreamid;
-            if ((fstreamid =
-                 realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {
-                gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
-                return -1;
-            }
-            for (y=x;y<lstreamid;y++) {
-                fstreamid[y].refcnt=0;
-                fstreamid[y].streamid=0;
-            }
-        }
-        fstreamid[x].state=HFTA_RINGBUF_ATOMIC;
-       fstreamid[x].streamid=streamid;
-       fstreamid[x].r=r;
-    }
-    fstreamid[x].refcnt+=1;
-    return 0;
-}
-
-
-/* unassosciate a ringbuffer from a streamid */
-
-gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)
-{
-    gs_int32_t x;
-    for(x=0;x<lstreamid;x++) {
-        if ((fstreamid[x].streamid == streamid)
-            && (fstreamid[x].r == r)
-            && (fstreamid[x].refcnt > 0))
-            fstreamid[x].refcnt--;
-    }
-    return 0;
-}
-
-/* set the state for a given streamid and destination process */
-gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)
-{
-    gs_int32_t x;
-    for(x=0;x<lstreamid;x++) {
-        if ((fstreamid[x].streamid == streamid)
-            && (fstreamid[x].r->destid.ip == process.ip )
-            && (fstreamid[x].r->destid.port == process.port )
-            && (fstreamid[x].refcnt > 0))
-            fstreamid[x].state=state;
-        return 0;
-    }
-    return -1;
-}
-
-/* starts an itteration through all ringbuffers for a particular streamid */
-
-gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)
-{
-    cstreamid=streamid;
-    nstreamid=0;
-    return 0;
-}
-
-/* returns all the ringbuffer associated with the streamid passed in
- * ftacallback_start_streamid
- */
-struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)
-{
-    for(;(nstreamid<lstreamid)
-           &&(fstreamid[nstreamid].streamid != cstreamid);
-        nstreamid++);
-    if (nstreamid<lstreamid) {
-        nstreamid++;
-        *state=fstreamid[nstreamid-1].state;
-        return fstreamid[nstreamid-1].r;
-    }
-    return 0;
-}
-
-
-/* associate msgid with ringbuf  */
-gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)
-{
-    gs_int32_t x;
-    if (lwakeup == 0) {
-        if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
-            return -1;
-        }
-        memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);
-        lwakeup = STARTSZ;
-    }
-    /* first try to find one for the same process */
-    for(x=0;(x<lwakeup)&&(
-                          ((fwakeup[x].ftaid.ip!=ftaid.ip)
-                           || (fwakeup[x].ftaid.port!=ftaid.port))
-                          ||(fwakeup[x].used==0)) ;x++);
-    if (x==lwakeup) {
-        /* now try to find empty slot */
-        for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);
-        if (x == lwakeup) {
-            gs_int32_t y;
-            lwakeup = 2*lwakeup;
-            if ((fwakeup =
-                 realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {
-                gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
-                return -1;
-            }
-            for (y=x;y<lwakeup;y++)
-                fwakeup[y].used=0;
-        }
-    }
-    fwakeup[x].used=1;
-    fwakeup[x].ftaid=ftaid;
-    fwakeup[x].r=r;
-    return x;
-}
-
-/* starts an itteration through all msgids associated with
- a streamid. This also uses data kept in the fstreamid
- registry
- */
-gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)
-{
-    swakeup=streamid;
-    nwakeup=0;
-    return 0;
-}
-
-/* returns all the msgid blocked on the streamid passed in
- * ftacallback_start_streamid and removes the msgid from
- * the wakeup list
- */
-FTAID * ftacallback_next_wakeup()
-{
-    for(;(nwakeup<lstreamid)
-           &&(fstreamid[nwakeup].streamid != swakeup);
-        nwakeup++);
-    if (nwakeup<lstreamid) {
-        gs_int32_t x;
-        for(x=0;x<lwakeup;x++)
-            if ((fwakeup[x].r==fstreamid[nwakeup].r) &&
-                (fwakeup[x].used==1)) {
-                fwakeup[x].used=0;
-                nwakeup++;
-                return & fwakeup[x].ftaid;
-            }
-    }
-    nwakeup ++;
-    return (FTAID *) 0;
-}
-
-
-
-static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,
-                                struct FTA * fta) {
-    struct fta_list * new;
-    struct fta_list * tmp;
-    
-    if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {
-        gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");
-        return -1;
-    }
-    
-    new->fta=fta;
-    
-    
-    if (after==0) {
-        new->next=*root;
-        new->prev=0;
-        *root=new;
-        if (new->next) {
-            new->next->prev=new;
-        }
-    } else {
-        tmp=*root;
-        while((tmp)&&(tmp->fta!=after)) {
-            tmp=tmp->next;
-        }
-        if (tmp==0) {
-            gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");
-            return -1;
-        }
-        new->next=tmp->next;
-        new->prev=tmp;
-        tmp->next=new;
-        if (new->next) {
-            new->next->prev=new;
-        }
-    }
-    return 0;
-}
-
-static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {
-    struct fta_list * tmp;
-    tmp=*root;
-    while((tmp)&&(tmp->fta!=fta)) {
-        tmp=tmp->next;
-    }
-    if (tmp==0) {
-        gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");
-        return -1;
-    }
-    if (tmp == (*root)) {
-        *root=tmp->next;
-        if (tmp->next) {
-            tmp->next->prev=0;
-        }
-    } else {
-        tmp->prev->next=tmp->next;
-        if (tmp->next) {
-            tmp->next->prev=tmp->prev;
-        }
-    }
-    
-    free(tmp);
-    return 0;
-}
-
-
-static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {
-    struct fta_list * tmp;
-    tmp=*root;
-    while((tmp)&&(tmp->fta!=fta)) {
-        tmp=tmp->next;
-    }
-    if (tmp==0) {
-        return -1;
-    }
-    return 0;
-}
-
-gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)
-{
-    
-    if ((after!=0) && (fta_list_check(&process,after)<0)) {
-        gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");
-        return -1;
-    }
-    
-    if (fta_list_check(&created,new)<0) {
-        gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");
-        return -1;
-    }
-    
-    if (fta_list_check(&process,new)==0) {
-               new->runrefcnt++;
-        gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);
-        return 0;
-    }
-    
-    if (fta_list_add(&process,after,new)<0) {
-        gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");
-        return -1;
-    }
-    
-    new->runrefcnt++;
-    
-    return 0;
-    
-}
-
-gs_retval_t ftaexec_remove(struct FTA * id){
-    
-    if (fta_list_check(&process,id)<0) {
-        gslog(LOG_EMERG,"fta_remove:: id not in process list\n");
-        return -1;
-    }
-    id->runrefcnt--;
-    if (id->runrefcnt<=0) {
-        if (fta_list_rm(&process,id)<0) {
-            gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");
-            return -1;
-        }
-    }
-    return 0;
-}
-
-
-struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,
-                                    gs_uint32_t reusable,
-                                    gs_int32_t command, gs_int32_t sz, void *  data){
-    struct FTA * f;
-    FTAID ftaid;
-    ftaid=gscpipc_getftaid();
-    ftaid.index=index;
-    ftaid.streamid=0;
-    if (fta_list_check(&created,reuse)==0) {
-        reuse->refcnt++;
-        return reuse;
-    }
-    if (ftacallback_get_alloc(index)==0) return 0;
-    f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);
-    f->prefilter=ftacallback_get_prefilter(index);
-    gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);
-    if (fta_list_add(&created,0,f)<0) {
-        gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");
-        return 0;
-    }
-    f->refcnt=1;
-    return f;
-}
-
-
-
-gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){
-    id->refcnt --;
-    if (id->refcnt==0) {
-        if (fta_list_rm(&created,id)<0) {
-            gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");
-            return -1;
-        }
-        /* just to make sure remove it form process list too */
-        if (fta_list_check(&process,id)>=0) {
-            fta_list_rm(&process,id);
-        }
-        
-        id->free_fta(id,recursive);
-    }
-    return 0;
-}
-
-gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){
-    if (fta_list_check(&created,id)<0) {
-        gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");
-        return -1;
-    }
-    return id->control_fta(id,command,sz,value);
-}
-
-gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){
-    struct FTA * f;
-    ftaexec_start();
-    while((f=ftaexec_next())!=0) {
-        f->control_fta(f,command,sz,value);
-    }
-    return 1;
-}
-
-
-/* Start itteration through list of active FTA */
-
-gs_retval_t ftaexec_start()
-{
-    itteration=process;
-    return 0;
-}
-
-/* get one FTA at a time */
-
-struct FTA * ftaexec_next()
-{
-    struct FTA * fta=0;
-    if (itteration) {
-        fta=itteration->fta;
-        itteration=itteration->next;
-    }
-    return fta;
-}
-
-
-/* adds a buffer to the end of the sidequeue*/
-gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)
-{
-    struct sq * s;
-    if ((s=malloc(sizeof(struct sq)))==0) {
-        gslog(LOG_EMERG,"Could not allocate memory for sidequeue");
-        return -1;
-    }
-    s->from=from;
-    memcpy(&s->buf[0],buf,MAXMSGSZ);
-    s->length=length;
-    s->next=0;
-    if (sqtail) {
-        sqtail->next=s;
-        sqtail=s;
-    } else {
-        sqtop = s;
-        sqtail = s;
-    }
-    return 0;
-}
-
-/* removes a buffer from the top of the sidequeue*/
-gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)
-{
-    struct sq * s;
-    
-    if (sqtop) {
-        *from=sqtop->from;
-        memcpy(buf,&sqtop->buf[0],MAXMSGSZ);
-        *length=sqtop->length;
-        s=sqtop;
-        sqtop=sqtop->next;
-        if (sqtop==0) sqtail=0;
-        free(s);
-        return 0;
-    }
-    return -1;
-}
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#include "callbackregistries.h"\r
+#include "lapp.h"\r
+#include "gscpipc.h"\r
+#include "stdlib.h"\r
+#include "stdio.h"\r
+#include "string.h"\r
+#include <unistd.h>\r
+#include <signal.h>\r
+#include <sys/mman.h>\r
+#include "schemaparser.h"\r
+#include "errno.h"\r
+\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+\r
+struct ftacallbackalloc {\r
+    gs_int32_t used;\r
+    gs_sp_t name;\r
+    alloc_fta fta_alloc_functionptr;\r
+    gs_uint64_t prefilter;\r
+};\r
+\r
+static struct ftacallbackalloc * falloc =0;\r
+static gs_int32_t lalloc=0;\r
+\r
+struct ftacallbackstreamid {\r
+    gs_int32_t refcnt;\r
+    gs_uint32_t streamid;\r
+    gs_uint32_t state;\r
+    struct ringbuf * r;\r
+};\r
+\r
+static struct ftacallbackstreamid * fstreamid =0;\r
+static gs_int32_t lstreamid=0;\r
+static gs_uint32_t cstreamid;\r
+static gs_int32_t nstreamid;\r
+\r
+struct ftacallbackwakeup {\r
+    gs_int32_t used;\r
+    FTAID ftaid;\r
+    struct ringbuf * r;\r
+};\r
+\r
+static struct ftacallbackwakeup * fwakeup =0;\r
+static gs_int32_t lwakeup=0;\r
+static gs_uint32_t swakeup;\r
+static gs_int32_t nwakeup;\r
+\r
+/* XXX memory of fta list has no owner struct */\r
+\r
+struct fta_list {\r
+    struct fta_list * prev;\r
+    struct fta_list * next;\r
+    struct FTA * fta;\r
+};\r
+\r
+static struct fta_list * process=0;\r
+static struct fta_list * created=0;\r
+\r
+static struct fta_list * itteration=0;\r
+\r
+// Side queue datastructures\r
+\r
+struct sq {\r
+    FTAID from;\r
+    gs_int8_t buf[MAXMSGSZ];\r
+    gs_int32_t length;\r
+    struct sq * next;\r
+};\r
+\r
+static struct sq * sqtop=0;\r
+static struct sq * sqtail=0;\r
+\r
+\r
+/* HFTA internal print function*/\r
+\r
+gs_retval_t add_printfunction_to_stream( struct FTA  * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,\r
+                                        gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {\r
+       gs_uint32_t parserversion;\r
+       gs_int32_t schemaid;\r
+       gs_uint32_t x;\r
+       gs_int8_t temp[50000];\r
+       if (ftaid->printfunc.in_use==1) {\r
+        gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");\r
+           return -1;\r
+       }\r
+       ftaid->printfunc.path=strdup(path);\r
+       ftaid->printfunc.basename=strdup(basename);\r
+       ftaid->printfunc.nexttime=0;\r
+       ftaid->printfunc.split=(split%1000);\r
+       ftaid->printfunc.itt=(split/1000)%1000;\r
+       ftaid->printfunc.base=(split/1000000)%1000;\r
+       ftaid->printfunc.delta=delta;\r
+       ftaid->printfunc.in_use=1;\r
+       if (ftaid->printfunc.split > MAXPRINTFILES) {\r
+        gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");\r
+           return -1;\r
+       }\r
+       for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;\r
+    \r
+       if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {\r
+               gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");\r
+               return -1;\r
+       }\r
+       if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(\r
+                                                                            ftaid->printfunc.schemaid,temporal_field))<0) {\r
+               gslog(LOG_EMERG,"ERROR:could not get "\r
+              "offset for timefield %s in HFTA print function\n",\r
+              temporal_field);\r
+               return -1;\r
+       }\r
+    \r
+       if (ftaschema_get_field_type_by_name(\r
+                                         ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {\r
+           gslog(LOG_EMERG,"ERROR: illegal type for timefield "\r
+              "%s in HFTA print function UINT expected\n",\r
+              temporal_field);\r
+           return -1;\r
+       }\r
+       if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(\r
+                                                                         ftaid->printfunc.schemaid,split_field))<0) {\r
+               gslog(LOG_EMERG,"ERROR:could not get "\r
+              "offset for splitfield %s in HFTA print function\n",\r
+              split_field);\r
+               return -1;\r
+       }\r
+    \r
+       if (ftaschema_get_field_type_by_name(\r
+                                         ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {\r
+           gslog(LOG_EMERG,"ERROR: illegal type for splitfield"\r
+              "%s in HFTA print function UINT expected\n",\r
+              split_field);\r
+           return -1;\r
+       }\r
+       parserversion=get_schemaparser_version();\r
+       sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);\r
+       ftaid->printfunc.header=strdup(temp);\r
+       gslog(LOG_INFO,"Established print function for %s",basename);\r
+       return 0;\r
+}\r
+\r
+gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)\r
+{\r
+       gs_int32_t problem;\r
+       gs_uint32_t timeval;\r
+       gs_uint32_t splitval;\r
+       gs_uint32_t x;\r
+       gs_uint32_t nsz;\r
+       timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);\r
+       if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp\r
+       if (timeval>= self->printfunc.nexttime) {\r
+               gs_int8_t oldname[1024];\r
+               gs_int8_t newname[1024];\r
+               if (self->printfunc.split==0) {\r
+                       if (self->printfunc.fa[0] != 0) {\r
+                               sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);\r
+                               sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);\r
+                               fclose(self->printfunc.fa[0]);\r
+                               rename(oldname,newname);\r
+                       }\r
+                       if (self->printfunc.nexttime==0) {\r
+                               self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;\r
+                       }\r
+                       sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);\r
+                       if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {\r
+                               gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");\r
+                               return -1;\r
+                       }\r
+                       if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {\r
+                               gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");\r
+                       }\r
+                       if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {\r
+                               gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);\r
+                       }\r
+                       gslog(LOG_INFO,"Opened file %s",oldname);\r
+               } else {\r
+                       for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {\r
+                               if (self->printfunc.fa[x] != 0) {\r
+                                       sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);\r
+                                       sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);\r
+                                       fclose(self->printfunc.fa[x]);\r
+                                       rename(oldname,newname);\r
+                               }\r
+                               if (self->printfunc.nexttime==0) {\r
+                                       self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;\r
+                               }\r
+                               sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);\r
+                               if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {\r
+                                       gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");\r
+                                       return -1;\r
+                               }\r
+                if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {\r
+                    gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");\r
+                }\r
+                               if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1)  {\r
+                    gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);\r
+                }\r
+                gslog(LOG_INFO,"Opened file %s",oldname);\r
+                       }\r
+               }\r
+               self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;\r
+       }\r
+    // don't write temporal tuples to file but use them to advance file name.\r
+       if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;\r
+       if (self->printfunc.split!=0) {\r
+               splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);\r
+               if (self->printfunc.fa[splitval]==0) {\r
+                       gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);\r
+                       exit(0);\r
+               }\r
+       } else {\r
+               splitval=0;\r
+       }\r
+       nsz=htonl(sz);\r
+       if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {\r
+               gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",\r
+              self->printfunc.basename,errno);\r
+               exit(0);\r
+       }\r
+       if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {\r
+               gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",\r
+              self->printfunc.basename,errno);\r
+               exit(0);\r
+       }\r
+       return 0;\r
+}\r
+\r
+/* registers an alloc function of an FTA and returns a unique index */\r
+gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)\r
+{\r
+    gs_int32_t x;\r
+    gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);\r
+    if (lalloc == 0) {\r
+        if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {\r
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+            return -1;\r
+        }\r
+        memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);\r
+        lalloc = STARTSZ;\r
+    }\r
+    for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);\r
+    if (x == lalloc) {\r
+        gs_int32_t y;\r
+        lalloc = 2*lalloc;\r
+        if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {\r
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+            return -1;\r
+        }\r
+        for (y=x;y<lalloc;y++)\r
+            falloc[y].used=0;\r
+    }\r
+    falloc[x].name=strdup(name);\r
+    falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;\r
+    falloc[x].prefilter=prefilter;\r
+    falloc[x].used=1;\r
+    return x;\r
+}\r
+\r
+/* unregisters an alloc function of an FTA and makes the index available\r
+ * for reuse\r
+ */\r
+\r
+gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)\r
+{\r
+    falloc[index].used=0;\r
+    free(falloc[index].name);\r
+    return 0;\r
+}\r
+/* returns the prefilter for a given\r
+ * index\r
+ */\r
+\r
+gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)\r
+{\r
+    if ((index<lalloc) && (falloc[index].used!=0)) {\r
+        return falloc[index].prefilter;\r
+    }\r
+    return 0;\r
+}\r
+\r
+/* returns the function pointer of the callback function for a given\r
+ * index\r
+ */\r
+\r
+alloc_fta ftacallback_get_alloc(gs_int32_t index)\r
+{\r
+    if ((index<lalloc) && (falloc[index].used!=0)) {\r
+        return falloc[index].fta_alloc_functionptr;\r
+    }\r
+    return 0;\r
+}\r
+\r
+\r
+\r
+\r
+\r
+/* associate ringbuffer with streamid (using refcounting) */\r
+gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {\r
+    gs_int32_t x;\r
+    if (lstreamid == 0) {\r
+        if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {\r
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+            return -1;\r
+        }\r
+        memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);\r
+        lstreamid = STARTSZ;\r
+    }\r
+    /* first try to increment refcnt */\r
+    for(x=0;(x<lstreamid)&&(\r
+                            (fstreamid[x].streamid!=streamid)\r
+                            ||(fstreamid[x].r!=r)\r
+                            ||(fstreamid[x].refcnt<=0)) ;x++);\r
+    if (x>=lstreamid) {\r
+        /* now try to find empty slot */\r
+        for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);\r
+        if (x >= lstreamid) {\r
+            gs_int32_t y;\r
+            lstreamid = 2*lstreamid;\r
+            if ((fstreamid =\r
+                 realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {\r
+                gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+                return -1;\r
+            }\r
+            for (y=x;y<lstreamid;y++) {\r
+                fstreamid[y].refcnt=0;\r
+                fstreamid[y].streamid=0;\r
+            }\r
+        }\r
+        fstreamid[x].state=HFTA_RINGBUF_ATOMIC;\r
+       fstreamid[x].streamid=streamid;\r
+       fstreamid[x].r=r;\r
+    }\r
+    fstreamid[x].refcnt+=1;\r
+    return 0;\r
+}\r
+\r
+\r
+/* unassosciate a ringbuffer from a streamid */\r
+\r
+gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)\r
+{\r
+    gs_int32_t x;\r
+    for(x=0;x<lstreamid;x++) {\r
+        if ((fstreamid[x].streamid == streamid)\r
+            && (fstreamid[x].r == r)\r
+            && (fstreamid[x].refcnt > 0))\r
+            fstreamid[x].refcnt--;\r
+    }\r
+    return 0;\r
+}\r
+\r
+/* set the state for a given streamid and destination process */\r
+gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)\r
+{\r
+    gs_int32_t x;\r
+    for(x=0;x<lstreamid;x++) {\r
+        if ((fstreamid[x].streamid == streamid)\r
+            && (fstreamid[x].r->destid.ip == process.ip )\r
+            && (fstreamid[x].r->destid.port == process.port )\r
+            && (fstreamid[x].refcnt > 0))\r
+            fstreamid[x].state=state;\r
+        return 0;\r
+    }\r
+    return -1;\r
+}\r
+\r
+/* starts an itteration through all ringbuffers for a particular streamid */\r
+\r
+gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)\r
+{\r
+    cstreamid=streamid;\r
+    nstreamid=0;\r
+    return 0;\r
+}\r
+\r
+/* returns all the ringbuffer associated with the streamid passed in\r
+ * ftacallback_start_streamid\r
+ */\r
+struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)\r
+{\r
+    for(;(nstreamid<lstreamid)\r
+           &&(fstreamid[nstreamid].streamid != cstreamid);\r
+        nstreamid++);\r
+    if (nstreamid<lstreamid) {\r
+        nstreamid++;\r
+        *state=fstreamid[nstreamid-1].state;\r
+        return fstreamid[nstreamid-1].r;\r
+    }\r
+    return 0;\r
+}\r
+\r
+\r
+/* associate msgid with ringbuf  */\r
+gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)\r
+{\r
+    gs_int32_t x;\r
+    if (lwakeup == 0) {\r
+        if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {\r
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+            return -1;\r
+        }\r
+        memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);\r
+        lwakeup = STARTSZ;\r
+    }\r
+    /* first try to find one for the same process */\r
+    for(x=0;(x<lwakeup)&&(\r
+                          ((fwakeup[x].ftaid.ip!=ftaid.ip)\r
+                           || (fwakeup[x].ftaid.port!=ftaid.port))\r
+                          ||(fwakeup[x].used==0)) ;x++);\r
+    if (x==lwakeup) {\r
+        /* now try to find empty slot */\r
+        for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);\r
+        if (x == lwakeup) {\r
+            gs_int32_t y;\r
+            lwakeup = 2*lwakeup;\r
+            if ((fwakeup =\r
+                 realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {\r
+                gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+                return -1;\r
+            }\r
+            for (y=x;y<lwakeup;y++)\r
+                fwakeup[y].used=0;\r
+        }\r
+    }\r
+    fwakeup[x].used=1;\r
+    fwakeup[x].ftaid=ftaid;\r
+    fwakeup[x].r=r;\r
+    return x;\r
+}\r
+\r
+/* starts an itteration through all msgids associated with\r
+ a streamid. This also uses data kept in the fstreamid\r
+ registry\r
+ */\r
+gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)\r
+{\r
+    swakeup=streamid;\r
+    nwakeup=0;\r
+    return 0;\r
+}\r
+\r
+/* returns all the msgid blocked on the streamid passed in\r
+ * ftacallback_start_streamid and removes the msgid from\r
+ * the wakeup list\r
+ */\r
+FTAID * ftacallback_next_wakeup()\r
+{\r
+    for(;(nwakeup<lstreamid)\r
+           &&(fstreamid[nwakeup].streamid != swakeup);\r
+        nwakeup++);\r
+    if (nwakeup<lstreamid) {\r
+        gs_int32_t x;\r
+        for(x=0;x<lwakeup;x++)\r
+            if ((fwakeup[x].r==fstreamid[nwakeup].r) &&\r
+                (fwakeup[x].used==1)) {\r
+                fwakeup[x].used=0;\r
+                nwakeup++;\r
+                return & fwakeup[x].ftaid;\r
+            }\r
+    }\r
+    nwakeup ++;\r
+    return (FTAID *) 0;\r
+}\r
+\r
+\r
+\r
+static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,\r
+                                struct FTA * fta) {\r
+    struct fta_list * new;\r
+    struct fta_list * tmp;\r
+    \r
+    if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {\r
+        gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");\r
+        return -1;\r
+    }\r
+    \r
+    new->fta=fta;\r
+    \r
+    \r
+    if (after==0) {\r
+        new->next=*root;\r
+        new->prev=0;\r
+        *root=new;\r
+        if (new->next) {\r
+            new->next->prev=new;\r
+        }\r
+    } else {\r
+        tmp=*root;\r
+        while((tmp)&&(tmp->fta!=after)) {\r
+            tmp=tmp->next;\r
+        }\r
+        if (tmp==0) {\r
+            gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");\r
+            return -1;\r
+        }\r
+        new->next=tmp->next;\r
+        new->prev=tmp;\r
+        tmp->next=new;\r
+        if (new->next) {\r
+            new->next->prev=new;\r
+        }\r
+    }\r
+    return 0;\r
+}\r
+\r
+static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {\r
+    struct fta_list * tmp;\r
+    tmp=*root;\r
+    while((tmp)&&(tmp->fta!=fta)) {\r
+        tmp=tmp->next;\r
+    }\r
+    if (tmp==0) {\r
+        gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");\r
+        return -1;\r
+    }\r
+    if (tmp == (*root)) {\r
+        *root=tmp->next;\r
+        if (tmp->next) {\r
+            tmp->next->prev=0;\r
+        }\r
+    } else {\r
+        tmp->prev->next=tmp->next;\r
+        if (tmp->next) {\r
+            tmp->next->prev=tmp->prev;\r
+        }\r
+    }\r
+    \r
+    free(tmp);\r
+    return 0;\r
+}\r
+\r
+\r
+static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {\r
+    struct fta_list * tmp;\r
+    tmp=*root;\r
+    while((tmp)&&(tmp->fta!=fta)) {\r
+        tmp=tmp->next;\r
+    }\r
+    if (tmp==0) {\r
+        return -1;\r
+    }\r
+    return 0;\r
+}\r
+\r
+gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)\r
+{\r
+    \r
+    if ((after!=0) && (fta_list_check(&process,after)<0)) {\r
+        gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");\r
+        return -1;\r
+    }\r
+    \r
+    if (fta_list_check(&created,new)<0) {\r
+        gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");\r
+        return -1;\r
+    }\r
+    \r
+    if (fta_list_check(&process,new)==0) {\r
+               new->runrefcnt++;\r
+        gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);\r
+        return 0;\r
+    }\r
+    \r
+    if (fta_list_add(&process,after,new)<0) {\r
+        gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");\r
+        return -1;\r
+    }\r
+    \r
+    new->runrefcnt++;\r
+    \r
+    return 0;\r
+    \r
+}\r
+\r
+gs_retval_t ftaexec_remove(struct FTA * id){\r
+    \r
+    if (fta_list_check(&process,id)<0) {\r
+        gslog(LOG_EMERG,"fta_remove:: id not in process list\n");\r
+        return -1;\r
+    }\r
+    id->runrefcnt--;\r
+    if (id->runrefcnt<=0) {\r
+        if (fta_list_rm(&process,id)<0) {\r
+            gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");\r
+            return -1;\r
+        }\r
+    }\r
+    return 0;\r
+}\r
+\r
+\r
+struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,\r
+                                    gs_uint32_t reusable,\r
+                                    gs_int32_t command, gs_int32_t sz, void *  data){\r
+    struct FTA * f;\r
+    FTAID ftaid;\r
+    ftaid=gscpipc_getftaid();\r
+    ftaid.index=index;\r
+    ftaid.streamid=0;\r
+    if (fta_list_check(&created,reuse)==0) {\r
+        reuse->refcnt++;\r
+        return reuse;\r
+    }\r
+    if (ftacallback_get_alloc(index)==0) return 0;\r
+    f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);\r
+    f->prefilter=ftacallback_get_prefilter(index);\r
+    gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);\r
+    if (fta_list_add(&created,0,f)<0) {\r
+        gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");\r
+        return 0;\r
+    }\r
+    f->refcnt=1;\r
+    return f;\r
+}\r
+\r
+\r
+\r
+gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){\r
+    id->refcnt --;\r
+    if (id->refcnt==0) {\r
+        if (fta_list_rm(&created,id)<0) {\r
+            gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");\r
+            return -1;\r
+        }\r
+        /* just to make sure remove it form process list too */\r
+        if (fta_list_check(&process,id)>=0) {\r
+            fta_list_rm(&process,id);\r
+        }\r
+        \r
+        id->free_fta(id,recursive);\r
+    }\r
+    return 0;\r
+}\r
+\r
+gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){\r
+    if (fta_list_check(&created,id)<0) {\r
+        gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");\r
+        return -1;\r
+    }\r
+    return id->control_fta(id,command,sz,value);\r
+}\r
+\r
+gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){\r
+    struct FTA * f;\r
+    ftaexec_start();\r
+    while((f=ftaexec_next())!=0) {\r
+        f->control_fta(f,command,sz,value);\r
+    }\r
+    return 1;\r
+}\r
+\r
+\r
+/* Start itteration through list of active FTA */\r
+\r
+gs_retval_t ftaexec_start()\r
+{\r
+    itteration=process;\r
+    return 0;\r
+}\r
+\r
+/* get one FTA at a time */\r
+\r
+struct FTA * ftaexec_next()\r
+{\r
+    struct FTA * fta=0;\r
+    if (itteration) {\r
+        fta=itteration->fta;\r
+        itteration=itteration->next;\r
+    }\r
+    return fta;\r
+}\r
+\r
+\r
+/* adds a buffer to the end of the sidequeue*/\r
+gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)\r
+{\r
+    struct sq * s;\r
+    if ((s=malloc(sizeof(struct sq)))==0) {\r
+        gslog(LOG_EMERG,"Could not allocate memory for sidequeue");\r
+        return -1;\r
+    }\r
+    s->from=from;\r
+    memcpy(&s->buf[0],buf,MAXMSGSZ);\r
+    s->length=length;\r
+    s->next=0;\r
+    if (sqtail) {\r
+        sqtail->next=s;\r
+        sqtail=s;\r
+    } else {\r
+        sqtop = s;\r
+        sqtail = s;\r
+    }\r
+    return 0;\r
+}\r
+\r
+/* removes a buffer from the top of the sidequeue*/\r
+gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)\r
+{\r
+    struct sq * s;\r
+    \r
+    if (sqtop) {\r
+        *from=sqtop->from;\r
+        memcpy(buf,&sqtop->buf[0],MAXMSGSZ);\r
+        *length=sqtop->length;\r
+        s=sqtop;\r
+        sqtop=sqtop->next;\r
+        if (sqtop==0) sqtail=0;\r
+        free(s);\r
+        return 0;\r
+    }\r
+    return -1;\r
+}\r