1 // : vi ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2020-2021 Nokia
5 Copyright (c) 2018-2021 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_call_si static.c
23 Abstract: Static funcitons related to the multi-threaded call feature
24 which are SI specific. The functions here also provide the
25 message construction functions which build a message that
26 might be split across multiple "datagrams" received from the
29 Author: E. Scott Daniels
33 #ifndef _mtcall_si_static_c
34 #define _mtcall_si_static_c
35 #include <semaphore.h>
37 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38 static time_t last_warning = 0;
39 //static long dcount = 0;
43 if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
44 rmr_free_msg( mbuf ); // drop if ring is full
48 if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec
49 rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
50 last_warning = time( NULL );
57 chute = &ctx->chutes[0];
58 sem_post( &chute->barrier ); // tickle the ring monitor
62 Allocate a message buffer, point it at the accumulated (raw) message,
63 call ref to point to all of the various bits and set real len etc,
64 then we queue it. Raw_msg is expected to include the transport goo
65 placed in front of the RMR header and payload.
67 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
69 uta_mhdr_t* hdr; // header of the message received
70 unsigned char* d1; // pointer at d1 data ([0] is the call_id)
72 unsigned int call_id; // the id assigned to the call generated message
74 if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
75 if( raw_msg == NULL || msg_size <= 0 ) {
80 // cross-check that header length indicators are not longer than actual message
81 uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN);
82 uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check);
83 uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen);
84 if (header_len+TP_HDR_LEN+payload_len> msg_size) {
85 rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size);
91 if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
92 mbuf->tp_buf = raw_msg;
93 mbuf->rts_fd = sender_fd;
94 if( msg_size > ctx->max_ibm + 1024 ) {
95 mbuf->flags |= MFL_HUGE; // prevent caching of oversized buffers
98 ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
99 hdr = mbuf->header; // convenience
100 if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
101 queue_normal( ctx, mbuf );
103 if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
104 queue_normal( ctx, mbuf );
106 d1 = DATA1_ADDR( hdr );
107 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
108 queue_normal( ctx, mbuf );
110 chute = &ctx->chutes[call_id];
112 sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
122 Given a buffer, extract the size. We assume the buffer contains one of:
126 where <int1> is the size in native storage order (v1) and <int2>
127 is the size in network order. If <mark> is present then we assume
128 that <int2> is present and we use that after translating from net
129 byte order. If <mark> is not present, we use <int1>. This allows
130 old versions of RMR to continue to work with new versions that now
131 do the right thing with byte ordering.
133 If the receiver of a message is a backlevel RMR, and it uses RTS to
134 return a message, it will only update the old size, but when the
135 message is received back at a new RMR application it will appear that
136 the message came from a new instance. Therefore, we must compare
137 the old and new sizes and if they are different we must use the old
138 size assuming that this is the case.
140 static inline uint32_t extract_mlen( unsigned char* buf ) {
141 uint32_t size; // adjusted (if needed) size for return
142 uint32_t osize; // old size
143 uint32_t* blen; // length in the buffer to extract
145 blen = (uint32_t *) buf;
146 if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
147 osize = *blen; // old size
148 size = ntohl( *(blen+1) ); // pick up the second integer
149 if( osize != size ) { // assume back level return to sender
150 size = osize; // MUST use old size
152 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
154 size = *blen; // old sender didn't encode size
155 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
162 This is the callback invoked when tcp data is received. It adds the data
163 to the buffer for the connection and if a complete message is received
164 then the message is queued onto the receive ring.
166 Return value indicates only that we handled the buffer and SI should continue
167 or that SI should terminate, so on error it's NOT wrong to return "ok".
169 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
171 river_t* river; // river associated with the fd passed in
172 unsigned char* old_accum; // old accumulator reference should we need to realloc
173 int bidx = 0; // transport buffer index
174 int remain; // bytes in transport buf that need to be moved
175 int* mlen; // pointer to spot in buffer for conversion to int
176 int need; // bytes needed for something
179 if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
180 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
184 ctx = (uta_ctx_t *) vctx;
187 if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd
191 if( fd >= ctx->nrivers ) {
192 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
193 if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
194 river = (river_t *) malloc( sizeof( *river ) );
195 memset( river, 0, sizeof( *river ) );
196 rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
197 river->state = RS_NEW;
200 river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD
203 if( river->state != RS_GOOD ) { // all states which aren't good require reset first
204 if( river->state == RS_NEW ) {
205 if( river->accum != NULL ) {
206 free( river->accum );
208 memset( river, 0, sizeof( *river ) );
209 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size
210 river->accum = (char *) malloc( river->nbytes );
213 if( river->state == RS_RESET ) {
214 // future -- reset not implemented
217 // future -- sync to next marker
218 river->ipt = 0; // insert point
223 river->state = RS_GOOD;
225 while( remain > 0 ) { // until we've done something with all bytes passed in
226 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
228 if( river->msg_size <= 0 ) { // don't have a message length yet
229 // FIX ME: we need a frame indicator to ensure alignment
230 need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length
231 if( need > remain ) { // the whole message len information isn't in this transport buf
232 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
233 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
234 river->ipt += remain;
235 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
239 if( river->ipt > 0 ) { // if we captured the start of size last go round
240 memcpy( &river->accum[river->ipt], buf + bidx, need );
244 river->msg_size = extract_mlen( river->accum );
246 rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
248 dump_40( river->accum, "from accumulator:" );
249 if( river->msg_size > 100 ) {
250 dump_40( river->accum + 50, "from rmr buf:" );
255 river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later
258 if( river->msg_size < 0) { // addressing RIC-989
259 river->state=RS_RESET;
263 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
265 if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
266 //river->flags |= RF_DROP; // uncomment to drop large messages
267 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
268 old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold
269 river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room
270 river->accum = (char *) malloc( river->nbytes );
271 if( river->ipt > 0 ) {
272 memcpy( river->accum, old_accum, river->ipt + 1 ); // copy anything snarfed in getting the sie
279 if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer
280 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
281 if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes
282 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more
284 river->ipt += remain;
287 need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
288 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
289 if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on
290 memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
291 buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
292 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting
293 river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
295 if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream
296 rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
297 river->flags |= RF_NOTIFIED;
301 river->msg_size = -1;
308 if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
313 Callback driven on a disconnect notification. We will attempt to find the related
314 endpoint via the fd2ep hash maintained in the context. If we find it, then we
315 remove it from the hash, and mark the endpoint as closed so that the next attempt
316 to send forces a reconnect attempt.
318 Future: put the ep on a queue to automatically attempt to reconnect.
320 static int mt_disc_cb( void* vctx, int fd ) {
323 river_t* river = NULL;
325 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
329 if( fd < ctx->nrivers && fd >= 0 ) {
330 river = &ctx->rivers[fd];
333 river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
334 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
338 if( river != NULL ) {
339 river->state = RS_NEW; // if one connects here later; ensure it's new
340 if( river->accum != NULL ) {
341 free( river->accum );
343 river->state = RS_NEW; // force realloc if the fd is used again
347 ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
349 pthread_mutex_lock( &ep->gate ); // wise to lock this
352 pthread_mutex_unlock( &ep->gate );
360 This is expected to execute in a separate thread. It is responsible for
361 _all_ receives and queues them on the appropriate ring, or chute.
362 It does this by registering the callback function above with the SI world
363 and then calling SIwait() to drive the callback when data has arrived.
366 The "state" of the message is checked which determines where the message
369 Flags indicate that the message is a call generated message, then
370 the message is queued on the normal receive ring.
372 Chute ID is == 0, then the message is queued on the normal receive ring.
374 The transaction ID in the message matches the expected ID in the chute,
375 then the message is given to the chute and the chute's semaphore is tickled.
377 If none are true, the message is dropped.
379 static void* mt_receive( void* vctx ) {
382 if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
383 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
387 rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() );
389 SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
390 SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
392 SIwait( ctx->si_ctx );
394 return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return