Initial commit
[com/gs-lite.git] / src / lib / gscprts / rts_env.c
diff --git a/src/lib/gscprts/rts_env.c b/src/lib/gscprts/rts_env.c
new file mode 100644 (file)
index 0000000..b674ae8
--- /dev/null
@@ -0,0 +1,265 @@
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+#include "gsconfig.h"
+#include "gstypes.h"
+#include "lapp.h"
+#include <ipcencoding.h>
+#include <callbackregistries.h>
+#include "fta.h"
+#include "stdio.h"
+#include "stdlib.h"
+#include "rts.h"
+
+#define  POLLING
+
+static struct ringbuf * ru=0;
+
+gs_retval_t print_error(gs_sp_t c) {
+    gslog(LOG_EMERG,"%s",c);
+    return 0;
+}
+
+void *fta_alloc(struct FTA * owner, gs_int32_t size)
+{
+    gs_uint8_t * c;
+    gs_uint32_t x;
+    if ((c=(gs_uint8_t *)malloc(size))==0) return 0;
+    /* touch all memory once to map/reserve it now */
+    for(x=0;x<size;x=x+1024) {
+        c[x]=0;
+    }
+    
+    return (void *) c;
+}
+
+void fta_free(struct FTA * owner , void * mem) {
+    free(mem);
+}
+
+
+void fta_free_all(struct FTA * owner) {
+    gslog(LOG_ERR,"fta_free_all not available ");
+}
+
+/* It is assumed that there is no write activity on all ringbuffers between an alloccate_tuple and
+ * a post_tuple. If there is any the result is unpredictable
+ */
+
+void * allocate_tuple(struct FTA * owner, gs_int32_t size)
+{
+    gs_int32_t state;
+    if (ru!=0) {
+        gslog(LOG_ALERT,"Can't allocate multiple tuples at the same time before posting them");
+        return 0;
+    }
+    
+    if (size>MAXTUPLESZ) {
+        gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ);
+        ru=0;
+        return 0;
+    }
+    
+    if (ftacallback_start_streamid(owner->ftaid.streamid)<0) {
+        gslog(LOG_ALERT,"Post for unkown streamid\n");
+        ru=0;
+        return 0;
+    }
+    
+    /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might
+     not find any memory*/
+    while ((ru=ftacallback_next_streamid(&state))!=0) {
+#ifdef PRINTMSG
+        fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]"
+                "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer,
+                ru->length);
+        fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru));
+#endif
+        if (state != LFTA_RINGBUF_SUSPEND) {
+#ifdef BLOCKRINGBUFFER
+            if (state == LFTA_RINGBUF_ATOMIC) {
+                while (!SPACETOWRITE(ru)) {
+                    usleep(100);
+                }
+            }
+#endif
+            if (SPACETOWRITE(ru)) {
+                CURWRITE(ru)->f=owner->ftaid;
+                CURWRITE(ru)->sz=size;
+                return &(CURWRITE(ru)->data[0]);
+            } else {
+                shared_memory_full_warning++;
+            }
+        }
+    }
+    ru=0;
+    outtupledrop++;
+       
+    return 0;
+}
+
+void free_tuple(void * data) {
+    ru=0;
+}
+
+gs_retval_t post_tuple(void * tuple) {
+    struct ringbuf * r;
+    FTAID * ftaidp;
+    gs_uint32_t stream_id;
+    struct wakeup_result a;
+    gs_int32_t state;
+    
+    if (ru==0) {
+        gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n");
+        return -1;
+    }
+    
+    
+    if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) {
+        gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated"
+              "immediatly before\n");
+        ru=0;
+        return -1;
+    }
+    
+    stream_id=CURWRITE(ru)->f.streamid;
+    
+    if (ftacallback_start_streamid(stream_id)<0) {
+        gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n");
+        ru=0;
+        return -1;
+    }
+    /* now make sure we have space to write in all atomic ringbuffer */
+    while((r=ftacallback_next_streamid(&state))!=0) {
+        if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) {
+#ifdef BLOCKRINGBUFFER
+            while (!SPACETOWRITE(r)) {
+                usleep(10000);
+            }
+#endif
+            if (! SPACETOWRITE(r)) {
+                /* atomic ring buffer and no space so post nothing */
+                               outtupledrop++;
+                               ru=0;
+                               return -1;
+            }
+        }
+    }
+    
+    if (ftacallback_start_streamid(stream_id)<0) {
+        gslog(LOG_ALERT,"Post for unkown streamid\n");
+        ru=0;
+        return -1;
+    }
+    
+    while((r=ftacallback_next_streamid(&state))!=0) {
+#ifdef PRINTMSG
+        fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n",
+                r);
+#endif
+        /* try to post in all none suspended ringbuffer for atomic once
+         * we know we will succeed
+         */
+        if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) {
+            if (SPACETOWRITE(r)) {
+                CURWRITE(r)->f=CURWRITE(ru)->f;
+                CURWRITE(r)->sz=CURWRITE(ru)->sz;
+                memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]),
+                       CURWRITE(ru)->sz);
+                outtuple++;
+                outbytes=outbytes+CURWRITE(ru)->sz;
+                ADVANCEWRITE(r);
+#ifdef PRINTMSG
+                fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]"
+                        "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
+                        r->length);
+                fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next,
+                        CURREAD(r)->f.streamid,CURREAD(r)->sz);
+#endif
+            } else {
+                outtupledrop++;
+            }
+        }
+        if (HOWFULL(r) > 500) {
+            // buffer is at least half full
+            shared_memory_full_warning++;
+#ifdef PRINTMSG
+            fprintf(stderr,"\t\t buffer full\n");
+#endif
+        }
+    }
+    
+    if (HOWFULL(ru) > 500) {
+        // buffer is at least half full
+        shared_memory_full_warning++;
+#ifdef PRINTMSG
+        fprintf(stderr,"\t\t buffer full\n");
+#endif
+    }
+    outtuple++;
+    outbytes=outbytes+CURWRITE(ru)->sz;
+    ADVANCEWRITE(ru);
+    ru=0;
+#ifndef POLLING
+    if (ftacallback_start_wakeup(stream_id)<0) {
+        gslog(LOG_ALERT,"Wakeup for unkown streamid\n");
+        return -1;
+    }
+    a.h.callid=WAKEUP;
+    a.h.size=sizeof(struct wakeup_result);
+    while((ftaidp=ftacallback_next_wakeup())!=0) {
+        if (send_wakeup(*ftaidp)<0) {
+            gslog(LOG_ALERT,"Could not send wakeup\n");
+            return -1;
+        }
+    }
+#endif
+    return 0;
+}
+
+gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz)
+{
+    gs_int32_t x=0;
+    gs_int32_t state;
+    struct ringbuf * ru;
+    if (ftacallback_start_streamid(f->ftaid.streamid)<0) {
+        gslog(LOG_ALERT,"Space check for unkown streamid\n");
+        return -1;
+    }
+    
+    while ((ru=ftacallback_next_streamid(&state))!=0) {
+        if (szr > x ) {
+            r[x]=ru->destid;
+            space[x]=TUPLEFIT(ru,tuplesz);
+        }
+        x++;
+    }
+    return x;
+}
+
+gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state)
+{
+    
+    if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) {
+               gslog(LOG_ALERT,"state change for unkown streamid\n");
+               return -1;
+    }
+    return 0;
+}
+
+
+gs_retval_t tpost_ready() {
+    return 1;
+}
+