1 // : vi ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
22 Mnemonic: mt_call_si static.c
23 Abstract: Static funcitons related to the multi-threaded call feature
24 which are SI specific.
26 Author: E. Scott Daniels
30 #ifndef _mtcall_si_static_c
31 #define _mtcall_si_static_c
32 #include <semaphore.h>
34 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
35 static int warned = 0;
38 if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
39 rmr_free_msg( mbuf ); // drop if ring is full
41 fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
48 chute = &ctx->chutes[0];
49 sem_post( &chute->barrier ); // tickle the ring monitor
53 Allocate a message buffer, point it at the accumulated (raw) message,
54 call ref to point to all of the various bits and set real len etc,
55 then we queue it. Raw_msg is expected to include the transport goo
56 placed in front of the RMR header and payload.
58 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
60 uta_mhdr_t* hdr; // header of the message received
61 unsigned char* d1; // pointer at d1 data ([0] is the call_id)
63 unsigned int call_id; // the id assigned to the call generated message
65 if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
66 if( raw_msg == NULL || msg_size <= 0 ) {
71 if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
72 mbuf->tp_buf = raw_msg;
73 mbuf->rts_fd = sender_fd;
75 ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
76 hdr = mbuf->header; // convenience
77 if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
78 queue_normal( ctx, mbuf );
80 if( RMR_D1_LEN( hdr ) <= 0 ) { // no call-id data; just queue
81 queue_normal( ctx, mbuf );
83 d1 = DATA1_ADDR( hdr );
84 if( (call_id = (unsigned int) d1[D1_CALLID_IDX]) == 0 ) { // call_id not set, just queue
85 queue_normal( ctx, mbuf );
87 chute = &ctx->chutes[call_id];
89 sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
97 This is the callback invoked when tcp data is received. It adds the data
98 to the buffer for the connection and if a complete message is received
99 then the message is queued onto the receive ring.
101 Return value indicates only that we handled the buffer and SI should continue
102 or that SI should terminate, so on error it's NOT wrong to return "ok".
105 FUTURE: to do this better, SI needs to support a 'ready to read' callback
106 which allows us to to the actual receive directly into our buffer.
108 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
110 river_t* river; // river associated with the fd passed in
111 int bidx = 0; // transport buffer index
112 int remain; // bytes in transport buf that need to be moved
113 int* mlen; // pointer to spot in buffer for conversion to int
114 int need; // bytes needed for something
117 if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
118 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
122 if( fd >= ctx->nrivers || fd < 0 ) {
123 if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
132 river = &ctx->rivers[fd];
133 if( river->state != RS_GOOD ) { // all states which aren't good require reset first
134 if( river->state == RS_NEW ) {
135 memset( river, 0, sizeof( *river ) );
136 //river->nbytes = sizeof( char ) * (8 * 1024);
137 river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size
138 river->accum = (char *) malloc( river->nbytes );
141 // future -- sync to next marker
142 river->ipt = 0; // insert point
146 river->state = RS_GOOD;
149 fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
150 for( i = 0; i < 40; i++ ) {
151 fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
153 fprintf( stderr, "\n" );
157 while( remain > 0 ) { // until we've done something with all bytes passed in
158 if( DEBUG ) fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
160 // FIX ME: size in the message needs to be network byte order
161 if( river->msg_size <= 0 ) { // don't have a size yet
162 // FIX ME: we need a frame indicator to ensure alignment
163 need = sizeof( int ) - river->ipt; // what we need from transport buffer
164 if( need > remain ) { // the whole size isn't there
165 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
166 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
167 river->ipt += remain;
168 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
172 if( river->ipt > 0 ) { // if we captured the start of size last go round
173 memcpy( &river->accum[river->ipt], buf + bidx, need );
177 river->msg_size = *((int *) river->accum);
179 fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
180 if( river->msg_size > 500 ) {
181 dump_40( river->accum, "msg size way too large accum:" );
185 river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later
187 if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
190 if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
191 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
192 memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
193 river->ipt += remain;
196 need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
197 if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
198 memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
199 buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
201 river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
202 river->msg_size = -1;
209 if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
215 This is expected to execute in a separate thread. It is responsible for
216 _all_ receives and queues them on the appropriate ring, or chute.
217 It does this by registering the callback function above with the SI world
218 and then calling SIwait() to drive the callback when data has arrived.
221 The "state" of the message is checked which determines where the message
224 Flags indicate that the message is a call generated message, then
225 the message is queued on the normal receive ring.
227 Chute ID is == 0, then the message is queued on the normal receive ring.
229 The transaction ID in the message matches the expected ID in the chute,
230 then the message is given to the chute and the chute's semaphore is tickled.
232 If none are true, the message is dropped.
234 static void* mt_receive( void* vctx ) {
237 if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
238 fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
242 if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
243 SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
244 SIwait( ctx->si_ctx );
246 return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return