7170a9d0795caadaa191673891f8f67a699f5ffa
[ric-plt/lib/rmr.git] / src / rmr / common / src / ring_static.c
1 // :vi sw=4 ts=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20 /*
21         Mnemonic:       ring_static.c
22         Abstract:       Implements a ring of information (probably to act as a
23                                 message queue).
24         Author:         E. Scott Daniels
25         Date:           31 August 2017
26 */
27
28 #ifndef _ring_static_c
29 #define _ring_static_c
30
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <strings.h>
35 #include <errno.h>
36 #include <string.h>
37 #include <stdint.h>
38 #include <sys/eventfd.h>
39
40 #define RING_FAST 1                     // when set we skip nil pointer checks on the ring pointer
41
42 /*
43         This returns the ring's pollable file descriptor. If one does not exist, then
44         it is created.
45 */
46 static int uta_ring_getpfd( void* vr ) {
47         ring_t*         r;
48
49         if( !RING_FAST ) {                                                              // compiler should drop the conditional when always false
50                 if( (r = (ring_t*) vr) == NULL ) {
51                         return 0;
52                 }
53         } else {
54                 r = (ring_t*) vr;
55         }
56
57         if( r->pfd < 0 ) {
58                 r->pfd = eventfd( 0, EFD_SEMAPHORE | EFD_NONBLOCK );
59         }
60
61         return r->pfd;
62 }
63
64 /*
65         Make a new ring. The default is to NOT create a lock; if the user 
66         wants read locking then uta_config_ring() can be used to setup the
67         mutex. (We use several rings internally and the assumption is that
68         there is no locking for these.)
69 */
70 static void* uta_mk_ring( int size ) {
71         ring_t* r;
72         uint16_t max;
73
74         if( size <= 0 || (r = (ring_t *) malloc( sizeof( *r ) )) == NULL ) {
75                 return NULL;
76         }
77
78         r->flags = 0;
79         r->rgate = NULL;
80         r->wgate = NULL;
81         r->head = r->tail = 0;
82
83         max = (r->head - 1);
84         if( size >= max ) {
85                 size--;
86         }
87
88         r->nelements = size;            // because we always have an empty element when full
89         if( (r->data = (void **) malloc( sizeof( void* ) * (r->nelements + 1) )) == NULL ) {
90                 free( r );
91                 return NULL;
92         }
93
94         memset( r->data, 0, sizeof( void* ) * r->nelements );
95         r->pfd = eventfd( 0, EFD_SEMAPHORE | EFD_NONBLOCK );            // in semaphore mode counter is maintained with each insert/extract
96         return (void *) r;
97 }
98
99 /*
100         Allows for configuration of a ring after it has been allocated.
101         Options are RING_* options that allow for things like setting/clearing
102         read locking. Returns 0 for failure 1 on success.
103
104         Options can be ORd together and all made effective at the same time, but
105         it will be impossible to determine a specific failure if invoked this
106         way.  Control is returned on the first error, and no provision is made
107         to "undo" previously set options if an error occurs.
108 */
109 static int uta_ring_config( void* vr, int options ) {
110         ring_t* r;
111
112         if( (r = (ring_t*) vr) == NULL ) {
113                 errno = EINVAL;
114                 return 0;
115         }
116
117         if( options & RING_WLOCK ) {
118                 if( r->wgate == NULL ) {                // don't realloc
119                         r->wgate = (pthread_mutex_t *) malloc( sizeof( *r->wgate ) );
120                         if( r->wgate == NULL ) {
121                                 return 0;
122                         }
123         
124                         pthread_mutex_init( r->wgate, NULL );
125                 }
126         }
127
128         if( options & (RING_RLOCK | RING_FRLOCK) ) {                            // read locking
129                 if( r->rgate == NULL ) {                // don't realloc
130                         r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) );
131                         if( r->rgate == NULL ) {
132                                 return 0;
133                         }
134         
135                         pthread_mutex_init( r->rgate, NULL );
136                 }
137                 if( options & RING_FRLOCK ) {
138                         r->flags |= RING_FL_FLOCK;
139                 }
140         }
141
142         return 1;
143 }
144
145 /*
146         Ditch the ring. The caller is responsible for extracting any remaining
147         pointers and freeing them as needed.
148 */
149 static void uta_ring_free( void* vr ) {
150         ring_t* r;
151
152         if( (r = (ring_t*) vr) == NULL ) {
153                 return;
154         }
155         if( r->data ){
156                 free( r->data );
157         }
158         if( r->rgate ){
159                 free( r->rgate );
160         }
161         if( r->wgate ){
162                 free( r->wgate );
163         }
164         free( r );
165 }
166
167
168 /*
169         Pull the next data pointer from the ring; null if there isn't
170         anything to be pulled.
171
172         If the read lock exists for the ring, then this will BLOCK until
173         it gets the lock.  There is always a chance that once the lock
174         is obtained that the ring is empty, so the caller MUST handle
175         a nil pointer as the return.
176 */
177 static inline void* uta_ring_extract( void* vr ) {
178         ring_t*         r;
179         uint16_t        ti;             // real index in data
180         int64_t ctr;            // pfd counter
181         void*           data;
182
183         if( !RING_FAST ) {                                                              // compiler should drop the conditional when always false
184                 if( (r = (ring_t*) vr) == NULL ) {
185                         return 0;
186                 }
187         } else {
188                 r = (ring_t*) vr;
189         }
190
191         if( r->tail == r->head ) {                                              // empty ring we can bail out quickly
192                 return NULL;
193         }
194
195         if( r->rgate != NULL ) {                                                        // if lock exists we must honour it
196                 if( r->flags & RING_FL_FLOCK ) {                                // fast read locking try once and return nil if we cant lock
197                         if( pthread_mutex_trylock( r->rgate ) != 0 ) {  // quick fail if not able to get a lock
198                                 return NULL;
199                         }
200                 } else {
201                         if( pthread_mutex_lock( r->rgate ) != 0 ) {
202                                 return NULL;
203                         }
204                 }
205                 if( r->tail == r->head ) {                                      // ensure ring didn't go empty while waiting
206                         pthread_mutex_unlock( r->rgate );
207                         return NULL;
208                 }
209         }
210
211         ti = r->tail;
212         r->tail++;
213         if( r->tail >= r->nelements ) {
214                 r->tail = 0;
215         }
216
217         read( r->pfd, &ctr, sizeof( ctr )  );                           // when not in semaphore, this zeros the counter and value is meaningless
218 /*
219 future -- investigate if it's possible only to set/clear when empty or going to empty
220         if( r->tail == r->head ) {                                                              // if this emptied the ring, turn off ready
221         }
222 */
223
224         data = r->data[ti];                                             // secure data and clear before letting go of the lock
225         r->data[ti] = NULL;
226
227         if( r->rgate != NULL ) {                                                        // if locked above...
228                 pthread_mutex_unlock( r->rgate );
229         }
230
231         return data;
232 }
233
234
235 /*
236         Insert the pointer at the next open space in the ring.
237         Returns 1 if the inert was ok, and 0 if there is an error;
238         errno will be set to EXFULL if  the ring is full, if the attempt
239         fails with anyt other error that indicates the inability to obtain
240         a lock on the ring.
241 */
242 static inline int uta_ring_insert( void* vr, void* new_data ) {
243         ring_t*         r;
244         int64_t inc = 1;                                // used to set the counter in the pfd
245
246         if( !RING_FAST ) {                                                              // compiler should drop the conditional when always false
247                 if( (r = (ring_t*) vr) == NULL ) {
248                         return 0;
249                 }
250         } else {
251                 r = (ring_t*) vr;
252         }
253
254         if( r->wgate != NULL ) {                                                // if lock exists we must honour it
255                 if( pthread_mutex_lock( r->wgate ) != 0 ) {
256                         return 0;                                                               // leave mutex reason in place
257                 }
258         }
259
260         if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) {         // ring is full
261                 if( r->wgate != NULL ) {                                                                                                // ensure released if needed
262                         pthread_mutex_unlock( r->wgate );
263                 }
264                 errno = EXFULL;
265                 return 0;
266         }
267
268         r->data[r->head] = new_data;
269         r->head++;
270         if( r->head >= r->nelements ) {
271                 r->head = 0;
272         }
273
274         write( r->pfd, &inc, sizeof( inc ) );
275 /*
276 future -- investigate if it's possible only to set/clear when empty or going to empty
277         if( r->tail == r->head ) {                                                              // turn on ready if ring was empty
278         }
279 */
280
281         if( r->wgate != NULL ) {                                                // if lock exists we must unlock before going
282                 pthread_mutex_unlock( r->wgate );
283         }
284         return 1;
285 }
286
287
288
289 #endif