76b56b21a0f04437ded7c7d157870f9704c05238
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
1 // : vi ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_si static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are SI specific. The functions here also provide the
25                                 message construction functions which build a message that
26                                 might be split across multiple "datagrams" received from the
27                                 underlying transport.
28
29         Author:         E. Scott Daniels
30         Date:           20 May 2019
31 */
32
33 #ifndef _mtcall_si_static_c
34 #define _mtcall_si_static_c
35 #include <semaphore.h>
36
37 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
38         static  int warned = 0;
39         chute_t*        chute;
40
41         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
42                 rmr_free_msg( mbuf );                                                           // drop if ring is full
43                 if( !warned ) {
44                         rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
45                         warned++;
46                 }
47
48                 return;
49         }
50
51         chute = &ctx->chutes[0];
52         sem_post( &chute->barrier );                                                            // tickle the ring monitor
53 }
54
55 /*
56         Allocate a message buffer, point it at the accumulated (raw) message,
57         call ref to point to all of the various bits and set real len etc,
58         then we queue it.  Raw_msg is expected to include the transport goo
59         placed in front of the RMR header and payload.
60 */
61 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
62         rmr_mbuf_t*             mbuf;
63         uta_mhdr_t*             hdr;            // header of the message received
64         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
65         chute_t*                chute;
66         unsigned int    call_id;        // the id assigned to the call generated message
67
68         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
69                 if( raw_msg == NULL || msg_size <= 0 ) {
70                         return;
71                 }
72         }
73
74         if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
75                 mbuf->tp_buf = raw_msg;
76                 mbuf->rts_fd = sender_fd;
77                 if( msg_size > ctx->max_ibm + 1024 ) {
78                         mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
79                 }
80
81                 ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
82                 hdr = mbuf->header;                                                     // convenience
83                 if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
84                         queue_normal( ctx, mbuf );
85                 } else {
86                         if( RMR_D1_LEN( hdr ) <= 0 ) {                                                                                  // no call-id data; just queue
87                                 queue_normal( ctx, mbuf );
88                         } else {
89                                 d1 = DATA1_ADDR( hdr );
90                                 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                       // call_id not set, just queue
91                                         queue_normal( ctx, mbuf );
92                                 } else {
93                                         chute = &ctx->chutes[call_id];
94                                         chute->mbuf = mbuf;
95                                         sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
96                                 }
97                         }
98                 }
99         }
100 }
101
102 /*
103         Given a buffer, extract the size. We assume the buffer contains one of:
104                 <int1><int2><mark>
105                 <int1>
106
107         where <int1> is the size in native storage order (v1) and <int2>
108         is the size in network order. If <mark> is present then we assume
109         that <int2> is present and we use that after translating from net
110         byte order. If <mark> is not present, we use <int1>. This allows
111         old versions of RMR to continue to work with new versions that now
112         do the right thing with byte ordering.
113
114         If the receiver of a message is a backlevel RMR, and it uses RTS to
115         return a message, it will only update the old size, but when the
116         message is received back at a new RMR application it will appear that
117         the message came from a new instance.  Therefore, we must compare
118         the old and new sizes and if they are different we must use the old
119         size assuming that this is the case.
120 */
121 static inline uint32_t extract_mlen( unsigned char* buf ) {
122         uint32_t        size;           // adjusted (if needed) size for return
123         uint32_t        osize;          // old size
124         uint32_t*       blen;           // length in the buffer to extract
125
126         blen = (uint32_t *) buf;
127         if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
128                 osize = *blen;                                                  // old size
129                 size = ntohl( *(blen+1) );                              // pick up the second integer
130                 if( osize != size ) {                                   // assume back level return to sender
131                         size = osize;                                           // MUST use old size
132                 }
133                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
134         } else {
135                 size = *blen;                                                   // old sender didn't encode size
136                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
137         }
138
139         return size;
140 }
141
142 /*
143         This is the callback invoked when tcp data is received. It adds the data
144         to the buffer for the connection and if a complete message is received
145         then the message is queued onto the receive ring.
146
147         Return value indicates only that we handled the buffer and SI should continue
148         or that SI should terminate, so on error it's NOT wrong to return "ok".
149 */
150 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
151         uta_ctx_t*              ctx;
152         river_t*                river;                  // river associated with the fd passed in
153         unsigned char*  old_accum;              // old accumulator reference should we need to realloc
154         int                             bidx = 0;               // transport buffer index
155         int                             remain;                 // bytes in transport buf that need to be moved
156         int*                    mlen;                   // pointer to spot in buffer for conversion to int
157         int                             need;                   // bytes needed for something
158         int                             i;
159
160         if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
161                 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
162                         return SI_RET_OK;
163                 }
164
165                 if( fd >= ctx->nrivers || fd < 0 ) {
166                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
167                         return SI_RET_OK;
168                 }
169         } else {
170                 ctx = (uta_ctx_t *) vctx;
171         }
172
173         if( buflen <= 0 ) {
174                 return SI_RET_OK;
175         }
176
177         river = &ctx->rivers[fd];
178         if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
179                 if( river->state == RS_NEW ) {
180                         memset( river, 0, sizeof( *river ) );
181                         river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
182                         river->accum = (char *) malloc( river->nbytes );
183                         river->ipt = 0;
184                 } else {
185                         // future -- sync to next marker
186                         river->ipt = 0;                                         // insert point
187                 }
188         }
189
190         river->state = RS_GOOD;
191         remain = buflen;
192         while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
193                 if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
194
195                 if( river->msg_size <= 0 ) {                            // don't have a size yet
196                                                                                                         // FIX ME: we need a frame indicator to ensure alignment
197                         need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute length
198                         if( need > remain ) {                                                                           // the whole size isn't there
199                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
200                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
201                                 river->ipt += remain;
202                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
203                                 return SI_RET_OK;
204                         }
205
206                         if( river->ipt > 0 ) {                                                                          // if we captured the start of size last go round
207                                 memcpy( &river->accum[river->ipt], buf + bidx, need );
208                                 river->ipt += need;
209                                 bidx += need;
210                                 remain -= need;
211                                 river->msg_size = extract_mlen( river->accum );
212                                 if( DEBUG ) {
213                                         rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
214                                         if( DEBUG > 1 ) {
215                                                 dump_40( river->accum, "from accumulator:"  );
216                                                 if( river->msg_size > 100 ) {
217                                                         dump_40( river->accum + 50, "from rmr buf:"  );
218                                                 }
219                                         }
220                                 }
221                         } else {
222                                 river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
223                         }
224                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
225
226                         if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
227                                 //river->flags |= RF_DROP;
228                                 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
229                                 old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
230                                 river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
231                                 river->accum = (char *) malloc( river->nbytes );
232                                 if( river->ipt > 0 ) {
233                                         memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
234                                 }
235
236                                 free( old_accum );
237                         }
238                 }
239
240                 if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
241                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
242                         if( (river->flags & RF_DROP) == 0  ) {
243                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
244                         }
245                         river->ipt += remain;
246                         remain = 0;
247                 } else {
248                         need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
249                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
250                         if( (river->flags & RF_DROP) == 0  ) {
251                                 memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
252                                 buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
253                                 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
254                                 river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
255                         } else {
256                                 if( !(river->flags & RF_NOTIFIED) ) {
257                                         rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
258                                         river->flags |= RF_NOTIFIED;
259                                 }
260                         }
261
262                         river->msg_size = -1;
263                         river->ipt = 0;
264                         bidx += need;
265                         remain -= need;
266                 }
267         }
268
269         if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
270         return SI_RET_OK;
271 }
272
273 /*
274         Callback driven on a disconnect notification. We will attempt to find the related
275         endpoint via the fd2ep hash maintained in the context. If we find it, then we
276         remove it from the hash, and mark the endpoint as closed so that the next attempt
277         to send forces a reconnect attempt.
278
279         Future: put the ep on a queue to automatically attempt to reconnect.
280 */
281 static int mt_disc_cb( void* vctx, int fd ) {
282         uta_ctx_t*      ctx;
283         endpoint_t*     ep;
284
285         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
286                 return SI_RET_OK;
287         }
288
289         ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
290         if( ep != NULL ) {
291         pthread_mutex_lock( &ep->gate );            // wise to lock this
292                 ep->open = FALSE;
293                 ep->nn_sock = -1;
294         pthread_mutex_unlock( &ep->gate );
295         }
296
297         return SI_RET_OK;
298 }
299
300
301 /*
302         This is expected to execute in a separate thread. It is responsible for
303         _all_ receives and queues them on the appropriate ring, or chute.
304         It does this by registering the callback function above with the SI world
305         and then calling SIwait() to drive the callback when data has arrived.
306
307
308         The "state" of the message is checked which determines where the message
309         is delivered.
310
311                 Flags indicate that the message is a call generated message, then
312                 the message is queued on the normal receive ring.
313
314                 Chute ID is == 0, then the message is queued on the normal receive ring.
315
316                 The transaction ID in the message matches the expected ID in the chute,
317                 then the message is given to the chute and the chute's semaphore is tickled.
318
319                 If none are true, the message is dropped.
320 */
321 static void* mt_receive( void* vctx ) {
322         uta_ctx_t*      ctx;
323
324         if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
325                 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
326                 return NULL;
327         }
328
329         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
330
331         SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
332         SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
333
334         SIwait( ctx->si_ctx );
335
336         return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
337 }
338
339 #endif