ISSUE ID:- (RICAPP-176).
[ric-app/bouncer.git] / Bouncer / src / xapp-utils / xapp_rmr.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2019-2020 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18  */
19
20
21 #include "xapp_rmr.hpp"
22 #include <stdlib.h>
23 #define  RMR_MAX_XID 32
24
25 XappRmr::XappRmr(std::string port, int rmrattempts){
26
27         _proto_port = port;
28         _nattempts = rmrattempts;
29         _xapp_rmr_ctx = NULL;
30         _xapp_received_buff = NULL;
31         _xapp_send_buff =NULL;
32         _rmr_is_ready = false;
33         _listen = false;
34
35 };
36
37 XappRmr::~XappRmr(void){
38         // free memory
39         if(_xapp_received_buff)
40                 rmr_free_msg(_xapp_received_buff);
41
42         if(_xapp_send_buff)
43                 rmr_free_msg(_xapp_send_buff);
44
45         if (_xapp_rmr_ctx){
46                 rmr_close(_xapp_rmr_ctx);
47         }
48 };
49
50 //Get RMR Context.
51 void XappRmr::xapp_rmr_init(bool rmr_listen){
52
53
54         // Initialize the RMR context
55         _xapp_rmr_ctx = rmr_init(const_cast<char*>(_proto_port.c_str()), RMR_MAX_RCV_BYTES, RMRFL_NONE);
56
57         if ( _xapp_rmr_ctx == NULL){
58                 mdclog_write(MDCLOG_ERR,"Error Initializing RMR, file= %s, line=%d",__FILE__,__LINE__);
59         }
60         while( ! rmr_ready(_xapp_rmr_ctx) ) {
61                 mdclog_write(MDCLOG_INFO,">>> waiting for RMR, file= %s, line=%d",__FILE__,__LINE__);
62                 sleep(1);
63         }
64         _rmr_is_ready = true;
65         mdclog_write(MDCLOG_INFO,"RMR Context is Ready, file= %s, line=%d",__FILE__,__LINE__);
66
67         //Set the listener requirement
68         _listen = rmr_listen;
69         return;
70
71 }
72
73 bool XappRmr::rmr_header(xapp_rmr_header *hdr){
74
75         _xapp_send_buff->mtype  = hdr->message_type;
76         _xapp_send_buff->len = hdr->payload_length;
77         _xapp_send_buff->sub_id = -1;
78         rmr_str2meid(_xapp_send_buff, hdr->meid);
79         rmr_str2xact(_xapp_send_buff, hdr->meid);
80             
81         mdclog_write(MDCLOG_INFO,"hdr->meid = %s",hdr->meid);
82
83         return true;
84 }
85
86 //RMR Send with payload and header.
87 bool XappRmr::xapp_rmr_send(xapp_rmr_header *hdr, void *payload){
88
89         // Get the thread id
90         std::thread::id my_id = std::this_thread::get_id();
91         std::stringstream thread_id;
92         std::stringstream ss;
93
94         thread_id << my_id;
95         mdclog_write(MDCLOG_INFO, "Sending thread %s",  thread_id.str().c_str());
96
97
98         int rmr_attempts = _nattempts;
99
100         if( _xapp_send_buff == NULL ) {
101                 _xapp_send_buff = rmr_alloc_msg(_xapp_rmr_ctx, RMR_DEF_SIZE);
102         }
103
104         bool res = rmr_header(hdr);
105         if(!res){
106                 mdclog_write(MDCLOG_ERR,"RMR HEADERS were incorrectly populated, file= %s, line=%d",__FILE__,__LINE__);
107                 return false;
108         }
109
110         mdclog_write(MDCLOG_INFO,"------ start of Xid updated, file= %s, line=%d",__FILE__,__LINE__);
111         int test_support_xact_count = rand();
112         char *xid = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
113         memset(xid, '\0',RMR_MAX_SRC);
114         snprintf(xid, RMR_MAX_XID, "%010d", test_support_xact_count );
115  
116         mdclog_write(MDCLOG_INFO,"before xapp_send_buff Xid=%s, file= %s, line=%d",xid,__FILE__,__LINE__);
117         memcpy(_xapp_send_buff->xaction, xid, RMR_MAX_XID);
118
119         mdclog_write(MDCLOG_INFO,"Xid=%s, file= %s, line=%d",_xapp_send_buff->xaction,__FILE__,__LINE__);
120
121         memcpy(_xapp_send_buff->payload, payload, hdr->payload_length);
122         _xapp_send_buff->len = hdr->payload_length;
123
124         if(!_rmr_is_ready) {
125                 mdclog_write(MDCLOG_ERR,"RMR Context is Not Ready in SENDER, file= %s, line=%d",__FILE__,__LINE__);
126                 return false;
127         }
128
129         while(rmr_attempts > 0){
130
131                 _xapp_send_buff = rmr_send_msg(_xapp_rmr_ctx,_xapp_send_buff);
132                 if(!_xapp_send_buff) {
133                         mdclog_write(MDCLOG_ERR,"Error In Sending Message , file= %s, line=%d, attempt=%d",__FILE__,__LINE__,rmr_attempts);
134                         rmr_attempts--;
135                 }
136                 else if (_xapp_send_buff->state == RMR_OK){
137                         mdclog_write(MDCLOG_INFO,"Message Sent: RMR State = RMR_OK");
138                 mdclog_write(MDCLOG_INFO,"_xapp_send_buff->xaction: %s",_xapp_send_buff->xaction);
139                         rmr_attempts = 0;
140                         _xapp_send_buff = NULL;
141                         return true;
142                 }
143                 else
144                 {
145                         mdclog_write(MDCLOG_INFO,"Need to retry RMR: state=%d, attempt=%d, file=%s, line=%d",_xapp_send_buff->state, rmr_attempts,__FILE__,__LINE__);
146                         if(_xapp_send_buff->state == RMR_ERR_RETRY){
147                                 usleep(1);                      }
148                                 rmr_attempts--;
149                 }
150                 sleep(1);
151         }
152         return false;
153 }
154
155 //----------------------------------------
156 // Some get/set methods
157 //---------------------------------------
158 bool XappRmr::get_listen(void){
159   return _listen;
160 }
161
162
163 void XappRmr::set_listen(bool listen){
164   _listen = listen;
165 }
166
167 int XappRmr::get_is_ready(void){
168   return _rmr_is_ready;
169 }
170
171 bool XappRmr::get_isRunning(void){
172   return _listen;
173 }
174
175
176 void * XappRmr::get_rmr_context(void){
177   return _xapp_rmr_ctx;
178 }
179
180
181 void init_logger(const char  *AppName, mdclog_severity_t log_level)
182 {
183     mdclog_attr_t *attr;
184     mdclog_attr_init(&attr);
185     mdclog_attr_set_ident(attr, AppName);
186     mdclog_init(attr);
187     mdclog_level_set(log_level);
188     mdclog_attr_destroy(attr);
189 }
190