X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Flib%2Fgscphost%2Fcallbackregistries.c;h=8bd0957443a17a6a95299baf3b0f68ccc6eb4c22;hb=e981e864b812c938d3df8b555b6bb98bb89273e7;hp=228b781a75c51bc9bdf3a0317c95f1a016aaf2e7;hpb=44ea17511358ebc75952066580e31cba8b38ddb8;p=com%2Fgs-lite.git diff --git a/src/lib/gscphost/callbackregistries.c b/src/lib/gscphost/callbackregistries.c index 228b781..8bd0957 100644 --- a/src/lib/gscphost/callbackregistries.c +++ b/src/lib/gscphost/callbackregistries.c @@ -1,727 +1,727 @@ -/* ------------------------------------------------ - Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ - -#include "callbackregistries.h" -#include "lapp.h" -#include "gscpipc.h" -#include "stdlib.h" -#include "stdio.h" -#include "string.h" -#include -#include -#include -#include "schemaparser.h" -#include "errno.h" - -#include "gsconfig.h" -#include "gstypes.h" - -struct ftacallbackalloc { - gs_int32_t used; - gs_sp_t name; - alloc_fta fta_alloc_functionptr; - gs_uint64_t prefilter; -}; - -static struct ftacallbackalloc * falloc =0; -static gs_int32_t lalloc=0; - -struct ftacallbackstreamid { - gs_int32_t refcnt; - gs_uint32_t streamid; - gs_uint32_t state; - struct ringbuf * r; -}; - -static struct ftacallbackstreamid * fstreamid =0; -static gs_int32_t lstreamid=0; -static gs_uint32_t cstreamid; -static gs_int32_t nstreamid; - -struct ftacallbackwakeup { - gs_int32_t used; - FTAID ftaid; - struct ringbuf * r; -}; - -static struct ftacallbackwakeup * fwakeup =0; -static gs_int32_t lwakeup=0; -static gs_uint32_t swakeup; -static gs_int32_t nwakeup; - -/* XXX memory of fta list has no owner struct */ - -struct fta_list { - struct fta_list * prev; - struct fta_list * next; - struct FTA * fta; -}; - -static struct fta_list * process=0; -static struct fta_list * created=0; - -static struct fta_list * itteration=0; - -// Side queue datastructures - -struct sq { - FTAID from; - gs_int8_t buf[MAXMSGSZ]; - gs_int32_t length; - struct sq * next; -}; - -static struct sq * sqtop=0; -static struct sq * sqtail=0; - - -/* HFTA internal print function*/ - -gs_retval_t add_printfunction_to_stream( struct FTA * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename, - gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) { - gs_uint32_t parserversion; - gs_int32_t schemaid; - gs_uint32_t x; - gs_int8_t temp[50000]; - if (ftaid->printfunc.in_use==1) { - gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n"); - return -1; - } - ftaid->printfunc.path=strdup(path); - ftaid->printfunc.basename=strdup(basename); - ftaid->printfunc.nexttime=0; - ftaid->printfunc.split=(split%1000); - ftaid->printfunc.itt=(split/1000)%1000; - ftaid->printfunc.base=(split/1000000)%1000; - ftaid->printfunc.delta=delta; - ftaid->printfunc.in_use=1; - if (ftaid->printfunc.split > MAXPRINTFILES) { - gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n"); - return -1; - } - for (x=0;xprintfunc.split;x++) ftaid->printfunc.fa[x]=0; - - if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) { - gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function"); - return -1; - } - if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name( - ftaid->printfunc.schemaid,temporal_field))<0) { - gslog(LOG_EMERG,"ERROR:could not get " - "offset for timefield %s in HFTA print function\n", - temporal_field); - return -1; - } - - if (ftaschema_get_field_type_by_name( - ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) { - gslog(LOG_EMERG,"ERROR: illegal type for timefield " - "%s in HFTA print function UINT expected\n", - temporal_field); - return -1; - } - if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name( - ftaid->printfunc.schemaid,split_field))<0) { - gslog(LOG_EMERG,"ERROR:could not get " - "offset for splitfield %s in HFTA print function\n", - split_field); - return -1; - } - - if (ftaschema_get_field_type_by_name( - ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) { - gslog(LOG_EMERG,"ERROR: illegal type for splitfield" - "%s in HFTA print function UINT expected\n", - split_field); - return -1; - } - parserversion=get_schemaparser_version(); - sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema); - ftaid->printfunc.header=strdup(temp); - gslog(LOG_INFO,"Established print function for %s",basename); - return 0; -} - -gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple) -{ - gs_int32_t problem; - gs_uint32_t timeval; - gs_uint32_t splitval; - gs_uint32_t x; - gs_uint32_t nsz; - timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem); - if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp - if (timeval>= self->printfunc.nexttime) { - gs_int8_t oldname[1024]; - gs_int8_t newname[1024]; - if (self->printfunc.split==0) { - if (self->printfunc.fa[0] != 0) { - sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename); - sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename); - fclose(self->printfunc.fa[0]); - rename(oldname,newname); - } - if (self->printfunc.nexttime==0) { - self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta; - } - sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename); - if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) { - gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n"); - return -1; - } - if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) { - gslog(LOG_EMERG,"ERROR:Could not setvbuf\n"); - } - if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) { - gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno); - } - gslog(LOG_INFO,"Opened file %s",oldname); - } else { - for(x=self->printfunc.base;xprintfunc.split;x=x+self->printfunc.itt) { - if (self->printfunc.fa[x] != 0) { - sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename); - sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename); - fclose(self->printfunc.fa[x]); - rename(oldname,newname); - } - if (self->printfunc.nexttime==0) { - self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta; - } - sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename); - if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) { - gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n"); - return -1; - } - if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) { - gslog(LOG_EMERG,"ERROR:Could not setvbuf\n"); - } - if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1) { - gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno); - } - gslog(LOG_INFO,"Opened file %s",oldname); - } - } - self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta; - } - // don't write temporal tuples to file but use them to advance file name. - if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0; - if (self->printfunc.split!=0) { - splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split); - if (self->printfunc.fa[splitval]==0) { - gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval); - exit(0); - } - } else { - splitval=0; - } - nsz=htonl(sz); - if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) { - gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n", - self->printfunc.basename,errno); - exit(0); - } - if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) { - gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n", - self->printfunc.basename,errno); - exit(0); - } - return 0; -} - -/* registers an alloc function of an FTA and returns a unique index */ -gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter) -{ - gs_int32_t x; - gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name); - if (lalloc == 0) { - if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) { - gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n"); - return -1; - } - memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ); - lalloc = STARTSZ; - } - for(x=0;(x=lstreamid) { - /* now try to find empty slot */ - for(x=0;(x= lstreamid) { - gs_int32_t y; - lstreamid = 2*lstreamid; - if ((fstreamid = - realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) { - gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n"); - return -1; - } - for (y=x;y 0)) - fstreamid[x].refcnt--; - } - return 0; -} - -/* set the state for a given streamid and destination process */ -gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state) -{ - gs_int32_t x; - for(x=0;xdestid.ip == process.ip ) - && (fstreamid[x].r->destid.port == process.port ) - && (fstreamid[x].refcnt > 0)) - fstreamid[x].state=state; - return 0; - } - return -1; -} - -/* starts an itteration through all ringbuffers for a particular streamid */ - -gs_retval_t ftacallback_start_streamid(gs_int32_t streamid) -{ - cstreamid=streamid; - nstreamid=0; - return 0; -} - -/* returns all the ringbuffer associated with the streamid passed in - * ftacallback_start_streamid - */ -struct ringbuf * ftacallback_next_streamid(gs_int32_t* state) -{ - for(;(nstreamidfta=fta; - - - if (after==0) { - new->next=*root; - new->prev=0; - *root=new; - if (new->next) { - new->next->prev=new; - } - } else { - tmp=*root; - while((tmp)&&(tmp->fta!=after)) { - tmp=tmp->next; - } - if (tmp==0) { - gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n"); - return -1; - } - new->next=tmp->next; - new->prev=tmp; - tmp->next=new; - if (new->next) { - new->next->prev=new; - } - } - return 0; -} - -static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) { - struct fta_list * tmp; - tmp=*root; - while((tmp)&&(tmp->fta!=fta)) { - tmp=tmp->next; - } - if (tmp==0) { - gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n"); - return -1; - } - if (tmp == (*root)) { - *root=tmp->next; - if (tmp->next) { - tmp->next->prev=0; - } - } else { - tmp->prev->next=tmp->next; - if (tmp->next) { - tmp->next->prev=tmp->prev; - } - } - - free(tmp); - return 0; -} - - -static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) { - struct fta_list * tmp; - tmp=*root; - while((tmp)&&(tmp->fta!=fta)) { - tmp=tmp->next; - } - if (tmp==0) { - return -1; - } - return 0; -} - -gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new) -{ - - if ((after!=0) && (fta_list_check(&process,after)<0)) { - gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n"); - return -1; - } - - if (fta_list_check(&created,new)<0) { - gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n"); - return -1; - } - - if (fta_list_check(&process,new)==0) { - new->runrefcnt++; - gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid); - return 0; - } - - if (fta_list_add(&process,after,new)<0) { - gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n"); - return -1; - } - - new->runrefcnt++; - - return 0; - -} - -gs_retval_t ftaexec_remove(struct FTA * id){ - - if (fta_list_check(&process,id)<0) { - gslog(LOG_EMERG,"fta_remove:: id not in process list\n"); - return -1; - } - id->runrefcnt--; - if (id->runrefcnt<=0) { - if (fta_list_rm(&process,id)<0) { - gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n"); - return -1; - } - } - return 0; -} - - -struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse, - gs_uint32_t reusable, - gs_int32_t command, gs_int32_t sz, void * data){ - struct FTA * f; - FTAID ftaid; - ftaid=gscpipc_getftaid(); - ftaid.index=index; - ftaid.streamid=0; - if (fta_list_check(&created,reuse)==0) { - reuse->refcnt++; - return reuse; - } - if (ftacallback_get_alloc(index)==0) return 0; - f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data); - f->prefilter=ftacallback_get_prefilter(index); - gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f); - if (fta_list_add(&created,0,f)<0) { - gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n"); - return 0; - } - f->refcnt=1; - return f; -} - - - -gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){ - id->refcnt --; - if (id->refcnt==0) { - if (fta_list_rm(&created,id)<0) { - gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n"); - return -1; - } - /* just to make sure remove it form process list too */ - if (fta_list_check(&process,id)>=0) { - fta_list_rm(&process,id); - } - - id->free_fta(id,recursive); - } - return 0; -} - -gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){ - if (fta_list_check(&created,id)<0) { - gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n"); - return -1; - } - return id->control_fta(id,command,sz,value); -} - -gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){ - struct FTA * f; - ftaexec_start(); - while((f=ftaexec_next())!=0) { - f->control_fta(f,command,sz,value); - } - return 1; -} - - -/* Start itteration through list of active FTA */ - -gs_retval_t ftaexec_start() -{ - itteration=process; - return 0; -} - -/* get one FTA at a time */ - -struct FTA * ftaexec_next() -{ - struct FTA * fta=0; - if (itteration) { - fta=itteration->fta; - itteration=itteration->next; - } - return fta; -} - - -/* adds a buffer to the end of the sidequeue*/ -gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length) -{ - struct sq * s; - if ((s=malloc(sizeof(struct sq)))==0) { - gslog(LOG_EMERG,"Could not allocate memory for sidequeue"); - return -1; - } - s->from=from; - memcpy(&s->buf[0],buf,MAXMSGSZ); - s->length=length; - s->next=0; - if (sqtail) { - sqtail->next=s; - sqtail=s; - } else { - sqtop = s; - sqtail = s; - } - return 0; -} - -/* removes a buffer from the top of the sidequeue*/ -gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length) -{ - struct sq * s; - - if (sqtop) { - *from=sqtop->from; - memcpy(buf,&sqtop->buf[0],MAXMSGSZ); - *length=sqtop->length; - s=sqtop; - sqtop=sqtop->next; - if (sqtop==0) sqtail=0; - free(s); - return 0; - } - return -1; -} +/* ------------------------------------------------ + Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ + +#include "callbackregistries.h" +#include "lapp.h" +#include "gscpipc.h" +#include "stdlib.h" +#include "stdio.h" +#include "string.h" +#include +#include +#include +#include "schemaparser.h" +#include "errno.h" + +#include "gsconfig.h" +#include "gstypes.h" + +struct ftacallbackalloc { + gs_int32_t used; + gs_sp_t name; + alloc_fta fta_alloc_functionptr; + gs_uint64_t prefilter; +}; + +static struct ftacallbackalloc * falloc =0; +static gs_int32_t lalloc=0; + +struct ftacallbackstreamid { + gs_int32_t refcnt; + gs_uint32_t streamid; + gs_uint32_t state; + struct ringbuf * r; +}; + +static struct ftacallbackstreamid * fstreamid =0; +static gs_int32_t lstreamid=0; +static gs_uint32_t cstreamid; +static gs_int32_t nstreamid; + +struct ftacallbackwakeup { + gs_int32_t used; + FTAID ftaid; + struct ringbuf * r; +}; + +static struct ftacallbackwakeup * fwakeup =0; +static gs_int32_t lwakeup=0; +static gs_uint32_t swakeup; +static gs_int32_t nwakeup; + +/* XXX memory of fta list has no owner struct */ + +struct fta_list { + struct fta_list * prev; + struct fta_list * next; + struct FTA * fta; +}; + +static struct fta_list * process=0; +static struct fta_list * created=0; + +static struct fta_list * itteration=0; + +// Side queue datastructures + +struct sq { + FTAID from; + gs_int8_t buf[MAXMSGSZ]; + gs_int32_t length; + struct sq * next; +}; + +static struct sq * sqtop=0; +static struct sq * sqtail=0; + + +/* HFTA internal print function*/ + +gs_retval_t add_printfunction_to_stream( struct FTA * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename, + gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) { + gs_uint32_t parserversion; + gs_int32_t schemaid; + gs_uint32_t x; + gs_int8_t temp[50000]; + if (ftaid->printfunc.in_use==1) { + gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n"); + return -1; + } + ftaid->printfunc.path=strdup(path); + ftaid->printfunc.basename=strdup(basename); + ftaid->printfunc.nexttime=0; + ftaid->printfunc.split=(split%1000); + ftaid->printfunc.itt=(split/1000)%1000; + ftaid->printfunc.base=(split/1000000)%1000; + ftaid->printfunc.delta=delta; + ftaid->printfunc.in_use=1; + if (ftaid->printfunc.split > MAXPRINTFILES) { + gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n"); + return -1; + } + for (x=0;xprintfunc.split;x++) ftaid->printfunc.fa[x]=0; + + if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) { + gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function"); + return -1; + } + if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name( + ftaid->printfunc.schemaid,temporal_field))<0) { + gslog(LOG_EMERG,"ERROR:could not get " + "offset for timefield %s in HFTA print function\n", + temporal_field); + return -1; + } + + if (ftaschema_get_field_type_by_name( + ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) { + gslog(LOG_EMERG,"ERROR: illegal type for timefield " + "%s in HFTA print function UINT expected\n", + temporal_field); + return -1; + } + if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name( + ftaid->printfunc.schemaid,split_field))<0) { + gslog(LOG_EMERG,"ERROR:could not get " + "offset for splitfield %s in HFTA print function\n", + split_field); + return -1; + } + + if (ftaschema_get_field_type_by_name( + ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) { + gslog(LOG_EMERG,"ERROR: illegal type for splitfield" + "%s in HFTA print function UINT expected\n", + split_field); + return -1; + } + parserversion=get_schemaparser_version(); + sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema); + ftaid->printfunc.header=strdup(temp); + gslog(LOG_INFO,"Established print function for %s",basename); + return 0; +} + +gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple) +{ + gs_int32_t problem; + gs_uint32_t timeval; + gs_uint32_t splitval; + gs_uint32_t x; + gs_uint32_t nsz; + timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem); + if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp + if (timeval>= self->printfunc.nexttime) { + gs_int8_t oldname[1024]; + gs_int8_t newname[1024]; + if (self->printfunc.split==0) { + if (self->printfunc.fa[0] != 0) { + sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename); + sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename); + fclose(self->printfunc.fa[0]); + rename(oldname,newname); + } + if (self->printfunc.nexttime==0) { + self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta; + } + sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename); + if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) { + gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n"); + return -1; + } + if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) { + gslog(LOG_EMERG,"ERROR:Could not setvbuf\n"); + } + if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) { + gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno); + } + gslog(LOG_INFO,"Opened file %s",oldname); + } else { + for(x=self->printfunc.base;xprintfunc.split;x=x+self->printfunc.itt) { + if (self->printfunc.fa[x] != 0) { + sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename); + sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename); + fclose(self->printfunc.fa[x]); + rename(oldname,newname); + } + if (self->printfunc.nexttime==0) { + self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta; + } + sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename); + if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) { + gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n"); + return -1; + } + if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) { + gslog(LOG_EMERG,"ERROR:Could not setvbuf\n"); + } + if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1) { + gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno); + } + gslog(LOG_INFO,"Opened file %s",oldname); + } + } + self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta; + } + // don't write temporal tuples to file but use them to advance file name. + if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0; + if (self->printfunc.split!=0) { + splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split); + if (self->printfunc.fa[splitval]==0) { + gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval); + exit(0); + } + } else { + splitval=0; + } + nsz=htonl(sz); + if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) { + gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n", + self->printfunc.basename,errno); + exit(0); + } + if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) { + gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n", + self->printfunc.basename,errno); + exit(0); + } + return 0; +} + +/* registers an alloc function of an FTA and returns a unique index */ +gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter) +{ + gs_int32_t x; + gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name); + if (lalloc == 0) { + if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) { + gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n"); + return -1; + } + memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ); + lalloc = STARTSZ; + } + for(x=0;(x=lstreamid) { + /* now try to find empty slot */ + for(x=0;(x= lstreamid) { + gs_int32_t y; + lstreamid = 2*lstreamid; + if ((fstreamid = + realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) { + gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n"); + return -1; + } + for (y=x;y 0)) + fstreamid[x].refcnt--; + } + return 0; +} + +/* set the state for a given streamid and destination process */ +gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state) +{ + gs_int32_t x; + for(x=0;xdestid.ip == process.ip ) + && (fstreamid[x].r->destid.port == process.port ) + && (fstreamid[x].refcnt > 0)) + fstreamid[x].state=state; + return 0; + } + return -1; +} + +/* starts an itteration through all ringbuffers for a particular streamid */ + +gs_retval_t ftacallback_start_streamid(gs_int32_t streamid) +{ + cstreamid=streamid; + nstreamid=0; + return 0; +} + +/* returns all the ringbuffer associated with the streamid passed in + * ftacallback_start_streamid + */ +struct ringbuf * ftacallback_next_streamid(gs_int32_t* state) +{ + for(;(nstreamidfta=fta; + + + if (after==0) { + new->next=*root; + new->prev=0; + *root=new; + if (new->next) { + new->next->prev=new; + } + } else { + tmp=*root; + while((tmp)&&(tmp->fta!=after)) { + tmp=tmp->next; + } + if (tmp==0) { + gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n"); + return -1; + } + new->next=tmp->next; + new->prev=tmp; + tmp->next=new; + if (new->next) { + new->next->prev=new; + } + } + return 0; +} + +static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) { + struct fta_list * tmp; + tmp=*root; + while((tmp)&&(tmp->fta!=fta)) { + tmp=tmp->next; + } + if (tmp==0) { + gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n"); + return -1; + } + if (tmp == (*root)) { + *root=tmp->next; + if (tmp->next) { + tmp->next->prev=0; + } + } else { + tmp->prev->next=tmp->next; + if (tmp->next) { + tmp->next->prev=tmp->prev; + } + } + + free(tmp); + return 0; +} + + +static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) { + struct fta_list * tmp; + tmp=*root; + while((tmp)&&(tmp->fta!=fta)) { + tmp=tmp->next; + } + if (tmp==0) { + return -1; + } + return 0; +} + +gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new) +{ + + if ((after!=0) && (fta_list_check(&process,after)<0)) { + gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n"); + return -1; + } + + if (fta_list_check(&created,new)<0) { + gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n"); + return -1; + } + + if (fta_list_check(&process,new)==0) { + new->runrefcnt++; + gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid); + return 0; + } + + if (fta_list_add(&process,after,new)<0) { + gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n"); + return -1; + } + + new->runrefcnt++; + + return 0; + +} + +gs_retval_t ftaexec_remove(struct FTA * id){ + + if (fta_list_check(&process,id)<0) { + gslog(LOG_EMERG,"fta_remove:: id not in process list\n"); + return -1; + } + id->runrefcnt--; + if (id->runrefcnt<=0) { + if (fta_list_rm(&process,id)<0) { + gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n"); + return -1; + } + } + return 0; +} + + +struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse, + gs_uint32_t reusable, + gs_int32_t command, gs_int32_t sz, void * data){ + struct FTA * f; + FTAID ftaid; + ftaid=gscpipc_getftaid(); + ftaid.index=index; + ftaid.streamid=0; + if (fta_list_check(&created,reuse)==0) { + reuse->refcnt++; + return reuse; + } + if (ftacallback_get_alloc(index)==0) return 0; + f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data); + f->prefilter=ftacallback_get_prefilter(index); + gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f); + if (fta_list_add(&created,0,f)<0) { + gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n"); + return 0; + } + f->refcnt=1; + return f; +} + + + +gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){ + id->refcnt --; + if (id->refcnt==0) { + if (fta_list_rm(&created,id)<0) { + gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n"); + return -1; + } + /* just to make sure remove it form process list too */ + if (fta_list_check(&process,id)>=0) { + fta_list_rm(&process,id); + } + + id->free_fta(id,recursive); + } + return 0; +} + +gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){ + if (fta_list_check(&created,id)<0) { + gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n"); + return -1; + } + return id->control_fta(id,command,sz,value); +} + +gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){ + struct FTA * f; + ftaexec_start(); + while((f=ftaexec_next())!=0) { + f->control_fta(f,command,sz,value); + } + return 1; +} + + +/* Start itteration through list of active FTA */ + +gs_retval_t ftaexec_start() +{ + itteration=process; + return 0; +} + +/* get one FTA at a time */ + +struct FTA * ftaexec_next() +{ + struct FTA * fta=0; + if (itteration) { + fta=itteration->fta; + itteration=itteration->next; + } + return fta; +} + + +/* adds a buffer to the end of the sidequeue*/ +gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length) +{ + struct sq * s; + if ((s=malloc(sizeof(struct sq)))==0) { + gslog(LOG_EMERG,"Could not allocate memory for sidequeue"); + return -1; + } + s->from=from; + memcpy(&s->buf[0],buf,MAXMSGSZ); + s->length=length; + s->next=0; + if (sqtail) { + sqtail->next=s; + sqtail=s; + } else { + sqtop = s; + sqtail = s; + } + return 0; +} + +/* removes a buffer from the top of the sidequeue*/ +gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length) +{ + struct sq * s; + + if (sqtop) { + *from=sqtop->from; + memcpy(buf,&sqtop->buf[0],MAXMSGSZ); + *length=sqtop->length; + s=sqtop; + sqtop=sqtop->next; + if (sqtop==0) sqtail=0; + free(s); + return 0; + } + return -1; +}