Allow user programme to set RMR verbosity level
[ric-plt/lib/rmr.git] / src / rmr / nng / src / mt_call_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_nng_static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are NNG specific.
25
26         Author:         E. Scott Daniels
27         Date:           20 May 2019
28 */
29
30 #ifndef _mtcall_nng_static_c
31 #define _mtcall_nng_static_c
32 #include <semaphore.h>
33
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35         static  int warned = 0;
36         chute_t*        chute;
37
38         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39                 rmr_free_msg( mbuf );                                                           // drop if ring is full
40                 if( !warned ) {
41                         rmr_vlog( RMR_VL_WARN, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
42                         warned++;
43                 }
44
45                 return;
46         }
47
48         chute = &ctx->chutes[0];
49         sem_post( &chute->barrier );                                                            // tickle the ring monitor
50 }
51
52 /*
53         This is expected to execute in a separate thread. It is responsible for
54         _all_ receives and queues them on the appropriate ring, or chute.
55
56         The "state" of the message is checked which determines where the message
57         is delivered.
58
59                 Flags indicate that the message is a call generated message, then
60                 the message is queued on the normal receive ring.
61
62                 Chute ID is == 0, then the message is queued on the normal receive ring.
63
64                 The transaction ID in the message matches the expected ID in the chute,
65                 then the message is given to the chute and the chute's semaphore is tickled.
66
67                 If none are true, the message is queued on the normal receive queue and that
68                 related semaphore is tickeled.
69 */
70 static void* mt_receive( void* vctx ) {
71         uta_ctx_t*              ctx;
72         uta_mhdr_t*             hdr;            // header of the message received
73         rmr_mbuf_t*             mbuf;           // msg received
74         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
75         chute_t*                chute;
76         unsigned int    call_id;        // the id assigned to the call generated message
77
78         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
79                 if( DEBUG ) fprintf( stderr, "rmr mt_receive: bad parms, thread not started\n" );
80                 return NULL;
81         }
82
83         rmr_vlog( RMR_VL_INFO, "rmr mt_receiver is spinning\n" );
84
85         while( ! ctx->shutdown ) {
86                 mbuf = rcv_msg( ctx, NULL );
87
88                 if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL && mbuf->payload != NULL ) {
89                         if( hdr->flags & HFL_CALL_MSG ) {                                       // call generated message; ignore call-id etc and queue
90                                 queue_normal( ctx, mbuf );
91                         } else {
92                                 if( RMR_D1_LEN( hdr ) <= 0 ) {                                  // no call-id data; just queue
93                                         queue_normal( ctx, mbuf );
94                                 } else {
95                                         d1 = DATA1_ADDR( hdr );
96                                         if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                                       // call_id not set, just queue
97                                                 queue_normal( ctx, mbuf );
98                                         } else {
99                                                 chute = &ctx->chutes[call_id];
100                                                 chute->mbuf = mbuf;
101                                                 sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
102                                         }
103                                 }
104                         }
105                 } else {
106                         if( ! mbuf ) {                          // very very unlikely, but prevent leaks
107                                 rmr_free_msg( mbuf );
108                         }
109                 }
110         }
111
112         return NULL;
113 }
114
115 #endif