Fixing handling of invalid header size
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
1 // : vi ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020-2021 Nokia
5         Copyright (c) 2018-2021 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_si static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are SI specific. The functions here also provide the
25                                 message construction functions which build a message that
26                                 might be split across multiple "datagrams" received from the
27                                 underlying transport.
28
29         Author:         E. Scott Daniels
30         Date:           20 May 2019
31 */
32
33 #ifndef _mtcall_si_static_c
34 #define _mtcall_si_static_c
35 #include <semaphore.h>
36
37 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38         static  time_t last_warning = 0;
39         //static        long dcount = 0;
40
41         chute_t*        chute;
42
43         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
44                 rmr_free_msg( mbuf );                                                           // drop if ring is full
45                 //dcount++;
46                 ctx->dcount++;
47                 ctx->acc_dcount++;
48                 if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
49                         rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
50                         last_warning = time( NULL );
51                         ctx->dcount = 0;
52                 }
53
54                 return;
55         }
56         ctx->acc_ecount++;
57         chute = &ctx->chutes[0];
58         sem_post( &chute->barrier );                                                            // tickle the ring monitor
59 }
60
61 /*
62         Allocate a message buffer, point it at the accumulated (raw) message,
63         call ref to point to all of the various bits and set real len etc,
64         then we queue it.  Raw_msg is expected to include the transport goo
65         placed in front of the RMR header and payload.
66 */
67 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
68         rmr_mbuf_t*             mbuf;
69         uta_mhdr_t*             hdr;            // header of the message received
70         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
71         chute_t*                chute;
72         unsigned int    call_id;        // the id assigned to the call generated message
73
74         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
75                 if( raw_msg == NULL || msg_size <= 0 ) {
76                         return;
77                 }
78         }
79
80         // cross-check that header length indicators are not longer than actual message
81         uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN);
82         uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check);
83         uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen);
84         if (header_len+TP_HDR_LEN+payload_len> msg_size) {
85                 rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size);
86                 free (raw_msg);
87                 return;
88         }
89
90
91         if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
92                 mbuf->tp_buf = raw_msg;
93                 mbuf->rts_fd = sender_fd;
94                 if( msg_size > ctx->max_ibm + 1024 ) {
95                         mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
96                 }
97
98                 ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
99                 hdr = mbuf->header;                                                     // convenience
100                 if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
101                         queue_normal( ctx, mbuf );
102                 } else {
103                         if( RMR_D1_LEN( hdr ) <= 0 ) {                                                                                  // no call-id data; just queue
104                                 queue_normal( ctx, mbuf );
105                         } else {
106                                 d1 = DATA1_ADDR( hdr );
107                                 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                       // call_id not set, just queue
108                                         queue_normal( ctx, mbuf );
109                                 } else {
110                                         chute = &ctx->chutes[call_id];
111                                         chute->mbuf = mbuf;
112                                         sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
113                                 }
114                         }
115                 }
116         } else {
117                 free( raw_msg );
118         }
119 }
120
121 /*
122         Given a buffer, extract the size. We assume the buffer contains one of:
123                 <int1><int2><mark>
124                 <int1>
125
126         where <int1> is the size in native storage order (v1) and <int2>
127         is the size in network order. If <mark> is present then we assume
128         that <int2> is present and we use that after translating from net
129         byte order. If <mark> is not present, we use <int1>. This allows
130         old versions of RMR to continue to work with new versions that now
131         do the right thing with byte ordering.
132
133         If the receiver of a message is a backlevel RMR, and it uses RTS to
134         return a message, it will only update the old size, but when the
135         message is received back at a new RMR application it will appear that
136         the message came from a new instance.  Therefore, we must compare
137         the old and new sizes and if they are different we must use the old
138         size assuming that this is the case.
139 */
140 static inline uint32_t extract_mlen( unsigned char* buf ) {
141         uint32_t        size;           // adjusted (if needed) size for return
142         uint32_t        osize;          // old size
143         uint32_t*       blen;           // length in the buffer to extract
144
145         blen = (uint32_t *) buf;
146         if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
147                 osize = *blen;                                                  // old size
148                 size = ntohl( *(blen+1) );                              // pick up the second integer
149                 if( osize != size ) {                                   // assume back level return to sender
150                         size = osize;                                           // MUST use old size
151                 }
152                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
153         } else {
154                 size = *blen;                                                   // old sender didn't encode size
155                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
156         }
157
158         return size;
159 }
160
161 /*
162         This is the callback invoked when tcp data is received. It adds the data
163         to the buffer for the connection and if a complete message is received
164         then the message is queued onto the receive ring.
165
166         Return value indicates only that we handled the buffer and SI should continue
167         or that SI should terminate, so on error it's NOT wrong to return "ok".
168 */
169 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
170         uta_ctx_t*              ctx;
171         river_t*                river;                  // river associated with the fd passed in
172         unsigned char*  old_accum;              // old accumulator reference should we need to realloc
173         int                             bidx = 0;               // transport buffer index
174         int                             remain;                 // bytes in transport buf that need to be moved
175         int*                    mlen;                   // pointer to spot in buffer for conversion to int
176         int                             need;                   // bytes needed for something
177         int                             i;
178
179         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
180                 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
181                         return SI_RET_OK;
182                 }
183         } else {
184                 ctx = (uta_ctx_t *) vctx;
185         }
186
187         if( buflen <= 0 || fd < 0 ) {                   // no buffer or invalid fd
188                 return SI_RET_OK;
189         }
190
191         if( fd >= ctx->nrivers ) {
192                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
193                 if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
194                         river = (river_t *) malloc( sizeof( *river ) );
195                         memset( river, 0, sizeof( *river ) );
196                         rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
197                         river->state = RS_NEW;
198                 }
199         } else {
200                 river = &ctx->rivers[fd];                               // quick index for fd values < MAX_FD
201         }
202
203         if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
204                 if( river->state == RS_NEW ) {
205                         if( river->accum != NULL ) {
206                                 free( river->accum );
207                         }
208                         memset( river, 0, sizeof( *river ) );
209                         river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
210                         river->accum = (char *) malloc( river->nbytes );
211                         river->ipt = 0;
212                 } else {
213                         if( river->state == RS_RESET ) {
214                                 // future -- reset not implemented
215                                 return SI_RET_OK;
216                         } else {
217                                 // future -- sync to next marker
218                                 river->ipt = 0;                                         // insert point
219                         }
220                 }
221         }
222
223         river->state = RS_GOOD;
224         remain = buflen;
225         while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
226                 if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
227
228                 if( river->msg_size <= 0 ) {                            // don't have a message length  yet
229                                                                                                         // FIX ME: we need a frame indicator to ensure alignment
230                         need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute the total message length
231                         if( need > remain ) {                                                                           // the whole message len  information isn't in this transport buf
232                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
233                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
234                                 river->ipt += remain;
235                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
236                                 return SI_RET_OK;
237                         }
238
239                         if( river->ipt > 0 ) {                                                                          // if we captured the start of size last go round
240                                 memcpy( &river->accum[river->ipt], buf + bidx, need );
241                                 river->ipt += need;
242                                 bidx += need;
243                                 remain -= need;
244                                 river->msg_size = extract_mlen( river->accum );
245                                 if( DEBUG ) {
246                                         rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
247                                         if( DEBUG > 1 ) {
248                                                 dump_40( river->accum, "from accumulator:"  );
249                                                 if( river->msg_size > 100 ) {
250                                                         dump_40( river->accum + 50, "from rmr buf:"  );
251                                                 }
252                                         }
253                                 }
254                         } else {
255                                 river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
256                         }
257
258                         if( river->msg_size < 0) { // addressing RIC-989
259                                 river->state=RS_RESET;
260                                 return SI_RET_OK;
261                         }
262
263                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
264
265                         if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
266                                 //river->flags |= RF_DROP;                                                              //  uncomment to drop large messages
267                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
268                                 old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
269                                 river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
270                                 river->accum = (char *) malloc( river->nbytes );
271                                 if( river->ipt > 0 ) {
272                                         memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
273                                 }
274
275                                 free( old_accum );
276                         }
277                 }
278
279                 if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in receive buffer
280                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
281                         if( (river->flags & RF_DROP) == 0  ) {                                                  // ok to keep this message; copy bytes
282                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );          // grab what is in the rcv buffer and go wait for more
283                         }
284                         river->ipt += remain;
285                         remain = 0;
286                 } else {
287                         need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
288                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
289                         if( (river->flags & RF_DROP) == 0  ) {                                                                  // keeping this message, copy and pass it on
290                                 memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
291                                 buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
292                                 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
293                                 river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
294                         } else {
295                                 if( !(river->flags & RF_NOTIFIED) ) {                                                           // not keeping huge messages; notify once per stream
296                                         rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
297                                         river->flags |= RF_NOTIFIED;
298                                 }
299                         }
300
301                         river->msg_size = -1;
302                         river->ipt = 0;
303                         bidx += need;
304                         remain -= need;
305                 }
306         }
307
308         if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
309         return SI_RET_OK;
310 }
311
312 /*
313         Callback driven on a disconnect notification. We will attempt to find the related
314         endpoint via the fd2ep hash maintained in the context. If we find it, then we
315         remove it from the hash, and mark the endpoint as closed so that the next attempt
316         to send forces a reconnect attempt.
317
318         Future: put the ep on a queue to automatically attempt to reconnect.
319 */
320 static int mt_disc_cb( void* vctx, int fd ) {
321         uta_ctx_t*      ctx;
322         endpoint_t*     ep;
323         river_t*        river = NULL;
324
325         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
326                 return SI_RET_OK;
327         }
328
329         if( fd < ctx->nrivers && fd >= 0 ) {
330                 river = &ctx->rivers[fd];
331         } else {
332                 if( fd > 0 ) {
333                         river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
334                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
335                 }
336         }
337
338         if( river != NULL ) {
339                 river->state = RS_NEW;                  // if one connects here later; ensure it's new
340                 if( river->accum != NULL ) {
341                         free( river->accum );
342                         river->accum = NULL;
343                         river->state = RS_NEW;          // force realloc if the fd is used again
344                 }
345         }
346
347         ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
348         if( ep != NULL ) {
349                 pthread_mutex_lock( &ep->gate );            // wise to lock this
350                 ep->open = FALSE;
351                 ep->nn_sock = -1;
352                 pthread_mutex_unlock( &ep->gate );
353         }
354
355         return SI_RET_OK;
356 }
357
358
359 /*
360         This is expected to execute in a separate thread. It is responsible for
361         _all_ receives and queues them on the appropriate ring, or chute.
362         It does this by registering the callback function above with the SI world
363         and then calling SIwait() to drive the callback when data has arrived.
364
365
366         The "state" of the message is checked which determines where the message
367         is delivered.
368
369                 Flags indicate that the message is a call generated message, then
370                 the message is queued on the normal receive ring.
371
372                 Chute ID is == 0, then the message is queued on the normal receive ring.
373
374                 The transaction ID in the message matches the expected ID in the chute,
375                 then the message is given to the chute and the chute's semaphore is tickled.
376
377                 If none are true, the message is dropped.
378 */
379 static void* mt_receive( void* vctx ) {
380         uta_ctx_t*      ctx;
381
382         if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
383                 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
384                 return NULL;
385         }
386
387         rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld  registering SI95 data callback and waiting\n", (long long) pthread_self() );
388
389         SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
390         SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
391
392         SIwait( ctx->si_ctx );
393
394         return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
395 }
396
397 #endif