X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Frmr%2Fcommon%2Fsrc%2Fring_static.c;h=99fdea8b828f327c21a7f8b8f047c1f600fde1c0;hb=11838bcf76f3614384459cb56e2ce80dea788cef;hp=dafc9467e437a9cd30585c2aec02a7514a27d367;hpb=68c1ab2191d9959fde0bd275a560f7c9cf6df485;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/common/src/ring_static.c b/src/rmr/common/src/ring_static.c index dafc946..99fdea8 100644 --- a/src/rmr/common/src/ring_static.c +++ b/src/rmr/common/src/ring_static.c @@ -35,20 +35,52 @@ #include #include #include +#include #define RING_FAST 1 // when set we skip nil pointer checks on the ring pointer /* - Make a new ring. + This returns the ring's pollable file descriptor. If one does not exist, then + it is created. +*/ +static int uta_ring_getpfd( void* vr ) { + ring_t* r; + + if( !RING_FAST ) { // compiler should drop the conditional when always false + if( (r = (ring_t*) vr) == NULL ) { + return 0; + } + } else { + r = (ring_t*) vr; + } + + if( r->pfd < 0 ) { + r->pfd = eventfd( 0, EFD_SEMAPHORE | EFD_NONBLOCK ); + } + + return r->pfd; +} + +/* + Make a new ring. The default is to NOT create a lock; if the user + wants read locking then uta_config_ring() can be used to setup the + mutex. (We use several rings internally and the assumption is that + there is no locking for these.) */ static void* uta_mk_ring( int size ) { ring_t* r; uint16_t max; - if( size <= 0 || (r = (ring_t *) malloc( sizeof( *r ) )) == NULL ) { + if( size <= 0 ) { + return NULL; + } + if( (r = (ring_t *) malloc( sizeof( *r ) )) == NULL ) { return NULL; } + r->flags = 0; + r->rgate = NULL; + r->wgate = NULL; r->head = r->tail = 0; max = (r->head - 1); @@ -57,15 +89,60 @@ static void* uta_mk_ring( int size ) { } r->nelements = size; // because we always have an empty element when full - if( (r->data = (void **) malloc( sizeof( void** ) * (r->nelements + 1) )) == NULL ) { + if( (r->data = (void **) malloc( sizeof( void* ) * (r->nelements + 1) )) == NULL ) { free( r ); return NULL; } - memset( r->data, 0, sizeof( void** ) * r->nelements ); + memset( r->data, 0, sizeof( void* ) * r->nelements ); + r->pfd = eventfd( 0, EFD_SEMAPHORE | EFD_NONBLOCK ); // in semaphore mode counter is maintained with each insert/extract return (void *) r; } +/* + Allows for configuration of a ring after it has been allocated. + Options are RING_* options that allow for things like setting/clearing + read locking. Returns 0 for failure 1 on success. + + Options can be ORd together and all made effective at the same time, but + it will be impossible to determine a specific failure if invoked this + way. Control is returned on the first error, and no provision is made + to "undo" previously set options if an error occurs. +*/ +static int uta_ring_config( void* vr, int options ) { + ring_t* r; + + if( (r = (ring_t*) vr) == NULL ) { + errno = EINVAL; + return 0; + } + + if( options & RING_WLOCK && r->wgate == NULL ) { // don't realloc if we have one + r->wgate = (pthread_mutex_t *) malloc( sizeof( *r->wgate ) ); + if( r->wgate == NULL ) { + return 0; + } + + pthread_mutex_init( r->wgate, NULL ); + } + + if( options & (RING_RLOCK | RING_FRLOCK) ) { // read locking + if( r->rgate == NULL ) { // don't realloc + r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) ); + if( r->rgate == NULL ) { + return 0; + } + + pthread_mutex_init( r->rgate, NULL ); + } + if( options & RING_FRLOCK ) { + r->flags |= RING_FL_FLOCK; + } + } + + return 1; +} + /* Ditch the ring. The caller is responsible for extracting any remaining pointers and freeing them as needed. @@ -76,7 +153,15 @@ static void uta_ring_free( void* vr ) { if( (r = (ring_t*) vr) == NULL ) { return; } - + if( r->data ){ + free( r->data ); + } + if( r->rgate ){ + free( r->rgate ); + } + if( r->wgate ){ + free( r->wgate ); + } free( r ); } @@ -84,10 +169,17 @@ static void uta_ring_free( void* vr ) { /* Pull the next data pointer from the ring; null if there isn't anything to be pulled. + + If the read lock exists for the ring, then this will BLOCK until + it gets the lock. There is always a chance that once the lock + is obtained that the ring is empty, so the caller MUST handle + a nil pointer as the return. */ static inline void* uta_ring_extract( void* vr ) { ring_t* r; uint16_t ti; // real index in data + int64_t ctr; // pfd counter + void* data; if( !RING_FAST ) { // compiler should drop the conditional when always false if( (r = (ring_t*) vr) == NULL ) { @@ -97,25 +189,60 @@ static inline void* uta_ring_extract( void* vr ) { r = (ring_t*) vr; } - if( r->tail == r->head ) { // empty ring + if( r->tail == r->head ) { // empty ring we can bail out quickly return NULL; } + if( r->rgate != NULL ) { // if lock exists we must honour it + if( r->flags & RING_FL_FLOCK ) { // fast read locking try once and return nil if we cant lock + if( pthread_mutex_trylock( r->rgate ) != 0 ) { // quick fail if not able to get a lock + return NULL; + } + } else { + if( pthread_mutex_lock( r->rgate ) != 0 ) { + return NULL; + } + } + if( r->tail == r->head ) { // ensure ring didn't go empty while waiting + pthread_mutex_unlock( r->rgate ); + return NULL; + } + } + ti = r->tail; r->tail++; if( r->tail >= r->nelements ) { r->tail = 0; } - return r->data[ti]; + read( r->pfd, &ctr, sizeof( ctr ) ); // when not in semaphore, this zeros the counter and value is meaningless +/* +future -- investigate if it's possible only to set/clear when empty or going to empty + if( r->tail == r->head ) { // if this emptied the ring, turn off ready + } +*/ + + data = r->data[ti]; // secure data and clear before letting go of the lock + r->data[ti] = NULL; + + if( r->rgate != NULL ) { // if locked above... + pthread_mutex_unlock( r->rgate ); + } + + return data; } + /* Insert the pointer at the next open space in the ring. - Returns 1 if the inert was ok, and 0 if the ring is full. + Returns 1 if the inert was ok, and 0 if there is an error; + errno will be set to EXFULL if the ring is full, if the attempt + fails with anyt other error that indicates the inability to obtain + a lock on the ring. */ static inline int uta_ring_insert( void* vr, void* new_data ) { ring_t* r; + int64_t inc = 1; // used to set the counter in the pfd if( !RING_FAST ) { // compiler should drop the conditional when always false if( (r = (ring_t*) vr) == NULL ) { @@ -125,7 +252,17 @@ static inline int uta_ring_insert( void* vr, void* new_data ) { r = (ring_t*) vr; } + if( r->wgate != NULL ) { // if lock exists we must honour it + if( pthread_mutex_lock( r->wgate ) != 0 ) { + return 0; // leave mutex reason in place + } + } + if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) { // ring is full + if( r->wgate != NULL ) { // ensure released if needed + pthread_mutex_unlock( r->wgate ); + } + errno = EXFULL; return 0; } @@ -135,6 +272,16 @@ static inline int uta_ring_insert( void* vr, void* new_data ) { r->head = 0; } + write( r->pfd, &inc, sizeof( inc ) ); +/* +future -- investigate if it's possible only to set/clear when empty or going to empty + if( r->tail == r->head ) { // turn on ready if ring was empty + } +*/ + + if( r->wgate != NULL ) { // if lock exists we must unlock before going + pthread_mutex_unlock( r->wgate ); + } return 1; }