X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Flib%2Fgscprts%2Frts_env.c;h=b674ae81c4b2957126d70cd74359ec4b3a240825;hb=HEAD;hp=f84ac7f67c2d3391b324fa60ae5ac517935e2b3d;hpb=07495effe193ca3f73c3bf0ce417068f9ac9dcdd;p=com%2Fgs-lite.git diff --git a/src/lib/gscprts/rts_env.c b/src/lib/gscprts/rts_env.c index f84ac7f..b674ae8 100644 --- a/src/lib/gscprts/rts_env.c +++ b/src/lib/gscprts/rts_env.c @@ -1,265 +1,265 @@ -/* ------------------------------------------------ - Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ -#include "gsconfig.h" -#include "gstypes.h" -#include "lapp.h" -#include -#include -#include "fta.h" -#include "stdio.h" -#include "stdlib.h" -#include "rts.h" - -#define POLLING - -static struct ringbuf * ru=0; - -gs_retval_t print_error(gs_sp_t c) { - gslog(LOG_EMERG,"%s",c); - return 0; -} - -void *fta_alloc(struct FTA * owner, gs_int32_t size) -{ - gs_uint8_t * c; - gs_uint32_t x; - if ((c=(gs_uint8_t *)malloc(size))==0) return 0; - /* touch all memory once to map/reserve it now */ - for(x=0;xMAXTUPLESZ) { - gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ); - ru=0; - return 0; - } - - if (ftacallback_start_streamid(owner->ftaid.streamid)<0) { - gslog(LOG_ALERT,"Post for unkown streamid\n"); - ru=0; - return 0; - } - - /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might - not find any memory*/ - while ((ru=ftacallback_next_streamid(&state))!=0) { -#ifdef PRINTMSG - fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]" - "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer, - ru->length); - fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru)); -#endif - if (state != LFTA_RINGBUF_SUSPEND) { -#ifdef BLOCKRINGBUFFER - if (state == LFTA_RINGBUF_ATOMIC) { - while (!SPACETOWRITE(ru)) { - usleep(100); - } - } -#endif - if (SPACETOWRITE(ru)) { - CURWRITE(ru)->f=owner->ftaid; - CURWRITE(ru)->sz=size; - return &(CURWRITE(ru)->data[0]); - } else { - shared_memory_full_warning++; - } - } - } - ru=0; - outtupledrop++; - - return 0; -} - -void free_tuple(void * data) { - ru=0; -} - -gs_retval_t post_tuple(void * tuple) { - struct ringbuf * r; - FTAID * ftaidp; - gs_uint32_t stream_id; - struct wakeup_result a; - gs_int32_t state; - - if (ru==0) { - gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n"); - return -1; - } - - - if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) { - gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated" - "immediatly before\n"); - ru=0; - return -1; - } - - stream_id=CURWRITE(ru)->f.streamid; - - if (ftacallback_start_streamid(stream_id)<0) { - gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n"); - ru=0; - return -1; - } - /* now make sure we have space to write in all atomic ringbuffer */ - while((r=ftacallback_next_streamid(&state))!=0) { - if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) { -#ifdef BLOCKRINGBUFFER - while (!SPACETOWRITE(r)) { - usleep(10000); - } -#endif - if (! SPACETOWRITE(r)) { - /* atomic ring buffer and no space so post nothing */ - outtupledrop++; - ru=0; - return -1; - } - } - } - - if (ftacallback_start_streamid(stream_id)<0) { - gslog(LOG_ALERT,"Post for unkown streamid\n"); - ru=0; - return -1; - } - - while((r=ftacallback_next_streamid(&state))!=0) { -#ifdef PRINTMSG - fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n", - r); -#endif - /* try to post in all none suspended ringbuffer for atomic once - * we know we will succeed - */ - if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) { - if (SPACETOWRITE(r)) { - CURWRITE(r)->f=CURWRITE(ru)->f; - CURWRITE(r)->sz=CURWRITE(ru)->sz; - memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]), - CURWRITE(ru)->sz); - outtuple++; - outbytes=outbytes+CURWRITE(ru)->sz; - ADVANCEWRITE(r); -#ifdef PRINTMSG - fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]" - "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer, - r->length); - fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next, - CURREAD(r)->f.streamid,CURREAD(r)->sz); -#endif - } else { - outtupledrop++; - } - } - if (HOWFULL(r) > 500) { - // buffer is at least half full - shared_memory_full_warning++; -#ifdef PRINTMSG - fprintf(stderr,"\t\t buffer full\n"); -#endif - } - } - - if (HOWFULL(ru) > 500) { - // buffer is at least half full - shared_memory_full_warning++; -#ifdef PRINTMSG - fprintf(stderr,"\t\t buffer full\n"); -#endif - } - outtuple++; - outbytes=outbytes+CURWRITE(ru)->sz; - ADVANCEWRITE(ru); - ru=0; -#ifndef POLLING - if (ftacallback_start_wakeup(stream_id)<0) { - gslog(LOG_ALERT,"Wakeup for unkown streamid\n"); - return -1; - } - a.h.callid=WAKEUP; - a.h.size=sizeof(struct wakeup_result); - while((ftaidp=ftacallback_next_wakeup())!=0) { - if (send_wakeup(*ftaidp)<0) { - gslog(LOG_ALERT,"Could not send wakeup\n"); - return -1; - } - } -#endif - return 0; -} - -gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz) -{ - gs_int32_t x=0; - gs_int32_t state; - struct ringbuf * ru; - if (ftacallback_start_streamid(f->ftaid.streamid)<0) { - gslog(LOG_ALERT,"Space check for unkown streamid\n"); - return -1; - } - - while ((ru=ftacallback_next_streamid(&state))!=0) { - if (szr > x ) { - r[x]=ru->destid; - space[x]=TUPLEFIT(ru,tuplesz); - } - x++; - } - return x; -} - -gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state) -{ - - if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) { - gslog(LOG_ALERT,"state change for unkown streamid\n"); - return -1; - } - return 0; -} - - -gs_retval_t tpost_ready() { - return 1; -} - +/* ------------------------------------------------ + Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ +#include "gsconfig.h" +#include "gstypes.h" +#include "lapp.h" +#include +#include +#include "fta.h" +#include "stdio.h" +#include "stdlib.h" +#include "rts.h" + +#define POLLING + +static struct ringbuf * ru=0; + +gs_retval_t print_error(gs_sp_t c) { + gslog(LOG_EMERG,"%s",c); + return 0; +} + +void *fta_alloc(struct FTA * owner, gs_int32_t size) +{ + gs_uint8_t * c; + gs_uint32_t x; + if ((c=(gs_uint8_t *)malloc(size))==0) return 0; + /* touch all memory once to map/reserve it now */ + for(x=0;xMAXTUPLESZ) { + gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ); + ru=0; + return 0; + } + + if (ftacallback_start_streamid(owner->ftaid.streamid)<0) { + gslog(LOG_ALERT,"Post for unkown streamid\n"); + ru=0; + return 0; + } + + /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might + not find any memory*/ + while ((ru=ftacallback_next_streamid(&state))!=0) { +#ifdef PRINTMSG + fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]" + "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer, + ru->length); + fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru)); +#endif + if (state != LFTA_RINGBUF_SUSPEND) { +#ifdef BLOCKRINGBUFFER + if (state == LFTA_RINGBUF_ATOMIC) { + while (!SPACETOWRITE(ru)) { + usleep(100); + } + } +#endif + if (SPACETOWRITE(ru)) { + CURWRITE(ru)->f=owner->ftaid; + CURWRITE(ru)->sz=size; + return &(CURWRITE(ru)->data[0]); + } else { + shared_memory_full_warning++; + } + } + } + ru=0; + outtupledrop++; + + return 0; +} + +void free_tuple(void * data) { + ru=0; +} + +gs_retval_t post_tuple(void * tuple) { + struct ringbuf * r; + FTAID * ftaidp; + gs_uint32_t stream_id; + struct wakeup_result a; + gs_int32_t state; + + if (ru==0) { + gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n"); + return -1; + } + + + if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) { + gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated" + "immediatly before\n"); + ru=0; + return -1; + } + + stream_id=CURWRITE(ru)->f.streamid; + + if (ftacallback_start_streamid(stream_id)<0) { + gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n"); + ru=0; + return -1; + } + /* now make sure we have space to write in all atomic ringbuffer */ + while((r=ftacallback_next_streamid(&state))!=0) { + if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) { +#ifdef BLOCKRINGBUFFER + while (!SPACETOWRITE(r)) { + usleep(10000); + } +#endif + if (! SPACETOWRITE(r)) { + /* atomic ring buffer and no space so post nothing */ + outtupledrop++; + ru=0; + return -1; + } + } + } + + if (ftacallback_start_streamid(stream_id)<0) { + gslog(LOG_ALERT,"Post for unkown streamid\n"); + ru=0; + return -1; + } + + while((r=ftacallback_next_streamid(&state))!=0) { +#ifdef PRINTMSG + fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n", + r); +#endif + /* try to post in all none suspended ringbuffer for atomic once + * we know we will succeed + */ + if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) { + if (SPACETOWRITE(r)) { + CURWRITE(r)->f=CURWRITE(ru)->f; + CURWRITE(r)->sz=CURWRITE(ru)->sz; + memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]), + CURWRITE(ru)->sz); + outtuple++; + outbytes=outbytes+CURWRITE(ru)->sz; + ADVANCEWRITE(r); +#ifdef PRINTMSG + fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]" + "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer, + r->length); + fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next, + CURREAD(r)->f.streamid,CURREAD(r)->sz); +#endif + } else { + outtupledrop++; + } + } + if (HOWFULL(r) > 500) { + // buffer is at least half full + shared_memory_full_warning++; +#ifdef PRINTMSG + fprintf(stderr,"\t\t buffer full\n"); +#endif + } + } + + if (HOWFULL(ru) > 500) { + // buffer is at least half full + shared_memory_full_warning++; +#ifdef PRINTMSG + fprintf(stderr,"\t\t buffer full\n"); +#endif + } + outtuple++; + outbytes=outbytes+CURWRITE(ru)->sz; + ADVANCEWRITE(ru); + ru=0; +#ifndef POLLING + if (ftacallback_start_wakeup(stream_id)<0) { + gslog(LOG_ALERT,"Wakeup for unkown streamid\n"); + return -1; + } + a.h.callid=WAKEUP; + a.h.size=sizeof(struct wakeup_result); + while((ftaidp=ftacallback_next_wakeup())!=0) { + if (send_wakeup(*ftaidp)<0) { + gslog(LOG_ALERT,"Could not send wakeup\n"); + return -1; + } + } +#endif + return 0; +} + +gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz) +{ + gs_int32_t x=0; + gs_int32_t state; + struct ringbuf * ru; + if (ftacallback_start_streamid(f->ftaid.streamid)<0) { + gslog(LOG_ALERT,"Space check for unkown streamid\n"); + return -1; + } + + while ((ru=ftacallback_next_streamid(&state))!=0) { + if (szr > x ) { + r[x]=ru->destid; + space[x]=TUPLEFIT(ru,tuplesz); + } + x++; + } + return x; +} + +gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state) +{ + + if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) { + gslog(LOG_ALERT,"state change for unkown streamid\n"); + return -1; + } + return 0; +} + + +gs_retval_t tpost_ready() { + return 1; +} +