1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_call_nng_static.c
23 Abstract: Static funcitons related to the multi-threaded call feature
24 which are NNG specific.
26 Author: E. Scott Daniels
30 #ifndef _mtcall_nng_static_c
31 #define _mtcall_nng_static_c
32 #include <semaphore.h>
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38 if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39 rmr_free_msg( mbuf ); // drop if ring is full
42 chute = &ctx->chutes[0];
44 state = sem_post( &chute->barrier ); // tickle the ring monitor
48 This is expected to execute in a separate thread. It is responsible for
49 _all_ receives and queues them on the appropriate ring, or chute.
51 The "state" of the message is checked which determines where the message
54 Flags indicate that the message is a call generated message, then
55 the message is queued on the normal receive ring.
57 Chute ID is == 0, then the message is queued on the normal receive ring.
59 The transaction ID in the message matches the expected ID in the chute,
60 then the message is given to the chute and the chute's semaphore is tickled.
62 If none are true, the message is dropped.
64 static void* mt_receive( void* vctx ) {
66 uta_mhdr_t* hdr; // header of the message received
67 rmr_mbuf_t* mbuf; // msg received
68 unsigned char* d1; // pointer at d1 data ([0] is the call_id)
70 unsigned int call_id; // the id assigned to the call generated message
72 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
73 if( DEBUG ) fprintf( stderr, "rmr mt_receive: bad parms, thread not started\n" );
77 fprintf( stderr, "[INFO] rmr mt_receiver is spinning\n" );
79 while( ! ctx->shutdown ) {
80 mbuf = rcv_msg( ctx, NULL );
82 if( mbuf != NULL && (hdr = (uta_mhdr_t *) mbuf->header) != NULL ) {
83 if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
84 queue_normal( ctx, mbuf );
86 if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
87 queue_normal( ctx, mbuf );
89 d1 = DATA1_ADDR( hdr );
90 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
91 queue_normal( ctx, mbuf );
93 chute = &ctx->chutes[call_id];
95 sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
100 if( ! mbuf ) { // very very unlikely, but prevent leaks
101 rmr_free_msg( mbuf );