-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
- \r
- http://www.apache.org/licenses/LICENSE-2.0\r
- \r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-#include "lapp.h"\r
-#include <ipcencoding.h>\r
-#include <callbackregistries.h>\r
-#include "fta.h"\r
-#include "stdio.h"\r
-#include "stdlib.h"\r
-#include "rts.h"\r
-\r
-#define POLLING\r
-\r
-static struct ringbuf * ru=0;\r
-\r
-gs_retval_t print_error(gs_sp_t c) {\r
- gslog(LOG_EMERG,"%s",c);\r
- return 0;\r
-}\r
-\r
-void *fta_alloc(struct FTA * owner, gs_int32_t size)\r
-{\r
- gs_uint8_t * c;\r
- gs_uint32_t x;\r
- if ((c=(gs_uint8_t *)malloc(size))==0) return 0;\r
- /* touch all memory once to map/reserve it now */\r
- for(x=0;x<size;x=x+1024) {\r
- c[x]=0;\r
- }\r
- \r
- return (void *) c;\r
-}\r
-\r
-void fta_free(struct FTA * owner , void * mem) {\r
- free(mem);\r
-}\r
-\r
-\r
-void fta_free_all(struct FTA * owner) {\r
- gslog(LOG_ERR,"fta_free_all not available ");\r
-}\r
-\r
-/* It is assumed that there is no write activity on all ringbuffers between an alloccate_tuple and\r
- * a post_tuple. If there is any the result is unpredictable\r
- */\r
-\r
-void * allocate_tuple(struct FTA * owner, gs_int32_t size)\r
-{\r
- gs_int32_t state;\r
- if (ru!=0) {\r
- gslog(LOG_ALERT,"Can't allocate multiple tuples at the same time before posting them");\r
- return 0;\r
- }\r
- \r
- if (size>MAXTUPLESZ) {\r
- gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ);\r
- ru=0;\r
- return 0;\r
- }\r
- \r
- if (ftacallback_start_streamid(owner->ftaid.streamid)<0) {\r
- gslog(LOG_ALERT,"Post for unkown streamid\n");\r
- ru=0;\r
- return 0;\r
- }\r
- \r
- /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might\r
- not find any memory*/\r
- while ((ru=ftacallback_next_streamid(&state))!=0) {\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]"\r
- "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer,\r
- ru->length);\r
- fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru));\r
-#endif\r
- if (state != LFTA_RINGBUF_SUSPEND) {\r
-#ifdef BLOCKRINGBUFFER\r
- if (state == LFTA_RINGBUF_ATOMIC) {\r
- while (!SPACETOWRITE(ru)) {\r
- usleep(100);\r
- }\r
- }\r
-#endif\r
- if (SPACETOWRITE(ru)) {\r
- CURWRITE(ru)->f=owner->ftaid;\r
- CURWRITE(ru)->sz=size;\r
- return &(CURWRITE(ru)->data[0]);\r
- } else {\r
- shared_memory_full_warning++;\r
- }\r
- }\r
- }\r
- ru=0;\r
- outtupledrop++;\r
- \r
- return 0;\r
-}\r
-\r
-void free_tuple(void * data) {\r
- ru=0;\r
-}\r
-\r
-gs_retval_t post_tuple(void * tuple) {\r
- struct ringbuf * r;\r
- FTAID * ftaidp;\r
- gs_uint32_t stream_id;\r
- struct wakeup_result a;\r
- gs_int32_t state;\r
- \r
- if (ru==0) {\r
- gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n");\r
- return -1;\r
- }\r
- \r
- \r
- if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) {\r
- gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated"\r
- "immediatly before\n");\r
- ru=0;\r
- return -1;\r
- }\r
- \r
- stream_id=CURWRITE(ru)->f.streamid;\r
- \r
- if (ftacallback_start_streamid(stream_id)<0) {\r
- gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n");\r
- ru=0;\r
- return -1;\r
- }\r
- /* now make sure we have space to write in all atomic ringbuffer */\r
- while((r=ftacallback_next_streamid(&state))!=0) {\r
- if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) {\r
-#ifdef BLOCKRINGBUFFER\r
- while (!SPACETOWRITE(r)) {\r
- usleep(10000);\r
- }\r
-#endif\r
- if (! SPACETOWRITE(r)) {\r
- /* atomic ring buffer and no space so post nothing */\r
- outtupledrop++;\r
- ru=0;\r
- return -1;\r
- }\r
- }\r
- }\r
- \r
- if (ftacallback_start_streamid(stream_id)<0) {\r
- gslog(LOG_ALERT,"Post for unkown streamid\n");\r
- ru=0;\r
- return -1;\r
- }\r
- \r
- while((r=ftacallback_next_streamid(&state))!=0) {\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n",\r
- r);\r
-#endif\r
- /* try to post in all none suspended ringbuffer for atomic once\r
- * we know we will succeed\r
- */\r
- if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) {\r
- if (SPACETOWRITE(r)) {\r
- CURWRITE(r)->f=CURWRITE(ru)->f;\r
- CURWRITE(r)->sz=CURWRITE(ru)->sz;\r
- memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]),\r
- CURWRITE(ru)->sz);\r
- outtuple++;\r
- outbytes=outbytes+CURWRITE(ru)->sz;\r
- ADVANCEWRITE(r);\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]"\r
- "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,\r
- r->length);\r
- fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next,\r
- CURREAD(r)->f.streamid,CURREAD(r)->sz);\r
-#endif\r
- } else {\r
- outtupledrop++;\r
- }\r
- }\r
- if (HOWFULL(r) > 500) {\r
- // buffer is at least half full\r
- shared_memory_full_warning++;\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"\t\t buffer full\n");\r
-#endif\r
- }\r
- }\r
- \r
- if (HOWFULL(ru) > 500) {\r
- // buffer is at least half full\r
- shared_memory_full_warning++;\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"\t\t buffer full\n");\r
-#endif\r
- }\r
- outtuple++;\r
- outbytes=outbytes+CURWRITE(ru)->sz;\r
- ADVANCEWRITE(ru);\r
- ru=0;\r
-#ifndef POLLING\r
- if (ftacallback_start_wakeup(stream_id)<0) {\r
- gslog(LOG_ALERT,"Wakeup for unkown streamid\n");\r
- return -1;\r
- }\r
- a.h.callid=WAKEUP;\r
- a.h.size=sizeof(struct wakeup_result);\r
- while((ftaidp=ftacallback_next_wakeup())!=0) {\r
- if (send_wakeup(*ftaidp)<0) {\r
- gslog(LOG_ALERT,"Could not send wakeup\n");\r
- return -1;\r
- }\r
- }\r
-#endif\r
- return 0;\r
-}\r
-\r
-gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz)\r
-{\r
- gs_int32_t x=0;\r
- gs_int32_t state;\r
- struct ringbuf * ru;\r
- if (ftacallback_start_streamid(f->ftaid.streamid)<0) {\r
- gslog(LOG_ALERT,"Space check for unkown streamid\n");\r
- return -1;\r
- }\r
- \r
- while ((ru=ftacallback_next_streamid(&state))!=0) {\r
- if (szr > x ) {\r
- r[x]=ru->destid;\r
- space[x]=TUPLEFIT(ru,tuplesz);\r
- }\r
- x++;\r
- }\r
- return x;\r
-}\r
-\r
-gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state)\r
-{\r
- \r
- if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) {\r
- gslog(LOG_ALERT,"state change for unkown streamid\n");\r
- return -1;\r
- }\r
- return 0;\r
-}\r
-\r
-\r
-gs_retval_t tpost_ready() {\r
- return 1;\r
-}\r
-\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+#include "gsconfig.h"
+#include "gstypes.h"
+#include "lapp.h"
+#include <ipcencoding.h>
+#include <callbackregistries.h>
+#include "fta.h"
+#include "stdio.h"
+#include "stdlib.h"
+#include "rts.h"
+
+#define POLLING
+
+static struct ringbuf * ru=0;
+
+gs_retval_t print_error(gs_sp_t c) {
+ gslog(LOG_EMERG,"%s",c);
+ return 0;
+}
+
+void *fta_alloc(struct FTA * owner, gs_int32_t size)
+{
+ gs_uint8_t * c;
+ gs_uint32_t x;
+ if ((c=(gs_uint8_t *)malloc(size))==0) return 0;
+ /* touch all memory once to map/reserve it now */
+ for(x=0;x<size;x=x+1024) {
+ c[x]=0;
+ }
+
+ return (void *) c;
+}
+
+void fta_free(struct FTA * owner , void * mem) {
+ free(mem);
+}
+
+
+void fta_free_all(struct FTA * owner) {
+ gslog(LOG_ERR,"fta_free_all not available ");
+}
+
+/* It is assumed that there is no write activity on all ringbuffers between an alloccate_tuple and
+ * a post_tuple. If there is any the result is unpredictable
+ */
+
+void * allocate_tuple(struct FTA * owner, gs_int32_t size)
+{
+ gs_int32_t state;
+ if (ru!=0) {
+ gslog(LOG_ALERT,"Can't allocate multiple tuples at the same time before posting them");
+ return 0;
+ }
+
+ if (size>MAXTUPLESZ) {
+ gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ);
+ ru=0;
+ return 0;
+ }
+
+ if (ftacallback_start_streamid(owner->ftaid.streamid)<0) {
+ gslog(LOG_ALERT,"Post for unkown streamid\n");
+ ru=0;
+ return 0;
+ }
+
+ /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might
+ not find any memory*/
+ while ((ru=ftacallback_next_streamid(&state))!=0) {
+#ifdef PRINTMSG
+ fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]"
+ "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer,
+ ru->length);
+ fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru));
+#endif
+ if (state != LFTA_RINGBUF_SUSPEND) {
+#ifdef BLOCKRINGBUFFER
+ if (state == LFTA_RINGBUF_ATOMIC) {
+ while (!SPACETOWRITE(ru)) {
+ usleep(100);
+ }
+ }
+#endif
+ if (SPACETOWRITE(ru)) {
+ CURWRITE(ru)->f=owner->ftaid;
+ CURWRITE(ru)->sz=size;
+ return &(CURWRITE(ru)->data[0]);
+ } else {
+ shared_memory_full_warning++;
+ }
+ }
+ }
+ ru=0;
+ outtupledrop++;
+
+ return 0;
+}
+
+void free_tuple(void * data) {
+ ru=0;
+}
+
+gs_retval_t post_tuple(void * tuple) {
+ struct ringbuf * r;
+ FTAID * ftaidp;
+ gs_uint32_t stream_id;
+ struct wakeup_result a;
+ gs_int32_t state;
+
+ if (ru==0) {
+ gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n");
+ return -1;
+ }
+
+
+ if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) {
+ gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated"
+ "immediatly before\n");
+ ru=0;
+ return -1;
+ }
+
+ stream_id=CURWRITE(ru)->f.streamid;
+
+ if (ftacallback_start_streamid(stream_id)<0) {
+ gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n");
+ ru=0;
+ return -1;
+ }
+ /* now make sure we have space to write in all atomic ringbuffer */
+ while((r=ftacallback_next_streamid(&state))!=0) {
+ if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) {
+#ifdef BLOCKRINGBUFFER
+ while (!SPACETOWRITE(r)) {
+ usleep(10000);
+ }
+#endif
+ if (! SPACETOWRITE(r)) {
+ /* atomic ring buffer and no space so post nothing */
+ outtupledrop++;
+ ru=0;
+ return -1;
+ }
+ }
+ }
+
+ if (ftacallback_start_streamid(stream_id)<0) {
+ gslog(LOG_ALERT,"Post for unkown streamid\n");
+ ru=0;
+ return -1;
+ }
+
+ while((r=ftacallback_next_streamid(&state))!=0) {
+#ifdef PRINTMSG
+ fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n",
+ r);
+#endif
+ /* try to post in all none suspended ringbuffer for atomic once
+ * we know we will succeed
+ */
+ if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) {
+ if (SPACETOWRITE(r)) {
+ CURWRITE(r)->f=CURWRITE(ru)->f;
+ CURWRITE(r)->sz=CURWRITE(ru)->sz;
+ memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]),
+ CURWRITE(ru)->sz);
+ outtuple++;
+ outbytes=outbytes+CURWRITE(ru)->sz;
+ ADVANCEWRITE(r);
+#ifdef PRINTMSG
+ fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]"
+ "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
+ r->length);
+ fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next,
+ CURREAD(r)->f.streamid,CURREAD(r)->sz);
+#endif
+ } else {
+ outtupledrop++;
+ }
+ }
+ if (HOWFULL(r) > 500) {
+ // buffer is at least half full
+ shared_memory_full_warning++;
+#ifdef PRINTMSG
+ fprintf(stderr,"\t\t buffer full\n");
+#endif
+ }
+ }
+
+ if (HOWFULL(ru) > 500) {
+ // buffer is at least half full
+ shared_memory_full_warning++;
+#ifdef PRINTMSG
+ fprintf(stderr,"\t\t buffer full\n");
+#endif
+ }
+ outtuple++;
+ outbytes=outbytes+CURWRITE(ru)->sz;
+ ADVANCEWRITE(ru);
+ ru=0;
+#ifndef POLLING
+ if (ftacallback_start_wakeup(stream_id)<0) {
+ gslog(LOG_ALERT,"Wakeup for unkown streamid\n");
+ return -1;
+ }
+ a.h.callid=WAKEUP;
+ a.h.size=sizeof(struct wakeup_result);
+ while((ftaidp=ftacallback_next_wakeup())!=0) {
+ if (send_wakeup(*ftaidp)<0) {
+ gslog(LOG_ALERT,"Could not send wakeup\n");
+ return -1;
+ }
+ }
+#endif
+ return 0;
+}
+
+gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz)
+{
+ gs_int32_t x=0;
+ gs_int32_t state;
+ struct ringbuf * ru;
+ if (ftacallback_start_streamid(f->ftaid.streamid)<0) {
+ gslog(LOG_ALERT,"Space check for unkown streamid\n");
+ return -1;
+ }
+
+ while ((ru=ftacallback_next_streamid(&state))!=0) {
+ if (szr > x ) {
+ r[x]=ru->destid;
+ space[x]=TUPLEFIT(ru,tuplesz);
+ }
+ x++;
+ }
+ return x;
+}
+
+gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state)
+{
+
+ if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) {
+ gslog(LOG_ALERT,"state change for unkown streamid\n");
+ return -1;
+ }
+ return 0;
+}
+
+
+gs_retval_t tpost_ready() {
+ return 1;
+}
+