1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
18 #include <ipcencoding.h>
19 #include <callbackregistries.h>
27 static struct ringbuf * ru=0;
29 gs_retval_t print_error(gs_sp_t c) {
30 gslog(LOG_EMERG,"%s",c);
34 void *fta_alloc(struct FTA * owner, gs_int32_t size)
38 if ((c=(gs_uint8_t *)malloc(size))==0) return 0;
39 /* touch all memory once to map/reserve it now */
40 for(x=0;x<size;x=x+1024) {
47 void fta_free(struct FTA * owner , void * mem) {
52 void fta_free_all(struct FTA * owner) {
53 gslog(LOG_ERR,"fta_free_all not available ");
56 /* It is assumed that there is no write activity on all ringbuffers between an alloccate_tuple and
57 * a post_tuple. If there is any the result is unpredictable
60 void * allocate_tuple(struct FTA * owner, gs_int32_t size)
64 gslog(LOG_ALERT,"Can't allocate multiple tuples at the same time before posting them");
68 if (size>MAXTUPLESZ) {
69 gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ);
74 if (ftacallback_start_streamid(owner->ftaid.streamid)<0) {
75 gslog(LOG_ALERT,"Post for unkown streamid\n");
80 /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might
82 while ((ru=ftacallback_next_streamid(&state))!=0) {
84 fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]"
85 "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer,
87 fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru));
89 if (state != LFTA_RINGBUF_SUSPEND) {
90 #ifdef BLOCKRINGBUFFER
91 if (state == LFTA_RINGBUF_ATOMIC) {
92 while (!SPACETOWRITE(ru)) {
97 if (SPACETOWRITE(ru)) {
98 CURWRITE(ru)->f=owner->ftaid;
99 CURWRITE(ru)->sz=size;
100 return &(CURWRITE(ru)->data[0]);
102 shared_memory_full_warning++;
112 void free_tuple(void * data) {
116 gs_retval_t post_tuple(void * tuple) {
119 gs_uint32_t stream_id;
120 struct wakeup_result a;
124 gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n");
129 if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) {
130 gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated"
131 "immediatly before\n");
136 stream_id=CURWRITE(ru)->f.streamid;
138 if (ftacallback_start_streamid(stream_id)<0) {
139 gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n");
143 /* now make sure we have space to write in all atomic ringbuffer */
144 while((r=ftacallback_next_streamid(&state))!=0) {
145 if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) {
146 #ifdef BLOCKRINGBUFFER
147 while (!SPACETOWRITE(r)) {
151 if (! SPACETOWRITE(r)) {
152 /* atomic ring buffer and no space so post nothing */
160 if (ftacallback_start_streamid(stream_id)<0) {
161 gslog(LOG_ALERT,"Post for unkown streamid\n");
166 while((r=ftacallback_next_streamid(&state))!=0) {
168 fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n",
171 /* try to post in all none suspended ringbuffer for atomic once
172 * we know we will succeed
174 if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) {
175 if (SPACETOWRITE(r)) {
176 CURWRITE(r)->f=CURWRITE(ru)->f;
177 CURWRITE(r)->sz=CURWRITE(ru)->sz;
178 memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]),
181 outbytes=outbytes+CURWRITE(ru)->sz;
184 fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]"
185 "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
187 fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next,
188 CURREAD(r)->f.streamid,CURREAD(r)->sz);
194 if (HOWFULL(r) > 500) {
195 // buffer is at least half full
196 shared_memory_full_warning++;
198 fprintf(stderr,"\t\t buffer full\n");
203 if (HOWFULL(ru) > 500) {
204 // buffer is at least half full
205 shared_memory_full_warning++;
207 fprintf(stderr,"\t\t buffer full\n");
211 outbytes=outbytes+CURWRITE(ru)->sz;
215 if (ftacallback_start_wakeup(stream_id)<0) {
216 gslog(LOG_ALERT,"Wakeup for unkown streamid\n");
220 a.h.size=sizeof(struct wakeup_result);
221 while((ftaidp=ftacallback_next_wakeup())!=0) {
222 if (send_wakeup(*ftaidp)<0) {
223 gslog(LOG_ALERT,"Could not send wakeup\n");
231 gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz)
236 if (ftacallback_start_streamid(f->ftaid.streamid)<0) {
237 gslog(LOG_ALERT,"Space check for unkown streamid\n");
241 while ((ru=ftacallback_next_streamid(&state))!=0) {
244 space[x]=TUPLEFIT(ru,tuplesz);
251 gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state)
254 if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) {
255 gslog(LOG_ALERT,"state change for unkown streamid\n");
262 gs_retval_t tpost_ready() {