c117a5c72ae8fa206c993b715c594187aea7f5ad
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
1         // : vi ts=4 sw=4 noet2
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_si static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are SI specific. The functions here also provide the
25                                 message construction functions which build a message that
26                                 might be split across multiple "datagrams" received from the
27                                 underlying transport.
28
29         Author:         E. Scott Daniels
30         Date:           20 May 2019
31 */
32
33 #ifndef _mtcall_si_static_c
34 #define _mtcall_si_static_c
35 #include <semaphore.h>
36
37 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38         static  time_t last_warning = 0;
39         //static        long dcount = 0;
40
41         chute_t*        chute;
42
43         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
44                 rmr_free_msg( mbuf );                                                           // drop if ring is full
45                 //dcount++;
46                 ctx->dcount++;
47                 if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
48                         rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
49                         last_warning = time( NULL );
50                         ctx->dcount = 0;
51                 }
52
53                 return;
54         }
55
56         chute = &ctx->chutes[0];
57         sem_post( &chute->barrier );                                                            // tickle the ring monitor
58 }
59
60 /*
61         Allocate a message buffer, point it at the accumulated (raw) message,
62         call ref to point to all of the various bits and set real len etc,
63         then we queue it.  Raw_msg is expected to include the transport goo
64         placed in front of the RMR header and payload.
65 */
66 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
67         rmr_mbuf_t*             mbuf;
68         uta_mhdr_t*             hdr;            // header of the message received
69         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
70         chute_t*                chute;
71         unsigned int    call_id;        // the id assigned to the call generated message
72
73         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
74                 if( raw_msg == NULL || msg_size <= 0 ) {
75                         return;
76                 }
77         }
78
79         if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
80                 mbuf->tp_buf = raw_msg;
81                 mbuf->rts_fd = sender_fd;
82                 if( msg_size > ctx->max_ibm + 1024 ) {
83                         mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
84                 }
85
86                 ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
87                 hdr = mbuf->header;                                                     // convenience
88                 if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
89                         queue_normal( ctx, mbuf );
90                 } else {
91                         if( RMR_D1_LEN( hdr ) <= 0 ) {                                                                                  // no call-id data; just queue
92                                 queue_normal( ctx, mbuf );
93                         } else {
94                                 d1 = DATA1_ADDR( hdr );
95                                 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                       // call_id not set, just queue
96                                         queue_normal( ctx, mbuf );
97                                 } else {
98                                         chute = &ctx->chutes[call_id];
99                                         chute->mbuf = mbuf;
100                                         sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
101                                 }
102                         }
103                 }
104         } else {
105                 free( raw_msg );
106         }
107 }
108
109 /*
110         Given a buffer, extract the size. We assume the buffer contains one of:
111                 <int1><int2><mark>
112                 <int1>
113
114         where <int1> is the size in native storage order (v1) and <int2>
115         is the size in network order. If <mark> is present then we assume
116         that <int2> is present and we use that after translating from net
117         byte order. If <mark> is not present, we use <int1>. This allows
118         old versions of RMR to continue to work with new versions that now
119         do the right thing with byte ordering.
120
121         If the receiver of a message is a backlevel RMR, and it uses RTS to
122         return a message, it will only update the old size, but when the
123         message is received back at a new RMR application it will appear that
124         the message came from a new instance.  Therefore, we must compare
125         the old and new sizes and if they are different we must use the old
126         size assuming that this is the case.
127 */
128 static inline uint32_t extract_mlen( unsigned char* buf ) {
129         uint32_t        size;           // adjusted (if needed) size for return
130         uint32_t        osize;          // old size
131         uint32_t*       blen;           // length in the buffer to extract
132
133         blen = (uint32_t *) buf;
134         if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
135                 osize = *blen;                                                  // old size
136                 size = ntohl( *(blen+1) );                              // pick up the second integer
137                 if( osize != size ) {                                   // assume back level return to sender
138                         size = osize;                                           // MUST use old size
139                 }
140                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
141         } else {
142                 size = *blen;                                                   // old sender didn't encode size
143                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
144         }
145
146         return size;
147 }
148
149 /*
150         This is the callback invoked when tcp data is received. It adds the data
151         to the buffer for the connection and if a complete message is received
152         then the message is queued onto the receive ring.
153
154         Return value indicates only that we handled the buffer and SI should continue
155         or that SI should terminate, so on error it's NOT wrong to return "ok".
156 */
157 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
158         uta_ctx_t*              ctx;
159         river_t*                river;                  // river associated with the fd passed in
160         unsigned char*  old_accum;              // old accumulator reference should we need to realloc
161         int                             bidx = 0;               // transport buffer index
162         int                             remain;                 // bytes in transport buf that need to be moved
163         int*                    mlen;                   // pointer to spot in buffer for conversion to int
164         int                             need;                   // bytes needed for something
165         int                             i;
166
167         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
168                 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
169                         return SI_RET_OK;
170                 }
171         } else {
172                 ctx = (uta_ctx_t *) vctx;
173         }
174
175         if( buflen <= 0 || fd < 0 ) {                   // no buffer or invalid fd
176                 return SI_RET_OK;
177         }
178
179         if( fd >= ctx->nrivers ) {
180                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
181                 if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
182                         river = (river_t *) malloc( sizeof( *river ) );
183                         memset( river, 0, sizeof( *river ) );
184                         rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
185                         river->state = RS_NEW;
186                 }
187         } else {
188                 river = &ctx->rivers[fd];                               // quick index for fd values < MAX_FD
189         }
190
191         if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
192                 if( river->state == RS_NEW ) {
193                         if( river->accum != NULL ) {
194                                 free( river->accum );
195                         }
196                         memset( river, 0, sizeof( *river ) );
197                         river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
198                         river->accum = (char *) malloc( river->nbytes );
199                         river->ipt = 0;
200                 } else {
201                         // future -- sync to next marker
202                         river->ipt = 0;                                         // insert point
203                 }
204         }
205
206         river->state = RS_GOOD;
207         remain = buflen;
208         while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
209                 if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
210
211                 if( river->msg_size <= 0 ) {                            // don't have a message length  yet
212                                                                                                         // FIX ME: we need a frame indicator to ensure alignment
213                         need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute the total message length
214                         if( need > remain ) {                                                                           // the whole message len  information isn't in this transport buf
215                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
216                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
217                                 river->ipt += remain;
218                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
219                                 return SI_RET_OK;
220                         }
221
222                         if( river->ipt > 0 ) {                                                                          // if we captured the start of size last go round
223                                 memcpy( &river->accum[river->ipt], buf + bidx, need );
224                                 river->ipt += need;
225                                 bidx += need;
226                                 remain -= need;
227                                 river->msg_size = extract_mlen( river->accum );
228                                 if( DEBUG ) {
229                                         rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
230                                         if( DEBUG > 1 ) {
231                                                 dump_40( river->accum, "from accumulator:"  );
232                                                 if( river->msg_size > 100 ) {
233                                                         dump_40( river->accum + 50, "from rmr buf:"  );
234                                                 }
235                                         }
236                                 }
237                         } else {
238                                 river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
239                         }
240                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
241
242                         if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
243                                 //river->flags |= RF_DROP;                                                              //  uncomment to drop large messages
244                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
245                                 old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
246                                 river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
247                                 river->accum = (char *) malloc( river->nbytes );
248                                 if( river->ipt > 0 ) {
249                                         memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
250                                 }
251
252                                 free( old_accum );
253                         }
254                 }
255
256                 if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in receive buffer
257                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
258                         if( (river->flags & RF_DROP) == 0  ) {                                                  // ok to keep this message; copy bytes
259                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );          // grab what is in the rcv buffer and go wait for more
260                         }
261                         river->ipt += remain;
262                         remain = 0;
263                 } else {
264                         need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
265                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
266                         if( (river->flags & RF_DROP) == 0  ) {                                                                  // keeping this message, copy and pass it on
267                                 memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
268                                 buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
269                                 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
270                                 river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
271                         } else {
272                                 if( !(river->flags & RF_NOTIFIED) ) {                                                           // not keeping huge messages; notify once per stream
273                                         rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
274                                         river->flags |= RF_NOTIFIED;
275                                 }
276                         }
277
278                         river->msg_size = -1;
279                         river->ipt = 0;
280                         bidx += need;
281                         remain -= need;
282                 }
283         }
284
285         if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
286         return SI_RET_OK;
287 }
288
289 /*
290         Callback driven on a disconnect notification. We will attempt to find the related
291         endpoint via the fd2ep hash maintained in the context. If we find it, then we
292         remove it from the hash, and mark the endpoint as closed so that the next attempt
293         to send forces a reconnect attempt.
294
295         Future: put the ep on a queue to automatically attempt to reconnect.
296 */
297 static int mt_disc_cb( void* vctx, int fd ) {
298         uta_ctx_t*      ctx;
299         endpoint_t*     ep;
300         river_t*        river = NULL;
301
302         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
303                 return SI_RET_OK;
304         }
305
306         if( fd < ctx->nrivers && fd >= 0 ) {
307                 river = &ctx->rivers[fd];
308         } else {
309                 if( fd > 0 ) {
310                         river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
311                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
312                 }
313         }
314
315         if( river != NULL ) {
316                 river->state = RS_NEW;                  // if one connects here later; ensure it's new
317                 if( river->accum != NULL ) {
318                         free( river->accum );
319                         river->accum = NULL;
320                         river->state = RS_NEW;          // force realloc if the fd is used again
321                 }
322         }
323
324         ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
325         if( ep != NULL ) {
326                 pthread_mutex_lock( &ep->gate );            // wise to lock this
327                 ep->open = FALSE;
328                 ep->nn_sock = -1;
329                 pthread_mutex_unlock( &ep->gate );
330         }
331
332         return SI_RET_OK;
333 }
334
335
336 /*
337         This is expected to execute in a separate thread. It is responsible for
338         _all_ receives and queues them on the appropriate ring, or chute.
339         It does this by registering the callback function above with the SI world
340         and then calling SIwait() to drive the callback when data has arrived.
341
342
343         The "state" of the message is checked which determines where the message
344         is delivered.
345
346                 Flags indicate that the message is a call generated message, then
347                 the message is queued on the normal receive ring.
348
349                 Chute ID is == 0, then the message is queued on the normal receive ring.
350
351                 The transaction ID in the message matches the expected ID in the chute,
352                 then the message is given to the chute and the chute's semaphore is tickled.
353
354                 If none are true, the message is dropped.
355 */
356 static void* mt_receive( void* vctx ) {
357         uta_ctx_t*      ctx;
358
359         if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
360                 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
361                 return NULL;
362         }
363
364         rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld  registering SI95 data callback and waiting\n", (long long) pthread_self() );
365
366         SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
367         SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
368
369         SIwait( ctx->si_ctx );
370
371         return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
372 }
373
374 #endif