2894cdb609d968d8b7ada4c7f4233b7ac80d2233
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
1 // : vi ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_si static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are SI specific.
25
26         Author:         E. Scott Daniels
27         Date:           20 May 2019
28 */
29
30 #ifndef _mtcall_si_static_c
31 #define _mtcall_si_static_c
32 #include <semaphore.h>
33
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35         static  int warned = 0;
36         chute_t*        chute;
37
38         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39                 rmr_free_msg( mbuf );                                                           // drop if ring is full
40                 if( !warned ) {
41                         rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
42                         warned++;
43                 }
44
45                 return;
46         }
47
48         chute = &ctx->chutes[0];
49         sem_post( &chute->barrier );                                                            // tickle the ring monitor
50 }
51
52 /*
53         Allocate a message buffer, point it at the accumulated (raw) message,
54         call ref to point to all of the various bits and set real len etc,
55         then we queue it.  Raw_msg is expected to include the transport goo
56         placed in front of the RMR header and payload.
57 */
58 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
59         rmr_mbuf_t*             mbuf;
60         uta_mhdr_t*             hdr;            // header of the message received
61         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
62         chute_t*                chute;
63         unsigned int    call_id;        // the id assigned to the call generated message
64
65         if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
66                 if( raw_msg == NULL || msg_size <= 0 ) {
67                         return;
68                 }
69         }
70
71         if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
72                 mbuf->tp_buf = raw_msg;
73                 mbuf->rts_fd = sender_fd;
74
75                 ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
76                 hdr = mbuf->header;                                                     // convenience
77                 if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
78                         queue_normal( ctx, mbuf );
79                 } else {
80                         if( RMR_D1_LEN( hdr ) <= 0 ) {                                                                                  // no call-id data; just queue
81                                 queue_normal( ctx, mbuf );
82                         } else {
83                                 d1 = DATA1_ADDR( hdr );
84                                 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                       // call_id not set, just queue
85                                         queue_normal( ctx, mbuf );
86                                 } else {
87                                         chute = &ctx->chutes[call_id];
88                                         chute->mbuf = mbuf;
89                                         sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
90                                 }
91                         }
92                 }
93         }
94 }
95
96 /*
97         This is the callback invoked when tcp data is received. It adds the data
98         to the buffer for the connection and if a complete message is received
99         then the message is queued onto the receive ring.
100
101         Return value indicates only that we handled the buffer and SI should continue
102         or that SI should terminate, so on error it's NOT wrong to return "ok".
103
104
105         FUTURE: to do this better, SI needs to support a 'ready to read' callback
106         which allows us to to the actual receive directly into our buffer.
107 */
108 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
109         uta_ctx_t*              ctx;
110         river_t*                river;                  // river associated with the fd passed in
111         int                             bidx = 0;               // transport buffer index
112         int                             remain;                 // bytes in transport buf that need to be moved
113         int*                    mlen;                   // pointer to spot in buffer for conversion to int
114         int                             need;                   // bytes needed for something
115         int                             i;
116
117         if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
118                 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
119                         return SI_RET_OK;
120                 }
121
122                 if( fd >= ctx->nrivers || fd < 0 ) {
123                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
124                         return SI_RET_OK;
125                 }
126         }
127
128         if( buflen <= 0 ) {
129                 return SI_RET_OK;
130         }
131
132         river = &ctx->rivers[fd];
133         if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
134                 if( river->state == RS_NEW ) {
135                         memset( river, 0, sizeof( *river ) );
136                         //river->nbytes = sizeof( char ) * (8 * 1024);
137                         river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
138                         river->accum = (char *) malloc( river->nbytes );
139                         river->ipt = 0;
140                 } else {
141                         // future -- sync to next marker
142                         river->ipt = 0;                                         // insert point
143                 }
144         }
145
146         river->state = RS_GOOD;
147         remain = buflen;
148         while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
149                 if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
150
151                 // FIX ME: size in the message  needs to be network byte order
152                 if( river->msg_size <= 0 ) {                            // don't have a size yet
153                                                                                                         // FIX ME: we need a frame indicator to ensure alignment
154                         need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
155                         if( need > remain ) {                                                                           // the whole size isn't there
156                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
157                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
158                                 river->ipt += remain;
159                                 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
160                                 return SI_RET_OK;
161                         }
162
163                         if( river->ipt > 0 ) {                                                                          // if we captured the start of size last go round
164                                 memcpy( &river->accum[river->ipt], buf + bidx, need );
165                                 river->ipt += need;
166                                 bidx += need;
167                                 remain -= need;
168                                 river->msg_size = *((int *) river->accum);              
169                                 if( DEBUG > 1 ) {
170                                         rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
171                                         if( river->msg_size > 500 ) {
172                                                 dump_40( river->accum, "msg size way too large accum:"  );
173                                         }
174                                 }
175                         } else {
176                                 river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
177                         }
178                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
179
180                         if( river->msg_size > river->nbytes ) {                         // message is too big, we will drop it
181                                 river->flags |= RF_DROP;
182                         }
183                 }
184
185                 if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
186                         if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
187                         if( (river->flags & RF_DROP) == 0  ) {
188                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
189                         }
190                         river->ipt += remain;
191                         remain = 0;
192                 } else {
193                         need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
194                         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
195                         if( (river->flags & RF_DROP) == 0  ) {
196                                 memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
197                                 buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
198                                 river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
199                         } else {
200                                 if( !(river->flags & RF_NOTIFIED) ) {
201                                         rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
202                                         river->flags |= RF_NOTIFIED;
203                                 }
204                         }
205
206                         river->msg_size = -1;
207                         river->ipt = 0;
208                         bidx += need;
209                         remain -= need;
210                 }
211         }
212
213         if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
214         return SI_RET_OK;
215 }
216
217 /*
218         Callback driven on a disconnect notification. We will attempt to find the related
219         endpoint via the fd2ep hash maintained in the context. If we find it, then we
220         remove it from the hash, and mark the endpoint as closed so that the next attempt
221         to send forces a reconnect attempt.
222
223         Future: put the ep on a queue to automatically attempt to reconnect.
224 */
225 static int mt_disc_cb( void* vctx, int fd ) {
226         uta_ctx_t*      ctx;
227         endpoint_t*     ep;
228
229         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
230                 return SI_RET_OK;
231         }
232
233         ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
234         if( ep != NULL ) {
235         pthread_mutex_lock( &ep->gate );            // wise to lock this
236                 ep->open = FALSE;
237                 ep->nn_sock = -1;
238         pthread_mutex_unlock( &ep->gate );
239         }
240
241         return SI_RET_OK;
242 }
243
244
245 /*
246         This is expected to execute in a separate thread. It is responsible for
247         _all_ receives and queues them on the appropriate ring, or chute.
248         It does this by registering the callback function above with the SI world
249         and then calling SIwait() to drive the callback when data has arrived.
250
251
252         The "state" of the message is checked which determines where the message
253         is delivered.
254
255                 Flags indicate that the message is a call generated message, then
256                 the message is queued on the normal receive ring.
257
258                 Chute ID is == 0, then the message is queued on the normal receive ring.
259
260                 The transaction ID in the message matches the expected ID in the chute,
261                 then the message is given to the chute and the chute's semaphore is tickled.
262
263                 If none are true, the message is dropped.
264 */
265 static void* mt_receive( void* vctx ) {
266         uta_ctx_t*      ctx;
267
268         if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
269                 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
270                 return NULL;
271         }
272
273         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
274
275         SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
276         SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
277
278         SIwait( ctx->si_ctx );
279
280         return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
281 }
282
283 #endif