1 // : vi ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_call_si static.c
23 Abstract: Static funcitons related to the multi-threaded call feature
24 which are SI specific.
26 Author: E. Scott Daniels
30 #ifndef _mtcall_si_static_c
31 #define _mtcall_si_static_c
32 #include <semaphore.h>
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35 static int warned = 0;
38 if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39 rmr_free_msg( mbuf ); // drop if ring is full
41 rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
48 chute = &ctx->chutes[0];
49 sem_post( &chute->barrier ); // tickle the ring monitor
53 Allocate a message buffer, point it at the accumulated (raw) message,
54 call ref to point to all of the various bits and set real len etc,
55 then we queue it. Raw_msg is expected to include the transport goo
56 placed in front of the RMR header and payload.
58 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
60 uta_mhdr_t* hdr; // header of the message received
61 unsigned char* d1; // pointer at d1 data ([0] is the call_id)
63 unsigned int call_id; // the id assigned to the call generated message
65 if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
66 if( raw_msg == NULL || msg_size <= 0 ) {
71 if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
72 mbuf->tp_buf = raw_msg;
73 mbuf->rts_fd = sender_fd;
75 ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
76 hdr = mbuf->header; // convenience
77 if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
78 queue_normal( ctx, mbuf );
80 if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
81 queue_normal( ctx, mbuf );
83 d1 = DATA1_ADDR( hdr );
84 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
85 queue_normal( ctx, mbuf );
87 chute = &ctx->chutes[call_id];
89 sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
97 This is the callback invoked when tcp data is received. It adds the data
98 to the buffer for the connection and if a complete message is received
99 then the message is queued onto the receive ring.
101 Return value indicates only that we handled the buffer and SI should continue
102 or that SI should terminate, so on error it's NOT wrong to return "ok".
105 FUTURE: to do this better, SI needs to support a 'ready to read' callback
106 which allows us to to the actual receive directly into our buffer.
108 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
110 river_t* river; // river associated with the fd passed in
111 int bidx = 0; // transport buffer index
112 int remain; // bytes in transport buf that need to be moved
113 int* mlen; // pointer to spot in buffer for conversion to int
114 int need; // bytes needed for something
117 if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
118 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
122 if( fd >= ctx->nrivers || fd < 0 ) {
123 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
132 river = &ctx->rivers[fd];
133 if( river->state != RS_GOOD ) { // all states which aren't good require reset first
134 if( river->state == RS_NEW ) {
135 memset( river, 0, sizeof( *river ) );
136 //river->nbytes = sizeof( char ) * (8 * 1024);
137 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
138 river->accum = (char *) malloc( river->nbytes );
141 // future -- sync to next marker
142 river->ipt = 0; // insert point
146 river->state = RS_GOOD;
148 while( remain > 0 ) { // until we've done something with all bytes passed in
149 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
151 // FIX ME: size in the message needs to be network byte order
152 if( river->msg_size <= 0 ) { // don't have a size yet
153 // FIX ME: we need a frame indicator to ensure alignment
154 need = sizeof( int ) - river->ipt; // what we need from transport buffer
155 if( need > remain ) { // the whole size isn't there
156 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
157 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
158 river->ipt += remain;
159 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
163 if( river->ipt > 0 ) { // if we captured the start of size last go round
164 memcpy( &river->accum[river->ipt], buf + bidx, need );
168 river->msg_size = *((int *) river->accum);
170 rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
172 dump_40( river->accum, "from accumulator:" );
173 if( river->msg_size > 100 ) {
174 dump_40( river->accum + 50, "from rmr buf:" );
179 river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later
181 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
183 if( river->msg_size > river->nbytes ) { // message is too big, we will drop it
184 river->flags |= RF_DROP;
188 if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
189 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
190 if( (river->flags & RF_DROP) == 0 ) {
191 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
193 river->ipt += remain;
196 need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
197 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
198 if( (river->flags & RF_DROP) == 0 ) {
199 memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
200 buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
201 river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
203 if( !(river->flags & RF_NOTIFIED) ) {
204 rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
205 river->flags |= RF_NOTIFIED;
209 river->msg_size = -1;
216 if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
221 Callback driven on a disconnect notification. We will attempt to find the related
222 endpoint via the fd2ep hash maintained in the context. If we find it, then we
223 remove it from the hash, and mark the endpoint as closed so that the next attempt
224 to send forces a reconnect attempt.
226 Future: put the ep on a queue to automatically attempt to reconnect.
228 static int mt_disc_cb( void* vctx, int fd ) {
232 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
236 ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
238 pthread_mutex_lock( &ep->gate ); // wise to lock this
241 pthread_mutex_unlock( &ep->gate );
249 This is expected to execute in a separate thread. It is responsible for
250 _all_ receives and queues them on the appropriate ring, or chute.
251 It does this by registering the callback function above with the SI world
252 and then calling SIwait() to drive the callback when data has arrived.
255 The "state" of the message is checked which determines where the message
258 Flags indicate that the message is a call generated message, then
259 the message is queued on the normal receive ring.
261 Chute ID is == 0, then the message is queued on the normal receive ring.
263 The transaction ID in the message matches the expected ID in the chute,
264 then the message is given to the chute and the chute's semaphore is tickled.
266 If none are true, the message is dropped.
268 static void* mt_receive( void* vctx ) {
271 if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
272 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
276 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
278 SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
279 SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
281 SIwait( ctx->si_ctx );
283 return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return