Correct excessive TCP connection bug
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
1         // : vi ts=4 sw=4 noet2
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_si static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are SI specific. The functions here also provide the
25                                 message construction functions which build a message that
26                                 might be split across multiple "datagrams" received from the
27                                 underlying transport.
28
29         Author:         E. Scott Daniels
30         Date:           20 May 2019
31 */
32
33 #ifndef _mtcall_si_static_c
34 #define _mtcall_si_static_c
35 #include <semaphore.h>
36
37 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38         static  time_t last_warning = 0;
39         static  long dcount = 0;
40
41         chute_t*        chute;
42
43         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
44                 rmr_free_msg( mbuf );                                                           // drop if ring is full
45                 dcount++;
46                 if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
47                         rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %ld msgs dropped since last warning\n", dcount );
48                         last_warning = time( NULL );
49                         dcount = 0;
50                 }
51
52                 return;
53         }
54
55         chute = &ctx->chutes[0];
56         sem_post( &chute->barrier );                                                            // tickle the ring monitor
57 }
58
59 /*
60         Allocate a message buffer, point it at the accumulated (raw) message,
61         call ref to point to all of the various bits and set real len etc,
62         then we queue it.  Raw_msg is expected to include the transport goo
63         placed in front of the RMR header and payload.
64 */
65 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
66         rmr_mbuf_t*             mbuf;
67         uta_mhdr_t*             hdr;            // header of the message received
68         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
69         chute_t*                chute;
70         unsigned int    call_id;        // the id assigned to the call generated message
71
72         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
73                 if( raw_msg == NULL || msg_size <= 0 ) {
74                         return;
75                 }
76         }
77
78         if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
79                 mbuf->tp_buf = raw_msg;
80                 mbuf->rts_fd = sender_fd;
81                 if( msg_size > ctx->max_ibm + 1024 ) {
82                         mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
83                 }
84
85                 ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
86                 hdr = mbuf->header;                                                     // convenience
87                 if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
88                         queue_normal( ctx, mbuf );
89                 } else {
90                         if( RMR_D1_LEN( hdr ) <= 0 ) {                                                                                  // no call-id data; just queue
91                                 queue_normal( ctx, mbuf );
92                         } else {
93                                 d1 = DATA1_ADDR( hdr );
94                                 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                       // call_id not set, just queue
95                                         queue_normal( ctx, mbuf );
96                                 } else {
97                                         chute = &ctx->chutes[call_id];
98                                         chute->mbuf = mbuf;
99                                         sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
100                                 }
101                         }
102                 }
103         } else {
104                 free( raw_msg );
105         }
106 }
107
108 /*
109         Given a buffer, extract the size. We assume the buffer contains one of:
110                 <int1><int2><mark>
111                 <int1>
112
113         where <int1> is the size in native storage order (v1) and <int2>
114         is the size in network order. If <mark> is present then we assume
115         that <int2> is present and we use that after translating from net
116         byte order. If <mark> is not present, we use <int1>. This allows
117         old versions of RMR to continue to work with new versions that now
118         do the right thing with byte ordering.
119
120         If the receiver of a message is a backlevel RMR, and it uses RTS to
121         return a message, it will only update the old size, but when the
122         message is received back at a new RMR application it will appear that
123         the message came from a new instance.  Therefore, we must compare
124         the old and new sizes and if they are different we must use the old
125         size assuming that this is the case.
126 */
127 static inline uint32_t extract_mlen( unsigned char* buf ) {
128         uint32_t        size;           // adjusted (if needed) size for return
129         uint32_t        osize;          // old size
130         uint32_t*       blen;           // length in the buffer to extract
131
132         blen = (uint32_t *) buf;
133         if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
134                 osize = *blen;                                                  // old size
135                 size = ntohl( *(blen+1) );                              // pick up the second integer
136                 if( osize != size ) {                                   // assume back level return to sender
137                         size = osize;                                           // MUST use old size
138                 }
139                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
140         } else {
141                 size = *blen;                                                   // old sender didn't encode size
142                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
143         }
144
145         return size;
146 }
147
148 /*
149         This is the callback invoked when tcp data is received. It adds the data
150         to the buffer for the connection and if a complete message is received
151         then the message is queued onto the receive ring.
152
153         Return value indicates only that we handled the buffer and SI should continue
154         or that SI should terminate, so on error it's NOT wrong to return "ok".
155 */
156 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
157         uta_ctx_t*              ctx;
158         river_t*                river;                  // river associated with the fd passed in
159         unsigned char*  old_accum;              // old accumulator reference should we need to realloc
160         int                             bidx = 0;               // transport buffer index
161         int                             remain;                 // bytes in transport buf that need to be moved
162         int*                    mlen;                   // pointer to spot in buffer for conversion to int
163         int                             need;                   // bytes needed for something
164         int                             i;
165
166         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
167                 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
168                         return SI_RET_OK;
169                 }
170         } else {
171                 ctx = (uta_ctx_t *) vctx;
172         }
173
174         if( buflen <= 0 || fd < 0 ) {                   // no buffer or invalid fd
175                 return SI_RET_OK;
176         }
177
178         if( fd >= ctx->nrivers ) {
179                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
180                 if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
181                         river = (river_t *) malloc( sizeof( *river ) );
182                         memset( river, 0, sizeof( *river ) );
183                         rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
184                         river->state = RS_NEW;
185                 }
186         } else {
187                 river = &ctx->rivers[fd];                               // quick index for fd values < MAX_FD
188         }
189
190         if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
191                 if( river->state == RS_NEW ) {
192                         if( river->accum != NULL ) {
193                                 free( river->accum );
194                         }
195                         memset( river, 0, sizeof( *river ) );
196                         river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
197                         river->accum = (char *) malloc( river->nbytes );
198                         river->ipt = 0;
199                 } else {
200                         // future -- sync to next marker
201                         river->ipt = 0;                                         // insert point
202                 }
203         }
204
205         river->state = RS_GOOD;
206         remain = buflen;
207         while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
208                 if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
209
210                 if( river->msg_size <= 0 ) {                            // don't have a message length  yet
211                                                                                                         // FIX ME: we need a frame indicator to ensure alignment
212                         need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute the total message length
213                         if( need > remain ) {                                                                           // the whole message len  information isn't in this transport buf
214                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
215                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
216                                 river->ipt += remain;
217                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
218                                 return SI_RET_OK;
219                         }
220
221                         if( river->ipt > 0 ) {                                                                          // if we captured the start of size last go round
222                                 memcpy( &river->accum[river->ipt], buf + bidx, need );
223                                 river->ipt += need;
224                                 bidx += need;
225                                 remain -= need;
226                                 river->msg_size = extract_mlen( river->accum );
227                                 if( DEBUG ) {
228                                         rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
229                                         if( DEBUG > 1 ) {
230                                                 dump_40( river->accum, "from accumulator:"  );
231                                                 if( river->msg_size > 100 ) {
232                                                         dump_40( river->accum + 50, "from rmr buf:"  );
233                                                 }
234                                         }
235                                 }
236                         } else {
237                                 river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
238                         }
239                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
240
241                         if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
242                                 //river->flags |= RF_DROP;                                                              //  uncomment to drop large messages
243                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
244                                 old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
245                                 river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
246                                 river->accum = (char *) malloc( river->nbytes );
247                                 if( river->ipt > 0 ) {
248                                         memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
249                                 }
250
251                                 free( old_accum );
252                         }
253                 }
254
255                 if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in receive buffer
256                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
257                         if( (river->flags & RF_DROP) == 0  ) {                                                  // ok to keep this message; copy bytes
258                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );          // grab what is in the rcv buffer and go wait for more
259                         }
260                         river->ipt += remain;
261                         remain = 0;
262                 } else {
263                         need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
264                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
265                         if( (river->flags & RF_DROP) == 0  ) {                                                                  // keeping this message, copy and pass it on
266                                 memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
267                                 buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
268                                 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
269                                 river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
270                         } else {
271                                 if( !(river->flags & RF_NOTIFIED) ) {                                                           // not keeping huge messages; notify once per stream
272                                         rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
273                                         river->flags |= RF_NOTIFIED;
274                                 }
275                         }
276
277                         river->msg_size = -1;
278                         river->ipt = 0;
279                         bidx += need;
280                         remain -= need;
281                 }
282         }
283
284         if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
285         return SI_RET_OK;
286 }
287
288 /*
289         Callback driven on a disconnect notification. We will attempt to find the related
290         endpoint via the fd2ep hash maintained in the context. If we find it, then we
291         remove it from the hash, and mark the endpoint as closed so that the next attempt
292         to send forces a reconnect attempt.
293
294         Future: put the ep on a queue to automatically attempt to reconnect.
295 */
296 static int mt_disc_cb( void* vctx, int fd ) {
297         uta_ctx_t*      ctx;
298         endpoint_t*     ep;
299         river_t*        river = NULL;
300
301         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
302                 return SI_RET_OK;
303         }
304
305         if( fd < ctx->nrivers && fd >= 0 ) {
306                 river = &ctx->rivers[fd];
307         } else {
308                 if( fd > 0 ) {
309                         river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
310                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
311                 }
312         }
313
314         if( river != NULL ) {
315                 river->state = RS_NEW;                  // if one connects here later; ensure it's new
316                 if( river->accum != NULL ) {
317                         free( river->accum );
318                         river->accum = NULL;
319                         river->state = RS_NEW;          // force realloc if the fd is used again
320                 }
321         }
322
323         ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
324         if( ep != NULL ) {
325                 pthread_mutex_lock( &ep->gate );            // wise to lock this
326                 ep->open = FALSE;
327                 ep->nn_sock = -1;
328                 pthread_mutex_unlock( &ep->gate );
329         }
330
331         return SI_RET_OK;
332 }
333
334
335 /*
336         This is expected to execute in a separate thread. It is responsible for
337         _all_ receives and queues them on the appropriate ring, or chute.
338         It does this by registering the callback function above with the SI world
339         and then calling SIwait() to drive the callback when data has arrived.
340
341
342         The "state" of the message is checked which determines where the message
343         is delivered.
344
345                 Flags indicate that the message is a call generated message, then
346                 the message is queued on the normal receive ring.
347
348                 Chute ID is == 0, then the message is queued on the normal receive ring.
349
350                 The transaction ID in the message matches the expected ID in the chute,
351                 then the message is given to the chute and the chute's semaphore is tickled.
352
353                 If none are true, the message is dropped.
354 */
355 static void* mt_receive( void* vctx ) {
356         uta_ctx_t*      ctx;
357
358         if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
359                 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
360                 return NULL;
361         }
362
363         rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld  registering SI95 data callback and waiting\n", (long long) pthread_self() );
364
365         SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
366         SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
367
368         SIwait( ctx->si_ctx );
369
370         return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
371 }
372
373 #endif