78a10393ba3a6539c5eaea998f158c09404a8eed
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
1 // : vi ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020-2021 Nokia
5         Copyright (c) 2018-2021 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_si static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are SI specific. The functions here also provide the
25                                 message construction functions which build a message that
26                                 might be split across multiple "datagrams" received from the
27                                 underlying transport.
28
29         Author:         E. Scott Daniels
30         Date:           20 May 2019
31 */
32
33 #ifndef _mtcall_si_static_c
34 #define _mtcall_si_static_c
35 #include <semaphore.h>
36
37 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38         static  time_t last_warning = 0;
39         //static        long dcount = 0;
40
41         chute_t*        chute;
42
43         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
44                 rmr_free_msg( mbuf );                                                           // drop if ring is full
45                 //dcount++;
46                 ctx->dcount++;
47                 ctx->acc_dcount++;
48                 if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
49                         rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
50                         last_warning = time( NULL );
51                         ctx->dcount = 0;
52                 }
53
54                 return;
55         }
56         ctx->acc_ecount++;
57         chute = &ctx->chutes[0];
58         sem_post( &chute->barrier );                                                            // tickle the ring monitor
59 }
60
61 /*
62         Allocate a message buffer, point it at the accumulated (raw) message,
63         call ref to point to all of the various bits and set real len etc,
64         then we queue it.  Raw_msg is expected to include the transport goo
65         placed in front of the RMR header and payload.
66 */
67 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
68         rmr_mbuf_t*             mbuf;
69         uta_mhdr_t*             hdr;            // header of the message received
70         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
71         chute_t*                chute;
72         unsigned int    call_id;        // the id assigned to the call generated message
73
74         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
75                 if( raw_msg == NULL || msg_size <= 0 ) {
76                         return;
77                 }
78         }
79
80         if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
81                 mbuf->tp_buf = raw_msg;
82                 mbuf->rts_fd = sender_fd;
83                 if( msg_size > ctx->max_ibm + 1024 ) {
84                         mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
85                 }
86
87                 ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
88                 hdr = mbuf->header;                                                     // convenience
89                 if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
90                         queue_normal( ctx, mbuf );
91                 } else {
92                         if( RMR_D1_LEN( hdr ) <= 0 ) {                                                                                  // no call-id data; just queue
93                                 queue_normal( ctx, mbuf );
94                         } else {
95                                 d1 = DATA1_ADDR( hdr );
96                                 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                       // call_id not set, just queue
97                                         queue_normal( ctx, mbuf );
98                                 } else {
99                                         chute = &ctx->chutes[call_id];
100                                         chute->mbuf = mbuf;
101                                         sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
102                                 }
103                         }
104                 }
105         } else {
106                 free( raw_msg );
107         }
108 }
109
110 /*
111         Given a buffer, extract the size. We assume the buffer contains one of:
112                 <int1><int2><mark>
113                 <int1>
114
115         where <int1> is the size in native storage order (v1) and <int2>
116         is the size in network order. If <mark> is present then we assume
117         that <int2> is present and we use that after translating from net
118         byte order. If <mark> is not present, we use <int1>. This allows
119         old versions of RMR to continue to work with new versions that now
120         do the right thing with byte ordering.
121
122         If the receiver of a message is a backlevel RMR, and it uses RTS to
123         return a message, it will only update the old size, but when the
124         message is received back at a new RMR application it will appear that
125         the message came from a new instance.  Therefore, we must compare
126         the old and new sizes and if they are different we must use the old
127         size assuming that this is the case.
128 */
129 static inline uint32_t extract_mlen( unsigned char* buf ) {
130         uint32_t        size;           // adjusted (if needed) size for return
131         uint32_t        osize;          // old size
132         uint32_t*       blen;           // length in the buffer to extract
133
134         blen = (uint32_t *) buf;
135         if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
136                 osize = *blen;                                                  // old size
137                 size = ntohl( *(blen+1) );                              // pick up the second integer
138                 if( osize != size ) {                                   // assume back level return to sender
139                         size = osize;                                           // MUST use old size
140                 }
141                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
142         } else {
143                 size = *blen;                                                   // old sender didn't encode size
144                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
145         }
146
147         return size;
148 }
149
150 /*
151         This is the callback invoked when tcp data is received. It adds the data
152         to the buffer for the connection and if a complete message is received
153         then the message is queued onto the receive ring.
154
155         Return value indicates only that we handled the buffer and SI should continue
156         or that SI should terminate, so on error it's NOT wrong to return "ok".
157 */
158 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
159         uta_ctx_t*              ctx;
160         river_t*                river;                  // river associated with the fd passed in
161         unsigned char*  old_accum;              // old accumulator reference should we need to realloc
162         int                             bidx = 0;               // transport buffer index
163         int                             remain;                 // bytes in transport buf that need to be moved
164         int*                    mlen;                   // pointer to spot in buffer for conversion to int
165         int                             need;                   // bytes needed for something
166         int                             i;
167
168         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
169                 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
170                         return SI_RET_OK;
171                 }
172         } else {
173                 ctx = (uta_ctx_t *) vctx;
174         }
175
176         if( buflen <= 0 || fd < 0 ) {                   // no buffer or invalid fd
177                 return SI_RET_OK;
178         }
179
180         if( fd >= ctx->nrivers ) {
181                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
182                 if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
183                         river = (river_t *) malloc( sizeof( *river ) );
184                         memset( river, 0, sizeof( *river ) );
185                         rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
186                         river->state = RS_NEW;
187                 }
188         } else {
189                 river = &ctx->rivers[fd];                               // quick index for fd values < MAX_FD
190         }
191
192         if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
193                 if( river->state == RS_NEW ) {
194                         if( river->accum != NULL ) {
195                                 free( river->accum );
196                         }
197                         memset( river, 0, sizeof( *river ) );
198                         river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
199                         river->accum = (char *) malloc( river->nbytes );
200                         river->ipt = 0;
201                 } else {
202                         if( river->state == RS_RESET ) {
203                                 // future -- reset not implemented
204                                 return SI_RET_OK;
205                         } else {
206                                 // future -- sync to next marker
207                                 river->ipt = 0;                                         // insert point
208                         }
209                 }
210         }
211
212         river->state = RS_GOOD;
213         remain = buflen;
214         while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
215                 if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
216
217                 if( river->msg_size <= 0 ) {                            // don't have a message length  yet
218                                                                                                         // FIX ME: we need a frame indicator to ensure alignment
219                         need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute the total message length
220                         if( need > remain ) {                                                                           // the whole message len  information isn't in this transport buf
221                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
222                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
223                                 river->ipt += remain;
224                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
225                                 return SI_RET_OK;
226                         }
227
228                         if( river->ipt > 0 ) {                                                                          // if we captured the start of size last go round
229                                 memcpy( &river->accum[river->ipt], buf + bidx, need );
230                                 river->ipt += need;
231                                 bidx += need;
232                                 remain -= need;
233                                 river->msg_size = extract_mlen( river->accum );
234                                 if( DEBUG ) {
235                                         rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
236                                         if( DEBUG > 1 ) {
237                                                 dump_40( river->accum, "from accumulator:"  );
238                                                 if( river->msg_size > 100 ) {
239                                                         dump_40( river->accum + 50, "from rmr buf:"  );
240                                                 }
241                                         }
242                                 }
243                         } else {
244                                 river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
245                         }
246
247                         if( river->msg_size < 0) { // addressing RIC-989
248                                 river->state=RS_RESET;
249                                 return SI_RET_OK;
250                         }
251
252                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
253
254                         if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
255                                 //river->flags |= RF_DROP;                                                              //  uncomment to drop large messages
256                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
257                                 old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
258                                 river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
259                                 river->accum = (char *) malloc( river->nbytes );
260                                 if( river->ipt > 0 ) {
261                                         memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
262                                 }
263
264                                 free( old_accum );
265                         }
266                 }
267
268                 if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in receive buffer
269                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
270                         if( (river->flags & RF_DROP) == 0  ) {                                                  // ok to keep this message; copy bytes
271                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );          // grab what is in the rcv buffer and go wait for more
272                         }
273                         river->ipt += remain;
274                         remain = 0;
275                 } else {
276                         need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
277                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
278                         if( (river->flags & RF_DROP) == 0  ) {                                                                  // keeping this message, copy and pass it on
279                                 memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
280                                 buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
281                                 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
282                                 river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
283                         } else {
284                                 if( !(river->flags & RF_NOTIFIED) ) {                                                           // not keeping huge messages; notify once per stream
285                                         rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
286                                         river->flags |= RF_NOTIFIED;
287                                 }
288                         }
289
290                         river->msg_size = -1;
291                         river->ipt = 0;
292                         bidx += need;
293                         remain -= need;
294                 }
295         }
296
297         if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
298         return SI_RET_OK;
299 }
300
301 /*
302         Callback driven on a disconnect notification. We will attempt to find the related
303         endpoint via the fd2ep hash maintained in the context. If we find it, then we
304         remove it from the hash, and mark the endpoint as closed so that the next attempt
305         to send forces a reconnect attempt.
306
307         Future: put the ep on a queue to automatically attempt to reconnect.
308 */
309 static int mt_disc_cb( void* vctx, int fd ) {
310         uta_ctx_t*      ctx;
311         endpoint_t*     ep;
312         river_t*        river = NULL;
313
314         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
315                 return SI_RET_OK;
316         }
317
318         if( fd < ctx->nrivers && fd >= 0 ) {
319                 river = &ctx->rivers[fd];
320         } else {
321                 if( fd > 0 ) {
322                         river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
323                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
324                 }
325         }
326
327         if( river != NULL ) {
328                 river->state = RS_NEW;                  // if one connects here later; ensure it's new
329                 if( river->accum != NULL ) {
330                         free( river->accum );
331                         river->accum = NULL;
332                         river->state = RS_NEW;          // force realloc if the fd is used again
333                 }
334         }
335
336         ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
337         if( ep != NULL ) {
338                 pthread_mutex_lock( &ep->gate );            // wise to lock this
339                 ep->open = FALSE;
340                 ep->nn_sock = -1;
341                 pthread_mutex_unlock( &ep->gate );
342         }
343
344         return SI_RET_OK;
345 }
346
347
348 /*
349         This is expected to execute in a separate thread. It is responsible for
350         _all_ receives and queues them on the appropriate ring, or chute.
351         It does this by registering the callback function above with the SI world
352         and then calling SIwait() to drive the callback when data has arrived.
353
354
355         The "state" of the message is checked which determines where the message
356         is delivered.
357
358                 Flags indicate that the message is a call generated message, then
359                 the message is queued on the normal receive ring.
360
361                 Chute ID is == 0, then the message is queued on the normal receive ring.
362
363                 The transaction ID in the message matches the expected ID in the chute,
364                 then the message is given to the chute and the chute's semaphore is tickled.
365
366                 If none are true, the message is dropped.
367 */
368 static void* mt_receive( void* vctx ) {
369         uta_ctx_t*      ctx;
370
371         if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
372                 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
373                 return NULL;
374         }
375
376         rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld  registering SI95 data callback and waiting\n", (long long) pthread_self() );
377
378         SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
379         SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
380
381         SIwait( ctx->si_ctx );
382
383         return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
384 }
385
386 #endif