Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscphost / callbackregistries.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15
16 #include "callbackregistries.h"
17 #include "lapp.h"
18 #include "gscpipc.h"
19 #include "stdlib.h"
20 #include "stdio.h"
21 #include "string.h"
22 #include <unistd.h>
23 #include <signal.h>
24 #include <sys/mman.h>
25 #include "schemaparser.h"
26 #include "errno.h"
27
28 #include "gsconfig.h"
29 #include "gstypes.h"
30
31 struct ftacallbackalloc {
32     gs_int32_t used;
33     gs_sp_t name;
34     alloc_fta fta_alloc_functionptr;
35     gs_uint64_t prefilter;
36 };
37
38 static struct ftacallbackalloc * falloc =0;
39 static gs_int32_t lalloc=0;
40
41 struct ftacallbackstreamid {
42     gs_int32_t refcnt;
43     gs_uint32_t streamid;
44     gs_uint32_t state;
45     struct ringbuf * r;
46 };
47
48 static struct ftacallbackstreamid * fstreamid =0;
49 static gs_int32_t lstreamid=0;
50 static gs_uint32_t cstreamid;
51 static gs_int32_t nstreamid;
52
53 struct ftacallbackwakeup {
54     gs_int32_t used;
55     FTAID ftaid;
56     struct ringbuf * r;
57 };
58
59 static struct ftacallbackwakeup * fwakeup =0;
60 static gs_int32_t lwakeup=0;
61 static gs_uint32_t swakeup;
62 static gs_int32_t nwakeup;
63
64 /* XXX memory of fta list has no owner struct */
65
66 struct fta_list {
67     struct fta_list * prev;
68     struct fta_list * next;
69     struct FTA * fta;
70 };
71
72 static struct fta_list * process=0;
73 static struct fta_list * created=0;
74
75 static struct fta_list * itteration=0;
76
77 // Side queue datastructures
78
79 struct sq {
80     FTAID from;
81     gs_int8_t buf[MAXMSGSZ];
82     gs_int32_t length;
83     struct sq * next;
84 };
85
86 static struct sq * sqtop=0;
87 static struct sq * sqtail=0;
88
89
90 /* HFTA internal print function*/
91
92 gs_retval_t add_printfunction_to_stream( struct FTA  * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,
93                                         gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {
94         gs_uint32_t parserversion;
95         gs_int32_t schemaid;
96         gs_uint32_t x;
97         gs_int8_t temp[50000];
98         if (ftaid->printfunc.in_use==1) {
99         gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");
100             return -1;
101         }
102         ftaid->printfunc.path=strdup(path);
103         ftaid->printfunc.basename=strdup(basename);
104         ftaid->printfunc.nexttime=0;
105         ftaid->printfunc.split=(split%1000);
106         ftaid->printfunc.itt=(split/1000)%1000;
107         ftaid->printfunc.base=(split/1000000)%1000;
108         ftaid->printfunc.delta=delta;
109         ftaid->printfunc.in_use=1;
110         if (ftaid->printfunc.split > MAXPRINTFILES) {
111         gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");
112             return -1;
113         }
114         for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;
115     
116         if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {
117                 gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");
118                 return -1;
119         }
120         if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(
121                                                                             ftaid->printfunc.schemaid,temporal_field))<0) {
122                 gslog(LOG_EMERG,"ERROR:could not get "
123               "offset for timefield %s in HFTA print function\n",
124               temporal_field);
125                 return -1;
126         }
127     
128         if (ftaschema_get_field_type_by_name(
129                                          ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {
130             gslog(LOG_EMERG,"ERROR: illegal type for timefield "
131               "%s in HFTA print function UINT expected\n",
132               temporal_field);
133             return -1;
134         }
135         if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(
136                                                                          ftaid->printfunc.schemaid,split_field))<0) {
137                 gslog(LOG_EMERG,"ERROR:could not get "
138               "offset for splitfield %s in HFTA print function\n",
139               split_field);
140                 return -1;
141         }
142     
143         if (ftaschema_get_field_type_by_name(
144                                          ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {
145             gslog(LOG_EMERG,"ERROR: illegal type for splitfield"
146               "%s in HFTA print function UINT expected\n",
147               split_field);
148             return -1;
149         }
150         parserversion=get_schemaparser_version();
151         sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);
152         ftaid->printfunc.header=strdup(temp);
153         gslog(LOG_INFO,"Established print function for %s",basename);
154         return 0;
155 }
156
157 gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)
158 {
159         gs_int32_t problem;
160         gs_uint32_t timeval;
161         gs_uint32_t splitval;
162         gs_uint32_t x;
163         gs_uint32_t nsz;
164         timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);
165         if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp
166         if (timeval>= self->printfunc.nexttime) {
167                 gs_int8_t oldname[1024];
168                 gs_int8_t newname[1024];
169                 if (self->printfunc.split==0) {
170                         if (self->printfunc.fa[0] != 0) {
171                                 sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
172                                 sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
173                                 fclose(self->printfunc.fa[0]);
174                                 rename(oldname,newname);
175                         }
176                         if (self->printfunc.nexttime==0) {
177                                 self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
178                         }
179                         sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);
180                         if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {
181                                 gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
182                                 return -1;
183                         }
184                         if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {
185                                 gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
186                         }
187                         if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {
188                                 gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);
189                         }
190                         gslog(LOG_INFO,"Opened file %s",oldname);
191                 } else {
192                         for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {
193                                 if (self->printfunc.fa[x] != 0) {
194                                         sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
195                                         sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
196                                         fclose(self->printfunc.fa[x]);
197                                         rename(oldname,newname);
198                                 }
199                                 if (self->printfunc.nexttime==0) {
200                                         self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
201                                 }
202                                 sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);
203                                 if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {
204                                         gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
205                                         return -1;
206                                 }
207                 if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {
208                     gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
209                 }
210                                 if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1)  {
211                     gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);
212                 }
213                 gslog(LOG_INFO,"Opened file %s",oldname);
214                         }
215                 }
216                 self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;
217         }
218     // don't write temporal tuples to file but use them to advance file name.
219         if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;
220         if (self->printfunc.split!=0) {
221                 splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);
222                 if (self->printfunc.fa[splitval]==0) {
223                         gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);
224                         exit(0);
225                 }
226         } else {
227                 splitval=0;
228         }
229         nsz=htonl(sz);
230         if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {
231                 gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",
232               self->printfunc.basename,errno);
233                 exit(0);
234         }
235         if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {
236                 gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",
237               self->printfunc.basename,errno);
238                 exit(0);
239         }
240         return 0;
241 }
242
243 /* registers an alloc function of an FTA and returns a unique index */
244 gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)
245 {
246     gs_int32_t x;
247     gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);
248     if (lalloc == 0) {
249         if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {
250             gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
251             return -1;
252         }
253         memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);
254         lalloc = STARTSZ;
255     }
256     for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);
257     if (x == lalloc) {
258         gs_int32_t y;
259         lalloc = 2*lalloc;
260         if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {
261             gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
262             return -1;
263         }
264         for (y=x;y<lalloc;y++)
265             falloc[y].used=0;
266     }
267     falloc[x].name=strdup(name);
268     falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;
269     falloc[x].prefilter=prefilter;
270     falloc[x].used=1;
271     return x;
272 }
273
274 /* unregisters an alloc function of an FTA and makes the index available
275  * for reuse
276  */
277
278 gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)
279 {
280     falloc[index].used=0;
281     free(falloc[index].name);
282     return 0;
283 }
284 /* returns the prefilter for a given
285  * index
286  */
287
288 gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)
289 {
290     if ((index<lalloc) && (falloc[index].used!=0)) {
291         return falloc[index].prefilter;
292     }
293     return 0;
294 }
295
296 /* returns the function pointer of the callback function for a given
297  * index
298  */
299
300 alloc_fta ftacallback_get_alloc(gs_int32_t index)
301 {
302     if ((index<lalloc) && (falloc[index].used!=0)) {
303         return falloc[index].fta_alloc_functionptr;
304     }
305     return 0;
306 }
307
308
309
310
311
312 /* associate ringbuffer with streamid (using refcounting) */
313 gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {
314     gs_int32_t x;
315     if (lstreamid == 0) {
316         if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {
317             gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
318             return -1;
319         }
320         memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);
321         lstreamid = STARTSZ;
322     }
323     /* first try to increment refcnt */
324     for(x=0;(x<lstreamid)&&(
325                             (fstreamid[x].streamid!=streamid)
326                             ||(fstreamid[x].r!=r)
327                             ||(fstreamid[x].refcnt<=0)) ;x++);
328     if (x>=lstreamid) {
329         /* now try to find empty slot */
330         for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);
331         if (x >= lstreamid) {
332             gs_int32_t y;
333             lstreamid = 2*lstreamid;
334             if ((fstreamid =
335                  realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {
336                 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
337                 return -1;
338             }
339             for (y=x;y<lstreamid;y++) {
340                 fstreamid[y].refcnt=0;
341                 fstreamid[y].streamid=0;
342             }
343         }
344         fstreamid[x].state=HFTA_RINGBUF_ATOMIC;
345         fstreamid[x].streamid=streamid;
346         fstreamid[x].r=r;
347     }
348     fstreamid[x].refcnt+=1;
349     return 0;
350 }
351
352
353 /* unassosciate a ringbuffer from a streamid */
354
355 gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)
356 {
357     gs_int32_t x;
358     for(x=0;x<lstreamid;x++) {
359         if ((fstreamid[x].streamid == streamid)
360             && (fstreamid[x].r == r)
361             && (fstreamid[x].refcnt > 0))
362             fstreamid[x].refcnt--;
363     }
364     return 0;
365 }
366
367 /* set the state for a given streamid and destination process */
368 gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)
369 {
370     gs_int32_t x;
371     for(x=0;x<lstreamid;x++) {
372         if ((fstreamid[x].streamid == streamid)
373             && (fstreamid[x].r->destid.ip == process.ip )
374             && (fstreamid[x].r->destid.port == process.port )
375             && (fstreamid[x].refcnt > 0))
376             fstreamid[x].state=state;
377         return 0;
378     }
379     return -1;
380 }
381
382 /* starts an itteration through all ringbuffers for a particular streamid */
383
384 gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)
385 {
386     cstreamid=streamid;
387     nstreamid=0;
388     return 0;
389 }
390
391 /* returns all the ringbuffer associated with the streamid passed in
392  * ftacallback_start_streamid
393  */
394 struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)
395 {
396     for(;(nstreamid<lstreamid)
397             &&(fstreamid[nstreamid].streamid != cstreamid);
398         nstreamid++);
399     if (nstreamid<lstreamid) {
400         nstreamid++;
401         *state=fstreamid[nstreamid-1].state;
402         return fstreamid[nstreamid-1].r;
403     }
404     return 0;
405 }
406
407
408 /* associate msgid with ringbuf  */
409 gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)
410 {
411     gs_int32_t x;
412     if (lwakeup == 0) {
413         if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {
414             gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
415             return -1;
416         }
417         memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);
418         lwakeup = STARTSZ;
419     }
420     /* first try to find one for the same process */
421     for(x=0;(x<lwakeup)&&(
422                           ((fwakeup[x].ftaid.ip!=ftaid.ip)
423                            || (fwakeup[x].ftaid.port!=ftaid.port))
424                           ||(fwakeup[x].used==0)) ;x++);
425     if (x==lwakeup) {
426         /* now try to find empty slot */
427         for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);
428         if (x == lwakeup) {
429             gs_int32_t y;
430             lwakeup = 2*lwakeup;
431             if ((fwakeup =
432                  realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {
433                 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
434                 return -1;
435             }
436             for (y=x;y<lwakeup;y++)
437                 fwakeup[y].used=0;
438         }
439     }
440     fwakeup[x].used=1;
441     fwakeup[x].ftaid=ftaid;
442     fwakeup[x].r=r;
443     return x;
444 }
445
446 /* starts an itteration through all msgids associated with
447  a streamid. This also uses data kept in the fstreamid
448  registry
449  */
450 gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)
451 {
452     swakeup=streamid;
453     nwakeup=0;
454     return 0;
455 }
456
457 /* returns all the msgid blocked on the streamid passed in
458  * ftacallback_start_streamid and removes the msgid from
459  * the wakeup list
460  */
461 FTAID * ftacallback_next_wakeup()
462 {
463     for(;(nwakeup<lstreamid)
464             &&(fstreamid[nwakeup].streamid != swakeup);
465         nwakeup++);
466     if (nwakeup<lstreamid) {
467         gs_int32_t x;
468         for(x=0;x<lwakeup;x++)
469             if ((fwakeup[x].r==fstreamid[nwakeup].r) &&
470                 (fwakeup[x].used==1)) {
471                 fwakeup[x].used=0;
472                 nwakeup++;
473                 return & fwakeup[x].ftaid;
474             }
475     }
476     nwakeup ++;
477     return (FTAID *) 0;
478 }
479
480
481
482 static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,
483                                 struct FTA * fta) {
484     struct fta_list * new;
485     struct fta_list * tmp;
486     
487     if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {
488         gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");
489         return -1;
490     }
491     
492     new->fta=fta;
493     
494     
495     if (after==0) {
496         new->next=*root;
497         new->prev=0;
498         *root=new;
499         if (new->next) {
500             new->next->prev=new;
501         }
502     } else {
503         tmp=*root;
504         while((tmp)&&(tmp->fta!=after)) {
505             tmp=tmp->next;
506         }
507         if (tmp==0) {
508             gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");
509             return -1;
510         }
511         new->next=tmp->next;
512         new->prev=tmp;
513         tmp->next=new;
514         if (new->next) {
515             new->next->prev=new;
516         }
517     }
518     return 0;
519 }
520
521 static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {
522     struct fta_list * tmp;
523     tmp=*root;
524     while((tmp)&&(tmp->fta!=fta)) {
525         tmp=tmp->next;
526     }
527     if (tmp==0) {
528         gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");
529         return -1;
530     }
531     if (tmp == (*root)) {
532         *root=tmp->next;
533         if (tmp->next) {
534             tmp->next->prev=0;
535         }
536     } else {
537         tmp->prev->next=tmp->next;
538         if (tmp->next) {
539             tmp->next->prev=tmp->prev;
540         }
541     }
542     
543     free(tmp);
544     return 0;
545 }
546
547
548 static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {
549     struct fta_list * tmp;
550     tmp=*root;
551     while((tmp)&&(tmp->fta!=fta)) {
552         tmp=tmp->next;
553     }
554     if (tmp==0) {
555         return -1;
556     }
557     return 0;
558 }
559
560 gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)
561 {
562     
563     if ((after!=0) && (fta_list_check(&process,after)<0)) {
564         gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");
565         return -1;
566     }
567     
568     if (fta_list_check(&created,new)<0) {
569         gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");
570         return -1;
571     }
572     
573     if (fta_list_check(&process,new)==0) {
574                 new->runrefcnt++;
575         gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);
576         return 0;
577     }
578     
579     if (fta_list_add(&process,after,new)<0) {
580         gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");
581         return -1;
582     }
583     
584     new->runrefcnt++;
585     
586     return 0;
587     
588 }
589
590 gs_retval_t ftaexec_remove(struct FTA * id){
591     
592     if (fta_list_check(&process,id)<0) {
593         gslog(LOG_EMERG,"fta_remove:: id not in process list\n");
594         return -1;
595     }
596     id->runrefcnt--;
597     if (id->runrefcnt<=0) {
598         if (fta_list_rm(&process,id)<0) {
599             gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");
600             return -1;
601         }
602     }
603     return 0;
604 }
605
606
607 struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,
608                                     gs_uint32_t reusable,
609                                     gs_int32_t command, gs_int32_t sz, void *  data){
610     struct FTA * f;
611     FTAID ftaid;
612     ftaid=gscpipc_getftaid();
613     ftaid.index=index;
614     ftaid.streamid=0;
615     if (fta_list_check(&created,reuse)==0) {
616         reuse->refcnt++;
617         return reuse;
618     }
619     if (ftacallback_get_alloc(index)==0) return 0;
620     f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);
621     f->prefilter=ftacallback_get_prefilter(index);
622     gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);
623     if (fta_list_add(&created,0,f)<0) {
624         gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");
625         return 0;
626     }
627     f->refcnt=1;
628     return f;
629 }
630
631
632
633 gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){
634     id->refcnt --;
635     if (id->refcnt==0) {
636         if (fta_list_rm(&created,id)<0) {
637             gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");
638             return -1;
639         }
640         /* just to make sure remove it form process list too */
641         if (fta_list_check(&process,id)>=0) {
642             fta_list_rm(&process,id);
643         }
644         
645         id->free_fta(id,recursive);
646     }
647     return 0;
648 }
649
650 gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){
651     if (fta_list_check(&created,id)<0) {
652         gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");
653         return -1;
654     }
655     return id->control_fta(id,command,sz,value);
656 }
657
658 gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){
659     struct FTA * f;
660     ftaexec_start();
661     while((f=ftaexec_next())!=0) {
662         f->control_fta(f,command,sz,value);
663     }
664     return 1;
665 }
666
667
668 /* Start itteration through list of active FTA */
669
670 gs_retval_t ftaexec_start()
671 {
672     itteration=process;
673     return 0;
674 }
675
676 /* get one FTA at a time */
677
678 struct FTA * ftaexec_next()
679 {
680     struct FTA * fta=0;
681     if (itteration) {
682         fta=itteration->fta;
683         itteration=itteration->next;
684     }
685     return fta;
686 }
687
688
689 /* adds a buffer to the end of the sidequeue*/
690 gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)
691 {
692     struct sq * s;
693     if ((s=malloc(sizeof(struct sq)))==0) {
694         gslog(LOG_EMERG,"Could not allocate memory for sidequeue");
695         return -1;
696     }
697     s->from=from;
698     memcpy(&s->buf[0],buf,MAXMSGSZ);
699     s->length=length;
700     s->next=0;
701     if (sqtail) {
702         sqtail->next=s;
703         sqtail=s;
704     } else {
705         sqtop = s;
706         sqtail = s;
707     }
708     return 0;
709 }
710
711 /* removes a buffer from the top of the sidequeue*/
712 gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)
713 {
714     struct sq * s;
715     
716     if (sqtop) {
717         *from=sqtop->from;
718         memcpy(buf,&sqtop->buf[0],MAXMSGSZ);
719         *length=sqtop->length;
720         s=sqtop;
721         sqtop=sqtop->next;
722         if (sqtop==0) sqtail=0;
723         free(s);
724         return 0;
725     }
726     return -1;
727 }