// :vi sw=4 ts=4 noet:
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2020 Nokia
+ Copyright (c) 2018-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
#include <unistd.h>
#include <netdb.h>
+#include <RIC_message_types.h> // needed for route manager messages
+
/*
Passed to a symtab foreach callback to construct a list of pointers from
rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
}
+// ------------ route manager communication -------------------------------------------------
+/*
+ Send a request for a table update to the route manager. Updates come in
+ async, so send and go.
+
+ pctx is the private context for the thread; ctx is the application context
+ that we need to be able to send the application ID in case rt mgr needs to
+ use it to idenfity us.
+
+ Returns 0 if we were not able to send a request.
+*/
+static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
+ rmr_mbuf_t* smsg;
+ int state = 0;
+
+ if( ctx->rtg_whid < 0 ) {
+ return state;
+ }
+
+ smsg = rmr_alloc_msg( pctx, 1024 );
+ if( smsg != NULL ) {
+ smsg->mtype = RMRRM_REQ_TABLE;
+ smsg->sub_id = 0;
+ snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
+ smsg->len = strlen( smsg->payload ) + 1;
+
+ smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
+ if( (state = smsg->state) != RMR_OK ) {
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
+ rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
+ ctx->rtg_whid = -1;
+ }
+
+ rmr_free_msg( smsg );
+ }
+
+ return state;
+}
+
+/*
+ Send an ack to the route table manager for a table ID that we are
+ processing. State is 1 for OK, and 0 for failed. Reason might
+ be populated if we know why there was a failure.
+
+ Context should be the PRIVATE context that we use for messages
+ to route manger and NOT the user's context.
+
+ If a message buffere is passed we use that and use return to sender
+ assuming that this might be a response to a call and that is needed
+ to send back to the proper calling thread. If msg is nil, we allocate
+ and use it.
+*/
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
+ int use_rts = 1;
+ int payload_size = 1024;
+
+ if( ctx == NULL || ctx->rtg_whid < 0 ) {
+ return;
+ }
+
+ if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
+ return;
+ }
+
+ if( smsg != NULL ) {
+ smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
+ } else {
+ use_rts = 0;
+ smsg = rmr_alloc_msg( ctx, payload_size );
+ }
+
+ if( smsg != NULL ) {
+ smsg->mtype = RMRRM_TABLE_STATE;
+ smsg->sub_id = -1;
+ snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
+ table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
+
+ smsg->len = strlen( smsg->payload ) + 1;
+
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
+ if( use_rts ) {
+ smsg = rmr_rts_msg( ctx, smsg );
+ } else {
+ smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+ }
+ if( (state = smsg->state) != RMR_OK ) {
+ rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
+ rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
+ ctx->rtg_whid = -1;
+ }
+
+ if( ! use_rts ) {
+ rmr_free_msg( smsg ); // if not our message we must free the leftovers
+ }
+ }
+}
// ------------------------------------------------------------------------------------------------
/*
/*
Parse a partially parsed meid record. Tokens[0] should be one of:
meid_map, mme_ar, mme_del.
+
+ pctx is the private context needed to return an ack/nack using the provided
+ message buffer with the route managers address info.
*/
-static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
+static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
+ char wbuf[1024];
+
if( tokens == NULL || ntoks < 1 ) {
return; // silent but should never happen
}
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
uta_rt_drop( ctx->new_rtable );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
}
+ if( ctx->table_id != NULL ) {
+ free( ctx->table_id );
+ }
+ if( ntoks >2 ) {
+ ctx->table_id = strdup( clip( tokens[2] ) );
+ } else {
+ ctx->table_id = NULL;
+ }
ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid)
ctx->new_rtable->mupdates = 0;
if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
if( ntoks > 2 ) { // meid_map | end | <count> |??? given
if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
+ snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
return;
ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
ctx->new_rtable = NULL;
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
if( vlevel > 0 ) {
rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
For a RT update, we expect:
- newrt|{start|end [hold]}
+ newrt | start | <table-id>
+ newrt | end | <count>
rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
mse| <mtype>[,sender] | <sub-id> | %meid
For a meid map update we expect:
- meid_map | start
+ meid_map | start | <table-id>
meid_map | end | <count> | <md5-hash>
mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
mme_del | <meid0> <meid1>...<meidn>
+
+ The pctx is our private context that must be used to send acks/status
+ messages back to the route manager. The regular ctx is the ctx that
+ the user has been given and thus that's where we have to hang the route
+ table we're working with.
+
+ If mbuf is given, and we need to ack, then we ack using the mbuf and a
+ return to sender call (allows route manager to use wh_call() to send
+ an update and rts is required to get that back to the right thread).
+ If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
+ will be used.
*/
-static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
+static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
int i;
int ntoks; // number of tokens found in something
int ngtoks;
rtable_ent_t* rte; // route table entry added
char* tokens[128];
char* tok; // pointer into a token or string
+ char wbuf[1024];
if( ! buf ) {
return;
case 'n': // newrt|{start|end}
tokens[1] = clip( tokens[1] );
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
+ if( ntoks >2 ) {
+ if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
+ rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
+ ctx->new_rtable->updates, tokens[2] );
+ snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
+ uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
+ break;
+ }
+ }
+
if( ctx->new_rtable ) {
uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
rt_stats( ctx->rtable );
}
+
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
} else {
if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
}
- } else { // start a new table.
- if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
+ } else { // start a new table.
+ if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
+
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
uta_rt_drop( ctx->new_rtable );
}
+ if( ctx->table_id != NULL ) {
+ free( ctx->table_id );
+ }
+ if( ntoks >2 ) {
+ ctx->table_id = strdup( clip( tokens[2] ) );
+ } else {
+ ctx->table_id = NULL;
+ }
+
ctx->new_rtable = NULL;
ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table
+ ctx->new_rtable->updates = 0; // init count of entries received
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
}
break;
- case 'm': // mse entry or one of the meid_ records
+ case 'm': // mse entry or one of the meid_ records
if( strcmp( tokens[0], "mse" ) == 0 ) {
if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
break;
build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
ctx->new_rtable->updates++;
} else {
- meid_parser( ctx, tokens, ntoks, vlevel );
+ meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
}
break;
uta_rt_drop( ctx->new_rtable );
}
+ if( ntoks >2 ) {
+ if( ctx->table_id != NULL ) {
+ free( ctx->table_id );
+ }
+ ctx->table_id = strdup( clip( tokens[2] ) );
+ }
+
ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
ctx->new_rtable->updates = 0; // init count of updates received
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
}
}
- for( rec = fbuf; rec && *rec; rec = eor+1 ) {
+ rec = fbuf;
+ while( rec && *rec ) {
rcount++;
if( (eor = strchr( rec, '\n' )) != NULL ) {
*eor = 0;
return;
}
- parse_rt_rec( ctx, rec, vlevel );
+ parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
+
+ rec = eor+1;
}
if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
return key;
}
+/*
+ Given a route table and meid string, find the owner (if known). Returns a pointer to
+ the endpoint struct or nil.
+*/
+static inline endpoint_t* get_meid_owner( route_table_t *rt, char* meid ) {
+ endpoint_t* ep; // the ep we found in the hash
+
+ if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
+ return NULL;
+ }
+
+ return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
+}
#endif