-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
- \r
- http://www.apache.org/licenses/LICENSE-2.0\r
- \r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#include "callbackregistries.h"\r
-#include "lapp.h"\r
-#include "gscpipc.h"\r
-#include "stdlib.h"\r
-#include "stdio.h"\r
-#include "string.h"\r
-#include <unistd.h>\r
-#include <signal.h>\r
-#include <sys/mman.h>\r
-#include "schemaparser.h"\r
-#include "errno.h"\r
-\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-\r
-struct ftacallbackalloc {\r
- gs_int32_t used;\r
- gs_sp_t name;\r
- alloc_fta fta_alloc_functionptr;\r
- gs_uint64_t prefilter;\r
-};\r
-\r
-static struct ftacallbackalloc * falloc =0;\r
-static gs_int32_t lalloc=0;\r
-\r
-struct ftacallbackstreamid {\r
- gs_int32_t refcnt;\r
- gs_uint32_t streamid;\r
- gs_uint32_t state;\r
- struct ringbuf * r;\r
-};\r
-\r
-static struct ftacallbackstreamid * fstreamid =0;\r
-static gs_int32_t lstreamid=0;\r
-static gs_uint32_t cstreamid;\r
-static gs_int32_t nstreamid;\r
-\r
-struct ftacallbackwakeup {\r
- gs_int32_t used;\r
- FTAID ftaid;\r
- struct ringbuf * r;\r
-};\r
-\r
-static struct ftacallbackwakeup * fwakeup =0;\r
-static gs_int32_t lwakeup=0;\r
-static gs_uint32_t swakeup;\r
-static gs_int32_t nwakeup;\r
-\r
-/* XXX memory of fta list has no owner struct */\r
-\r
-struct fta_list {\r
- struct fta_list * prev;\r
- struct fta_list * next;\r
- struct FTA * fta;\r
-};\r
-\r
-static struct fta_list * process=0;\r
-static struct fta_list * created=0;\r
-\r
-static struct fta_list * itteration=0;\r
-\r
-// Side queue datastructures\r
-\r
-struct sq {\r
- FTAID from;\r
- gs_int8_t buf[MAXMSGSZ];\r
- gs_int32_t length;\r
- struct sq * next;\r
-};\r
-\r
-static struct sq * sqtop=0;\r
-static struct sq * sqtail=0;\r
-\r
-\r
-/* HFTA internal print function*/\r
-\r
-gs_retval_t add_printfunction_to_stream( struct FTA * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,\r
- gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {\r
- gs_uint32_t parserversion;\r
- gs_int32_t schemaid;\r
- gs_uint32_t x;\r
- gs_int8_t temp[50000];\r
- if (ftaid->printfunc.in_use==1) {\r
- gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");\r
- return -1;\r
- }\r
- ftaid->printfunc.path=strdup(path);\r
- ftaid->printfunc.basename=strdup(basename);\r
- ftaid->printfunc.nexttime=0;\r
- ftaid->printfunc.split=(split%1000);\r
- ftaid->printfunc.itt=(split/1000)%1000;\r
- ftaid->printfunc.base=(split/1000000)%1000;\r
- ftaid->printfunc.delta=delta;\r
- ftaid->printfunc.in_use=1;\r
- if (ftaid->printfunc.split > MAXPRINTFILES) {\r
- gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");\r
- return -1;\r
- }\r
- for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;\r
- \r
- if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {\r
- gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");\r
- return -1;\r
- }\r
- if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(\r
- ftaid->printfunc.schemaid,temporal_field))<0) {\r
- gslog(LOG_EMERG,"ERROR:could not get "\r
- "offset for timefield %s in HFTA print function\n",\r
- temporal_field);\r
- return -1;\r
- }\r
- \r
- if (ftaschema_get_field_type_by_name(\r
- ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {\r
- gslog(LOG_EMERG,"ERROR: illegal type for timefield "\r
- "%s in HFTA print function UINT expected\n",\r
- temporal_field);\r
- return -1;\r
- }\r
- if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(\r
- ftaid->printfunc.schemaid,split_field))<0) {\r
- gslog(LOG_EMERG,"ERROR:could not get "\r
- "offset for splitfield %s in HFTA print function\n",\r
- split_field);\r
- return -1;\r
- }\r
- \r
- if (ftaschema_get_field_type_by_name(\r
- ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {\r
- gslog(LOG_EMERG,"ERROR: illegal type for splitfield"\r
- "%s in HFTA print function UINT expected\n",\r
- split_field);\r
- return -1;\r
- }\r
- parserversion=get_schemaparser_version();\r
- sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);\r
- ftaid->printfunc.header=strdup(temp);\r
- gslog(LOG_INFO,"Established print function for %s",basename);\r
- return 0;\r
-}\r
-\r
-gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)\r
-{\r
- gs_int32_t problem;\r
- gs_uint32_t timeval;\r
- gs_uint32_t splitval;\r
- gs_uint32_t x;\r
- gs_uint32_t nsz;\r
- timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);\r
- if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp\r
- if (timeval>= self->printfunc.nexttime) {\r
- gs_int8_t oldname[1024];\r
- gs_int8_t newname[1024];\r
- if (self->printfunc.split==0) {\r
- if (self->printfunc.fa[0] != 0) {\r
- sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);\r
- sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);\r
- fclose(self->printfunc.fa[0]);\r
- rename(oldname,newname);\r
- }\r
- if (self->printfunc.nexttime==0) {\r
- self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;\r
- }\r
- sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);\r
- if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {\r
- gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");\r
- return -1;\r
- }\r
- if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {\r
- gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");\r
- }\r
- if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {\r
- gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);\r
- }\r
- gslog(LOG_INFO,"Opened file %s",oldname);\r
- } else {\r
- for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {\r
- if (self->printfunc.fa[x] != 0) {\r
- sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);\r
- sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);\r
- fclose(self->printfunc.fa[x]);\r
- rename(oldname,newname);\r
- }\r
- if (self->printfunc.nexttime==0) {\r
- self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;\r
- }\r
- sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);\r
- if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {\r
- gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");\r
- return -1;\r
- }\r
- if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {\r
- gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");\r
- }\r
- if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1) {\r
- gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);\r
- }\r
- gslog(LOG_INFO,"Opened file %s",oldname);\r
- }\r
- }\r
- self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;\r
- }\r
- // don't write temporal tuples to file but use them to advance file name.\r
- if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;\r
- if (self->printfunc.split!=0) {\r
- splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);\r
- if (self->printfunc.fa[splitval]==0) {\r
- gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);\r
- exit(0);\r
- }\r
- } else {\r
- splitval=0;\r
- }\r
- nsz=htonl(sz);\r
- if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {\r
- gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",\r
- self->printfunc.basename,errno);\r
- exit(0);\r
- }\r
- if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {\r
- gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",\r
- self->printfunc.basename,errno);\r
- exit(0);\r
- }\r
- return 0;\r
-}\r
-\r
-/* registers an alloc function of an FTA and returns a unique index */\r
-gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)\r
-{\r
- gs_int32_t x;\r
- gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);\r
- if (lalloc == 0) {\r
- if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {\r
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
- return -1;\r
- }\r
- memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);\r
- lalloc = STARTSZ;\r
- }\r
- for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);\r
- if (x == lalloc) {\r
- gs_int32_t y;\r
- lalloc = 2*lalloc;\r
- if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {\r
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
- return -1;\r
- }\r
- for (y=x;y<lalloc;y++)\r
- falloc[y].used=0;\r
- }\r
- falloc[x].name=strdup(name);\r
- falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;\r
- falloc[x].prefilter=prefilter;\r
- falloc[x].used=1;\r
- return x;\r
-}\r
-\r
-/* unregisters an alloc function of an FTA and makes the index available\r
- * for reuse\r
- */\r
-\r
-gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)\r
-{\r
- falloc[index].used=0;\r
- free(falloc[index].name);\r
- return 0;\r
-}\r
-/* returns the prefilter for a given\r
- * index\r
- */\r
-\r
-gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)\r
-{\r
- if ((index<lalloc) && (falloc[index].used!=0)) {\r
- return falloc[index].prefilter;\r
- }\r
- return 0;\r
-}\r
-\r
-/* returns the function pointer of the callback function for a given\r
- * index\r
- */\r
-\r
-alloc_fta ftacallback_get_alloc(gs_int32_t index)\r
-{\r
- if ((index<lalloc) && (falloc[index].used!=0)) {\r
- return falloc[index].fta_alloc_functionptr;\r
- }\r
- return 0;\r
-}\r
-\r
-\r
-\r
-\r
-\r
-/* associate ringbuffer with streamid (using refcounting) */\r
-gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {\r
- gs_int32_t x;\r
- if (lstreamid == 0) {\r
- if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {\r
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
- return -1;\r
- }\r
- memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);\r
- lstreamid = STARTSZ;\r
- }\r
- /* first try to increment refcnt */\r
- for(x=0;(x<lstreamid)&&(\r
- (fstreamid[x].streamid!=streamid)\r
- ||(fstreamid[x].r!=r)\r
- ||(fstreamid[x].refcnt<=0)) ;x++);\r
- if (x>=lstreamid) {\r
- /* now try to find empty slot */\r
- for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);\r
- if (x >= lstreamid) {\r
- gs_int32_t y;\r
- lstreamid = 2*lstreamid;\r
- if ((fstreamid =\r
- realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {\r
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
- return -1;\r
- }\r
- for (y=x;y<lstreamid;y++) {\r
- fstreamid[y].refcnt=0;\r
- fstreamid[y].streamid=0;\r
- }\r
- }\r
- fstreamid[x].state=HFTA_RINGBUF_ATOMIC;\r
- fstreamid[x].streamid=streamid;\r
- fstreamid[x].r=r;\r
- }\r
- fstreamid[x].refcnt+=1;\r
- return 0;\r
-}\r
-\r
-\r
-/* unassosciate a ringbuffer from a streamid */\r
-\r
-gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)\r
-{\r
- gs_int32_t x;\r
- for(x=0;x<lstreamid;x++) {\r
- if ((fstreamid[x].streamid == streamid)\r
- && (fstreamid[x].r == r)\r
- && (fstreamid[x].refcnt > 0))\r
- fstreamid[x].refcnt--;\r
- }\r
- return 0;\r
-}\r
-\r
-/* set the state for a given streamid and destination process */\r
-gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)\r
-{\r
- gs_int32_t x;\r
- for(x=0;x<lstreamid;x++) {\r
- if ((fstreamid[x].streamid == streamid)\r
- && (fstreamid[x].r->destid.ip == process.ip )\r
- && (fstreamid[x].r->destid.port == process.port )\r
- && (fstreamid[x].refcnt > 0))\r
- fstreamid[x].state=state;\r
- return 0;\r
- }\r
- return -1;\r
-}\r
-\r
-/* starts an itteration through all ringbuffers for a particular streamid */\r
-\r
-gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)\r
-{\r
- cstreamid=streamid;\r
- nstreamid=0;\r
- return 0;\r
-}\r
-\r
-/* returns all the ringbuffer associated with the streamid passed in\r
- * ftacallback_start_streamid\r
- */\r
-struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)\r
-{\r
- for(;(nstreamid<lstreamid)\r
- &&(fstreamid[nstreamid].streamid != cstreamid);\r
- nstreamid++);\r
- if (nstreamid<lstreamid) {\r
- nstreamid++;\r
- *state=fstreamid[nstreamid-1].state;\r
- return fstreamid[nstreamid-1].r;\r
- }\r
- return 0;\r
-}\r
-\r
-\r
-/* associate msgid with ringbuf */\r
-gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)\r
-{\r
- gs_int32_t x;\r
- if (lwakeup == 0) {\r
- if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {\r
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
- return -1;\r
- }\r
- memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);\r
- lwakeup = STARTSZ;\r
- }\r
- /* first try to find one for the same process */\r
- for(x=0;(x<lwakeup)&&(\r
- ((fwakeup[x].ftaid.ip!=ftaid.ip)\r
- || (fwakeup[x].ftaid.port!=ftaid.port))\r
- ||(fwakeup[x].used==0)) ;x++);\r
- if (x==lwakeup) {\r
- /* now try to find empty slot */\r
- for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);\r
- if (x == lwakeup) {\r
- gs_int32_t y;\r
- lwakeup = 2*lwakeup;\r
- if ((fwakeup =\r
- realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {\r
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
- return -1;\r
- }\r
- for (y=x;y<lwakeup;y++)\r
- fwakeup[y].used=0;\r
- }\r
- }\r
- fwakeup[x].used=1;\r
- fwakeup[x].ftaid=ftaid;\r
- fwakeup[x].r=r;\r
- return x;\r
-}\r
-\r
-/* starts an itteration through all msgids associated with\r
- a streamid. This also uses data kept in the fstreamid\r
- registry\r
- */\r
-gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)\r
-{\r
- swakeup=streamid;\r
- nwakeup=0;\r
- return 0;\r
-}\r
-\r
-/* returns all the msgid blocked on the streamid passed in\r
- * ftacallback_start_streamid and removes the msgid from\r
- * the wakeup list\r
- */\r
-FTAID * ftacallback_next_wakeup()\r
-{\r
- for(;(nwakeup<lstreamid)\r
- &&(fstreamid[nwakeup].streamid != swakeup);\r
- nwakeup++);\r
- if (nwakeup<lstreamid) {\r
- gs_int32_t x;\r
- for(x=0;x<lwakeup;x++)\r
- if ((fwakeup[x].r==fstreamid[nwakeup].r) &&\r
- (fwakeup[x].used==1)) {\r
- fwakeup[x].used=0;\r
- nwakeup++;\r
- return & fwakeup[x].ftaid;\r
- }\r
- }\r
- nwakeup ++;\r
- return (FTAID *) 0;\r
-}\r
-\r
-\r
-\r
-static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,\r
- struct FTA * fta) {\r
- struct fta_list * new;\r
- struct fta_list * tmp;\r
- \r
- if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {\r
- gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");\r
- return -1;\r
- }\r
- \r
- new->fta=fta;\r
- \r
- \r
- if (after==0) {\r
- new->next=*root;\r
- new->prev=0;\r
- *root=new;\r
- if (new->next) {\r
- new->next->prev=new;\r
- }\r
- } else {\r
- tmp=*root;\r
- while((tmp)&&(tmp->fta!=after)) {\r
- tmp=tmp->next;\r
- }\r
- if (tmp==0) {\r
- gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");\r
- return -1;\r
- }\r
- new->next=tmp->next;\r
- new->prev=tmp;\r
- tmp->next=new;\r
- if (new->next) {\r
- new->next->prev=new;\r
- }\r
- }\r
- return 0;\r
-}\r
-\r
-static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {\r
- struct fta_list * tmp;\r
- tmp=*root;\r
- while((tmp)&&(tmp->fta!=fta)) {\r
- tmp=tmp->next;\r
- }\r
- if (tmp==0) {\r
- gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");\r
- return -1;\r
- }\r
- if (tmp == (*root)) {\r
- *root=tmp->next;\r
- if (tmp->next) {\r
- tmp->next->prev=0;\r
- }\r
- } else {\r
- tmp->prev->next=tmp->next;\r
- if (tmp->next) {\r
- tmp->next->prev=tmp->prev;\r
- }\r
- }\r
- \r
- free(tmp);\r
- return 0;\r
-}\r
-\r
-\r
-static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {\r
- struct fta_list * tmp;\r
- tmp=*root;\r
- while((tmp)&&(tmp->fta!=fta)) {\r
- tmp=tmp->next;\r
- }\r
- if (tmp==0) {\r
- return -1;\r
- }\r
- return 0;\r
-}\r
-\r
-gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)\r
-{\r
- \r
- if ((after!=0) && (fta_list_check(&process,after)<0)) {\r
- gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");\r
- return -1;\r
- }\r
- \r
- if (fta_list_check(&created,new)<0) {\r
- gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");\r
- return -1;\r
- }\r
- \r
- if (fta_list_check(&process,new)==0) {\r
- new->runrefcnt++;\r
- gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);\r
- return 0;\r
- }\r
- \r
- if (fta_list_add(&process,after,new)<0) {\r
- gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");\r
- return -1;\r
- }\r
- \r
- new->runrefcnt++;\r
- \r
- return 0;\r
- \r
-}\r
-\r
-gs_retval_t ftaexec_remove(struct FTA * id){\r
- \r
- if (fta_list_check(&process,id)<0) {\r
- gslog(LOG_EMERG,"fta_remove:: id not in process list\n");\r
- return -1;\r
- }\r
- id->runrefcnt--;\r
- if (id->runrefcnt<=0) {\r
- if (fta_list_rm(&process,id)<0) {\r
- gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");\r
- return -1;\r
- }\r
- }\r
- return 0;\r
-}\r
-\r
-\r
-struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,\r
- gs_uint32_t reusable,\r
- gs_int32_t command, gs_int32_t sz, void * data){\r
- struct FTA * f;\r
- FTAID ftaid;\r
- ftaid=gscpipc_getftaid();\r
- ftaid.index=index;\r
- ftaid.streamid=0;\r
- if (fta_list_check(&created,reuse)==0) {\r
- reuse->refcnt++;\r
- return reuse;\r
- }\r
- if (ftacallback_get_alloc(index)==0) return 0;\r
- f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);\r
- f->prefilter=ftacallback_get_prefilter(index);\r
- gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);\r
- if (fta_list_add(&created,0,f)<0) {\r
- gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");\r
- return 0;\r
- }\r
- f->refcnt=1;\r
- return f;\r
-}\r
-\r
-\r
-\r
-gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){\r
- id->refcnt --;\r
- if (id->refcnt==0) {\r
- if (fta_list_rm(&created,id)<0) {\r
- gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");\r
- return -1;\r
- }\r
- /* just to make sure remove it form process list too */\r
- if (fta_list_check(&process,id)>=0) {\r
- fta_list_rm(&process,id);\r
- }\r
- \r
- id->free_fta(id,recursive);\r
- }\r
- return 0;\r
-}\r
-\r
-gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){\r
- if (fta_list_check(&created,id)<0) {\r
- gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");\r
- return -1;\r
- }\r
- return id->control_fta(id,command,sz,value);\r
-}\r
-\r
-gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){\r
- struct FTA * f;\r
- ftaexec_start();\r
- while((f=ftaexec_next())!=0) {\r
- f->control_fta(f,command,sz,value);\r
- }\r
- return 1;\r
-}\r
-\r
-\r
-/* Start itteration through list of active FTA */\r
-\r
-gs_retval_t ftaexec_start()\r
-{\r
- itteration=process;\r
- return 0;\r
-}\r
-\r
-/* get one FTA at a time */\r
-\r
-struct FTA * ftaexec_next()\r
-{\r
- struct FTA * fta=0;\r
- if (itteration) {\r
- fta=itteration->fta;\r
- itteration=itteration->next;\r
- }\r
- return fta;\r
-}\r
-\r
-\r
-/* adds a buffer to the end of the sidequeue*/\r
-gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)\r
-{\r
- struct sq * s;\r
- if ((s=malloc(sizeof(struct sq)))==0) {\r
- gslog(LOG_EMERG,"Could not allocate memory for sidequeue");\r
- return -1;\r
- }\r
- s->from=from;\r
- memcpy(&s->buf[0],buf,MAXMSGSZ);\r
- s->length=length;\r
- s->next=0;\r
- if (sqtail) {\r
- sqtail->next=s;\r
- sqtail=s;\r
- } else {\r
- sqtop = s;\r
- sqtail = s;\r
- }\r
- return 0;\r
-}\r
-\r
-/* removes a buffer from the top of the sidequeue*/\r
-gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)\r
-{\r
- struct sq * s;\r
- \r
- if (sqtop) {\r
- *from=sqtop->from;\r
- memcpy(buf,&sqtop->buf[0],MAXMSGSZ);\r
- *length=sqtop->length;\r
- s=sqtop;\r
- sqtop=sqtop->next;\r
- if (sqtop==0) sqtail=0;\r
- free(s);\r
- return 0;\r
- }\r
- return -1;\r
-}\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+
+#include "callbackregistries.h"
+#include "lapp.h"
+#include "gscpipc.h"
+#include "stdlib.h"
+#include "stdio.h"
+#include "string.h"
+#include <unistd.h>
+#include <signal.h>
+#include <sys/mman.h>
+#include "schemaparser.h"
+#include "errno.h"
+
+#include "gsconfig.h"
+#include "gstypes.h"
+
+struct ftacallbackalloc {
+ gs_int32_t used;
+ gs_sp_t name;
+ alloc_fta fta_alloc_functionptr;
+ gs_uint64_t prefilter;
+};
+
+static struct ftacallbackalloc * falloc =0;
+static gs_int32_t lalloc=0;
+
+struct ftacallbackstreamid {
+ gs_int32_t refcnt;
+ gs_uint32_t streamid;
+ gs_uint32_t state;
+ struct ringbuf * r;
+};
+
+static struct ftacallbackstreamid * fstreamid =0;
+static gs_int32_t lstreamid=0;
+static gs_uint32_t cstreamid;
+static gs_int32_t nstreamid;
+
+struct ftacallbackwakeup {
+ gs_int32_t used;
+ FTAID ftaid;
+ struct ringbuf * r;
+};
+
+static struct ftacallbackwakeup * fwakeup =0;
+static gs_int32_t lwakeup=0;
+static gs_uint32_t swakeup;
+static gs_int32_t nwakeup;
+
+/* XXX memory of fta list has no owner struct */
+
+struct fta_list {
+ struct fta_list * prev;
+ struct fta_list * next;
+ struct FTA * fta;
+};
+
+static struct fta_list * process=0;
+static struct fta_list * created=0;
+
+static struct fta_list * itteration=0;
+
+// Side queue datastructures
+
+struct sq {
+ FTAID from;
+ gs_int8_t buf[MAXMSGSZ];
+ gs_int32_t length;
+ struct sq * next;
+};
+
+static struct sq * sqtop=0;
+static struct sq * sqtail=0;
+
+
+/* HFTA internal print function*/
+
+gs_retval_t add_printfunction_to_stream( struct FTA * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,
+ gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {
+ gs_uint32_t parserversion;
+ gs_int32_t schemaid;
+ gs_uint32_t x;
+ gs_int8_t temp[50000];
+ if (ftaid->printfunc.in_use==1) {
+ gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");
+ return -1;
+ }
+ ftaid->printfunc.path=strdup(path);
+ ftaid->printfunc.basename=strdup(basename);
+ ftaid->printfunc.nexttime=0;
+ ftaid->printfunc.split=(split%1000);
+ ftaid->printfunc.itt=(split/1000)%1000;
+ ftaid->printfunc.base=(split/1000000)%1000;
+ ftaid->printfunc.delta=delta;
+ ftaid->printfunc.in_use=1;
+ if (ftaid->printfunc.split > MAXPRINTFILES) {
+ gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");
+ return -1;
+ }
+ for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;
+
+ if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {
+ gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");
+ return -1;
+ }
+ if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(
+ ftaid->printfunc.schemaid,temporal_field))<0) {
+ gslog(LOG_EMERG,"ERROR:could not get "
+ "offset for timefield %s in HFTA print function\n",
+ temporal_field);
+ return -1;
+ }
+
+ if (ftaschema_get_field_type_by_name(
+ ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {
+ gslog(LOG_EMERG,"ERROR: illegal type for timefield "
+ "%s in HFTA print function UINT expected\n",
+ temporal_field);
+ return -1;
+ }
+ if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(
+ ftaid->printfunc.schemaid,split_field))<0) {
+ gslog(LOG_EMERG,"ERROR:could not get "
+ "offset for splitfield %s in HFTA print function\n",
+ split_field);
+ return -1;
+ }
+
+ if (ftaschema_get_field_type_by_name(
+ ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {
+ gslog(LOG_EMERG,"ERROR: illegal type for splitfield"
+ "%s in HFTA print function UINT expected\n",
+ split_field);
+ return -1;
+ }
+ parserversion=get_schemaparser_version();
+ sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);
+ ftaid->printfunc.header=strdup(temp);
+ gslog(LOG_INFO,"Established print function for %s",basename);
+ return 0;
+}
+
+gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)
+{
+ gs_int32_t problem;
+ gs_uint32_t timeval;
+ gs_uint32_t splitval;
+ gs_uint32_t x;
+ gs_uint32_t nsz;
+ timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);
+ if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp
+ if (timeval>= self->printfunc.nexttime) {
+ gs_int8_t oldname[1024];
+ gs_int8_t newname[1024];
+ if (self->printfunc.split==0) {
+ if (self->printfunc.fa[0] != 0) {
+ sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
+ sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
+ fclose(self->printfunc.fa[0]);
+ rename(oldname,newname);
+ }
+ if (self->printfunc.nexttime==0) {
+ self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
+ }
+ sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);
+ if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {
+ gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
+ return -1;
+ }
+ if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {
+ gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
+ }
+ if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {
+ gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);
+ }
+ gslog(LOG_INFO,"Opened file %s",oldname);
+ } else {
+ for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {
+ if (self->printfunc.fa[x] != 0) {
+ sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
+ sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
+ fclose(self->printfunc.fa[x]);
+ rename(oldname,newname);
+ }
+ if (self->printfunc.nexttime==0) {
+ self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
+ }
+ sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);
+ if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {
+ gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
+ return -1;
+ }
+ if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {
+ gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
+ }
+ if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1) {
+ gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);
+ }
+ gslog(LOG_INFO,"Opened file %s",oldname);
+ }
+ }
+ self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;
+ }
+ // don't write temporal tuples to file but use them to advance file name.
+ if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;
+ if (self->printfunc.split!=0) {
+ splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);
+ if (self->printfunc.fa[splitval]==0) {
+ gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);
+ exit(0);
+ }
+ } else {
+ splitval=0;
+ }
+ nsz=htonl(sz);
+ if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {
+ gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",
+ self->printfunc.basename,errno);
+ exit(0);
+ }
+ if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {
+ gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",
+ self->printfunc.basename,errno);
+ exit(0);
+ }
+ return 0;
+}
+
+/* registers an alloc function of an FTA and returns a unique index */
+gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)
+{
+ gs_int32_t x;
+ gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);
+ if (lalloc == 0) {
+ if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+ return -1;
+ }
+ memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);
+ lalloc = STARTSZ;
+ }
+ for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);
+ if (x == lalloc) {
+ gs_int32_t y;
+ lalloc = 2*lalloc;
+ if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+ return -1;
+ }
+ for (y=x;y<lalloc;y++)
+ falloc[y].used=0;
+ }
+ falloc[x].name=strdup(name);
+ falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;
+ falloc[x].prefilter=prefilter;
+ falloc[x].used=1;
+ return x;
+}
+
+/* unregisters an alloc function of an FTA and makes the index available
+ * for reuse
+ */
+
+gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)
+{
+ falloc[index].used=0;
+ free(falloc[index].name);
+ return 0;
+}
+/* returns the prefilter for a given
+ * index
+ */
+
+gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)
+{
+ if ((index<lalloc) && (falloc[index].used!=0)) {
+ return falloc[index].prefilter;
+ }
+ return 0;
+}
+
+/* returns the function pointer of the callback function for a given
+ * index
+ */
+
+alloc_fta ftacallback_get_alloc(gs_int32_t index)
+{
+ if ((index<lalloc) && (falloc[index].used!=0)) {
+ return falloc[index].fta_alloc_functionptr;
+ }
+ return 0;
+}
+
+
+
+
+
+/* associate ringbuffer with streamid (using refcounting) */
+gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {
+ gs_int32_t x;
+ if (lstreamid == 0) {
+ if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+ return -1;
+ }
+ memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);
+ lstreamid = STARTSZ;
+ }
+ /* first try to increment refcnt */
+ for(x=0;(x<lstreamid)&&(
+ (fstreamid[x].streamid!=streamid)
+ ||(fstreamid[x].r!=r)
+ ||(fstreamid[x].refcnt<=0)) ;x++);
+ if (x>=lstreamid) {
+ /* now try to find empty slot */
+ for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);
+ if (x >= lstreamid) {
+ gs_int32_t y;
+ lstreamid = 2*lstreamid;
+ if ((fstreamid =
+ realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+ return -1;
+ }
+ for (y=x;y<lstreamid;y++) {
+ fstreamid[y].refcnt=0;
+ fstreamid[y].streamid=0;
+ }
+ }
+ fstreamid[x].state=HFTA_RINGBUF_ATOMIC;
+ fstreamid[x].streamid=streamid;
+ fstreamid[x].r=r;
+ }
+ fstreamid[x].refcnt+=1;
+ return 0;
+}
+
+
+/* unassosciate a ringbuffer from a streamid */
+
+gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)
+{
+ gs_int32_t x;
+ for(x=0;x<lstreamid;x++) {
+ if ((fstreamid[x].streamid == streamid)
+ && (fstreamid[x].r == r)
+ && (fstreamid[x].refcnt > 0))
+ fstreamid[x].refcnt--;
+ }
+ return 0;
+}
+
+/* set the state for a given streamid and destination process */
+gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)
+{
+ gs_int32_t x;
+ for(x=0;x<lstreamid;x++) {
+ if ((fstreamid[x].streamid == streamid)
+ && (fstreamid[x].r->destid.ip == process.ip )
+ && (fstreamid[x].r->destid.port == process.port )
+ && (fstreamid[x].refcnt > 0))
+ fstreamid[x].state=state;
+ return 0;
+ }
+ return -1;
+}
+
+/* starts an itteration through all ringbuffers for a particular streamid */
+
+gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)
+{
+ cstreamid=streamid;
+ nstreamid=0;
+ return 0;
+}
+
+/* returns all the ringbuffer associated with the streamid passed in
+ * ftacallback_start_streamid
+ */
+struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)
+{
+ for(;(nstreamid<lstreamid)
+ &&(fstreamid[nstreamid].streamid != cstreamid);
+ nstreamid++);
+ if (nstreamid<lstreamid) {
+ nstreamid++;
+ *state=fstreamid[nstreamid-1].state;
+ return fstreamid[nstreamid-1].r;
+ }
+ return 0;
+}
+
+
+/* associate msgid with ringbuf */
+gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)
+{
+ gs_int32_t x;
+ if (lwakeup == 0) {
+ if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+ return -1;
+ }
+ memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);
+ lwakeup = STARTSZ;
+ }
+ /* first try to find one for the same process */
+ for(x=0;(x<lwakeup)&&(
+ ((fwakeup[x].ftaid.ip!=ftaid.ip)
+ || (fwakeup[x].ftaid.port!=ftaid.port))
+ ||(fwakeup[x].used==0)) ;x++);
+ if (x==lwakeup) {
+ /* now try to find empty slot */
+ for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);
+ if (x == lwakeup) {
+ gs_int32_t y;
+ lwakeup = 2*lwakeup;
+ if ((fwakeup =
+ realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+ return -1;
+ }
+ for (y=x;y<lwakeup;y++)
+ fwakeup[y].used=0;
+ }
+ }
+ fwakeup[x].used=1;
+ fwakeup[x].ftaid=ftaid;
+ fwakeup[x].r=r;
+ return x;
+}
+
+/* starts an itteration through all msgids associated with
+ a streamid. This also uses data kept in the fstreamid
+ registry
+ */
+gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)
+{
+ swakeup=streamid;
+ nwakeup=0;
+ return 0;
+}
+
+/* returns all the msgid blocked on the streamid passed in
+ * ftacallback_start_streamid and removes the msgid from
+ * the wakeup list
+ */
+FTAID * ftacallback_next_wakeup()
+{
+ for(;(nwakeup<lstreamid)
+ &&(fstreamid[nwakeup].streamid != swakeup);
+ nwakeup++);
+ if (nwakeup<lstreamid) {
+ gs_int32_t x;
+ for(x=0;x<lwakeup;x++)
+ if ((fwakeup[x].r==fstreamid[nwakeup].r) &&
+ (fwakeup[x].used==1)) {
+ fwakeup[x].used=0;
+ nwakeup++;
+ return & fwakeup[x].ftaid;
+ }
+ }
+ nwakeup ++;
+ return (FTAID *) 0;
+}
+
+
+
+static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,
+ struct FTA * fta) {
+ struct fta_list * new;
+ struct fta_list * tmp;
+
+ if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {
+ gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");
+ return -1;
+ }
+
+ new->fta=fta;
+
+
+ if (after==0) {
+ new->next=*root;
+ new->prev=0;
+ *root=new;
+ if (new->next) {
+ new->next->prev=new;
+ }
+ } else {
+ tmp=*root;
+ while((tmp)&&(tmp->fta!=after)) {
+ tmp=tmp->next;
+ }
+ if (tmp==0) {
+ gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");
+ return -1;
+ }
+ new->next=tmp->next;
+ new->prev=tmp;
+ tmp->next=new;
+ if (new->next) {
+ new->next->prev=new;
+ }
+ }
+ return 0;
+}
+
+static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {
+ struct fta_list * tmp;
+ tmp=*root;
+ while((tmp)&&(tmp->fta!=fta)) {
+ tmp=tmp->next;
+ }
+ if (tmp==0) {
+ gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");
+ return -1;
+ }
+ if (tmp == (*root)) {
+ *root=tmp->next;
+ if (tmp->next) {
+ tmp->next->prev=0;
+ }
+ } else {
+ tmp->prev->next=tmp->next;
+ if (tmp->next) {
+ tmp->next->prev=tmp->prev;
+ }
+ }
+
+ free(tmp);
+ return 0;
+}
+
+
+static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {
+ struct fta_list * tmp;
+ tmp=*root;
+ while((tmp)&&(tmp->fta!=fta)) {
+ tmp=tmp->next;
+ }
+ if (tmp==0) {
+ return -1;
+ }
+ return 0;
+}
+
+gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)
+{
+
+ if ((after!=0) && (fta_list_check(&process,after)<0)) {
+ gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");
+ return -1;
+ }
+
+ if (fta_list_check(&created,new)<0) {
+ gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");
+ return -1;
+ }
+
+ if (fta_list_check(&process,new)==0) {
+ new->runrefcnt++;
+ gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);
+ return 0;
+ }
+
+ if (fta_list_add(&process,after,new)<0) {
+ gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");
+ return -1;
+ }
+
+ new->runrefcnt++;
+
+ return 0;
+
+}
+
+gs_retval_t ftaexec_remove(struct FTA * id){
+
+ if (fta_list_check(&process,id)<0) {
+ gslog(LOG_EMERG,"fta_remove:: id not in process list\n");
+ return -1;
+ }
+ id->runrefcnt--;
+ if (id->runrefcnt<=0) {
+ if (fta_list_rm(&process,id)<0) {
+ gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");
+ return -1;
+ }
+ }
+ return 0;
+}
+
+
+struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,
+ gs_uint32_t reusable,
+ gs_int32_t command, gs_int32_t sz, void * data){
+ struct FTA * f;
+ FTAID ftaid;
+ ftaid=gscpipc_getftaid();
+ ftaid.index=index;
+ ftaid.streamid=0;
+ if (fta_list_check(&created,reuse)==0) {
+ reuse->refcnt++;
+ return reuse;
+ }
+ if (ftacallback_get_alloc(index)==0) return 0;
+ f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);
+ f->prefilter=ftacallback_get_prefilter(index);
+ gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);
+ if (fta_list_add(&created,0,f)<0) {
+ gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");
+ return 0;
+ }
+ f->refcnt=1;
+ return f;
+}
+
+
+
+gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){
+ id->refcnt --;
+ if (id->refcnt==0) {
+ if (fta_list_rm(&created,id)<0) {
+ gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");
+ return -1;
+ }
+ /* just to make sure remove it form process list too */
+ if (fta_list_check(&process,id)>=0) {
+ fta_list_rm(&process,id);
+ }
+
+ id->free_fta(id,recursive);
+ }
+ return 0;
+}
+
+gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){
+ if (fta_list_check(&created,id)<0) {
+ gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");
+ return -1;
+ }
+ return id->control_fta(id,command,sz,value);
+}
+
+gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){
+ struct FTA * f;
+ ftaexec_start();
+ while((f=ftaexec_next())!=0) {
+ f->control_fta(f,command,sz,value);
+ }
+ return 1;
+}
+
+
+/* Start itteration through list of active FTA */
+
+gs_retval_t ftaexec_start()
+{
+ itteration=process;
+ return 0;
+}
+
+/* get one FTA at a time */
+
+struct FTA * ftaexec_next()
+{
+ struct FTA * fta=0;
+ if (itteration) {
+ fta=itteration->fta;
+ itteration=itteration->next;
+ }
+ return fta;
+}
+
+
+/* adds a buffer to the end of the sidequeue*/
+gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)
+{
+ struct sq * s;
+ if ((s=malloc(sizeof(struct sq)))==0) {
+ gslog(LOG_EMERG,"Could not allocate memory for sidequeue");
+ return -1;
+ }
+ s->from=from;
+ memcpy(&s->buf[0],buf,MAXMSGSZ);
+ s->length=length;
+ s->next=0;
+ if (sqtail) {
+ sqtail->next=s;
+ sqtail=s;
+ } else {
+ sqtop = s;
+ sqtail = s;
+ }
+ return 0;
+}
+
+/* removes a buffer from the top of the sidequeue*/
+gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)
+{
+ struct sq * s;
+
+ if (sqtop) {
+ *from=sqtop->from;
+ memcpy(buf,&sqtop->buf[0],MAXMSGSZ);
+ *length=sqtop->length;
+ s=sqtop;
+ sqtop=sqtop->next;
+ if (sqtop==0) sqtail=0;
+ free(s);
+ return 0;
+ }
+ return -1;
+}