325b313cd0a60e951fe4f64e9bf6425fad61d8b0
[ric-plt/lib/rmr.git] / src / rmr / nng / src / mt_call_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_nng_static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are NNG specific.
25
26         Author:         E. Scott Daniels
27         Date:           20 May 2019
28 */
29
30 #ifndef _mtcall_nng_static_c
31 #define _mtcall_nng_static_c
32 #include <semaphore.h>
33
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35         chute_t*        chute;
36         int                     state;
37
38         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39                 rmr_free_msg( mbuf );                                                           // drop if ring is full
40         }
41
42         chute = &ctx->chutes[0];
43         chute->mbuf = mbuf;
44         state = sem_post( &chute->barrier );                                                            // tickle the ring monitor
45 }
46
47 /*
48         This is expected to execute in a separate thread. It is responsible for
49         _all_ receives and queues them on the appropriate ring, or chute.
50
51         The "state" of the message is checked which determines where the message
52         is delivered.
53
54                 Flags indicate that the message is a call generated message, then
55                 the message is queued on the normal receive ring.
56
57                 Chute ID is == 0, then the message is queued on the normal receive ring.
58
59                 The transaction ID in the message matches the expected ID in the chute,
60                 then the message is given to the chute and the chute's semaphore is tickled.
61
62                 If none are true, the message is dropped.
63 */
64 static void* mt_receive( void* vctx ) {
65         uta_ctx_t*              ctx;
66         uta_mhdr_t*             hdr;            // header of the message received
67         rmr_mbuf_t*             mbuf;           // msg received
68         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
69         chute_t*                chute;
70         unsigned int    call_id;        // the id assigned to the call generated message
71
72         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
73                 if( DEBUG ) fprintf( stderr, "rmr mt_receive: bad parms, thread not started\n" );
74                 return NULL;
75         }
76
77         fprintf( stderr, "[INFO] rmr mt_receiver is spinning\n" );
78
79         while( ! ctx->shutdown ) {
80                 mbuf = rcv_msg( ctx, NULL );
81
82                 if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL ) {
83                         if( hdr->flags & HFL_CALL_MSG ) {                                       // call generated message; ignore call-id etc and queue
84                                 queue_normal( ctx, mbuf );
85                         } else {
86                                 if( RMR_D1_LEN( hdr ) <= 0 ) {                                  // no call-id data; just queue
87                                         queue_normal( ctx, mbuf );
88                                 } else {
89                                         d1 = DATA1_ADDR( hdr );
90                                         if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                                       // call_id not set, just queue
91                                                 queue_normal( ctx, mbuf );
92                                         } else {
93                                                 chute = &ctx->chutes[call_id];
94                                                 chute->mbuf = mbuf;
95                                                 sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
96                                         }
97                                 }
98                         }
99                 } else {
100                         if( ! mbuf ) {                          // very very unlikely, but prevent leaks
101                                 rmr_free_msg( mbuf );
102                         }
103                 }
104         }
105
106         return NULL;
107 }
108
109 #endif