Added protobuf support
[com/gs-lite.git] / src / lib / gscprts / rts_env.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include "gsconfig.h"
16 #include "gstypes.h"
17 #include "lapp.h"
18 #include <ipcencoding.h>
19 #include <callbackregistries.h>
20 #include "fta.h"
21 #include "stdio.h"
22 #include "stdlib.h"
23 #include "rts.h"
24
25 #define  POLLING
26
27 static struct ringbuf * ru=0;
28
29 gs_retval_t print_error(gs_sp_t c) {
30     gslog(LOG_EMERG,"%s",c);
31     return 0;
32 }
33
34 void *fta_alloc(struct FTA * owner, gs_int32_t size)
35 {
36     gs_uint8_t * c;
37     gs_uint32_t x;
38     if ((c=(gs_uint8_t *)malloc(size))==0) return 0;
39     /* touch all memory once to map/reserve it now */
40     for(x=0;x<size;x=x+1024) {
41         c[x]=0;
42     }
43     
44     return (void *) c;
45 }
46
47 void fta_free(struct FTA * owner , void * mem) {
48     free(mem);
49 }
50
51
52 void fta_free_all(struct FTA * owner) {
53     gslog(LOG_ERR,"fta_free_all not available ");
54 }
55
56 /* It is assumed that there is no write activity on all ringbuffers between an alloccate_tuple and
57  * a post_tuple. If there is any the result is unpredictable
58  */
59
60 void * allocate_tuple(struct FTA * owner, gs_int32_t size)
61 {
62     gs_int32_t state;
63     if (ru!=0) {
64         gslog(LOG_ALERT,"Can't allocate multiple tuples at the same time before posting them");
65         return 0;
66     }
67     
68     if (size>MAXTUPLESZ) {
69         gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ);
70         ru=0;
71         return 0;
72     }
73     
74     if (ftacallback_start_streamid(owner->ftaid.streamid)<0) {
75         gslog(LOG_ALERT,"Post for unkown streamid\n");
76         ru=0;
77         return 0;
78     }
79     
80     /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might
81      not find any memory*/
82     while ((ru=ftacallback_next_streamid(&state))!=0) {
83 #ifdef PRINTMSG
84         fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]"
85                 "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer,
86                 ru->length);
87         fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru));
88 #endif
89         if (state != LFTA_RINGBUF_SUSPEND) {
90 #ifdef BLOCKRINGBUFFER
91             if (state == LFTA_RINGBUF_ATOMIC) {
92                 while (!SPACETOWRITE(ru)) {
93                     usleep(100);
94                 }
95             }
96 #endif
97             if (SPACETOWRITE(ru)) {
98                 CURWRITE(ru)->f=owner->ftaid;
99                 CURWRITE(ru)->sz=size;
100                 return &(CURWRITE(ru)->data[0]);
101             } else {
102                 shared_memory_full_warning++;
103             }
104         }
105     }
106     ru=0;
107     outtupledrop++;
108         
109     return 0;
110 }
111
112 void free_tuple(void * data) {
113     ru=0;
114 }
115
116 gs_retval_t post_tuple(void * tuple) {
117     struct ringbuf * r;
118     FTAID * ftaidp;
119     gs_uint32_t stream_id;
120     struct wakeup_result a;
121     gs_int32_t state;
122     
123     if (ru==0) {
124         gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n");
125         return -1;
126     }
127     
128     
129     if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) {
130         gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated"
131               "immediatly before\n");
132         ru=0;
133         return -1;
134     }
135     
136     stream_id=CURWRITE(ru)->f.streamid;
137     
138     if (ftacallback_start_streamid(stream_id)<0) {
139         gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n");
140         ru=0;
141         return -1;
142     }
143     /* now make sure we have space to write in all atomic ringbuffer */
144     while((r=ftacallback_next_streamid(&state))!=0) {
145         if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) {
146 #ifdef BLOCKRINGBUFFER
147             while (!SPACETOWRITE(r)) {
148                 usleep(10000);
149             }
150 #endif
151             if (! SPACETOWRITE(r)) {
152                 /* atomic ring buffer and no space so post nothing */
153                                 outtupledrop++;
154                                 ru=0;
155                                 return -1;
156             }
157         }
158     }
159     
160     if (ftacallback_start_streamid(stream_id)<0) {
161         gslog(LOG_ALERT,"Post for unkown streamid\n");
162         ru=0;
163         return -1;
164     }
165     
166     while((r=ftacallback_next_streamid(&state))!=0) {
167 #ifdef PRINTMSG
168         fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n",
169                 r);
170 #endif
171         /* try to post in all none suspended ringbuffer for atomic once
172          * we know we will succeed
173          */
174         if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) {
175             if (SPACETOWRITE(r)) {
176                 CURWRITE(r)->f=CURWRITE(ru)->f;
177                 CURWRITE(r)->sz=CURWRITE(ru)->sz;
178                 memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]),
179                        CURWRITE(ru)->sz);
180                 outtuple++;
181                 outbytes=outbytes+CURWRITE(ru)->sz;
182                 ADVANCEWRITE(r);
183 #ifdef PRINTMSG
184                 fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]"
185                         "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
186                         r->length);
187                 fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next,
188                         CURREAD(r)->f.streamid,CURREAD(r)->sz);
189 #endif
190             } else {
191                 outtupledrop++;
192             }
193         }
194         if (HOWFULL(r) > 500) {
195             // buffer is at least half full
196             shared_memory_full_warning++;
197 #ifdef PRINTMSG
198             fprintf(stderr,"\t\t buffer full\n");
199 #endif
200         }
201     }
202     
203     if (HOWFULL(ru) > 500) {
204         // buffer is at least half full
205         shared_memory_full_warning++;
206 #ifdef PRINTMSG
207         fprintf(stderr,"\t\t buffer full\n");
208 #endif
209     }
210     outtuple++;
211     outbytes=outbytes+CURWRITE(ru)->sz;
212     ADVANCEWRITE(ru);
213     ru=0;
214 #ifndef POLLING
215     if (ftacallback_start_wakeup(stream_id)<0) {
216         gslog(LOG_ALERT,"Wakeup for unkown streamid\n");
217         return -1;
218     }
219     a.h.callid=WAKEUP;
220     a.h.size=sizeof(struct wakeup_result);
221     while((ftaidp=ftacallback_next_wakeup())!=0) {
222         if (send_wakeup(*ftaidp)<0) {
223             gslog(LOG_ALERT,"Could not send wakeup\n");
224             return -1;
225         }
226     }
227 #endif
228     return 0;
229 }
230
231 gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz)
232 {
233     gs_int32_t x=0;
234     gs_int32_t state;
235     struct ringbuf * ru;
236     if (ftacallback_start_streamid(f->ftaid.streamid)<0) {
237         gslog(LOG_ALERT,"Space check for unkown streamid\n");
238         return -1;
239     }
240     
241     while ((ru=ftacallback_next_streamid(&state))!=0) {
242         if (szr > x ) {
243             r[x]=ru->destid;
244             space[x]=TUPLEFIT(ru,tuplesz);
245         }
246         x++;
247     }
248     return x;
249 }
250
251 gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state)
252 {
253     
254     if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) {
255                 gslog(LOG_ALERT,"state change for unkown streamid\n");
256                 return -1;
257     }
258     return 0;
259 }
260
261
262 gs_retval_t tpost_ready() {
263     return 1;
264 }
265