The change to fix a bug2
[ric-plt/lib/rmr.git] / src / rmr / nng / src / mt_call_nng_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_nng_static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are NNG specific.
25
26         Author:         E. Scott Daniels
27         Date:           20 May 2019
28 */
29
30 #ifndef _mtcall_nng_static_c
31 #define _mtcall_nng_static_c
32 #include <semaphore.h>
33
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35         static  int warned = 0;
36         chute_t*        chute;
37
38         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39                 rmr_free_msg( mbuf );                                                           // drop if ring is full
40                 if( !warned ) {
41                         fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
42                         warned++;
43                 }
44
45                 return;
46         }
47
48         chute = &ctx->chutes[0];
49         sem_post( &chute->barrier );                                                            // tickle the ring monitor
50 }
51
52 /*
53         This is expected to execute in a separate thread. It is responsible for
54         _all_ receives and queues them on the appropriate ring, or chute.
55
56         The "state" of the message is checked which determines where the message
57         is delivered.
58
59                 Flags indicate that the message is a call generated message, then
60                 the message is queued on the normal receive ring.
61
62                 Chute ID is == 0, then the message is queued on the normal receive ring.
63
64                 The transaction ID in the message matches the expected ID in the chute,
65                 then the message is given to the chute and the chute's semaphore is tickled.
66
67                 If none are true, the message is dropped.
68 */
69 static void* mt_receive( void* vctx ) {
70         uta_ctx_t*              ctx;
71         uta_mhdr_t*             hdr;            // header of the message received
72         rmr_mbuf_t*             mbuf;           // msg received
73         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
74         chute_t*                chute;
75         unsigned int    call_id;        // the id assigned to the call generated message
76
77         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
78                 if( DEBUG ) fprintf( stderr, "rmr mt_receive: bad parms, thread not started\n" );
79                 return NULL;
80         }
81
82         fprintf( stderr, "[INFO] rmr mt_receiver is spinning\n" );
83
84         while( ! ctx->shutdown ) {
85                 mbuf = rcv_msg( ctx, NULL );
86
87                 if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL ) {
88                         if( hdr->flags & HFL_CALL_MSG ) {                                       // call generated message; ignore call-id etc and queue
89                                 queue_normal( ctx, mbuf );
90                         } else {
91                                 if( RMR_D1_LEN( hdr ) <= 0 ) {                                  // no call-id data; just queue
92                                         queue_normal( ctx, mbuf );
93                                 } else {
94                                         d1 = DATA1_ADDR( hdr );
95                                         if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                                       // call_id not set, just queue
96                                                 queue_normal( ctx, mbuf );
97                                         } else {
98                                                 chute = &ctx->chutes[call_id];
99                                                 chute->mbuf = mbuf;
100                                                 sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
101                                         }
102                                 }
103                         }
104                 } else {
105                         if( ! mbuf ) {                          // very very unlikely, but prevent leaks
106                                 rmr_free_msg( mbuf );
107                         }
108                 }
109         }
110
111         return NULL;
112 }
113
114 #endif