-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#include "callbackregistries.h"
-#include "lapp.h"
-#include "gscpipc.h"
-#include "stdlib.h"
-#include "stdio.h"
-#include "string.h"
-#include <unistd.h>
-#include <signal.h>
-#include <sys/mman.h>
-#include "schemaparser.h"
-#include "errno.h"
-
-#include "gsconfig.h"
-#include "gstypes.h"
-
-struct ftacallbackalloc {
- gs_int32_t used;
- gs_sp_t name;
- alloc_fta fta_alloc_functionptr;
- gs_uint64_t prefilter;
-};
-
-static struct ftacallbackalloc * falloc =0;
-static gs_int32_t lalloc=0;
-
-struct ftacallbackstreamid {
- gs_int32_t refcnt;
- gs_uint32_t streamid;
- gs_uint32_t state;
- struct ringbuf * r;
-};
-
-static struct ftacallbackstreamid * fstreamid =0;
-static gs_int32_t lstreamid=0;
-static gs_uint32_t cstreamid;
-static gs_int32_t nstreamid;
-
-struct ftacallbackwakeup {
- gs_int32_t used;
- FTAID ftaid;
- struct ringbuf * r;
-};
-
-static struct ftacallbackwakeup * fwakeup =0;
-static gs_int32_t lwakeup=0;
-static gs_uint32_t swakeup;
-static gs_int32_t nwakeup;
-
-/* XXX memory of fta list has no owner struct */
-
-struct fta_list {
- struct fta_list * prev;
- struct fta_list * next;
- struct FTA * fta;
-};
-
-static struct fta_list * process=0;
-static struct fta_list * created=0;
-
-static struct fta_list * itteration=0;
-
-// Side queue datastructures
-
-struct sq {
- FTAID from;
- gs_int8_t buf[MAXMSGSZ];
- gs_int32_t length;
- struct sq * next;
-};
-
-static struct sq * sqtop=0;
-static struct sq * sqtail=0;
-
-
-/* HFTA internal print function*/
-
-gs_retval_t add_printfunction_to_stream( struct FTA * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,
- gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {
- gs_uint32_t parserversion;
- gs_int32_t schemaid;
- gs_uint32_t x;
- gs_int8_t temp[50000];
- if (ftaid->printfunc.in_use==1) {
- gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");
- return -1;
- }
- ftaid->printfunc.path=strdup(path);
- ftaid->printfunc.basename=strdup(basename);
- ftaid->printfunc.nexttime=0;
- ftaid->printfunc.split=(split%1000);
- ftaid->printfunc.itt=(split/1000)%1000;
- ftaid->printfunc.base=(split/1000000)%1000;
- ftaid->printfunc.delta=delta;
- ftaid->printfunc.in_use=1;
- if (ftaid->printfunc.split > MAXPRINTFILES) {
- gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");
- return -1;
- }
- for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;
-
- if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {
- gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");
- return -1;
- }
- if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(
- ftaid->printfunc.schemaid,temporal_field))<0) {
- gslog(LOG_EMERG,"ERROR:could not get "
- "offset for timefield %s in HFTA print function\n",
- temporal_field);
- return -1;
- }
-
- if (ftaschema_get_field_type_by_name(
- ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {
- gslog(LOG_EMERG,"ERROR: illegal type for timefield "
- "%s in HFTA print function UINT expected\n",
- temporal_field);
- return -1;
- }
- if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(
- ftaid->printfunc.schemaid,split_field))<0) {
- gslog(LOG_EMERG,"ERROR:could not get "
- "offset for splitfield %s in HFTA print function\n",
- split_field);
- return -1;
- }
-
- if (ftaschema_get_field_type_by_name(
- ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {
- gslog(LOG_EMERG,"ERROR: illegal type for splitfield"
- "%s in HFTA print function UINT expected\n",
- split_field);
- return -1;
- }
- parserversion=get_schemaparser_version();
- sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);
- ftaid->printfunc.header=strdup(temp);
- gslog(LOG_INFO,"Established print function for %s",basename);
- return 0;
-}
-
-gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)
-{
- gs_int32_t problem;
- gs_uint32_t timeval;
- gs_uint32_t splitval;
- gs_uint32_t x;
- gs_uint32_t nsz;
- timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);
- if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp
- if (timeval>= self->printfunc.nexttime) {
- gs_int8_t oldname[1024];
- gs_int8_t newname[1024];
- if (self->printfunc.split==0) {
- if (self->printfunc.fa[0] != 0) {
- sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
- sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
- fclose(self->printfunc.fa[0]);
- rename(oldname,newname);
- }
- if (self->printfunc.nexttime==0) {
- self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
- }
- sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);
- if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {
- gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
- return -1;
- }
- if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {
- gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
- }
- if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {
- gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);
- }
- gslog(LOG_INFO,"Opened file %s",oldname);
- } else {
- for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {
- if (self->printfunc.fa[x] != 0) {
- sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
- sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
- fclose(self->printfunc.fa[x]);
- rename(oldname,newname);
- }
- if (self->printfunc.nexttime==0) {
- self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
- }
- sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);
- if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {
- gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
- return -1;
- }
- if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {
- gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
- }
- if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1) {
- gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);
- }
- gslog(LOG_INFO,"Opened file %s",oldname);
- }
- }
- self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;
- }
- // don't write temporal tuples to file but use them to advance file name.
- if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;
- if (self->printfunc.split!=0) {
- splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);
- if (self->printfunc.fa[splitval]==0) {
- gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);
- exit(0);
- }
- } else {
- splitval=0;
- }
- nsz=htonl(sz);
- if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {
- gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",
- self->printfunc.basename,errno);
- exit(0);
- }
- if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {
- gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",
- self->printfunc.basename,errno);
- exit(0);
- }
- return 0;
-}
-
-/* registers an alloc function of an FTA and returns a unique index */
-gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)
-{
- gs_int32_t x;
- gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);
- if (lalloc == 0) {
- if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
- return -1;
- }
- memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);
- lalloc = STARTSZ;
- }
- for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);
- if (x == lalloc) {
- gs_int32_t y;
- lalloc = 2*lalloc;
- if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
- return -1;
- }
- for (y=x;y<lalloc;y++)
- falloc[y].used=0;
- }
- falloc[x].name=strdup(name);
- falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;
- falloc[x].prefilter=prefilter;
- falloc[x].used=1;
- return x;
-}
-
-/* unregisters an alloc function of an FTA and makes the index available
- * for reuse
- */
-
-gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)
-{
- falloc[index].used=0;
- free(falloc[index].name);
- return 0;
-}
-/* returns the prefilter for a given
- * index
- */
-
-gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)
-{
- if ((index<lalloc) && (falloc[index].used!=0)) {
- return falloc[index].prefilter;
- }
- return 0;
-}
-
-/* returns the function pointer of the callback function for a given
- * index
- */
-
-alloc_fta ftacallback_get_alloc(gs_int32_t index)
-{
- if ((index<lalloc) && (falloc[index].used!=0)) {
- return falloc[index].fta_alloc_functionptr;
- }
- return 0;
-}
-
-
-
-
-
-/* associate ringbuffer with streamid (using refcounting) */
-gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {
- gs_int32_t x;
- if (lstreamid == 0) {
- if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
- return -1;
- }
- memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);
- lstreamid = STARTSZ;
- }
- /* first try to increment refcnt */
- for(x=0;(x<lstreamid)&&(
- (fstreamid[x].streamid!=streamid)
- ||(fstreamid[x].r!=r)
- ||(fstreamid[x].refcnt<=0)) ;x++);
- if (x>=lstreamid) {
- /* now try to find empty slot */
- for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);
- if (x >= lstreamid) {
- gs_int32_t y;
- lstreamid = 2*lstreamid;
- if ((fstreamid =
- realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
- return -1;
- }
- for (y=x;y<lstreamid;y++) {
- fstreamid[y].refcnt=0;
- fstreamid[y].streamid=0;
- }
- }
- fstreamid[x].state=HFTA_RINGBUF_ATOMIC;
- fstreamid[x].streamid=streamid;
- fstreamid[x].r=r;
- }
- fstreamid[x].refcnt+=1;
- return 0;
-}
-
-
-/* unassosciate a ringbuffer from a streamid */
-
-gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)
-{
- gs_int32_t x;
- for(x=0;x<lstreamid;x++) {
- if ((fstreamid[x].streamid == streamid)
- && (fstreamid[x].r == r)
- && (fstreamid[x].refcnt > 0))
- fstreamid[x].refcnt--;
- }
- return 0;
-}
-
-/* set the state for a given streamid and destination process */
-gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)
-{
- gs_int32_t x;
- for(x=0;x<lstreamid;x++) {
- if ((fstreamid[x].streamid == streamid)
- && (fstreamid[x].r->destid.ip == process.ip )
- && (fstreamid[x].r->destid.port == process.port )
- && (fstreamid[x].refcnt > 0))
- fstreamid[x].state=state;
- return 0;
- }
- return -1;
-}
-
-/* starts an itteration through all ringbuffers for a particular streamid */
-
-gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)
-{
- cstreamid=streamid;
- nstreamid=0;
- return 0;
-}
-
-/* returns all the ringbuffer associated with the streamid passed in
- * ftacallback_start_streamid
- */
-struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)
-{
- for(;(nstreamid<lstreamid)
- &&(fstreamid[nstreamid].streamid != cstreamid);
- nstreamid++);
- if (nstreamid<lstreamid) {
- nstreamid++;
- *state=fstreamid[nstreamid-1].state;
- return fstreamid[nstreamid-1].r;
- }
- return 0;
-}
-
-
-/* associate msgid with ringbuf */
-gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)
-{
- gs_int32_t x;
- if (lwakeup == 0) {
- if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
- return -1;
- }
- memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);
- lwakeup = STARTSZ;
- }
- /* first try to find one for the same process */
- for(x=0;(x<lwakeup)&&(
- ((fwakeup[x].ftaid.ip!=ftaid.ip)
- || (fwakeup[x].ftaid.port!=ftaid.port))
- ||(fwakeup[x].used==0)) ;x++);
- if (x==lwakeup) {
- /* now try to find empty slot */
- for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);
- if (x == lwakeup) {
- gs_int32_t y;
- lwakeup = 2*lwakeup;
- if ((fwakeup =
- realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
- return -1;
- }
- for (y=x;y<lwakeup;y++)
- fwakeup[y].used=0;
- }
- }
- fwakeup[x].used=1;
- fwakeup[x].ftaid=ftaid;
- fwakeup[x].r=r;
- return x;
-}
-
-/* starts an itteration through all msgids associated with
- a streamid. This also uses data kept in the fstreamid
- registry
- */
-gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)
-{
- swakeup=streamid;
- nwakeup=0;
- return 0;
-}
-
-/* returns all the msgid blocked on the streamid passed in
- * ftacallback_start_streamid and removes the msgid from
- * the wakeup list
- */
-FTAID * ftacallback_next_wakeup()
-{
- for(;(nwakeup<lstreamid)
- &&(fstreamid[nwakeup].streamid != swakeup);
- nwakeup++);
- if (nwakeup<lstreamid) {
- gs_int32_t x;
- for(x=0;x<lwakeup;x++)
- if ((fwakeup[x].r==fstreamid[nwakeup].r) &&
- (fwakeup[x].used==1)) {
- fwakeup[x].used=0;
- nwakeup++;
- return & fwakeup[x].ftaid;
- }
- }
- nwakeup ++;
- return (FTAID *) 0;
-}
-
-
-
-static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,
- struct FTA * fta) {
- struct fta_list * new;
- struct fta_list * tmp;
-
- if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {
- gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");
- return -1;
- }
-
- new->fta=fta;
-
-
- if (after==0) {
- new->next=*root;
- new->prev=0;
- *root=new;
- if (new->next) {
- new->next->prev=new;
- }
- } else {
- tmp=*root;
- while((tmp)&&(tmp->fta!=after)) {
- tmp=tmp->next;
- }
- if (tmp==0) {
- gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");
- return -1;
- }
- new->next=tmp->next;
- new->prev=tmp;
- tmp->next=new;
- if (new->next) {
- new->next->prev=new;
- }
- }
- return 0;
-}
-
-static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {
- struct fta_list * tmp;
- tmp=*root;
- while((tmp)&&(tmp->fta!=fta)) {
- tmp=tmp->next;
- }
- if (tmp==0) {
- gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");
- return -1;
- }
- if (tmp == (*root)) {
- *root=tmp->next;
- if (tmp->next) {
- tmp->next->prev=0;
- }
- } else {
- tmp->prev->next=tmp->next;
- if (tmp->next) {
- tmp->next->prev=tmp->prev;
- }
- }
-
- free(tmp);
- return 0;
-}
-
-
-static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {
- struct fta_list * tmp;
- tmp=*root;
- while((tmp)&&(tmp->fta!=fta)) {
- tmp=tmp->next;
- }
- if (tmp==0) {
- return -1;
- }
- return 0;
-}
-
-gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)
-{
-
- if ((after!=0) && (fta_list_check(&process,after)<0)) {
- gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");
- return -1;
- }
-
- if (fta_list_check(&created,new)<0) {
- gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");
- return -1;
- }
-
- if (fta_list_check(&process,new)==0) {
- new->runrefcnt++;
- gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);
- return 0;
- }
-
- if (fta_list_add(&process,after,new)<0) {
- gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");
- return -1;
- }
-
- new->runrefcnt++;
-
- return 0;
-
-}
-
-gs_retval_t ftaexec_remove(struct FTA * id){
-
- if (fta_list_check(&process,id)<0) {
- gslog(LOG_EMERG,"fta_remove:: id not in process list\n");
- return -1;
- }
- id->runrefcnt--;
- if (id->runrefcnt<=0) {
- if (fta_list_rm(&process,id)<0) {
- gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");
- return -1;
- }
- }
- return 0;
-}
-
-
-struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,
- gs_uint32_t reusable,
- gs_int32_t command, gs_int32_t sz, void * data){
- struct FTA * f;
- FTAID ftaid;
- ftaid=gscpipc_getftaid();
- ftaid.index=index;
- ftaid.streamid=0;
- if (fta_list_check(&created,reuse)==0) {
- reuse->refcnt++;
- return reuse;
- }
- if (ftacallback_get_alloc(index)==0) return 0;
- f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);
- f->prefilter=ftacallback_get_prefilter(index);
- gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);
- if (fta_list_add(&created,0,f)<0) {
- gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");
- return 0;
- }
- f->refcnt=1;
- return f;
-}
-
-
-
-gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){
- id->refcnt --;
- if (id->refcnt==0) {
- if (fta_list_rm(&created,id)<0) {
- gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");
- return -1;
- }
- /* just to make sure remove it form process list too */
- if (fta_list_check(&process,id)>=0) {
- fta_list_rm(&process,id);
- }
-
- id->free_fta(id,recursive);
- }
- return 0;
-}
-
-gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){
- if (fta_list_check(&created,id)<0) {
- gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");
- return -1;
- }
- return id->control_fta(id,command,sz,value);
-}
-
-gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){
- struct FTA * f;
- ftaexec_start();
- while((f=ftaexec_next())!=0) {
- f->control_fta(f,command,sz,value);
- }
- return 1;
-}
-
-
-/* Start itteration through list of active FTA */
-
-gs_retval_t ftaexec_start()
-{
- itteration=process;
- return 0;
-}
-
-/* get one FTA at a time */
-
-struct FTA * ftaexec_next()
-{
- struct FTA * fta=0;
- if (itteration) {
- fta=itteration->fta;
- itteration=itteration->next;
- }
- return fta;
-}
-
-
-/* adds a buffer to the end of the sidequeue*/
-gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)
-{
- struct sq * s;
- if ((s=malloc(sizeof(struct sq)))==0) {
- gslog(LOG_EMERG,"Could not allocate memory for sidequeue");
- return -1;
- }
- s->from=from;
- memcpy(&s->buf[0],buf,MAXMSGSZ);
- s->length=length;
- s->next=0;
- if (sqtail) {
- sqtail->next=s;
- sqtail=s;
- } else {
- sqtop = s;
- sqtail = s;
- }
- return 0;
-}
-
-/* removes a buffer from the top of the sidequeue*/
-gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)
-{
- struct sq * s;
-
- if (sqtop) {
- *from=sqtop->from;
- memcpy(buf,&sqtop->buf[0],MAXMSGSZ);
- *length=sqtop->length;
- s=sqtop;
- sqtop=sqtop->next;
- if (sqtop==0) sqtail=0;
- free(s);
- return 0;
- }
- return -1;
-}
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+ \r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+ \r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#include "callbackregistries.h"\r
+#include "lapp.h"\r
+#include "gscpipc.h"\r
+#include "stdlib.h"\r
+#include "stdio.h"\r
+#include "string.h"\r
+#include <unistd.h>\r
+#include <signal.h>\r
+#include <sys/mman.h>\r
+#include "schemaparser.h"\r
+#include "errno.h"\r
+\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+\r
+struct ftacallbackalloc {\r
+ gs_int32_t used;\r
+ gs_sp_t name;\r
+ alloc_fta fta_alloc_functionptr;\r
+ gs_uint64_t prefilter;\r
+};\r
+\r
+static struct ftacallbackalloc * falloc =0;\r
+static gs_int32_t lalloc=0;\r
+\r
+struct ftacallbackstreamid {\r
+ gs_int32_t refcnt;\r
+ gs_uint32_t streamid;\r
+ gs_uint32_t state;\r
+ struct ringbuf * r;\r
+};\r
+\r
+static struct ftacallbackstreamid * fstreamid =0;\r
+static gs_int32_t lstreamid=0;\r
+static gs_uint32_t cstreamid;\r
+static gs_int32_t nstreamid;\r
+\r
+struct ftacallbackwakeup {\r
+ gs_int32_t used;\r
+ FTAID ftaid;\r
+ struct ringbuf * r;\r
+};\r
+\r
+static struct ftacallbackwakeup * fwakeup =0;\r
+static gs_int32_t lwakeup=0;\r
+static gs_uint32_t swakeup;\r
+static gs_int32_t nwakeup;\r
+\r
+/* XXX memory of fta list has no owner struct */\r
+\r
+struct fta_list {\r
+ struct fta_list * prev;\r
+ struct fta_list * next;\r
+ struct FTA * fta;\r
+};\r
+\r
+static struct fta_list * process=0;\r
+static struct fta_list * created=0;\r
+\r
+static struct fta_list * itteration=0;\r
+\r
+// Side queue datastructures\r
+\r
+struct sq {\r
+ FTAID from;\r
+ gs_int8_t buf[MAXMSGSZ];\r
+ gs_int32_t length;\r
+ struct sq * next;\r
+};\r
+\r
+static struct sq * sqtop=0;\r
+static struct sq * sqtail=0;\r
+\r
+\r
+/* HFTA internal print function*/\r
+\r
+gs_retval_t add_printfunction_to_stream( struct FTA * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,\r
+ gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {\r
+ gs_uint32_t parserversion;\r
+ gs_int32_t schemaid;\r
+ gs_uint32_t x;\r
+ gs_int8_t temp[50000];\r
+ if (ftaid->printfunc.in_use==1) {\r
+ gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");\r
+ return -1;\r
+ }\r
+ ftaid->printfunc.path=strdup(path);\r
+ ftaid->printfunc.basename=strdup(basename);\r
+ ftaid->printfunc.nexttime=0;\r
+ ftaid->printfunc.split=(split%1000);\r
+ ftaid->printfunc.itt=(split/1000)%1000;\r
+ ftaid->printfunc.base=(split/1000000)%1000;\r
+ ftaid->printfunc.delta=delta;\r
+ ftaid->printfunc.in_use=1;\r
+ if (ftaid->printfunc.split > MAXPRINTFILES) {\r
+ gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");\r
+ return -1;\r
+ }\r
+ for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;\r
+ \r
+ if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {\r
+ gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");\r
+ return -1;\r
+ }\r
+ if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(\r
+ ftaid->printfunc.schemaid,temporal_field))<0) {\r
+ gslog(LOG_EMERG,"ERROR:could not get "\r
+ "offset for timefield %s in HFTA print function\n",\r
+ temporal_field);\r
+ return -1;\r
+ }\r
+ \r
+ if (ftaschema_get_field_type_by_name(\r
+ ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {\r
+ gslog(LOG_EMERG,"ERROR: illegal type for timefield "\r
+ "%s in HFTA print function UINT expected\n",\r
+ temporal_field);\r
+ return -1;\r
+ }\r
+ if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(\r
+ ftaid->printfunc.schemaid,split_field))<0) {\r
+ gslog(LOG_EMERG,"ERROR:could not get "\r
+ "offset for splitfield %s in HFTA print function\n",\r
+ split_field);\r
+ return -1;\r
+ }\r
+ \r
+ if (ftaschema_get_field_type_by_name(\r
+ ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {\r
+ gslog(LOG_EMERG,"ERROR: illegal type for splitfield"\r
+ "%s in HFTA print function UINT expected\n",\r
+ split_field);\r
+ return -1;\r
+ }\r
+ parserversion=get_schemaparser_version();\r
+ sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);\r
+ ftaid->printfunc.header=strdup(temp);\r
+ gslog(LOG_INFO,"Established print function for %s",basename);\r
+ return 0;\r
+}\r
+\r
+gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)\r
+{\r
+ gs_int32_t problem;\r
+ gs_uint32_t timeval;\r
+ gs_uint32_t splitval;\r
+ gs_uint32_t x;\r
+ gs_uint32_t nsz;\r
+ timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);\r
+ if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp\r
+ if (timeval>= self->printfunc.nexttime) {\r
+ gs_int8_t oldname[1024];\r
+ gs_int8_t newname[1024];\r
+ if (self->printfunc.split==0) {\r
+ if (self->printfunc.fa[0] != 0) {\r
+ sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);\r
+ sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);\r
+ fclose(self->printfunc.fa[0]);\r
+ rename(oldname,newname);\r
+ }\r
+ if (self->printfunc.nexttime==0) {\r
+ self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;\r
+ }\r
+ sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);\r
+ if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");\r
+ return -1;\r
+ }\r
+ if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {\r
+ gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");\r
+ }\r
+ if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {\r
+ gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);\r
+ }\r
+ gslog(LOG_INFO,"Opened file %s",oldname);\r
+ } else {\r
+ for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {\r
+ if (self->printfunc.fa[x] != 0) {\r
+ sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);\r
+ sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);\r
+ fclose(self->printfunc.fa[x]);\r
+ rename(oldname,newname);\r
+ }\r
+ if (self->printfunc.nexttime==0) {\r
+ self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;\r
+ }\r
+ sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);\r
+ if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");\r
+ return -1;\r
+ }\r
+ if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {\r
+ gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");\r
+ }\r
+ if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1) {\r
+ gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);\r
+ }\r
+ gslog(LOG_INFO,"Opened file %s",oldname);\r
+ }\r
+ }\r
+ self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;\r
+ }\r
+ // don't write temporal tuples to file but use them to advance file name.\r
+ if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;\r
+ if (self->printfunc.split!=0) {\r
+ splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);\r
+ if (self->printfunc.fa[splitval]==0) {\r
+ gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);\r
+ exit(0);\r
+ }\r
+ } else {\r
+ splitval=0;\r
+ }\r
+ nsz=htonl(sz);\r
+ if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {\r
+ gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",\r
+ self->printfunc.basename,errno);\r
+ exit(0);\r
+ }\r
+ if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {\r
+ gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",\r
+ self->printfunc.basename,errno);\r
+ exit(0);\r
+ }\r
+ return 0;\r
+}\r
+\r
+/* registers an alloc function of an FTA and returns a unique index */\r
+gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)\r
+{\r
+ gs_int32_t x;\r
+ gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);\r
+ if (lalloc == 0) {\r
+ if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+ return -1;\r
+ }\r
+ memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);\r
+ lalloc = STARTSZ;\r
+ }\r
+ for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);\r
+ if (x == lalloc) {\r
+ gs_int32_t y;\r
+ lalloc = 2*lalloc;\r
+ if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+ return -1;\r
+ }\r
+ for (y=x;y<lalloc;y++)\r
+ falloc[y].used=0;\r
+ }\r
+ falloc[x].name=strdup(name);\r
+ falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;\r
+ falloc[x].prefilter=prefilter;\r
+ falloc[x].used=1;\r
+ return x;\r
+}\r
+\r
+/* unregisters an alloc function of an FTA and makes the index available\r
+ * for reuse\r
+ */\r
+\r
+gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)\r
+{\r
+ falloc[index].used=0;\r
+ free(falloc[index].name);\r
+ return 0;\r
+}\r
+/* returns the prefilter for a given\r
+ * index\r
+ */\r
+\r
+gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)\r
+{\r
+ if ((index<lalloc) && (falloc[index].used!=0)) {\r
+ return falloc[index].prefilter;\r
+ }\r
+ return 0;\r
+}\r
+\r
+/* returns the function pointer of the callback function for a given\r
+ * index\r
+ */\r
+\r
+alloc_fta ftacallback_get_alloc(gs_int32_t index)\r
+{\r
+ if ((index<lalloc) && (falloc[index].used!=0)) {\r
+ return falloc[index].fta_alloc_functionptr;\r
+ }\r
+ return 0;\r
+}\r
+\r
+\r
+\r
+\r
+\r
+/* associate ringbuffer with streamid (using refcounting) */\r
+gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {\r
+ gs_int32_t x;\r
+ if (lstreamid == 0) {\r
+ if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+ return -1;\r
+ }\r
+ memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);\r
+ lstreamid = STARTSZ;\r
+ }\r
+ /* first try to increment refcnt */\r
+ for(x=0;(x<lstreamid)&&(\r
+ (fstreamid[x].streamid!=streamid)\r
+ ||(fstreamid[x].r!=r)\r
+ ||(fstreamid[x].refcnt<=0)) ;x++);\r
+ if (x>=lstreamid) {\r
+ /* now try to find empty slot */\r
+ for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);\r
+ if (x >= lstreamid) {\r
+ gs_int32_t y;\r
+ lstreamid = 2*lstreamid;\r
+ if ((fstreamid =\r
+ realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+ return -1;\r
+ }\r
+ for (y=x;y<lstreamid;y++) {\r
+ fstreamid[y].refcnt=0;\r
+ fstreamid[y].streamid=0;\r
+ }\r
+ }\r
+ fstreamid[x].state=HFTA_RINGBUF_ATOMIC;\r
+ fstreamid[x].streamid=streamid;\r
+ fstreamid[x].r=r;\r
+ }\r
+ fstreamid[x].refcnt+=1;\r
+ return 0;\r
+}\r
+\r
+\r
+/* unassosciate a ringbuffer from a streamid */\r
+\r
+gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)\r
+{\r
+ gs_int32_t x;\r
+ for(x=0;x<lstreamid;x++) {\r
+ if ((fstreamid[x].streamid == streamid)\r
+ && (fstreamid[x].r == r)\r
+ && (fstreamid[x].refcnt > 0))\r
+ fstreamid[x].refcnt--;\r
+ }\r
+ return 0;\r
+}\r
+\r
+/* set the state for a given streamid and destination process */\r
+gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)\r
+{\r
+ gs_int32_t x;\r
+ for(x=0;x<lstreamid;x++) {\r
+ if ((fstreamid[x].streamid == streamid)\r
+ && (fstreamid[x].r->destid.ip == process.ip )\r
+ && (fstreamid[x].r->destid.port == process.port )\r
+ && (fstreamid[x].refcnt > 0))\r
+ fstreamid[x].state=state;\r
+ return 0;\r
+ }\r
+ return -1;\r
+}\r
+\r
+/* starts an itteration through all ringbuffers for a particular streamid */\r
+\r
+gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)\r
+{\r
+ cstreamid=streamid;\r
+ nstreamid=0;\r
+ return 0;\r
+}\r
+\r
+/* returns all the ringbuffer associated with the streamid passed in\r
+ * ftacallback_start_streamid\r
+ */\r
+struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)\r
+{\r
+ for(;(nstreamid<lstreamid)\r
+ &&(fstreamid[nstreamid].streamid != cstreamid);\r
+ nstreamid++);\r
+ if (nstreamid<lstreamid) {\r
+ nstreamid++;\r
+ *state=fstreamid[nstreamid-1].state;\r
+ return fstreamid[nstreamid-1].r;\r
+ }\r
+ return 0;\r
+}\r
+\r
+\r
+/* associate msgid with ringbuf */\r
+gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)\r
+{\r
+ gs_int32_t x;\r
+ if (lwakeup == 0) {\r
+ if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+ return -1;\r
+ }\r
+ memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);\r
+ lwakeup = STARTSZ;\r
+ }\r
+ /* first try to find one for the same process */\r
+ for(x=0;(x<lwakeup)&&(\r
+ ((fwakeup[x].ftaid.ip!=ftaid.ip)\r
+ || (fwakeup[x].ftaid.port!=ftaid.port))\r
+ ||(fwakeup[x].used==0)) ;x++);\r
+ if (x==lwakeup) {\r
+ /* now try to find empty slot */\r
+ for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);\r
+ if (x == lwakeup) {\r
+ gs_int32_t y;\r
+ lwakeup = 2*lwakeup;\r
+ if ((fwakeup =\r
+ realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+ return -1;\r
+ }\r
+ for (y=x;y<lwakeup;y++)\r
+ fwakeup[y].used=0;\r
+ }\r
+ }\r
+ fwakeup[x].used=1;\r
+ fwakeup[x].ftaid=ftaid;\r
+ fwakeup[x].r=r;\r
+ return x;\r
+}\r
+\r
+/* starts an itteration through all msgids associated with\r
+ a streamid. This also uses data kept in the fstreamid\r
+ registry\r
+ */\r
+gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)\r
+{\r
+ swakeup=streamid;\r
+ nwakeup=0;\r
+ return 0;\r
+}\r
+\r
+/* returns all the msgid blocked on the streamid passed in\r
+ * ftacallback_start_streamid and removes the msgid from\r
+ * the wakeup list\r
+ */\r
+FTAID * ftacallback_next_wakeup()\r
+{\r
+ for(;(nwakeup<lstreamid)\r
+ &&(fstreamid[nwakeup].streamid != swakeup);\r
+ nwakeup++);\r
+ if (nwakeup<lstreamid) {\r
+ gs_int32_t x;\r
+ for(x=0;x<lwakeup;x++)\r
+ if ((fwakeup[x].r==fstreamid[nwakeup].r) &&\r
+ (fwakeup[x].used==1)) {\r
+ fwakeup[x].used=0;\r
+ nwakeup++;\r
+ return & fwakeup[x].ftaid;\r
+ }\r
+ }\r
+ nwakeup ++;\r
+ return (FTAID *) 0;\r
+}\r
+\r
+\r
+\r
+static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,\r
+ struct FTA * fta) {\r
+ struct fta_list * new;\r
+ struct fta_list * tmp;\r
+ \r
+ if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {\r
+ gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");\r
+ return -1;\r
+ }\r
+ \r
+ new->fta=fta;\r
+ \r
+ \r
+ if (after==0) {\r
+ new->next=*root;\r
+ new->prev=0;\r
+ *root=new;\r
+ if (new->next) {\r
+ new->next->prev=new;\r
+ }\r
+ } else {\r
+ tmp=*root;\r
+ while((tmp)&&(tmp->fta!=after)) {\r
+ tmp=tmp->next;\r
+ }\r
+ if (tmp==0) {\r
+ gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");\r
+ return -1;\r
+ }\r
+ new->next=tmp->next;\r
+ new->prev=tmp;\r
+ tmp->next=new;\r
+ if (new->next) {\r
+ new->next->prev=new;\r
+ }\r
+ }\r
+ return 0;\r
+}\r
+\r
+static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {\r
+ struct fta_list * tmp;\r
+ tmp=*root;\r
+ while((tmp)&&(tmp->fta!=fta)) {\r
+ tmp=tmp->next;\r
+ }\r
+ if (tmp==0) {\r
+ gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");\r
+ return -1;\r
+ }\r
+ if (tmp == (*root)) {\r
+ *root=tmp->next;\r
+ if (tmp->next) {\r
+ tmp->next->prev=0;\r
+ }\r
+ } else {\r
+ tmp->prev->next=tmp->next;\r
+ if (tmp->next) {\r
+ tmp->next->prev=tmp->prev;\r
+ }\r
+ }\r
+ \r
+ free(tmp);\r
+ return 0;\r
+}\r
+\r
+\r
+static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {\r
+ struct fta_list * tmp;\r
+ tmp=*root;\r
+ while((tmp)&&(tmp->fta!=fta)) {\r
+ tmp=tmp->next;\r
+ }\r
+ if (tmp==0) {\r
+ return -1;\r
+ }\r
+ return 0;\r
+}\r
+\r
+gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)\r
+{\r
+ \r
+ if ((after!=0) && (fta_list_check(&process,after)<0)) {\r
+ gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");\r
+ return -1;\r
+ }\r
+ \r
+ if (fta_list_check(&created,new)<0) {\r
+ gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");\r
+ return -1;\r
+ }\r
+ \r
+ if (fta_list_check(&process,new)==0) {\r
+ new->runrefcnt++;\r
+ gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);\r
+ return 0;\r
+ }\r
+ \r
+ if (fta_list_add(&process,after,new)<0) {\r
+ gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");\r
+ return -1;\r
+ }\r
+ \r
+ new->runrefcnt++;\r
+ \r
+ return 0;\r
+ \r
+}\r
+\r
+gs_retval_t ftaexec_remove(struct FTA * id){\r
+ \r
+ if (fta_list_check(&process,id)<0) {\r
+ gslog(LOG_EMERG,"fta_remove:: id not in process list\n");\r
+ return -1;\r
+ }\r
+ id->runrefcnt--;\r
+ if (id->runrefcnt<=0) {\r
+ if (fta_list_rm(&process,id)<0) {\r
+ gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");\r
+ return -1;\r
+ }\r
+ }\r
+ return 0;\r
+}\r
+\r
+\r
+struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,\r
+ gs_uint32_t reusable,\r
+ gs_int32_t command, gs_int32_t sz, void * data){\r
+ struct FTA * f;\r
+ FTAID ftaid;\r
+ ftaid=gscpipc_getftaid();\r
+ ftaid.index=index;\r
+ ftaid.streamid=0;\r
+ if (fta_list_check(&created,reuse)==0) {\r
+ reuse->refcnt++;\r
+ return reuse;\r
+ }\r
+ if (ftacallback_get_alloc(index)==0) return 0;\r
+ f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);\r
+ f->prefilter=ftacallback_get_prefilter(index);\r
+ gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);\r
+ if (fta_list_add(&created,0,f)<0) {\r
+ gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");\r
+ return 0;\r
+ }\r
+ f->refcnt=1;\r
+ return f;\r
+}\r
+\r
+\r
+\r
+gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){\r
+ id->refcnt --;\r
+ if (id->refcnt==0) {\r
+ if (fta_list_rm(&created,id)<0) {\r
+ gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");\r
+ return -1;\r
+ }\r
+ /* just to make sure remove it form process list too */\r
+ if (fta_list_check(&process,id)>=0) {\r
+ fta_list_rm(&process,id);\r
+ }\r
+ \r
+ id->free_fta(id,recursive);\r
+ }\r
+ return 0;\r
+}\r
+\r
+gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){\r
+ if (fta_list_check(&created,id)<0) {\r
+ gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");\r
+ return -1;\r
+ }\r
+ return id->control_fta(id,command,sz,value);\r
+}\r
+\r
+gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){\r
+ struct FTA * f;\r
+ ftaexec_start();\r
+ while((f=ftaexec_next())!=0) {\r
+ f->control_fta(f,command,sz,value);\r
+ }\r
+ return 1;\r
+}\r
+\r
+\r
+/* Start itteration through list of active FTA */\r
+\r
+gs_retval_t ftaexec_start()\r
+{\r
+ itteration=process;\r
+ return 0;\r
+}\r
+\r
+/* get one FTA at a time */\r
+\r
+struct FTA * ftaexec_next()\r
+{\r
+ struct FTA * fta=0;\r
+ if (itteration) {\r
+ fta=itteration->fta;\r
+ itteration=itteration->next;\r
+ }\r
+ return fta;\r
+}\r
+\r
+\r
+/* adds a buffer to the end of the sidequeue*/\r
+gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)\r
+{\r
+ struct sq * s;\r
+ if ((s=malloc(sizeof(struct sq)))==0) {\r
+ gslog(LOG_EMERG,"Could not allocate memory for sidequeue");\r
+ return -1;\r
+ }\r
+ s->from=from;\r
+ memcpy(&s->buf[0],buf,MAXMSGSZ);\r
+ s->length=length;\r
+ s->next=0;\r
+ if (sqtail) {\r
+ sqtail->next=s;\r
+ sqtail=s;\r
+ } else {\r
+ sqtop = s;\r
+ sqtail = s;\r
+ }\r
+ return 0;\r
+}\r
+\r
+/* removes a buffer from the top of the sidequeue*/\r
+gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)\r
+{\r
+ struct sq * s;\r
+ \r
+ if (sqtop) {\r
+ *from=sqtop->from;\r
+ memcpy(buf,&sqtop->buf[0],MAXMSGSZ);\r
+ *length=sqtop->length;\r
+ s=sqtop;\r
+ sqtop=sqtop->next;\r
+ if (sqtop==0) sqtail=0;\r
+ free(s);\r
+ return 0;\r
+ }\r
+ return -1;\r
+}\r