1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include "callbackregistries.h"
25 #include "schemaparser.h"
31 struct ftacallbackalloc {
34 alloc_fta fta_alloc_functionptr;
35 gs_uint64_t prefilter;
38 static struct ftacallbackalloc * falloc =0;
39 static gs_int32_t lalloc=0;
41 struct ftacallbackstreamid {
48 static struct ftacallbackstreamid * fstreamid =0;
49 static gs_int32_t lstreamid=0;
50 static gs_uint32_t cstreamid;
51 static gs_int32_t nstreamid;
53 struct ftacallbackwakeup {
59 static struct ftacallbackwakeup * fwakeup =0;
60 static gs_int32_t lwakeup=0;
61 static gs_uint32_t swakeup;
62 static gs_int32_t nwakeup;
64 /* XXX memory of fta list has no owner struct */
67 struct fta_list * prev;
68 struct fta_list * next;
72 static struct fta_list * process=0;
73 static struct fta_list * created=0;
75 static struct fta_list * itteration=0;
77 // Side queue datastructures
81 gs_int8_t buf[MAXMSGSZ];
86 static struct sq * sqtop=0;
87 static struct sq * sqtail=0;
90 /* HFTA internal print function*/
92 gs_retval_t add_printfunction_to_stream( struct FTA * ftaid, gs_sp_t schema, gs_sp_t path, gs_sp_t basename,
93 gs_sp_t temporal_field, gs_sp_t split_field, gs_uint32_t delta, gs_uint32_t split) {
94 gs_uint32_t parserversion;
97 gs_int8_t temp[50000];
98 if (ftaid->printfunc.in_use==1) {
99 gslog(LOG_EMERG,"ERROR:Printfunction::only allow one print function per HFTA instance\n");
102 ftaid->printfunc.path=strdup(path);
103 ftaid->printfunc.basename=strdup(basename);
104 ftaid->printfunc.nexttime=0;
105 ftaid->printfunc.split=(split%1000);
106 ftaid->printfunc.itt=(split/1000)%1000;
107 ftaid->printfunc.base=(split/1000000)%1000;
108 ftaid->printfunc.delta=delta;
109 ftaid->printfunc.in_use=1;
110 if (ftaid->printfunc.split > MAXPRINTFILES) {
111 gslog(LOG_EMERG,"ERROR:Printfunction SPLIT to large\n");
114 for (x=0;x<ftaid->printfunc.split;x++) ftaid->printfunc.fa[x]=0;
116 if ((ftaid->printfunc.schemaid=ftaschema_parse_string(schema))<0) {
117 gslog(LOG_EMERG,"ERROR:could not parse schema in HFTA print function");
120 if ((ftaid->printfunc.temporal_field=ftaschema_get_field_offset_by_name(
121 ftaid->printfunc.schemaid,temporal_field))<0) {
122 gslog(LOG_EMERG,"ERROR:could not get "
123 "offset for timefield %s in HFTA print function\n",
128 if (ftaschema_get_field_type_by_name(
129 ftaid->printfunc.schemaid,temporal_field)!=UINT_TYPE) {
130 gslog(LOG_EMERG,"ERROR: illegal type for timefield "
131 "%s in HFTA print function UINT expected\n",
135 if ((ftaid->printfunc.split_field=ftaschema_get_field_offset_by_name(
136 ftaid->printfunc.schemaid,split_field))<0) {
137 gslog(LOG_EMERG,"ERROR:could not get "
138 "offset for splitfield %s in HFTA print function\n",
143 if (ftaschema_get_field_type_by_name(
144 ftaid->printfunc.schemaid,split_field)!=UINT_TYPE) {
145 gslog(LOG_EMERG,"ERROR: illegal type for splitfield"
146 "%s in HFTA print function UINT expected\n",
150 parserversion=get_schemaparser_version();
151 sprintf(temp,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n%s",parserversion,strlen(schema)+1,schema);
152 ftaid->printfunc.header=strdup(temp);
153 gslog(LOG_INFO,"Established print function for %s",basename);
157 gs_retval_t print_stream(struct FTA * self, gs_int32_t sz, void *tuple)
161 gs_uint32_t splitval;
164 timeval=fta_unpack_uint(tuple,sz,self->printfunc.temporal_field,&problem);
165 if (timeval==0) return 0; // ignore heartbeats till we see a real timestamp
166 if (timeval>= self->printfunc.nexttime) {
167 gs_int8_t oldname[1024];
168 gs_int8_t newname[1024];
169 if (self->printfunc.split==0) {
170 if (self->printfunc.fa[0] != 0) {
171 sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
172 sprintf(newname,"%s/%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,self->printfunc.basename);
173 fclose(self->printfunc.fa[0]);
174 rename(oldname,newname);
176 if (self->printfunc.nexttime==0) {
177 self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
179 sprintf(oldname,"%s/%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,self->printfunc.basename);
180 if ((self->printfunc.fa[0]=fopen(oldname,"w"))==0) {
181 gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
184 if (setvbuf(self->printfunc.fa[0],0,_IOFBF,16000000)!=0) {
185 gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
187 if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[0])!=1) {
188 gslog(LOG_EMERG,"ERROR:fwrite:xfgh1:%s:%u",self->printfunc.basename,errno);
190 gslog(LOG_INFO,"Opened file %s",oldname);
192 for(x=self->printfunc.base;x<self->printfunc.split;x=x+self->printfunc.itt) {
193 if (self->printfunc.fa[x] != 0) {
194 sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
195 sprintf(newname,"%s/%u_s%u%s",self->printfunc.path,self->printfunc.nexttime-self->printfunc.delta,x+1,self->printfunc.basename);
196 fclose(self->printfunc.fa[x]);
197 rename(oldname,newname);
199 if (self->printfunc.nexttime==0) {
200 self->printfunc.nexttime=(timeval/self->printfunc.delta)*self->printfunc.delta;
202 sprintf(oldname,"%s/%u_s%u%s.tmp",self->printfunc.path,self->printfunc.nexttime,x+1,self->printfunc.basename);
203 if ((self->printfunc.fa[x]=fopen(oldname,"w"))==0) {
204 gslog(LOG_EMERG,"ERROR:Could not open file in HFTA print function\n");
207 if (setvbuf(self->printfunc.fa[x],0,_IOFBF,16000000)!=0) {
208 gslog(LOG_EMERG,"ERROR:Could not setvbuf\n");
210 if (fwrite(self->printfunc.header,strlen(self->printfunc.header)+1,1,self->printfunc.fa[x])!=1) {
211 gslog(LOG_EMERG,"ERROR:fwrite:xfgh2:%s:%u",self->printfunc.basename,errno);
213 gslog(LOG_INFO,"Opened file %s",oldname);
216 self->printfunc.nexttime=self->printfunc.nexttime+self->printfunc.delta;
218 // don't write temporal tuples to file but use them to advance file name.
219 if (ftaschema_is_temporal_tuple(self->printfunc.schemaid, tuple)) return 0;
220 if (self->printfunc.split!=0) {
221 splitval=fta_unpack_uint(tuple,sz,self->printfunc.split_field,&problem)%(self->printfunc.split);
222 if (self->printfunc.fa[splitval]==0) {
223 gslog(LOG_EMERG,"Inconsistent rangehash in print %u\n", splitval);
230 if (fwrite(&nsz,sizeof(gs_uint32_t),1,self->printfunc.fa[splitval])!=1) {
231 gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\":%u.. EXITING\n",
232 self->printfunc.basename,errno);
235 if (fwrite(tuple,sz,1,self->printfunc.fa[splitval])!=1) {
236 gslog(LOG_EMERG,"Could not write to output in HFTA print\"%s\"%u.. EXITING\n",
237 self->printfunc.basename,errno);
243 /* registers an alloc function of an FTA and returns a unique index */
244 gs_retval_t ftacallback_add_alloc(FTAname name, alloc_fta fta_alloc_functionptr,gs_uint64_t prefilter)
247 gslog(LOG_INFO,"Register prefilter %llu for %s\n",prefilter,name);
249 if ((falloc = malloc(sizeof(struct ftacallbackalloc)*STARTSZ))==0) {
250 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
253 memset(falloc,0,sizeof(struct ftacallbackalloc)*STARTSZ);
256 for(x=0;(x<lalloc)&&(falloc[x].used!=0);x++);
260 if ((falloc = realloc(falloc,lalloc*sizeof(struct ftacallbackalloc)))==0) {
261 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
264 for (y=x;y<lalloc;y++)
267 falloc[x].name=strdup(name);
268 falloc[x].fta_alloc_functionptr=fta_alloc_functionptr;
269 falloc[x].prefilter=prefilter;
274 /* unregisters an alloc function of an FTA and makes the index available
278 gs_retval_t ftacallback_rm_alloc(gs_uint32_t index)
280 falloc[index].used=0;
281 free(falloc[index].name);
284 /* returns the prefilter for a given
288 gs_uint64_t ftacallback_get_prefilter(gs_int32_t index)
290 if ((index<lalloc) && (falloc[index].used!=0)) {
291 return falloc[index].prefilter;
296 /* returns the function pointer of the callback function for a given
300 alloc_fta ftacallback_get_alloc(gs_int32_t index)
302 if ((index<lalloc) && (falloc[index].used!=0)) {
303 return falloc[index].fta_alloc_functionptr;
312 /* associate ringbuffer with streamid (using refcounting) */
313 gs_retval_t ftacallback_add_streamid(struct ringbuf * r, gs_uint32_t streamid) {
315 if (lstreamid == 0) {
316 if ((fstreamid = malloc(sizeof(struct ftacallbackstreamid)*STARTSZ))==0) {
317 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
320 memset(fstreamid,0,sizeof(struct ftacallbackstreamid)*STARTSZ);
323 /* first try to increment refcnt */
324 for(x=0;(x<lstreamid)&&(
325 (fstreamid[x].streamid!=streamid)
326 ||(fstreamid[x].r!=r)
327 ||(fstreamid[x].refcnt<=0)) ;x++);
329 /* now try to find empty slot */
330 for(x=0;(x<lstreamid)&&(fstreamid[x].refcnt!=0);x++);
331 if (x >= lstreamid) {
333 lstreamid = 2*lstreamid;
335 realloc(fstreamid,sizeof(struct ftacallbackstreamid)*lstreamid))==0) {
336 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
339 for (y=x;y<lstreamid;y++) {
340 fstreamid[y].refcnt=0;
341 fstreamid[y].streamid=0;
344 fstreamid[x].state=HFTA_RINGBUF_ATOMIC;
345 fstreamid[x].streamid=streamid;
348 fstreamid[x].refcnt+=1;
353 /* unassosciate a ringbuffer from a streamid */
355 gs_retval_t ftacallback_rm_streamid(struct ringbuf * r, gs_uint32_t streamid)
358 for(x=0;x<lstreamid;x++) {
359 if ((fstreamid[x].streamid == streamid)
360 && (fstreamid[x].r == r)
361 && (fstreamid[x].refcnt > 0))
362 fstreamid[x].refcnt--;
367 /* set the state for a given streamid and destination process */
368 gs_retval_t ftacallback_state_streamid(gs_int32_t streamid,FTAID process, gs_int32_t state)
371 for(x=0;x<lstreamid;x++) {
372 if ((fstreamid[x].streamid == streamid)
373 && (fstreamid[x].r->destid.ip == process.ip )
374 && (fstreamid[x].r->destid.port == process.port )
375 && (fstreamid[x].refcnt > 0))
376 fstreamid[x].state=state;
382 /* starts an itteration through all ringbuffers for a particular streamid */
384 gs_retval_t ftacallback_start_streamid(gs_int32_t streamid)
391 /* returns all the ringbuffer associated with the streamid passed in
392 * ftacallback_start_streamid
394 struct ringbuf * ftacallback_next_streamid(gs_int32_t* state)
396 for(;(nstreamid<lstreamid)
397 &&(fstreamid[nstreamid].streamid != cstreamid);
399 if (nstreamid<lstreamid) {
401 *state=fstreamid[nstreamid-1].state;
402 return fstreamid[nstreamid-1].r;
408 /* associate msgid with ringbuf */
409 gs_retval_t ftacallback_add_wakeup(FTAID ftaid, struct ringbuf * r)
413 if ((fwakeup = malloc(sizeof(struct ftacallbackwakeup)*STARTSZ))==0) {
414 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
417 memset(fwakeup,0,sizeof(struct ftacallbackwakeup)*STARTSZ);
420 /* first try to find one for the same process */
421 for(x=0;(x<lwakeup)&&(
422 ((fwakeup[x].ftaid.ip!=ftaid.ip)
423 || (fwakeup[x].ftaid.port!=ftaid.port))
424 ||(fwakeup[x].used==0)) ;x++);
426 /* now try to find empty slot */
427 for(x=0;(x<lwakeup)&&(fwakeup[x].used!=0);x++);
432 realloc(fwakeup,sizeof(struct ftacallbackwakeup)*lwakeup))==0) {
433 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
436 for (y=x;y<lwakeup;y++)
441 fwakeup[x].ftaid=ftaid;
446 /* starts an itteration through all msgids associated with
447 a streamid. This also uses data kept in the fstreamid
450 gs_retval_t ftacallback_start_wakeup(gs_uint32_t streamid)
457 /* returns all the msgid blocked on the streamid passed in
458 * ftacallback_start_streamid and removes the msgid from
461 FTAID * ftacallback_next_wakeup()
463 for(;(nwakeup<lstreamid)
464 &&(fstreamid[nwakeup].streamid != swakeup);
466 if (nwakeup<lstreamid) {
468 for(x=0;x<lwakeup;x++)
469 if ((fwakeup[x].r==fstreamid[nwakeup].r) &&
470 (fwakeup[x].used==1)) {
473 return & fwakeup[x].ftaid;
482 static gs_retval_t fta_list_add(struct fta_list ** root, struct FTA * after,
484 struct fta_list * new;
485 struct fta_list * tmp;
487 if ((new=(struct fta_list *)malloc(sizeof(struct fta_list)))==0) {
488 gslog(LOG_EMERG,"fta_list_add:: can't allocate memory\n");
504 while((tmp)&&(tmp->fta!=after)) {
508 gslog(LOG_EMERG,"fta_list_add:: can't find after fta\n");
521 static gs_retval_t fta_list_rm(struct fta_list ** root, struct FTA * fta) {
522 struct fta_list * tmp;
524 while((tmp)&&(tmp->fta!=fta)) {
528 gslog(LOG_EMERG,"fta_list_rm:: can't find fta\n");
531 if (tmp == (*root)) {
537 tmp->prev->next=tmp->next;
539 tmp->next->prev=tmp->prev;
548 static gs_retval_t fta_list_check(struct fta_list ** root, struct FTA * fta) {
549 struct fta_list * tmp;
551 while((tmp)&&(tmp->fta!=fta)) {
560 gs_retval_t ftaexec_insert(struct FTA * after, struct FTA * new)
563 if ((after!=0) && (fta_list_check(&process,after)<0)) {
564 gslog(LOG_EMERG,"fta_insert:: ilegal adapter for after\n");
568 if (fta_list_check(&created,new)<0) {
569 gslog(LOG_EMERG,"fta_insert:: ilegal adapter for new\n");
573 if (fta_list_check(&process,new)==0) {
575 gslog(LOG_INFO,"fta_insert:: new already in process list reusing entry with streamid %d\n",new->ftaid.streamid);
579 if (fta_list_add(&process,after,new)<0) {
580 gslog(LOG_EMERG,"fta_insert:: new can not be added to process list\n");
590 gs_retval_t ftaexec_remove(struct FTA * id){
592 if (fta_list_check(&process,id)<0) {
593 gslog(LOG_EMERG,"fta_remove:: id not in process list\n");
597 if (id->runrefcnt<=0) {
598 if (fta_list_rm(&process,id)<0) {
599 gslog(LOG_EMERG,"fta_remove:: id could not be removed from process list\n");
607 struct FTA * ftaexec_alloc_instance(gs_uint32_t index, struct FTA * reuse,
608 gs_uint32_t reusable,
609 gs_int32_t command, gs_int32_t sz, void * data){
612 ftaid=gscpipc_getftaid();
615 if (fta_list_check(&created,reuse)==0) {
619 if (ftacallback_get_alloc(index)==0) return 0;
620 f=ftacallback_get_alloc(index)(ftaid,reusable,command, sz, data);
621 f->prefilter=ftacallback_get_prefilter(index);
622 gslog(LOG_INFO,"Using prefilter %llu for fta %x\n",f->prefilter,f);
623 if (fta_list_add(&created,0,f)<0) {
624 gslog(LOG_EMERG,"fta_alloc_instance:: new fta can not be added to created list\n");
633 gs_retval_t ftaexec_free_instance(struct FTA * id, gs_uint32_t recursive){
636 if (fta_list_rm(&created,id)<0) {
637 gslog(LOG_EMERG,"fta_free_instance:: fta could not be removed from created list\n");
640 /* just to make sure remove it form process list too */
641 if (fta_list_check(&process,id)>=0) {
642 fta_list_rm(&process,id);
645 id->free_fta(id,recursive);
650 gs_retval_t ftaexec_control(struct FTA * id, gs_int32_t command, gs_int32_t sz, void * value){
651 if (fta_list_check(&created,id)<0) {
652 gslog(LOG_EMERG,"fta_control:: id not found in adapter's created list\n");
655 return id->control_fta(id,command,sz,value);
658 gs_retval_t ftaexec_process_control(gs_int32_t command, gs_int32_t sz, void * value){
661 while((f=ftaexec_next())!=0) {
662 f->control_fta(f,command,sz,value);
668 /* Start itteration through list of active FTA */
670 gs_retval_t ftaexec_start()
676 /* get one FTA at a time */
678 struct FTA * ftaexec_next()
683 itteration=itteration->next;
689 /* adds a buffer to the end of the sidequeue*/
690 gs_retval_t sidequeue_append(FTAID from, gs_sp_t buf, gs_int32_t length)
693 if ((s=malloc(sizeof(struct sq)))==0) {
694 gslog(LOG_EMERG,"Could not allocate memory for sidequeue");
698 memcpy(&s->buf[0],buf,MAXMSGSZ);
711 /* removes a buffer from the top of the sidequeue*/
712 gs_retval_t sidequeue_pop(FTAID * from, gs_sp_t buf, gs_int32_t* length)
718 memcpy(buf,&sqtop->buf[0],MAXMSGSZ);
719 *length=sqtop->length;
722 if (sqtop==0) sqtail=0;