1 // : vi ts=4 sw=4 noet2
3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_call_si static.c
23 Abstract: Static funcitons related to the multi-threaded call feature
24 which are SI specific. The functions here also provide the
25 message construction functions which build a message that
26 might be split across multiple "datagrams" received from the
29 Author: E. Scott Daniels
33 #ifndef _mtcall_si_static_c
34 #define _mtcall_si_static_c
35 #include <semaphore.h>
37 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38 static time_t last_warning = 0;
39 //static long dcount = 0;
43 if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
44 rmr_free_msg( mbuf ); // drop if ring is full
47 if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec
48 rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %ld msgs dropped since last warning\n", ctx->dcount );
49 last_warning = time( NULL );
56 chute = &ctx->chutes[0];
57 sem_post( &chute->barrier ); // tickle the ring monitor
61 Allocate a message buffer, point it at the accumulated (raw) message,
62 call ref to point to all of the various bits and set real len etc,
63 then we queue it. Raw_msg is expected to include the transport goo
64 placed in front of the RMR header and payload.
66 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
68 uta_mhdr_t* hdr; // header of the message received
69 unsigned char* d1; // pointer at d1 data ([0] is the call_id)
71 unsigned int call_id; // the id assigned to the call generated message
73 if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
74 if( raw_msg == NULL || msg_size <= 0 ) {
79 if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
80 mbuf->tp_buf = raw_msg;
81 mbuf->rts_fd = sender_fd;
82 if( msg_size > ctx->max_ibm + 1024 ) {
83 mbuf->flags |= MFL_HUGE; // prevent caching of oversized buffers
86 ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
87 hdr = mbuf->header; // convenience
88 if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
89 queue_normal( ctx, mbuf );
91 if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
92 queue_normal( ctx, mbuf );
94 d1 = DATA1_ADDR( hdr );
95 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
96 queue_normal( ctx, mbuf );
98 chute = &ctx->chutes[call_id];
100 sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
110 Given a buffer, extract the size. We assume the buffer contains one of:
114 where <int1> is the size in native storage order (v1) and <int2>
115 is the size in network order. If <mark> is present then we assume
116 that <int2> is present and we use that after translating from net
117 byte order. If <mark> is not present, we use <int1>. This allows
118 old versions of RMR to continue to work with new versions that now
119 do the right thing with byte ordering.
121 If the receiver of a message is a backlevel RMR, and it uses RTS to
122 return a message, it will only update the old size, but when the
123 message is received back at a new RMR application it will appear that
124 the message came from a new instance. Therefore, we must compare
125 the old and new sizes and if they are different we must use the old
126 size assuming that this is the case.
128 static inline uint32_t extract_mlen( unsigned char* buf ) {
129 uint32_t size; // adjusted (if needed) size for return
130 uint32_t osize; // old size
131 uint32_t* blen; // length in the buffer to extract
133 blen = (uint32_t *) buf;
134 if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
135 osize = *blen; // old size
136 size = ntohl( *(blen+1) ); // pick up the second integer
137 if( osize != size ) { // assume back level return to sender
138 size = osize; // MUST use old size
140 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
142 size = *blen; // old sender didn't encode size
143 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
150 This is the callback invoked when tcp data is received. It adds the data
151 to the buffer for the connection and if a complete message is received
152 then the message is queued onto the receive ring.
154 Return value indicates only that we handled the buffer and SI should continue
155 or that SI should terminate, so on error it's NOT wrong to return "ok".
157 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
159 river_t* river; // river associated with the fd passed in
160 unsigned char* old_accum; // old accumulator reference should we need to realloc
161 int bidx = 0; // transport buffer index
162 int remain; // bytes in transport buf that need to be moved
163 int* mlen; // pointer to spot in buffer for conversion to int
164 int need; // bytes needed for something
167 if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
168 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
172 ctx = (uta_ctx_t *) vctx;
175 if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd
179 if( fd >= ctx->nrivers ) {
180 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
181 if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
182 river = (river_t *) malloc( sizeof( *river ) );
183 memset( river, 0, sizeof( *river ) );
184 rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
185 river->state = RS_NEW;
188 river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD
191 if( river->state != RS_GOOD ) { // all states which aren't good require reset first
192 if( river->state == RS_NEW ) {
193 if( river->accum != NULL ) {
194 free( river->accum );
196 memset( river, 0, sizeof( *river ) );
197 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size
198 river->accum = (char *) malloc( river->nbytes );
201 // future -- sync to next marker
202 river->ipt = 0; // insert point
206 river->state = RS_GOOD;
208 while( remain > 0 ) { // until we've done something with all bytes passed in
209 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
211 if( river->msg_size <= 0 ) { // don't have a message length yet
212 // FIX ME: we need a frame indicator to ensure alignment
213 need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length
214 if( need > remain ) { // the whole message len information isn't in this transport buf
215 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
216 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
217 river->ipt += remain;
218 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
222 if( river->ipt > 0 ) { // if we captured the start of size last go round
223 memcpy( &river->accum[river->ipt], buf + bidx, need );
227 river->msg_size = extract_mlen( river->accum );
229 rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
231 dump_40( river->accum, "from accumulator:" );
232 if( river->msg_size > 100 ) {
233 dump_40( river->accum + 50, "from rmr buf:" );
238 river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later
240 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
242 if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
243 //river->flags |= RF_DROP; // uncomment to drop large messages
244 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
245 old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold
246 river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room
247 river->accum = (char *) malloc( river->nbytes );
248 if( river->ipt > 0 ) {
249 memcpy( river->accum, old_accum, river->ipt + 1 ); // copy anything snarfed in getting the sie
256 if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer
257 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
258 if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes
259 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more
261 river->ipt += remain;
264 need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
265 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
266 if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on
267 memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
268 buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
269 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting
270 river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
272 if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream
273 rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
274 river->flags |= RF_NOTIFIED;
278 river->msg_size = -1;
285 if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
290 Callback driven on a disconnect notification. We will attempt to find the related
291 endpoint via the fd2ep hash maintained in the context. If we find it, then we
292 remove it from the hash, and mark the endpoint as closed so that the next attempt
293 to send forces a reconnect attempt.
295 Future: put the ep on a queue to automatically attempt to reconnect.
297 static int mt_disc_cb( void* vctx, int fd ) {
300 river_t* river = NULL;
302 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
306 if( fd < ctx->nrivers && fd >= 0 ) {
307 river = &ctx->rivers[fd];
310 river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
311 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
315 if( river != NULL ) {
316 river->state = RS_NEW; // if one connects here later; ensure it's new
317 if( river->accum != NULL ) {
318 free( river->accum );
320 river->state = RS_NEW; // force realloc if the fd is used again
324 ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
326 pthread_mutex_lock( &ep->gate ); // wise to lock this
329 pthread_mutex_unlock( &ep->gate );
337 This is expected to execute in a separate thread. It is responsible for
338 _all_ receives and queues them on the appropriate ring, or chute.
339 It does this by registering the callback function above with the SI world
340 and then calling SIwait() to drive the callback when data has arrived.
343 The "state" of the message is checked which determines where the message
346 Flags indicate that the message is a call generated message, then
347 the message is queued on the normal receive ring.
349 Chute ID is == 0, then the message is queued on the normal receive ring.
351 The transaction ID in the message matches the expected ID in the chute,
352 then the message is given to the chute and the chute's semaphore is tickled.
354 If none are true, the message is dropped.
356 static void* mt_receive( void* vctx ) {
359 if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
360 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
364 rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() );
366 SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
367 SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
369 SIwait( ctx->si_ctx );
371 return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return