1 // : vi ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_call_si static.c
23 Abstract: Static funcitons related to the multi-threaded call feature
24 which are SI specific. The functions here also provide the
25 message construction functions which build a message that
26 might be split across multiple "datagrams" received from the
29 Author: E. Scott Daniels
33 #ifndef _mtcall_si_static_c
34 #define _mtcall_si_static_c
35 #include <semaphore.h>
37 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38 static int warned = 0;
41 if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
42 rmr_free_msg( mbuf ); // drop if ring is full
44 rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
51 chute = &ctx->chutes[0];
52 sem_post( &chute->barrier ); // tickle the ring monitor
56 Allocate a message buffer, point it at the accumulated (raw) message,
57 call ref to point to all of the various bits and set real len etc,
58 then we queue it. Raw_msg is expected to include the transport goo
59 placed in front of the RMR header and payload.
61 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
63 uta_mhdr_t* hdr; // header of the message received
64 unsigned char* d1; // pointer at d1 data ([0] is the call_id)
66 unsigned int call_id; // the id assigned to the call generated message
68 if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
69 if( raw_msg == NULL || msg_size <= 0 ) {
74 if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
75 mbuf->tp_buf = raw_msg;
76 mbuf->rts_fd = sender_fd;
77 if( msg_size > ctx->max_ibm + 1024 ) {
78 mbuf->flags |= MFL_HUGE; // prevent caching of oversized buffers
81 ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
82 hdr = mbuf->header; // convenience
83 if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
84 queue_normal( ctx, mbuf );
86 if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
87 queue_normal( ctx, mbuf );
89 d1 = DATA1_ADDR( hdr );
90 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
91 queue_normal( ctx, mbuf );
93 chute = &ctx->chutes[call_id];
95 sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
103 Given a buffer, extract the size. We assume the buffer contains one of:
107 where <int1> is the size in native storage order (v1) and <int2>
108 is the size in network order. If <mark> is present then we assume
109 that <int2> is present and we use that after translating from net
110 byte order. If <mark> is not present, we use <int1>. This allows
111 old versions of RMR to continue to work with new versions that now
112 do the right thing with byte ordering.
114 static inline uint32_t extract_mlen( unsigned char* buf ) {
115 uint32_t size; // adjusted (if needed) size for return
116 uint32_t* blen; // length in the buffer to extract
118 blen = (uint32_t *) buf;
119 if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
120 size = ntohl( *(blen+1) ); // pick up the second integer
121 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
123 size = *blen; // old sender didn't encode size
124 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
131 This is the callback invoked when tcp data is received. It adds the data
132 to the buffer for the connection and if a complete message is received
133 then the message is queued onto the receive ring.
135 Return value indicates only that we handled the buffer and SI should continue
136 or that SI should terminate, so on error it's NOT wrong to return "ok".
138 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
140 river_t* river; // river associated with the fd passed in
141 unsigned char* old_accum; // old accumulator reference should we need to realloc
142 int bidx = 0; // transport buffer index
143 int remain; // bytes in transport buf that need to be moved
144 int* mlen; // pointer to spot in buffer for conversion to int
145 int need; // bytes needed for something
148 if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
149 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
153 if( fd >= ctx->nrivers || fd < 0 ) {
154 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
158 ctx = (uta_ctx_t *) vctx;
165 river = &ctx->rivers[fd];
166 if( river->state != RS_GOOD ) { // all states which aren't good require reset first
167 if( river->state == RS_NEW ) {
168 memset( river, 0, sizeof( *river ) );
169 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
170 river->accum = (char *) malloc( river->nbytes );
173 // future -- sync to next marker
174 river->ipt = 0; // insert point
178 river->state = RS_GOOD;
180 while( remain > 0 ) { // until we've done something with all bytes passed in
181 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
183 if( river->msg_size <= 0 ) { // don't have a size yet
184 // FIX ME: we need a frame indicator to ensure alignment
185 need = TP_SZFIELD_LEN - river->ipt; // what we need to compute length
186 if( need > remain ) { // the whole size isn't there
187 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
188 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
189 river->ipt += remain;
190 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
194 if( river->ipt > 0 ) { // if we captured the start of size last go round
195 memcpy( &river->accum[river->ipt], buf + bidx, need );
199 river->msg_size = extract_mlen( river->accum );
201 rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
203 dump_40( river->accum, "from accumulator:" );
204 if( river->msg_size > 100 ) {
205 dump_40( river->accum + 50, "from rmr buf:" );
210 river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later
212 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
214 if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
215 //river->flags |= RF_DROP;
216 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
217 old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold
218 river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room
219 river->accum = (char *) malloc( river->nbytes );
220 if( river->ipt > 0 ) {
221 memcpy( river->accum, old_accum, river->ipt + 1 ); // copy anything snarfed in getting the sie
228 if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
229 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
230 if( (river->flags & RF_DROP) == 0 ) {
231 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
233 river->ipt += remain;
236 need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
237 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
238 if( (river->flags & RF_DROP) == 0 ) {
239 memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
240 buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
241 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting
242 river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
244 if( !(river->flags & RF_NOTIFIED) ) {
245 rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
246 river->flags |= RF_NOTIFIED;
250 river->msg_size = -1;
257 if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
262 Callback driven on a disconnect notification. We will attempt to find the related
263 endpoint via the fd2ep hash maintained in the context. If we find it, then we
264 remove it from the hash, and mark the endpoint as closed so that the next attempt
265 to send forces a reconnect attempt.
267 Future: put the ep on a queue to automatically attempt to reconnect.
269 static int mt_disc_cb( void* vctx, int fd ) {
273 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
277 ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
279 pthread_mutex_lock( &ep->gate ); // wise to lock this
282 pthread_mutex_unlock( &ep->gate );
290 This is expected to execute in a separate thread. It is responsible for
291 _all_ receives and queues them on the appropriate ring, or chute.
292 It does this by registering the callback function above with the SI world
293 and then calling SIwait() to drive the callback when data has arrived.
296 The "state" of the message is checked which determines where the message
299 Flags indicate that the message is a call generated message, then
300 the message is queued on the normal receive ring.
302 Chute ID is == 0, then the message is queued on the normal receive ring.
304 The transaction ID in the message matches the expected ID in the chute,
305 then the message is given to the chute and the chute's semaphore is tickled.
307 If none are true, the message is dropped.
309 static void* mt_receive( void* vctx ) {
312 if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
313 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
317 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
319 SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
320 SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
322 SIwait( ctx->si_ctx );
324 return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return