6e2e8aa5efb4b236aefdbb906801c377b4d1649a
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
1 // : vi ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       mt_call_si static.c
23         Abstract:       Static funcitons related to the multi-threaded call feature
24                                 which are SI specific.
25
26         Author:         E. Scott Daniels
27         Date:           20 May 2019
28 */
29
30 #ifndef _mtcall_si_static_c
31 #define _mtcall_si_static_c
32 #include <semaphore.h>
33
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35         static  int warned = 0;
36         chute_t*        chute;
37
38         if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39                 rmr_free_msg( mbuf );                                                           // drop if ring is full
40                 if( !warned ) {
41                         fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
42                         warned++;
43                 }
44
45                 return;
46         }
47
48         chute = &ctx->chutes[0];
49         sem_post( &chute->barrier );                                                            // tickle the ring monitor
50 }
51
52 /*
53         Allocate a message buffer, point it at the accumulated (raw) message,
54         call ref to point to all of the various bits and set real len etc,
55         then we queue it.  Raw_msg is expected to include the transport goo
56         placed in front of the RMR header and payload.
57
58         done -- FIX ME?? can we eliminate the buffer copy here?
59 */
60 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
61         rmr_mbuf_t*             mbuf;
62         uta_mhdr_t*             hdr;            // header of the message received
63         unsigned char*  d1;                     // pointer at d1 data ([0] is the call_id)
64         chute_t*                chute;
65         unsigned int    call_id;        // the id assigned to the call generated message
66
67         if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
68                 if( raw_msg == NULL || msg_size <= 0 ) {
69                         return;
70                 }
71         }
72
73 /*
74         if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) {                // alloc mbuf and point at various bits of payload
75                 memset( mbuf, 0, sizeof( *mbuf ) );
76                 mbuf->tp_buf = raw_msg;
77                 mbuf->ring = ctx->zcb_mring;
78 */
79         if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
80                 mbuf->tp_buf = raw_msg;
81                 mbuf->rts_fd = sender_fd;
82
83                 // eliminated :)   memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size );
84
85                 ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
86                 hdr = mbuf->header;                                                     // convenience
87                 if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
88                         queue_normal( ctx, mbuf );
89                 } else {
90                         if( RMR_D1_LEN( hdr ) <= 0 ) {                                                                                  // no call-id data; just queue
91                                 queue_normal( ctx, mbuf );
92                         } else {
93                                 d1 = DATA1_ADDR( hdr );
94                                 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) {                       // call_id not set, just queue
95                                         queue_normal( ctx, mbuf );
96                                 } else {
97                                         chute = &ctx->chutes[call_id];
98                                         chute->mbuf = mbuf;
99                                         sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
100                                 }
101                         }
102                 }
103         }
104 }
105
106 /*
107         This is the callback invoked when tcp data is received. It adds the data
108         to the buffer for the connection and if a complete message is received
109         then the message is queued onto the receive ring.
110
111         Return value indicates only that we handled the buffer and SI should continue
112         or that SI should terminate, so on error it's NOT wrong to return "ok".
113
114
115         FUTURE: to do this better, SI needs to support a 'ready to read' callback
116         which allows us to to the actual receive directly into our buffer.
117 */
118 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
119         uta_ctx_t*              ctx;
120         river_t*                river;                  // river associated with the fd passed in
121         int                             bidx = 0;               // transport buffer index
122         int                             remain;                 // bytes in transport buf that need to be moved
123         int*                    mlen;                   // pointer to spot in buffer for conversion to int
124         int                             need;                   // bytes needed for something
125         int                             i;
126
127         // for speed these checks should be enabled only in debug mode and assume we always get a good context
128         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
129                 return SI_RET_OK;
130         }
131
132         if( fd >= ctx->nrivers || fd < 0 ) {
133                 if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
134                 return SI_RET_OK;
135         }
136
137         // -------- end debug checks -----------------
138
139         if( buflen <= 0 ) {
140                 return SI_RET_OK;
141         }
142
143         river = &ctx->rivers[fd];
144         if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
145                 if( river->state == RS_NEW ) {
146                         memset( river, 0, sizeof( *river ) );
147                         //river->nbytes = sizeof( char ) * (8 * 1024);
148                         river->nbytes = sizeof( char ) * ctx->max_ibm;                  // max inbound message size
149                         river->accum = (char *) malloc( river->nbytes );
150                         river->ipt = 0;
151                 } else {
152                         // future -- sync to next marker
153                         river->ipt = 0;                                         // insert point
154                 }
155         }
156
157         river->state = RS_GOOD;
158
159 /*
160 fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
161 for( i = 0; i < 40; i++ ) {
162 fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
163 }
164 fprintf( stderr, "\n" );
165 */
166
167         remain = buflen;
168         while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
169                 if( DEBUG )  fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
170
171                 // FIX ME: size in the message  needs to be network byte order  
172                 if( river->msg_size <= 0 ) {                            // don't have a size yet
173                                                                                                         // FIX ME: we need a frame indicator to ensure alignment
174                         need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
175                         if( need > remain ) {                                                                           // the whole size isn't there
176                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
177                                 memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
178                                 river->ipt += remain;
179                                 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
180                                 return SI_RET_OK;
181                         }
182
183                         if( river->ipt > 0 ) {                                                                          // if we captured the start of size last go round
184                                 memcpy( &river->accum[river->ipt], buf + bidx, need );
185                                 river->ipt += need;
186                                 bidx += need;
187                                 remain -= need;
188                                 river->msg_size = *((int *) river->accum);                              
189                                 if( DEBUG > 1 ) {
190                                         fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
191                                         if( river->msg_size > 500 ) {
192                                                 dump_40( river->accum, "msg size way too large accum:"  );
193                                         }
194                                 }
195                         } else {
196                                 river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
197                         }
198                         if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
199                 }
200
201                 if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
202                         if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
203                         memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
204                         river->ipt += remain;
205                         remain = 0;
206                 } else {
207                         need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
208                         if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
209                         memcpy( &river->accum[river->ipt], buf+bidx, need );            // grab just what is needed (might be more)
210                         buf2mbuf( ctx, river->accum, river->msg_size, fd );                             // build an RMR mbuf and queue
211
212                         river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
213                         river->msg_size = -1;
214                         river->ipt = 0;
215                         bidx += need;
216                         remain -= need; 
217                 }
218         }
219
220         if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
221         return SI_RET_OK;
222 }
223
224
225 /*
226         This is expected to execute in a separate thread. It is responsible for
227         _all_ receives and queues them on the appropriate ring, or chute.
228         It does this by registering the callback function above with the SI world
229         and then caling SIwait() to drive the callback when data has arrived.
230
231
232         The "state" of the message is checked which determines where the message
233         is delivered.
234
235                 Flags indicate that the message is a call generated message, then
236                 the message is queued on the normal receive ring.
237
238                 Chute ID is == 0, then the message is queued on the normal receive ring.
239
240                 The transaction ID in the message matches the expected ID in the chute,
241                 then the message is given to the chute and the chute's semaphore is tickled.
242
243                 If none are true, the message is dropped.
244 */
245 static void* mt_receive( void* vctx ) {
246         uta_ctx_t*      ctx;
247
248         if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
249                 fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
250                 return NULL;
251         }
252
253         if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
254         SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
255         SIwait( ctx->si_ctx );
256
257         return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
258 }
259
260 #endif