2 ==================================================================================
4 Copyright (c) 2019-2020 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
21 #include "xapp_rmr.hpp"
23 #define RMR_MAX_XID 32
25 XappRmr::XappRmr(std::string port, int rmrattempts){
28 _nattempts = rmrattempts;
30 _xapp_received_buff = NULL;
31 _xapp_send_buff =NULL;
32 _rmr_is_ready = false;
37 XappRmr::~XappRmr(void){
39 if(_xapp_received_buff)
40 rmr_free_msg(_xapp_received_buff);
43 rmr_free_msg(_xapp_send_buff);
46 rmr_close(_xapp_rmr_ctx);
51 void XappRmr::xapp_rmr_init(bool rmr_listen){
54 // Initialize the RMR context
55 _xapp_rmr_ctx = rmr_init(const_cast<char*>(_proto_port.c_str()), RMR_MAX_RCV_BYTES, RMRFL_NONE);
57 if ( _xapp_rmr_ctx == NULL){
58 mdclog_write(MDCLOG_ERR,"Error Initializing RMR, file= %s, line=%d",__FILE__,__LINE__);
60 while( ! rmr_ready(_xapp_rmr_ctx) ) {
61 mdclog_write(MDCLOG_INFO,">>> waiting for RMR, file= %s, line=%d",__FILE__,__LINE__);
65 mdclog_write(MDCLOG_INFO,"RMR Context is Ready, file= %s, line=%d",__FILE__,__LINE__);
67 //Set the listener requirement
73 bool XappRmr::rmr_header(xapp_rmr_header *hdr){
75 _xapp_send_buff->mtype = hdr->message_type;
76 _xapp_send_buff->len = hdr->payload_length;
77 _xapp_send_buff->sub_id = -1;
78 rmr_str2meid(_xapp_send_buff, hdr->meid);
79 rmr_str2xact(_xapp_send_buff, hdr->meid);
81 mdclog_write(MDCLOG_INFO,"hdr->meid = %s",hdr->meid);
86 //RMR Send with payload and header.
87 bool XappRmr::xapp_rmr_send(xapp_rmr_header *hdr, void *payload){
90 std::thread::id my_id = std::this_thread::get_id();
91 std::stringstream thread_id;
95 mdclog_write(MDCLOG_INFO, "Sending thread %s", thread_id.str().c_str());
98 int rmr_attempts = _nattempts;
100 if( _xapp_send_buff == NULL ) {
101 _xapp_send_buff = rmr_alloc_msg(_xapp_rmr_ctx, RMR_DEF_SIZE);
104 bool res = rmr_header(hdr);
106 mdclog_write(MDCLOG_ERR,"RMR HEADERS were incorrectly populated, file= %s, line=%d",__FILE__,__LINE__);
110 mdclog_write(MDCLOG_INFO,"------ start of Xid updated, file= %s, line=%d",__FILE__,__LINE__);
111 int test_support_xact_count = rand();
112 char *xid = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
113 memset(xid, '\0',RMR_MAX_SRC);
114 snprintf(xid, RMR_MAX_XID, "%010d", test_support_xact_count );
116 mdclog_write(MDCLOG_INFO,"before xapp_send_buff Xid=%s, file= %s, line=%d",xid,__FILE__,__LINE__);
117 memcpy(_xapp_send_buff->xaction, xid, RMR_MAX_XID);
119 mdclog_write(MDCLOG_INFO,"Xid=%s, file= %s, line=%d",_xapp_send_buff->xaction,__FILE__,__LINE__);
121 memcpy(_xapp_send_buff->payload, payload, hdr->payload_length);
122 _xapp_send_buff->len = hdr->payload_length;
125 mdclog_write(MDCLOG_ERR,"RMR Context is Not Ready in SENDER, file= %s, line=%d",__FILE__,__LINE__);
129 while(rmr_attempts > 0){
131 _xapp_send_buff = rmr_send_msg(_xapp_rmr_ctx,_xapp_send_buff);
132 if(!_xapp_send_buff) {
133 mdclog_write(MDCLOG_ERR,"Error In Sending Message , file= %s, line=%d, attempt=%d",__FILE__,__LINE__,rmr_attempts);
136 else if (_xapp_send_buff->state == RMR_OK){
137 mdclog_write(MDCLOG_INFO,"Message Sent: RMR State = RMR_OK");
138 mdclog_write(MDCLOG_INFO,"_xapp_send_buff->xaction: %s",_xapp_send_buff->xaction);
140 _xapp_send_buff = NULL;
145 mdclog_write(MDCLOG_INFO,"Need to retry RMR: state=%d, attempt=%d, file=%s, line=%d",_xapp_send_buff->state, rmr_attempts,__FILE__,__LINE__);
146 if(_xapp_send_buff->state == RMR_ERR_RETRY){
155 //----------------------------------------
156 // Some get/set methods
157 //---------------------------------------
158 bool XappRmr::get_listen(void){
163 void XappRmr::set_listen(bool listen){
167 int XappRmr::get_is_ready(void){
168 return _rmr_is_ready;
171 bool XappRmr::get_isRunning(void){
176 void * XappRmr::get_rmr_context(void){
177 return _xapp_rmr_ctx;
181 void init_logger(const char *AppName, mdclog_severity_t log_level)
184 mdclog_attr_init(&attr);
185 mdclog_attr_set_ident(attr, AppName);
187 mdclog_level_set(log_level);
188 mdclog_attr_destroy(attr);