1 // : vi ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_call_si static.c
23 Abstract: Static funcitons related to the multi-threaded call feature
24 which are SI specific.
26 Author: E. Scott Daniels
30 #ifndef _mtcall_si_static_c
31 #define _mtcall_si_static_c
32 #include <semaphore.h>
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35 static int warned = 0;
38 if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39 rmr_free_msg( mbuf ); // drop if ring is full
41 rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
48 chute = &ctx->chutes[0];
49 sem_post( &chute->barrier ); // tickle the ring monitor
53 Allocate a message buffer, point it at the accumulated (raw) message,
54 call ref to point to all of the various bits and set real len etc,
55 then we queue it. Raw_msg is expected to include the transport goo
56 placed in front of the RMR header and payload.
58 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
60 uta_mhdr_t* hdr; // header of the message received
61 unsigned char* d1; // pointer at d1 data ([0] is the call_id)
63 unsigned int call_id; // the id assigned to the call generated message
65 if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
66 if( raw_msg == NULL || msg_size <= 0 ) {
71 if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
72 mbuf->tp_buf = raw_msg;
73 mbuf->rts_fd = sender_fd;
75 ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
76 hdr = mbuf->header; // convenience
77 if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
78 queue_normal( ctx, mbuf );
80 if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
81 queue_normal( ctx, mbuf );
83 d1 = DATA1_ADDR( hdr );
84 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
85 queue_normal( ctx, mbuf );
87 chute = &ctx->chutes[call_id];
89 sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
97 This is the callback invoked when tcp data is received. It adds the data
98 to the buffer for the connection and if a complete message is received
99 then the message is queued onto the receive ring.
101 Return value indicates only that we handled the buffer and SI should continue
102 or that SI should terminate, so on error it's NOT wrong to return "ok".
105 FUTURE: to do this better, SI needs to support a 'ready to read' callback
106 which allows us to to the actual receive directly into our buffer.
108 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
110 river_t* river; // river associated with the fd passed in
111 int bidx = 0; // transport buffer index
112 int remain; // bytes in transport buf that need to be moved
113 int* mlen; // pointer to spot in buffer for conversion to int
114 int need; // bytes needed for something
117 if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
118 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
122 if( fd >= ctx->nrivers || fd < 0 ) {
123 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
132 river = &ctx->rivers[fd];
133 if( river->state != RS_GOOD ) { // all states which aren't good require reset first
134 if( river->state == RS_NEW ) {
135 memset( river, 0, sizeof( *river ) );
136 //river->nbytes = sizeof( char ) * (8 * 1024);
137 river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
138 river->accum = (char *) malloc( river->nbytes );
141 // future -- sync to next marker
142 river->ipt = 0; // insert point
146 river->state = RS_GOOD;
148 while( remain > 0 ) { // until we've done something with all bytes passed in
149 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
151 // FIX ME: size in the message needs to be network byte order
152 if( river->msg_size <= 0 ) { // don't have a size yet
153 // FIX ME: we need a frame indicator to ensure alignment
154 need = sizeof( int ) - river->ipt; // what we need from transport buffer
155 if( need > remain ) { // the whole size isn't there
156 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
157 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
158 river->ipt += remain;
159 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
163 if( river->ipt > 0 ) { // if we captured the start of size last go round
164 memcpy( &river->accum[river->ipt], buf + bidx, need );
168 river->msg_size = *((int *) river->accum);
170 rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
171 if( river->msg_size > 500 ) {
172 dump_40( river->accum, "msg size way too large accum:" );
176 river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later
178 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
180 if( river->msg_size > river->nbytes ) { // message is too big, we will drop it
181 river->flags |= RF_DROP;
185 if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
186 if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
187 if( (river->flags & RF_DROP) == 0 ) {
188 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
190 river->ipt += remain;
193 need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
194 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
195 if( (river->flags & RF_DROP) == 0 ) {
196 memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
197 buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
198 river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
200 if( !(river->flags & RF_NOTIFIED) ) {
201 rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
202 river->flags |= RF_NOTIFIED;
206 river->msg_size = -1;
213 if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
218 Callback driven on a disconnect notification. We will attempt to find the related
219 endpoint via the fd2ep hash maintained in the context. If we find it, then we
220 remove it from the hash, and mark the endpoint as closed so that the next attempt
221 to send forces a reconnect attempt.
223 Future: put the ep on a queue to automatically attempt to reconnect.
225 static int mt_disc_cb( void* vctx, int fd ) {
229 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
233 ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
235 pthread_mutex_lock( &ep->gate ); // wise to lock this
238 pthread_mutex_unlock( &ep->gate );
246 This is expected to execute in a separate thread. It is responsible for
247 _all_ receives and queues them on the appropriate ring, or chute.
248 It does this by registering the callback function above with the SI world
249 and then calling SIwait() to drive the callback when data has arrived.
252 The "state" of the message is checked which determines where the message
255 Flags indicate that the message is a call generated message, then
256 the message is queued on the normal receive ring.
258 Chute ID is == 0, then the message is queued on the normal receive ring.
260 The transaction ID in the message matches the expected ID in the chute,
261 then the message is given to the chute and the chute's semaphore is tickled.
263 If none are true, the message is dropped.
265 static void* mt_receive( void* vctx ) {
268 if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
269 rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
273 if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
275 SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
276 SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
278 SIwait( ctx->si_ctx );
280 return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return