X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Frmr%2Fcommon%2Fsrc%2Frt_generic_static.c;h=185dbb1f178a407ca816076c6d8ab266c68d7b4b;hb=33a2b86357426380076a408761169f4b698e8f3d;hp=245c9b533d24163c3ff392ea9dd5802ecb9a953e;hpb=c1658934f329e02a704dd5ec94b38dff293b09ee;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c index 245c9b5..185dbb1 100644 --- a/src/rmr/common/src/rt_generic_static.c +++ b/src/rmr/common/src/rt_generic_static.c @@ -1,8 +1,8 @@ // :vi sw=4 ts=4 noet: /* ================================================================================== - Copyright (c) 2019 Nokia - Copyright (c) 2018-2019 AT&T Intellectual Property. + Copyright (c) 2019-2020 Nokia + Copyright (c) 2018-2020 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -46,6 +46,8 @@ #include #include +#include // needed for route manager messages + /* Passed to a symtab foreach callback to construct a list of pointers from @@ -192,6 +194,103 @@ static void rt_epcounts( route_table_t* rt, char* id ) { rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table } +// ------------ route manager communication ------------------------------------------------- +/* + Send a request for a table update to the route manager. Updates come in + async, so send and go. + + pctx is the private context for the thread; ctx is the application context + that we need to be able to send the application ID in case rt mgr needs to + use it to idenfity us. + + Returns 0 if we were not able to send a request. +*/ +static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) { + rmr_mbuf_t* smsg; + int state = 0; + + if( ctx->rtg_whid < 0 ) { + return state; + } + + smsg = rmr_alloc_msg( pctx, 1024 ); + if( smsg != NULL ) { + smsg->mtype = RMRRM_REQ_TABLE; + smsg->sub_id = 0; + snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) ); + rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid ); + smsg->len = strlen( smsg->payload ) + 1; + + smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg ); + if( (state = smsg->state) != RMR_OK ) { + rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid ); + rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost + ctx->rtg_whid = -1; + } + + rmr_free_msg( smsg ); + } + + return state; +} + +/* + Send an ack to the route table manager for a table ID that we are + processing. State is 1 for OK, and 0 for failed. Reason might + be populated if we know why there was a failure. + + Context should be the PRIVATE context that we use for messages + to route manger and NOT the user's context. + + If a message buffere is passed we use that and use return to sender + assuming that this might be a response to a call and that is needed + to send back to the proper calling thread. If msg is nil, we allocate + and use it. +*/ +static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) { + int use_rts = 1; + int payload_size = 1024; + + if( ctx == NULL || ctx->rtg_whid < 0 ) { + return; + } + + if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc + return; + } + + if( smsg != NULL ) { + smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response + } else { + use_rts = 0; + smsg = rmr_alloc_msg( ctx, payload_size ); + } + + if( smsg != NULL ) { + smsg->mtype = RMRRM_TABLE_STATE; + smsg->sub_id = -1; + snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", + table_id == NULL ? "" : table_id, reason == NULL ? "" : reason ); + + smsg->len = strlen( smsg->payload ) + 1; + + rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid ); + if( use_rts ) { + smsg = rmr_rts_msg( ctx, smsg ); + } else { + smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg ); + } + if( (state = smsg->state) != RMR_OK ) { + rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state ); + rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost + ctx->rtg_whid = -1; + } + + if( ! use_rts ) { + rmr_free_msg( smsg ); // if not our message we must free the leftovers + } + } +} // ------------------------------------------------------------------------------------------------ /* @@ -469,8 +568,13 @@ static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) { /* Parse a partially parsed meid record. Tokens[0] should be one of: meid_map, mme_ar, mme_del. + + pctx is the private context needed to return an ack/nack using the provided + message buffer with the route managers address info. */ -static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) { +static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) { + char wbuf[1024]; + if( tokens == NULL || ntoks < 1 ) { return; // silent but should never happen } @@ -486,8 +590,17 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) if( ctx->new_rtable != NULL ) { // one in progress? this forces it out if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" ); uta_rt_drop( ctx->new_rtable ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it } + if( ctx->table_id != NULL ) { + free( ctx->table_id ); + } + if( ntoks >2 ) { + ctx->table_id = strdup( clip( tokens[2] ) ); + } else { + ctx->table_id = NULL; + } ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid) ctx->new_rtable->mupdates = 0; if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" ); @@ -496,6 +609,8 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) if( ntoks > 2 ) { // meid_map | end | |??? given if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] ); + snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); uta_rt_drop( ctx->new_rtable ); ctx->new_rtable = NULL; return; @@ -510,6 +625,7 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active ctx->new_rtable = NULL; if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" ); + send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); if( vlevel > 0 ) { rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" ); @@ -573,20 +689,32 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) For a RT update, we expect: - newrt|{start|end [hold]} + newrt | start | + newrt | end | rte|[,sender]|[;,...] mse|[,sender]||[;,...] mse| [,sender] | | %meid For a meid map update we expect: - meid_map | start + meid_map | start | meid_map | end | | mme_ar | | ... mme_del | ... + + The pctx is our private context that must be used to send acks/status + messages back to the route manager. The regular ctx is the ctx that + the user has been given and thus that's where we have to hang the route + table we're working with. + + If mbuf is given, and we need to ack, then we ack using the mbuf and a + return to sender call (allows route manager to use wh_call() to send + an update and rts is required to get that back to the right thread). + If mbuf is nil, then one will be allocated (in ack) and a normal wh_send + will be used. */ -static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { +static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) { int i; int ntoks; // number of tokens found in something int ngtoks; @@ -594,6 +722,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { rtable_ent_t* rte; // route table entry added char* tokens[128]; char* tok; // pointer into a token or string + char wbuf[1024]; if( ! buf ) { return; @@ -630,6 +759,18 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { case 'n': // newrt|{start|end} tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building + if( ntoks >2 ) { + if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received + rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n", + ctx->new_rtable->updates, tokens[2] ); + snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); + uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; + break; + } + } + if( ctx->new_rtable ) { uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' @@ -643,23 +784,37 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); rt_stats( ctx->rtable ); } + + send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); } else { if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" ); ctx->new_rtable = NULL; } - } else { // start a new table. - if( ctx->new_rtable != NULL ) { // one in progress? this forces it out + } else { // start a new table. + if( ctx->new_rtable != NULL ) { // one in progress? this forces it out + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" ); uta_rt_drop( ctx->new_rtable ); } + if( ctx->table_id != NULL ) { + free( ctx->table_id ); + } + if( ntoks >2 ) { + ctx->table_id = strdup( clip( tokens[2] ) ); + } else { + ctx->table_id = NULL; + } + ctx->new_rtable = NULL; ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table + ctx->new_rtable->updates = 0; // init count of entries received if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" ); } break; - case 'm': // mse entry or one of the meid_ records + case 'm': // mse entry or one of the meid_ records if( strcmp( tokens[0], "mse" ) == 0 ) { if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently break; @@ -673,7 +828,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel ); ctx->new_rtable->updates++; } else { - meid_parser( ctx, tokens, ntoks, vlevel ); + meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel ); } break; @@ -730,6 +885,13 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { uta_rt_drop( ctx->new_rtable ); } + if( ntoks >2 ) { + if( ctx->table_id != NULL ) { + free( ctx->table_id ); + } + ctx->table_id = strdup( clip( tokens[2] ) ); + } + ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries) ctx->new_rtable->updates = 0; // init count of updates received if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" ); @@ -776,7 +938,8 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) { } } - for( rec = fbuf; rec && *rec; rec = eor+1 ) { + rec = fbuf; + while( rec && *rec ) { rcount++; if( (eor = strchr( rec, '\n' )) != NULL ) { *eor = 0; @@ -787,7 +950,9 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) { return; } - parse_rt_rec( ctx, rec, vlevel ); + parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack + + rec = eor+1; } if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount ); @@ -1114,4 +1279,18 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) { return key; } +/* + Given a route table and meid string, find the owner (if known). Returns a pointer to + the endpoint struct or nil. +*/ +static inline endpoint_t* get_meid_owner( route_table_t *rt, char* meid ) { + endpoint_t* ep; // the ep we found in the hash + + if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) { + return NULL; + } + + return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); +} + #endif