1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_call_nng_static.c
23 Abstract: Static funcitons related to the multi-threaded call feature
24 which are NNG specific.
26 Author: E. Scott Daniels
30 #ifndef _mtcall_nng_static_c
31 #define _mtcall_nng_static_c
32 #include <semaphore.h>
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35 static int warned = 0;
38 if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39 rmr_free_msg( mbuf ); // drop if ring is full
41 rmr_vlog( RMR_VL_WARN, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
48 chute = &ctx->chutes[0];
49 sem_post( &chute->barrier ); // tickle the ring monitor
53 This is expected to execute in a separate thread. It is responsible for
54 _all_ receives and queues them on the appropriate ring, or chute.
56 The "state" of the message is checked which determines where the message
59 Flags indicate that the message is a call generated message, then
60 the message is queued on the normal receive ring.
62 Chute ID is == 0, then the message is queued on the normal receive ring.
64 The transaction ID in the message matches the expected ID in the chute,
65 then the message is given to the chute and the chute's semaphore is tickled.
67 If none are true, the message is queued on the normal receive queue and that
68 related semaphore is tickeled.
70 static void* mt_receive( void* vctx ) {
72 uta_mhdr_t* hdr; // header of the message received
73 rmr_mbuf_t* mbuf; // msg received
74 unsigned char* d1; // pointer at d1 data ([0] is the call_id)
76 unsigned int call_id; // the id assigned to the call generated message
78 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
79 if( DEBUG ) fprintf( stderr, "rmr mt_receive: bad parms, thread not started\n" );
83 rmr_vlog( RMR_VL_INFO, "rmr mt_receiver is spinning\n" );
85 while( ! ctx->shutdown ) {
86 mbuf = rcv_msg( ctx, NULL );
88 if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL && mbuf->payload != NULL ) {
89 if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
90 queue_normal( ctx, mbuf );
92 if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
93 queue_normal( ctx, mbuf );
95 d1 = DATA1_ADDR( hdr );
96 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
97 queue_normal( ctx, mbuf );
99 chute = &ctx->chutes[call_id];
101 sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
106 if( ! mbuf ) { // very very unlikely, but prevent leaks
107 rmr_free_msg( mbuf );