Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscprts / rts_env.c
1 /* ------------------------------------------------\r
2  Copyright 2014 AT&T Intellectual Property\r
3  Licensed under the Apache License, Version 2.0 (the "License");\r
4  you may not use this file except in compliance with the License.\r
5  You may obtain a copy of the License at\r
6  \r
7  http://www.apache.org/licenses/LICENSE-2.0\r
8  \r
9  Unless required by applicable law or agreed to in writing, software\r
10  distributed under the License is distributed on an "AS IS" BASIS,\r
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12  See the License for the specific language governing permissions and\r
13  limitations under the License.\r
14  ------------------------------------------- */\r
15 #include "gsconfig.h"\r
16 #include "gstypes.h"\r
17 #include "lapp.h"\r
18 #include <ipcencoding.h>\r
19 #include <callbackregistries.h>\r
20 #include "fta.h"\r
21 #include "stdio.h"\r
22 #include "stdlib.h"\r
23 #include "rts.h"\r
24 \r
25 #define  POLLING\r
26 \r
27 static struct ringbuf * ru=0;\r
28 \r
29 gs_retval_t print_error(gs_sp_t c) {\r
30     gslog(LOG_EMERG,"%s",c);\r
31     return 0;\r
32 }\r
33 \r
34 void *fta_alloc(struct FTA * owner, gs_int32_t size)\r
35 {\r
36     gs_uint8_t * c;\r
37     gs_uint32_t x;\r
38     if ((c=(gs_uint8_t *)malloc(size))==0) return 0;\r
39     /* touch all memory once to map/reserve it now */\r
40     for(x=0;x<size;x=x+1024) {\r
41         c[x]=0;\r
42     }\r
43     \r
44     return (void *) c;\r
45 }\r
46 \r
47 void fta_free(struct FTA * owner , void * mem) {\r
48     free(mem);\r
49 }\r
50 \r
51 \r
52 void fta_free_all(struct FTA * owner) {\r
53     gslog(LOG_ERR,"fta_free_all not available ");\r
54 }\r
55 \r
56 /* It is assumed that there is no write activity on all ringbuffers between an alloccate_tuple and\r
57  * a post_tuple. If there is any the result is unpredictable\r
58  */\r
59 \r
60 void * allocate_tuple(struct FTA * owner, gs_int32_t size)\r
61 {\r
62     gs_int32_t state;\r
63     if (ru!=0) {\r
64         gslog(LOG_ALERT,"Can't allocate multiple tuples at the same time before posting them");\r
65         return 0;\r
66     }\r
67     \r
68     if (size>MAXTUPLESZ) {\r
69         gslog(LOG_ALERT,"Maximum tuple size is %u",MAXTUPLESZ);\r
70         ru=0;\r
71         return 0;\r
72     }\r
73     \r
74     if (ftacallback_start_streamid(owner->ftaid.streamid)<0) {\r
75         gslog(LOG_ALERT,"Post for unkown streamid\n");\r
76         ru=0;\r
77         return 0;\r
78     }\r
79     \r
80     /* we grep memory in the first none suspended ringbuffer. Note that if there is no such ringbuffer we might\r
81      not find any memory*/\r
82     while ((ru=ftacallback_next_streamid(&state))!=0) {\r
83 #ifdef PRINTMSG\r
84         fprintf(stderr,"Allocating in ringpuffer %p [%p:%u]"\r
85                 "(%u %u %u) \n",ru,&ru->start,ru->end,ru->reader,ru->writer,\r
86                 ru->length);\r
87         fprintf(stderr,"Pointer to current writer %p\n",CURWRITE(ru));\r
88 #endif\r
89         if (state != LFTA_RINGBUF_SUSPEND) {\r
90 #ifdef BLOCKRINGBUFFER\r
91             if (state == LFTA_RINGBUF_ATOMIC) {\r
92                 while (!SPACETOWRITE(ru)) {\r
93                     usleep(100);\r
94                 }\r
95             }\r
96 #endif\r
97             if (SPACETOWRITE(ru)) {\r
98                 CURWRITE(ru)->f=owner->ftaid;\r
99                 CURWRITE(ru)->sz=size;\r
100                 return &(CURWRITE(ru)->data[0]);\r
101             } else {\r
102                 shared_memory_full_warning++;\r
103             }\r
104         }\r
105     }\r
106     ru=0;\r
107     outtupledrop++;\r
108         \r
109     return 0;\r
110 }\r
111 \r
112 void free_tuple(void * data) {\r
113     ru=0;\r
114 }\r
115 \r
116 gs_retval_t post_tuple(void * tuple) {\r
117     struct ringbuf * r;\r
118     FTAID * ftaidp;\r
119     gs_uint32_t stream_id;\r
120     struct wakeup_result a;\r
121     gs_int32_t state;\r
122     \r
123     if (ru==0) {\r
124         gslog(LOG_ALERT,"lfta post tuple posted tupple was never allocated\n");\r
125         return -1;\r
126     }\r
127     \r
128     \r
129     if (tuple != ((void*) &(CURWRITE(ru)->data[0]))) {\r
130         gslog(LOG_ALERT,"lfta post tuple posted tupple which was not allocated"\r
131               "immediatly before\n");\r
132         ru=0;\r
133         return -1;\r
134     }\r
135     \r
136     stream_id=CURWRITE(ru)->f.streamid;\r
137     \r
138     if (ftacallback_start_streamid(stream_id)<0) {\r
139         gslog(LOG_ALERT,"ERROR:Post for unkown streamid\n");\r
140         ru=0;\r
141         return -1;\r
142     }\r
143     /* now make sure we have space to write in all atomic ringbuffer */\r
144     while((r=ftacallback_next_streamid(&state))!=0) {\r
145         if ((state == LFTA_RINGBUF_ATOMIC) && (r!=ru)) {\r
146 #ifdef BLOCKRINGBUFFER\r
147             while (!SPACETOWRITE(r)) {\r
148                 usleep(10000);\r
149             }\r
150 #endif\r
151             if (! SPACETOWRITE(r)) {\r
152                 /* atomic ring buffer and no space so post nothing */\r
153                                 outtupledrop++;\r
154                                 ru=0;\r
155                                 return -1;\r
156             }\r
157         }\r
158     }\r
159     \r
160     if (ftacallback_start_streamid(stream_id)<0) {\r
161         gslog(LOG_ALERT,"Post for unkown streamid\n");\r
162         ru=0;\r
163         return -1;\r
164     }\r
165     \r
166     while((r=ftacallback_next_streamid(&state))!=0) {\r
167 #ifdef PRINTMSG\r
168         fprintf(stderr,"Found additional ring buffer make a copy to rb%p\n",\r
169                 r);\r
170 #endif\r
171         /* try to post in all none suspended ringbuffer for atomic once\r
172          * we know we will succeed\r
173          */\r
174         if ((r!=ru)&&(state != LFTA_RINGBUF_SUSPEND)) {\r
175             if (SPACETOWRITE(r)) {\r
176                 CURWRITE(r)->f=CURWRITE(ru)->f;\r
177                 CURWRITE(r)->sz=CURWRITE(ru)->sz;\r
178                 memcpy(&(CURWRITE(r)->data[0]),&(CURWRITE(ru)->data[0]),\r
179                        CURWRITE(ru)->sz);\r
180                 outtuple++;\r
181                 outbytes=outbytes+CURWRITE(ru)->sz;\r
182                 ADVANCEWRITE(r);\r
183 #ifdef PRINTMSG\r
184                 fprintf(stderr,"Wrote in ringpuffer %p [%p:%u]"\r
185                         "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,\r
186                         r->length);\r
187                 fprintf(stderr,"\t%u %u %u\n",CURREAD(r)->next,\r
188                         CURREAD(r)->f.streamid,CURREAD(r)->sz);\r
189 #endif\r
190             } else {\r
191                 outtupledrop++;\r
192             }\r
193         }\r
194         if (HOWFULL(r) > 500) {\r
195             // buffer is at least half full\r
196             shared_memory_full_warning++;\r
197 #ifdef PRINTMSG\r
198             fprintf(stderr,"\t\t buffer full\n");\r
199 #endif\r
200         }\r
201     }\r
202     \r
203     if (HOWFULL(ru) > 500) {\r
204         // buffer is at least half full\r
205         shared_memory_full_warning++;\r
206 #ifdef PRINTMSG\r
207         fprintf(stderr,"\t\t buffer full\n");\r
208 #endif\r
209     }\r
210     outtuple++;\r
211     outbytes=outbytes+CURWRITE(ru)->sz;\r
212     ADVANCEWRITE(ru);\r
213     ru=0;\r
214 #ifndef POLLING\r
215     if (ftacallback_start_wakeup(stream_id)<0) {\r
216         gslog(LOG_ALERT,"Wakeup for unkown streamid\n");\r
217         return -1;\r
218     }\r
219     a.h.callid=WAKEUP;\r
220     a.h.size=sizeof(struct wakeup_result);\r
221     while((ftaidp=ftacallback_next_wakeup())!=0) {\r
222         if (send_wakeup(*ftaidp)<0) {\r
223             gslog(LOG_ALERT,"Could not send wakeup\n");\r
224             return -1;\r
225         }\r
226     }\r
227 #endif\r
228     return 0;\r
229 }\r
230 \r
231 gs_retval_t get_ringbuf_space(struct FTA * f, FTAID * r, gs_int32_t* space, gs_int32_t szr, gs_int32_t tuplesz)\r
232 {\r
233     gs_int32_t x=0;\r
234     gs_int32_t state;\r
235     struct ringbuf * ru;\r
236     if (ftacallback_start_streamid(f->ftaid.streamid)<0) {\r
237         gslog(LOG_ALERT,"Space check for unkown streamid\n");\r
238         return -1;\r
239     }\r
240     \r
241     while ((ru=ftacallback_next_streamid(&state))!=0) {\r
242         if (szr > x ) {\r
243             r[x]=ru->destid;\r
244             space[x]=TUPLEFIT(ru,tuplesz);\r
245         }\r
246         x++;\r
247     }\r
248     return x;\r
249 }\r
250 \r
251 gs_retval_t set_ringbuf_type(struct FTA * f, FTAID process, gs_int32_t state)\r
252 {\r
253     \r
254     if (ftacallback_state_streamid(f->ftaid.streamid,process, state)<0) {\r
255                 gslog(LOG_ALERT,"state change for unkown streamid\n");\r
256                 return -1;\r
257     }\r
258     return 0;\r
259 }\r
260 \r
261 \r
262 gs_retval_t tpost_ready() {\r
263     return 1;\r
264 }\r
265 \r