Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscprts / rts_env.c
index f84ac7f..b674ae8 100644 (file)
-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
\r
- http://www.apache.org/licenses/LICENSE-2.0\r
\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-#include "lapp.h"\r
-#include <ipcencoding.h>\r
-#include <callbackregistries.h>\r
-#include "fta.h"\r
-#include "stdio.h"\r
-#include "stdlib.h"\r
-#include "rts.h"\r
-\r
-#define  POLLING\r
-\r
-static struct ringbuf * ru=0;\r
-\r
-gs_retval_t print_error(gs_sp_t c) {\r
-    gslog(LOG_EMERG,"%s",c);\r
-    return 0;\r
-}\r
-\r
-void *fta_alloc(struct FTA * owner, gs_int32_t size)\r
-{\r
-    gs_uint8_t * c;\r
-    gs_uint32_t x;\r
-    if ((c=(gs_uint8_t *)malloc(size))==0) return 0;\r
-    /* touch all memory once to map/reserve it now */\r
-    for(x=0;x<size;x=x+1024) {\r
-        c[x]=0;\r
-    }\r
-    \r
-    return (void *) c;\r
-}\r
-\r
-void fta_free(struct FTA * owner , void * mem) {\r
-    free(mem);\r
-}\r
-\r
-\r
-void fta_free_all(struct FTA * owner) {\r
-    gslog(LOG_ERR,"fta_free_all not available ");\r
-}\r
-\r
-/* It is assumed that there is no write activity on all ringbuffers between an alloccate_tuple and\r
- * a post_tuple. If there is any the result is unpredictable\r
- */\r
-\r
-void * allocate_tuple(struct FTA * owner, gs_int32_t size)\r
-{\r
-    gs_int32_t state;\r
-    if (ru!=0) {\r
-        gslog(LOG_ALERT,"Can't allocate multiple tuples at the same time before posting them");\r
-        return 0;\r
-    }\r
-    \r
-    if (size>MAXTUPLESZ) {\r
-        gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ);\r
-        ru=0;\r
-        return 0;\r
-    }\r
-    \r
-    if (ftacallback_start_streamid(owner->ftaid.streamid)<0) {\r
-        gslog(LOG_ALERT,"Post for unkown streamid\n");\r
-        ru=0;\r
-        return 0;\r
-    }\r
-    \r
-    /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might\r
-     not find any memory*/\r
-    while ((ru=ftacallback_next_streamid(&state))!=0) {\r
-#ifdef PRINTMSG\r
-        fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]"\r
-                "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer,\r
-                ru->length);\r
-        fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru));\r
-#endif\r
-        if (state != LFTA_RINGBUF_SUSPEND) {\r
-#ifdef BLOCKRINGBUFFER\r
-            if (state == LFTA_RINGBUF_ATOMIC) {\r
-                while (!SPACETOWRITE(ru)) {\r
-                    usleep(100);\r
-                }\r
-            }\r
-#endif\r
-            if (SPACETOWRITE(ru)) {\r
-                CURWRITE(ru)->f=owner->ftaid;\r
-                CURWRITE(ru)->sz=size;\r
-                return &(CURWRITE(ru)->data[0]);\r
-            } else {\r
-                shared_memory_full_warning++;\r
-            }\r
-        }\r
-    }\r
-    ru=0;\r
-    outtupledrop++;\r
-       \r
-    return 0;\r
-}\r
-\r
-void free_tuple(void * data) {\r
-    ru=0;\r
-}\r
-\r
-gs_retval_t post_tuple(void * tuple) {\r
-    struct ringbuf * r;\r
-    FTAID * ftaidp;\r
-    gs_uint32_t stream_id;\r
-    struct wakeup_result a;\r
-    gs_int32_t state;\r
-    \r
-    if (ru==0) {\r
-        gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n");\r
-        return -1;\r
-    }\r
-    \r
-    \r
-    if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) {\r
-        gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated"\r
-              "immediatly before\n");\r
-        ru=0;\r
-        return -1;\r
-    }\r
-    \r
-    stream_id=CURWRITE(ru)->f.streamid;\r
-    \r
-    if (ftacallback_start_streamid(stream_id)<0) {\r
-        gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n");\r
-        ru=0;\r
-        return -1;\r
-    }\r
-    /* now make sure we have space to write in all atomic ringbuffer */\r
-    while((r=ftacallback_next_streamid(&state))!=0) {\r
-        if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) {\r
-#ifdef BLOCKRINGBUFFER\r
-            while (!SPACETOWRITE(r)) {\r
-                usleep(10000);\r
-            }\r
-#endif\r
-            if (! SPACETOWRITE(r)) {\r
-                /* atomic ring buffer and no space so post nothing */\r
-                               outtupledrop++;\r
-                               ru=0;\r
-                               return -1;\r
-            }\r
-        }\r
-    }\r
-    \r
-    if (ftacallback_start_streamid(stream_id)<0) {\r
-        gslog(LOG_ALERT,"Post for unkown streamid\n");\r
-        ru=0;\r
-        return -1;\r
-    }\r
-    \r
-    while((r=ftacallback_next_streamid(&state))!=0) {\r
-#ifdef PRINTMSG\r
-        fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n",\r
-                r);\r
-#endif\r
-        /* try to post in all none suspended ringbuffer for atomic once\r
-         * we know we will succeed\r
-         */\r
-        if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) {\r
-            if (SPACETOWRITE(r)) {\r
-                CURWRITE(r)->f=CURWRITE(ru)->f;\r
-                CURWRITE(r)->sz=CURWRITE(ru)->sz;\r
-                memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]),\r
-                       CURWRITE(ru)->sz);\r
-                outtuple++;\r
-                outbytes=outbytes+CURWRITE(ru)->sz;\r
-                ADVANCEWRITE(r);\r
-#ifdef PRINTMSG\r
-                fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]"\r
-                        "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,\r
-                        r->length);\r
-                fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next,\r
-                        CURREAD(r)->f.streamid,CURREAD(r)->sz);\r
-#endif\r
-            } else {\r
-                outtupledrop++;\r
-            }\r
-        }\r
-        if (HOWFULL(r) > 500) {\r
-            // buffer is at least half full\r
-            shared_memory_full_warning++;\r
-#ifdef PRINTMSG\r
-            fprintf(stderr,"\t\t buffer full\n");\r
-#endif\r
-        }\r
-    }\r
-    \r
-    if (HOWFULL(ru) > 500) {\r
-        // buffer is at least half full\r
-        shared_memory_full_warning++;\r
-#ifdef PRINTMSG\r
-        fprintf(stderr,"\t\t buffer full\n");\r
-#endif\r
-    }\r
-    outtuple++;\r
-    outbytes=outbytes+CURWRITE(ru)->sz;\r
-    ADVANCEWRITE(ru);\r
-    ru=0;\r
-#ifndef POLLING\r
-    if (ftacallback_start_wakeup(stream_id)<0) {\r
-        gslog(LOG_ALERT,"Wakeup for unkown streamid\n");\r
-        return -1;\r
-    }\r
-    a.h.callid=WAKEUP;\r
-    a.h.size=sizeof(struct wakeup_result);\r
-    while((ftaidp=ftacallback_next_wakeup())!=0) {\r
-        if (send_wakeup(*ftaidp)<0) {\r
-            gslog(LOG_ALERT,"Could not send wakeup\n");\r
-            return -1;\r
-        }\r
-    }\r
-#endif\r
-    return 0;\r
-}\r
-\r
-gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz)\r
-{\r
-    gs_int32_t x=0;\r
-    gs_int32_t state;\r
-    struct ringbuf * ru;\r
-    if (ftacallback_start_streamid(f->ftaid.streamid)<0) {\r
-        gslog(LOG_ALERT,"Space check for unkown streamid\n");\r
-        return -1;\r
-    }\r
-    \r
-    while ((ru=ftacallback_next_streamid(&state))!=0) {\r
-        if (szr > x ) {\r
-            r[x]=ru->destid;\r
-            space[x]=TUPLEFIT(ru,tuplesz);\r
-        }\r
-        x++;\r
-    }\r
-    return x;\r
-}\r
-\r
-gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state)\r
-{\r
-    \r
-    if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) {\r
-               gslog(LOG_ALERT,"state change for unkown streamid\n");\r
-               return -1;\r
-    }\r
-    return 0;\r
-}\r
-\r
-\r
-gs_retval_t tpost_ready() {\r
-    return 1;\r
-}\r
-\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+#include "gsconfig.h"
+#include "gstypes.h"
+#include "lapp.h"
+#include <ipcencoding.h>
+#include <callbackregistries.h>
+#include "fta.h"
+#include "stdio.h"
+#include "stdlib.h"
+#include "rts.h"
+
+#define  POLLING
+
+static struct ringbuf * ru=0;
+
+gs_retval_t print_error(gs_sp_t c) {
+    gslog(LOG_EMERG,"%s",c);
+    return 0;
+}
+
+void *fta_alloc(struct FTA * owner, gs_int32_t size)
+{
+    gs_uint8_t * c;
+    gs_uint32_t x;
+    if ((c=(gs_uint8_t *)malloc(size))==0) return 0;
+    /* touch all memory once to map/reserve it now */
+    for(x=0;x<size;x=x+1024) {
+        c[x]=0;
+    }
+    
+    return (void *) c;
+}
+
+void fta_free(struct FTA * owner , void * mem) {
+    free(mem);
+}
+
+
+void fta_free_all(struct FTA * owner) {
+    gslog(LOG_ERR,"fta_free_all not available ");
+}
+
+/* It is assumed that there is no write activity on all ringbuffers between an alloccate_tuple and
+ * a post_tuple. If there is any the result is unpredictable
+ */
+
+void * allocate_tuple(struct FTA * owner, gs_int32_t size)
+{
+    gs_int32_t state;
+    if (ru!=0) {
+        gslog(LOG_ALERT,"Can't allocate multiple tuples at the same time before posting them");
+        return 0;
+    }
+    
+    if (size>MAXTUPLESZ) {
+        gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ);
+        ru=0;
+        return 0;
+    }
+    
+    if (ftacallback_start_streamid(owner->ftaid.streamid)<0) {
+        gslog(LOG_ALERT,"Post for unkown streamid\n");
+        ru=0;
+        return 0;
+    }
+    
+    /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might
+     not find any memory*/
+    while ((ru=ftacallback_next_streamid(&state))!=0) {
+#ifdef PRINTMSG
+        fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]"
+                "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer,
+                ru->length);
+        fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru));
+#endif
+        if (state != LFTA_RINGBUF_SUSPEND) {
+#ifdef BLOCKRINGBUFFER
+            if (state == LFTA_RINGBUF_ATOMIC) {
+                while (!SPACETOWRITE(ru)) {
+                    usleep(100);
+                }
+            }
+#endif
+            if (SPACETOWRITE(ru)) {
+                CURWRITE(ru)->f=owner->ftaid;
+                CURWRITE(ru)->sz=size;
+                return &(CURWRITE(ru)->data[0]);
+            } else {
+                shared_memory_full_warning++;
+            }
+        }
+    }
+    ru=0;
+    outtupledrop++;
+       
+    return 0;
+}
+
+void free_tuple(void * data) {
+    ru=0;
+}
+
+gs_retval_t post_tuple(void * tuple) {
+    struct ringbuf * r;
+    FTAID * ftaidp;
+    gs_uint32_t stream_id;
+    struct wakeup_result a;
+    gs_int32_t state;
+    
+    if (ru==0) {
+        gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n");
+        return -1;
+    }
+    
+    
+    if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) {
+        gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated"
+              "immediatly before\n");
+        ru=0;
+        return -1;
+    }
+    
+    stream_id=CURWRITE(ru)->f.streamid;
+    
+    if (ftacallback_start_streamid(stream_id)<0) {
+        gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n");
+        ru=0;
+        return -1;
+    }
+    /* now make sure we have space to write in all atomic ringbuffer */
+    while((r=ftacallback_next_streamid(&state))!=0) {
+        if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) {
+#ifdef BLOCKRINGBUFFER
+            while (!SPACETOWRITE(r)) {
+                usleep(10000);
+            }
+#endif
+            if (! SPACETOWRITE(r)) {
+                /* atomic ring buffer and no space so post nothing */
+                               outtupledrop++;
+                               ru=0;
+                               return -1;
+            }
+        }
+    }
+    
+    if (ftacallback_start_streamid(stream_id)<0) {
+        gslog(LOG_ALERT,"Post for unkown streamid\n");
+        ru=0;
+        return -1;
+    }
+    
+    while((r=ftacallback_next_streamid(&state))!=0) {
+#ifdef PRINTMSG
+        fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n",
+                r);
+#endif
+        /* try to post in all none suspended ringbuffer for atomic once
+         * we know we will succeed
+         */
+        if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) {
+            if (SPACETOWRITE(r)) {
+                CURWRITE(r)->f=CURWRITE(ru)->f;
+                CURWRITE(r)->sz=CURWRITE(ru)->sz;
+                memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]),
+                       CURWRITE(ru)->sz);
+                outtuple++;
+                outbytes=outbytes+CURWRITE(ru)->sz;
+                ADVANCEWRITE(r);
+#ifdef PRINTMSG
+                fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]"
+                        "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
+                        r->length);
+                fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next,
+                        CURREAD(r)->f.streamid,CURREAD(r)->sz);
+#endif
+            } else {
+                outtupledrop++;
+            }
+        }
+        if (HOWFULL(r) > 500) {
+            // buffer is at least half full
+            shared_memory_full_warning++;
+#ifdef PRINTMSG
+            fprintf(stderr,"\t\t buffer full\n");
+#endif
+        }
+    }
+    
+    if (HOWFULL(ru) > 500) {
+        // buffer is at least half full
+        shared_memory_full_warning++;
+#ifdef PRINTMSG
+        fprintf(stderr,"\t\t buffer full\n");
+#endif
+    }
+    outtuple++;
+    outbytes=outbytes+CURWRITE(ru)->sz;
+    ADVANCEWRITE(ru);
+    ru=0;
+#ifndef POLLING
+    if (ftacallback_start_wakeup(stream_id)<0) {
+        gslog(LOG_ALERT,"Wakeup for unkown streamid\n");
+        return -1;
+    }
+    a.h.callid=WAKEUP;
+    a.h.size=sizeof(struct wakeup_result);
+    while((ftaidp=ftacallback_next_wakeup())!=0) {
+        if (send_wakeup(*ftaidp)<0) {
+            gslog(LOG_ALERT,"Could not send wakeup\n");
+            return -1;
+        }
+    }
+#endif
+    return 0;
+}
+
+gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz)
+{
+    gs_int32_t x=0;
+    gs_int32_t state;
+    struct ringbuf * ru;
+    if (ftacallback_start_streamid(f->ftaid.streamid)<0) {
+        gslog(LOG_ALERT,"Space check for unkown streamid\n");
+        return -1;
+    }
+    
+    while ((ru=ftacallback_next_streamid(&state))!=0) {
+        if (szr > x ) {
+            r[x]=ru->destid;
+            space[x]=TUPLEFIT(ru,tuplesz);
+        }
+        x++;
+    }
+    return x;
+}
+
+gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state)
+{
+    
+    if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) {
+               gslog(LOG_ALERT,"state change for unkown streamid\n");
+               return -1;
+    }
+    return 0;
+}
+
+
+gs_retval_t tpost_ready() {
+    return 1;
+}
+