1 // : vi ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_call_si static.c
23 Abstract: Static funcitons related to the multi-threaded call feature
24 which are SI specific.
26 Author: E. Scott Daniels
30 #ifndef _mtcall_si_static_c
31 #define _mtcall_si_static_c
32 #include <semaphore.h>
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35 static int warned = 0;
38 if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39 rmr_free_msg( mbuf ); // drop if ring is full
41 fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
48 chute = &ctx->chutes[0];
49 sem_post( &chute->barrier ); // tickle the ring monitor
53 Allocate a message buffer, point it at the accumulated (raw) message,
54 call ref to point to all of the various bits and set real len etc,
55 then we queue it. Raw_msg is expected to include the transport goo
56 placed in front of the RMR header and payload.
58 done -- FIX ME?? can we eliminate the buffer copy here?
60 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
62 uta_mhdr_t* hdr; // header of the message received
63 unsigned char* d1; // pointer at d1 data ([0] is the call_id)
65 unsigned int call_id; // the id assigned to the call generated message
67 if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
68 if( raw_msg == NULL || msg_size <= 0 ) {
74 if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) { // alloc mbuf and point at various bits of payload
75 memset( mbuf, 0, sizeof( *mbuf ) );
76 mbuf->tp_buf = raw_msg;
77 mbuf->ring = ctx->zcb_mring;
79 if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
80 mbuf->tp_buf = raw_msg;
81 mbuf->rts_fd = sender_fd;
83 // eliminated :) memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size );
85 ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
86 hdr = mbuf->header; // convenience
87 if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
88 queue_normal( ctx, mbuf );
90 if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
91 queue_normal( ctx, mbuf );
93 d1 = DATA1_ADDR( hdr );
94 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
95 queue_normal( ctx, mbuf );
97 chute = &ctx->chutes[call_id];
99 sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
107 This is the callback invoked when tcp data is received. It adds the data
108 to the buffer for the connection and if a complete message is received
109 then the message is queued onto the receive ring.
111 Return value indicates only that we handled the buffer and SI should continue
112 or that SI should terminate, so on error it's NOT wrong to return "ok".
115 FUTURE: to do this better, SI needs to support a 'ready to read' callback
116 which allows us to to the actual receive directly into our buffer.
118 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
120 river_t* river; // river associated with the fd passed in
121 int bidx = 0; // transport buffer index
122 int remain; // bytes in transport buf that need to be moved
123 int* mlen; // pointer to spot in buffer for conversion to int
124 int need; // bytes needed for something
127 // for speed these checks should be enabled only in debug mode and assume we always get a good context
128 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
132 if( fd >= ctx->nrivers || fd < 0 ) {
133 if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
137 // -------- end debug checks -----------------
143 river = &ctx->rivers[fd];
144 if( river->state != RS_GOOD ) { // all states which aren't good require reset first
145 if( river->state == RS_NEW ) {
146 memset( river, 0, sizeof( *river ) );
147 //river->nbytes = sizeof( char ) * (8 * 1024);
148 river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size
149 river->accum = (char *) malloc( river->nbytes );
152 // future -- sync to next marker
153 river->ipt = 0; // insert point
157 river->state = RS_GOOD;
160 fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
161 for( i = 0; i < 40; i++ ) {
162 fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
164 fprintf( stderr, "\n" );
168 while( remain > 0 ) { // until we've done something with all bytes passed in
169 if( DEBUG ) fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
171 // FIX ME: size in the message needs to be network byte order
172 if( river->msg_size <= 0 ) { // don't have a size yet
173 // FIX ME: we need a frame indicator to ensure alignment
174 need = sizeof( int ) - river->ipt; // what we need from transport buffer
175 if( need > remain ) { // the whole size isn't there
176 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
177 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
178 river->ipt += remain;
179 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
183 if( river->ipt > 0 ) { // if we captured the start of size last go round
184 memcpy( &river->accum[river->ipt], buf + bidx, need );
188 river->msg_size = *((int *) river->accum);
190 fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
191 if( river->msg_size > 500 ) {
192 dump_40( river->accum, "msg size way too large accum:" );
196 river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later
198 if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
201 if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
202 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
203 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
204 river->ipt += remain;
207 need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
208 if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
209 memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
210 buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
212 river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
213 river->msg_size = -1;
220 if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
226 This is expected to execute in a separate thread. It is responsible for
227 _all_ receives and queues them on the appropriate ring, or chute.
228 It does this by registering the callback function above with the SI world
229 and then caling SIwait() to drive the callback when data has arrived.
232 The "state" of the message is checked which determines where the message
235 Flags indicate that the message is a call generated message, then
236 the message is queued on the normal receive ring.
238 Chute ID is == 0, then the message is queued on the normal receive ring.
240 The transaction ID in the message matches the expected ID in the chute,
241 then the message is given to the chute and the chute's semaphore is tickled.
243 If none are true, the message is dropped.
245 static void* mt_receive( void* vctx ) {
248 if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
249 fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
253 if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
254 SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
255 SIwait( ctx->si_ctx );
257 return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return