2d422bb2a6892787eeae1ae64fea9523f5be0f2e
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
1 // : vi ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_si static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are SI specific. The functions here also provide the
25                                 message construction functions which build a message that
26                                 might be split across multiple "datagrams" received from the
27                                 underlying transport.
28
29         Author:         E. Scott Daniels
30         Date:           20 May 2019
31 */
32
33 #ifndef _mtcall_si_static_c
34 #define _mtcall_si_static_c
35 #include <semaphore.h>
36
37 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38         static  int warned = 0;
39         chute_t*        chute;
40
41         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
42                 rmr_free_msg( mbuf );                                                           // drop if ring is full
43                 if( !warned ) {
44                         rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
45                         warned++;
46                 }
47
48                 return;
49         }
50
51         chute = &ctx->chutes[0];
52         sem_post( &chute->barrier );                                                            // tickle the ring monitor
53 }
54
55 /*
56         Allocate a message buffer, point it at the accumulated (raw) message,
57         call ref to point to all of the various bits and set real len etc,
58         then we queue it.  Raw_msg is expected to include the transport goo
59         placed in front of the RMR header and payload.
60 */
61 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
62         rmr_mbuf_t*             mbuf;
63         uta_mhdr_t*             hdr;            // header of the message received
64         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
65         chute_t*                chute;
66         unsigned int    call_id;        // the id assigned to the call generated message
67
68         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
69                 if( raw_msg == NULL || msg_size <= 0 ) {
70                         return;
71                 }
72         }
73
74         if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
75                 mbuf->tp_buf = raw_msg;
76                 mbuf->rts_fd = sender_fd;
77                 if( msg_size > ctx->max_ibm + 1024 ) {
78                         mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
79                 }
80
81                 ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
82                 hdr = mbuf->header;                                                     // convenience
83                 if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
84                         queue_normal( ctx, mbuf );
85                 } else {
86                         if( RMR_D1_LEN( hdr ) <= 0 ) {                                                                                  // no call-id data; just queue
87                                 queue_normal( ctx, mbuf );
88                         } else {
89                                 d1 = DATA1_ADDR( hdr );
90                                 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                       // call_id not set, just queue
91                                         queue_normal( ctx, mbuf );
92                                 } else {
93                                         chute = &ctx->chutes[call_id];
94                                         chute->mbuf = mbuf;
95                                         sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
96                                 }
97                         }
98                 }
99         }
100 }
101
102 /*
103         Given a buffer, extract the size. We assume the buffer contains one of:
104                 <int1><int2><mark>
105                 <int1>
106
107         where <int1> is the size in native storage order (v1) and <int2>
108         is the size in network order. If <mark> is present then we assume
109         that <int2> is present and we use that after translating from net
110         byte order. If <mark> is not present, we use <int1>. This allows
111         old versions of RMR to continue to work with new versions that now
112         do the right thing with byte ordering.
113 */
114 static inline uint32_t extract_mlen( unsigned char* buf ) {
115         uint32_t        size;           // adjusted (if needed) size for return
116         uint32_t*       blen;           // length in the buffer to extract
117
118         blen = (uint32_t *) buf;
119         if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
120                 size = ntohl( *(blen+1) );                              // pick up the second integer
121                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
122         } else {
123                 size = *blen;                                                   // old sender didn't encode size
124                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
125         }
126
127         return size;
128 }
129
130 /*
131         This is the callback invoked when tcp data is received. It adds the data
132         to the buffer for the connection and if a complete message is received
133         then the message is queued onto the receive ring.
134
135         Return value indicates only that we handled the buffer and SI should continue
136         or that SI should terminate, so on error it's NOT wrong to return "ok".
137 */
138 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
139         uta_ctx_t*              ctx;
140         river_t*                river;                  // river associated with the fd passed in
141         unsigned char*  old_accum;              // old accumulator reference should we need to realloc
142         int                             bidx = 0;               // transport buffer index
143         int                             remain;                 // bytes in transport buf that need to be moved
144         int*                    mlen;                   // pointer to spot in buffer for conversion to int
145         int                             need;                   // bytes needed for something
146         int                             i;
147
148         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
149                 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
150                         return SI_RET_OK;
151                 }
152
153                 if( fd >= ctx->nrivers || fd < 0 ) {
154                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
155                         return SI_RET_OK;
156                 }
157         } else {
158                 ctx = (uta_ctx_t *) vctx;
159         }
160
161         if( buflen <= 0 ) {
162                 return SI_RET_OK;
163         }
164
165         river = &ctx->rivers[fd];
166         if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
167                 if( river->state == RS_NEW ) {
168                         memset( river, 0, sizeof( *river ) );
169                         river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
170                         river->accum = (char *) malloc( river->nbytes );
171                         river->ipt = 0;
172                 } else {
173                         // future -- sync to next marker
174                         river->ipt = 0;                                         // insert point
175                 }
176         }
177
178         river->state = RS_GOOD;
179         remain = buflen;
180         while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
181                 if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
182
183                 if( river->msg_size <= 0 ) {                            // don't have a size yet
184                                                                                                         // FIX ME: we need a frame indicator to ensure alignment
185                         need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute length
186                         if( need > remain ) {                                                                           // the whole size isn't there
187                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
188                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
189                                 river->ipt += remain;
190                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
191                                 return SI_RET_OK;
192                         }
193
194                         if( river->ipt > 0 ) {                                                                          // if we captured the start of size last go round
195                                 memcpy( &river->accum[river->ipt], buf + bidx, need );
196                                 river->ipt += need;
197                                 bidx += need;
198                                 remain -= need;
199                                 river->msg_size = extract_mlen( river->accum );
200                                 if( DEBUG ) {
201                                         rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
202                                         if( DEBUG > 1 ) {
203                                                 dump_40( river->accum, "from accumulator:"  );
204                                                 if( river->msg_size > 100 ) {
205                                                         dump_40( river->accum + 50, "from rmr buf:"  );
206                                                 }
207                                         }
208                                 }
209                         } else {
210                                 river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
211                         }
212                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
213
214                         if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
215                                 //river->flags |= RF_DROP;
216                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
217                                 old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
218                                 river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
219                                 river->accum = (char *) malloc( river->nbytes );
220                                 if( river->ipt > 0 ) {
221                                         memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
222                                 }
223
224                                 free( old_accum );
225                         }
226                 }
227
228                 if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
229                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
230                         if( (river->flags & RF_DROP) == 0  ) {
231                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
232                         }
233                         river->ipt += remain;
234                         remain = 0;
235                 } else {
236                         need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
237                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
238                         if( (river->flags & RF_DROP) == 0  ) {
239                                 memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
240                                 buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
241                                 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
242                                 river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
243                         } else {
244                                 if( !(river->flags & RF_NOTIFIED) ) {
245                                         rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
246                                         river->flags |= RF_NOTIFIED;
247                                 }
248                         }
249
250                         river->msg_size = -1;
251                         river->ipt = 0;
252                         bidx += need;
253                         remain -= need;
254                 }
255         }
256
257         if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
258         return SI_RET_OK;
259 }
260
261 /*
262         Callback driven on a disconnect notification. We will attempt to find the related
263         endpoint via the fd2ep hash maintained in the context. If we find it, then we
264         remove it from the hash, and mark the endpoint as closed so that the next attempt
265         to send forces a reconnect attempt.
266
267         Future: put the ep on a queue to automatically attempt to reconnect.
268 */
269 static int mt_disc_cb( void* vctx, int fd ) {
270         uta_ctx_t*      ctx;
271         endpoint_t*     ep;
272
273         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
274                 return SI_RET_OK;
275         }
276
277         ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
278         if( ep != NULL ) {
279         pthread_mutex_lock( &ep->gate );            // wise to lock this
280                 ep->open = FALSE;
281                 ep->nn_sock = -1;
282         pthread_mutex_unlock( &ep->gate );
283         }
284
285         return SI_RET_OK;
286 }
287
288
289 /*
290         This is expected to execute in a separate thread. It is responsible for
291         _all_ receives and queues them on the appropriate ring, or chute.
292         It does this by registering the callback function above with the SI world
293         and then calling SIwait() to drive the callback when data has arrived.
294
295
296         The "state" of the message is checked which determines where the message
297         is delivered.
298
299                 Flags indicate that the message is a call generated message, then
300                 the message is queued on the normal receive ring.
301
302                 Chute ID is == 0, then the message is queued on the normal receive ring.
303
304                 The transaction ID in the message matches the expected ID in the chute,
305                 then the message is given to the chute and the chute's semaphore is tickled.
306
307                 If none are true, the message is dropped.
308 */
309 static void* mt_receive( void* vctx ) {
310         uta_ctx_t*      ctx;
311
312         if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
313                 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
314                 return NULL;
315         }
316
317         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
318
319         SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
320         SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
321
322         SIwait( ctx->si_ctx );
323
324         return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
325 }
326
327 #endif