Merge "version 4.0.5 Fix bug in reading the PLMN-ID (wrong byte reading). Add extra...
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include "sctpThread.h"
24 #include "BuildRunName.h"
25
26 using namespace std;
27 //using namespace std::placeholders;
28 using namespace boost::filesystem;
29
30 //#ifdef __cplusplus
31 //extern "C"
32 //{
33 //#endif
34
35 // need to expose without the include of gcov
36 extern "C" void __gcov_flush(void);
37
38 static void catch_function(int signal) {
39     __gcov_flush();
40     exit(signal);
41 }
42
43
44 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
45
46 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
47 double cpuClock = 0.0;
48 bool jsonTrace = true;
49
50 void init_log() {
51     mdclog_attr_t *attr;
52     mdclog_attr_init(&attr);
53     mdclog_attr_set_ident(attr, "E2Terminator");
54     mdclog_init(attr);
55     mdclog_attr_destroy(attr);
56 }
57 auto start_time = std::chrono::high_resolution_clock::now();
58 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
59
60 double age() {
61     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
62 }
63
64 double approx_CPU_MHz(unsigned sleeptime) {
65     using namespace std::chrono_literals;
66     uint32_t aux = 0;
67     uint64_t cycles_start = rdtscp(aux);
68     double time_start = age();
69     std::this_thread::sleep_for(sleeptime * 1ms);
70     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
71     double elapsed_time = age() - time_start;
72     return elapsed_cycles / elapsed_time;
73 }
74
75 //std::atomic<int64_t> rmrCounter{0};
76 std::atomic<int64_t> num_of_messages{0};
77 std::atomic<int64_t> num_of_XAPP_messages{0};
78 static long transactionCounter = 0;
79
80 int buildListeningPort(sctp_params_t &sctpParams) {
81     sctpParams.listenFD = socket (AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
82     struct sockaddr_in6 servaddr {};
83     servaddr.sin6_family = AF_INET6;
84     servaddr.sin6_addr   = in6addr_any;
85     servaddr.sin6_port = htons(sctpParams.sctpPort);
86     if (bind(sctpParams.listenFD, (SA *)&servaddr, sizeof(servaddr)) < 0 ) {
87         mdclog_write(MDCLOG_ERR, "Error binding. %s\n", strerror(errno));
88         return -1;
89     }
90     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
91         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
92         return -1;
93     }
94     if (mdclog_level_get() >= MDCLOG_DEBUG) {
95         struct sockaddr_in6 cliaddr {};
96         socklen_t len = sizeof(cliaddr);
97         getsockname(sctpParams.listenFD, (SA *)&cliaddr, &len);
98         char buff[1024] {};
99         inet_ntop(AF_INET6, &cliaddr.sin6_addr, buff, sizeof(buff));
100         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(cliaddr.sin6_port));
101     }
102
103     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
104         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
105         return -1;
106     }
107     struct epoll_event event {};
108     event.events = EPOLLIN | EPOLLET;
109     event.data.fd = sctpParams.listenFD;
110
111     // add listening port to epoll
112     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
113         printf("Failed to add descriptor to epoll\n");
114         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
115         return -1;
116     }
117
118     return 0;
119 }
120
121 int buildConfiguration(sctp_params_t &sctpParams) {
122     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
123     if (exists(p)) {
124         const int size = 2048;
125         auto fileSize = file_size(p);
126         if (fileSize > size) {
127             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
128             return -1;
129         }
130     } else {
131         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
132         return -1;
133     }
134
135     ReadConfigFile conf;
136     if (conf.openConfigFile(p.string()) == -1) {
137         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
138                      p.string().c_str(), strerror(errno));
139         return -1;
140     }
141     int rmrPort = conf.getIntValue("nano");
142     if (rmrPort == -1) {
143         mdclog_write(MDCLOG_ERR, "illigal RMR port ");
144         return -1;
145     }
146     sctpParams.rmrPort = (uint16_t)rmrPort;
147     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
148
149     auto tmpStr = conf.getStringValue("loglevel");
150     if (tmpStr.length() == 0) {
151         mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
152         tmpStr = "info";
153     }
154     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
155
156     if ((tmpStr.compare("debug")) == 0) {
157         sctpParams.logLevel = MDCLOG_DEBUG;
158     } else if ((tmpStr.compare("info")) == 0) {
159         sctpParams.logLevel = MDCLOG_INFO;
160     } else if ((tmpStr.compare("warning")) == 0) {
161         sctpParams.logLevel = MDCLOG_WARN;
162     } else if ((tmpStr.compare("error")) == 0) {
163         sctpParams.logLevel = MDCLOG_ERR;
164     } else {
165         mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
166         sctpParams.logLevel = MDCLOG_INFO;
167     }
168     mdclog_level_set(sctpParams.logLevel);
169
170     tmpStr = conf.getStringValue("volume");
171     if (tmpStr.length() == 0) {
172         mdclog_write(MDCLOG_ERR, "illigal volume.");
173         return -1;
174     }
175
176     char tmpLogFilespec[VOLUME_URL_SIZE];
177     tmpLogFilespec[0] = 0;
178     sctpParams.volume[0] = 0;
179     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
180     // copy the name to temp file as well
181     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
182
183
184     // define the file name in the tmp directory under the volume
185     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
186
187     sctpParams.myIP = conf.getStringValue("local-ip");
188     if (sctpParams.myIP.length() == 0) {
189         mdclog_write(MDCLOG_ERR, "illigal local-ip.");
190         return -1;
191     }
192
193     int sctpPort = conf.getIntValue("sctp-port");
194     if (sctpPort == -1) {
195         mdclog_write(MDCLOG_ERR, "illigal SCTP port ");
196         return -1;
197     }
198     sctpParams.sctpPort = (uint16_t)sctpPort;
199
200     sctpParams.fqdn = conf.getStringValue("external-fqdn");
201     if (sctpParams.fqdn.length() == 0) {
202         mdclog_write(MDCLOG_ERR, "illigal external-fqdn");
203         return -1;
204     }
205
206     std::string pod = conf.getStringValue("pod_name");
207     if (pod.length() == 0) {
208         mdclog_write(MDCLOG_ERR, "illigal pod_name in config file");
209         return -1;
210     }
211     auto *podName = getenv(pod.c_str());
212     if (podName == nullptr) {
213         mdclog_write(MDCLOG_ERR, "illigal pod_name or environment varible not exists : %s", pod.c_str());
214         return -1;
215
216     } else {
217         sctpParams.podName.assign(podName);
218         if (sctpParams.podName.length() == 0) {
219             mdclog_write(MDCLOG_ERR, "illigal pod_name");
220             return -1;
221         }
222     }
223
224     tmpStr = conf.getStringValue("trace");
225     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
226     if ((tmpStr.compare("start")) == 0) {
227         mdclog_write(MDCLOG_INFO, "Trace set to: start");
228         sctpParams.trace = true;
229     } else if ((tmpStr.compare("stop")) == 0) {
230         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
231         sctpParams.trace = false;
232     }
233     jsonTrace = sctpParams.trace;
234
235     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, KA_MESSAGE_SIZE, "{\"address\": \"%s:%d\","
236                                                                                     "\"fqdn\": \"%s\","
237                                                                                     "\"pod_name\": \"%s\"}",
238                                             (const char *)sctpParams.myIP.c_str(),
239                                             sctpParams.rmrPort,
240                                             sctpParams.fqdn.c_str(),
241                                             sctpParams.podName.c_str());
242
243     if (mdclog_level_get() >= MDCLOG_INFO) {
244         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
245         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
246         mdclog_mdc_add("volume", sctpParams.volume);
247         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
248         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
249         mdclog_mdc_add("pod name", sctpParams.podName.c_str());
250
251         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
252     }
253     mdclog_mdc_clean();
254
255     // Files written to the current working directory
256     boostLogger = logging::add_file_log(
257             keywords::file_name = tmpLogFilespec, // to temp directory
258             keywords::rotation_size = 10 * 1024 * 1024,
259             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
260             keywords::format = "%Message%"
261             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
262     );
263
264     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
265     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
266             keywords::target = sctpParams.volume
267     ));
268
269     // Upon restart, scan the directory for files matching the file_name pattern
270     boostLogger->locked_backend()->scan_for_files();
271
272     // Enable auto-flushing after each tmpStr record written
273     if (mdclog_level_get() >= MDCLOG_DEBUG) {
274         boostLogger->locked_backend()->auto_flush(true);
275     }
276
277     return 0;
278 }
279
280
281
282 int main(const int argc, char **argv) {
283     sctp_params_t sctpParams;
284
285     {
286         std::random_device device{};
287         std::mt19937 generator(device());
288         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
289         transactionCounter = distribution(generator);
290     }
291
292 //    uint64_t st = 0;
293 //    uint32_t aux1 = 0;
294 //   st = rdtscp(aux1);
295
296     unsigned num_cpus = std::thread::hardware_concurrency();
297     init_log();
298     mdclog_level_set(MDCLOG_INFO);
299
300     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
301         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
302         exit(1);
303     }
304     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
305         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
306         exit(1);
307     }
308     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
309         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
310         exit(1);
311     }
312
313     cpuClock = approx_CPU_MHz(100);
314
315     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
316
317     auto result = parse(argc, argv, sctpParams);
318
319     if (buildConfiguration(sctpParams) != 0) {
320         exit(-1);
321     }
322
323     // start epoll
324     sctpParams.epoll_fd = epoll_create1(0);
325     if (sctpParams.epoll_fd == -1) {
326         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
327         exit(-1);
328     }
329
330     getRmrContext(sctpParams);
331     if (sctpParams.rmrCtx == nullptr) {
332         close(sctpParams.epoll_fd);
333         exit(-1);
334     }
335
336     if (buildInotify(sctpParams) == -1) {
337         close(sctpParams.rmrListenFd);
338         rmr_close(sctpParams.rmrCtx);
339         close(sctpParams.epoll_fd);
340         exit(-1);
341     }
342
343     if (buildListeningPort(sctpParams) != 0) {
344         close(sctpParams.rmrListenFd);
345         rmr_close(sctpParams.rmrCtx);
346         close(sctpParams.epoll_fd);
347         exit(-1);
348     }
349
350     sctpParams.sctpMap = new mapWrapper();
351
352     std::vector<std::thread> threads(num_cpus);
353 //    std::vector<std::thread> threads;
354
355     num_cpus = 1;
356     for (unsigned int i = 0; i < num_cpus; i++) {
357         threads[i] = std::thread(listener, &sctpParams);
358
359         cpu_set_t cpuset;
360         CPU_ZERO(&cpuset);
361         CPU_SET(i, &cpuset);
362         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
363         if (rc != 0) {
364             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
365         }
366     }
367
368     auto statFlag = false;
369     auto statThread = std::thread(statColectorThread, (void *)&statFlag);
370
371     //loop over term_init until first message from xApp
372     handleTermInit(sctpParams);
373
374     for (auto &t : threads) {
375         t.join();
376     }
377
378     statFlag = true;
379     statThread.join();
380
381     return 0;
382 }
383
384 void handleTermInit(sctp_params_t &sctpParams) {
385     sendTermInit(sctpParams);
386     //send to e2 manager init of e2 term
387     //E2_TERM_INIT
388
389     int count = 0;
390     while (true) {
391         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
392         if (xappMessages > 0) {
393             if (mdclog_level_get() >=  MDCLOG_INFO) {
394                 mdclog_write(MDCLOG_INFO, "Got a message from some appliction, stop sending E2_TERM_INIT");
395             }
396             return;
397         }
398         usleep(100000);
399         count++;
400         if (count % 1000 == 0) {
401             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
402             sendTermInit(sctpParams);
403         }
404     }
405 }
406
407 void sendTermInit(sctp_params_t &sctpParams) {
408     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
409     auto count = 0;
410     while (true) {
411         msg->mtype = E2_TERM_INIT;
412         msg->state = 0;
413         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
414         static unsigned char tx[32];
415         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
416         rmr_bytes2xact(msg, tx, txLen);
417         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
418         if (msg == nullptr) {
419             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.myIP.length());
420         } else if (msg->state == 0) {
421             rmr_free_msg(msg);
422             if (mdclog_level_get() >=  MDCLOG_INFO) {
423                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT succsesfuly sent ");
424             }
425             return;
426         } else {
427             if (count % 100 == 0) {
428                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %s ", translateRmrErrorMessages(msg->state).c_str());
429             }
430             sleep(1);
431         }
432         count++;
433     }
434 }
435
436 /**
437  *
438  * @param argc
439  * @param argv
440  * @param sctpParams
441  * @return
442  */
443 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
444     cxxopts::Options options(argv[0], "e2 term help");
445     options.positional_help("[optional args]").show_positional_help();
446     options.allow_unrecognised_options().add_options()
447             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
448             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
449             ("h,help", "Print help");
450
451     auto result = options.parse(argc, argv);
452
453     if (result.count("help")) {
454         std::cout << options.help({""}) << std::endl;
455         exit(0);
456     }
457     return result;
458 }
459
460 /**
461  *
462  * @param sctpParams
463  * @return -1 failed 0 success
464  */
465 int buildInotify(sctp_params_t &sctpParams) {
466     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
467     if (sctpParams.inotifyFD == -1) {
468         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
469         close(sctpParams.rmrListenFd);
470         rmr_close(sctpParams.rmrCtx);
471         close(sctpParams.epoll_fd);
472         return -1;
473     }
474
475     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
476                                              (const char *)sctpParams.configFilePath.c_str(),
477                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
478     if (sctpParams.inotifyWD == -1) {
479         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
480                      sctpParams.configFilePath.c_str(),
481                      strerror(errno));
482         close(sctpParams.inotifyFD);
483         return -1;
484     }
485
486     struct epoll_event event{};
487     event.events = (EPOLLIN);
488     event.data.fd = sctpParams.inotifyFD;
489     // add listening RMR FD to epoll
490     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
491         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
492         close(sctpParams.inotifyFD);
493         return -1;
494     }
495     return 0;
496 }
497
498 /**
499  *
500  * @param args
501  * @return
502  */
503 void listener(sctp_params_t *params) {
504     int num_of_SCTP_messages = 0;
505     auto totalTime = 0.0;
506     mdclog_mdc_clean();
507     mdclog_level_set(params->logLevel);
508
509     std::thread::id this_id = std::this_thread::get_id();
510     //save cout
511     streambuf *oldCout = cout.rdbuf();
512     ostringstream memCout;
513     // create new cout
514     cout.rdbuf(memCout.rdbuf());
515     cout << this_id;
516     //return to the normal cout
517     cout.rdbuf(oldCout);
518
519     char tid[32];
520     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
521     tid[memCout.str().length()] = 0;
522     mdclog_mdc_add("thread id", tid);
523
524     if (mdclog_level_get() >= MDCLOG_DEBUG) {
525         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
526     }
527
528
529     RmrMessagesBuffer_t rmrMessageBuffer{};
530     //create and init RMR
531     rmrMessageBuffer.rmrCtx = params->rmrCtx;
532
533     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
534     struct timespec end{0, 0};
535     struct timespec start{0, 0};
536
537     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
538     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
539
540     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
541     rmrMessageBuffer.ka_message_len = params->ka_message_length;
542     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
543
544     if (mdclog_level_get() >= MDCLOG_DEBUG) {
545         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
546     }
547
548     ReportingMessages_t message {};
549
550     for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
551         rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
552         rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
553     }
554
555     message.statCollector = StatCollector::GetInstance();
556
557     while (true) {
558         if (mdclog_level_get() >= MDCLOG_DEBUG) {
559             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
560         }
561         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
562         if (numOfEvents < 0 && errno == EINTR) {
563             if (mdclog_level_get() >= MDCLOG_DEBUG) {
564                 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
565             }
566             continue;
567         }
568         if (numOfEvents < 0) {
569             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
570             return;
571         }
572         for (auto i = 0; i < numOfEvents; i++) {
573             if (mdclog_level_get() >= MDCLOG_DEBUG) {
574                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
575             }
576             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
577             start.tv_sec = message.message.time.tv_sec;
578             start.tv_nsec = message.message.time.tv_nsec;
579
580
581             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
582                 handlepoll_error(events[i], message, rmrMessageBuffer, params);
583             } else if (events[i].events & EPOLLOUT) {
584                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params);
585             } else if (params->listenFD == events[i].data.fd) {
586                 if (mdclog_level_get() >= MDCLOG_INFO) {
587                     mdclog_write(MDCLOG_INFO, "New connection request from sctp network\n");
588                 }
589                 // new connection is requested from RAN  start build connection
590                 while (true) {
591                     struct sockaddr in_addr {};
592                     socklen_t in_len;
593                     char hostBuff[NI_MAXHOST];
594                     char portBuff[NI_MAXSERV];
595
596                     in_len = sizeof(in_addr);
597                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
598                     peerInfo->sctpParams = params;
599                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
600                     if (peerInfo->fileDescriptor == -1) {
601                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
602                             /* We have processed all incoming connections. */
603                             break;
604                         } else {
605                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
606                             break;
607                         }
608                     }
609                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
610                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
611                         close(peerInfo->fileDescriptor);
612                         break;
613                     }
614                     auto  ans = getnameinfo(&in_addr, in_len,
615                                             peerInfo->hostName, NI_MAXHOST,
616                                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
617                     if (ans < 0) {
618                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
619                         close(peerInfo->fileDescriptor);
620                         break;
621                     }
622                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
623                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
624                     }
625                     peerInfo->isConnected = false;
626                     peerInfo->gotSetup = false;
627                     if (addToEpoll(params->epoll_fd,
628                                    peerInfo,
629                                    (EPOLLIN | EPOLLET),
630                                    params->sctpMap, nullptr,
631                                    0) != 0) {
632                         break;
633                     }
634                     break;
635                 }
636             } else if (params->rmrListenFd == events[i].data.fd) {
637                 // got message from XAPP
638                 num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
639                 num_of_messages.fetch_add(1, std::memory_order_release);
640                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
641                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
642                 }
643                 if (receiveXappMessages(params->sctpMap,
644                                         rmrMessageBuffer,
645                                         message.message.time) != 0) {
646                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
647                 }
648             } else if (params->inotifyFD == events[i].data.fd) {
649                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
650                 handleConfigChange(params);
651             } else {
652                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
653                  * We must read whatever data is available completely, as we are running
654                  *  in edge-triggered mode and won't get a notification again for the same data. */
655                 num_of_messages.fetch_add(1, std::memory_order_release);
656                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
657                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
658                 }
659                 receiveDataFromSctp(&events[i],
660                                     params->sctpMap,
661                                     num_of_SCTP_messages,
662                                     rmrMessageBuffer,
663                                     message.message.time);
664             }
665
666             clock_gettime(CLOCK_MONOTONIC, &end);
667             if (mdclog_level_get() >= MDCLOG_INFO) {
668                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
669                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
670             }
671             if (mdclog_level_get() >= MDCLOG_DEBUG) {
672                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
673                              end.tv_sec - start.tv_sec,
674                              end.tv_nsec - start.tv_nsec);
675             }
676         }
677     }
678 }
679
680 /**
681  *
682  * @param sctpParams
683  */
684 void handleConfigChange(sctp_params_t *sctpParams) {
685     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
686     const struct inotify_event *event;
687     char *ptr;
688
689     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
690     auto endlessLoop = true;
691     while (endlessLoop) {
692         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
693         if (len == -1) {
694             if (errno != EAGAIN) {
695                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
696                 endlessLoop = false;
697                 continue;
698             }
699             else {
700                 endlessLoop = false;
701                 continue;
702             }
703         }
704
705         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
706             event = (const struct inotify_event *)ptr;
707             if (event->mask & (uint32_t)IN_ISDIR) {
708                 continue;
709             }
710
711             // the directory name
712             if (sctpParams->inotifyWD == event->wd) {
713                 // not the directory
714             }
715             if (event->len) {
716                 if (!(sctpParams->configFileName.compare(event->name))) {
717                     continue;
718                 }
719             }
720             // only the file we want
721             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
722                 if (exists(p)) {
723                     const int size = 2048;
724                     auto fileSize = file_size(p);
725                     if (fileSize > size) {
726                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
727                         return;
728                     }
729                 } else {
730                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
731                     return;
732                 }
733
734                 ReadConfigFile conf;
735                 if (conf.openConfigFile(p.string()) == -1) {
736                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
737                                  p.string().c_str(), strerror(errno));
738                     return;
739                 }
740
741                 auto tmpStr = conf.getStringValue("loglevel");
742                 if (tmpStr.length() == 0) {
743                     mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
744                     tmpStr = "info";
745                 }
746                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
747
748                 if ((tmpStr.compare("debug")) == 0) {
749                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
750                     sctpParams->logLevel = MDCLOG_DEBUG;
751                 } else if ((tmpStr.compare("info")) == 0) {
752                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
753                     sctpParams->logLevel = MDCLOG_INFO;
754                 } else if ((tmpStr.compare("warning")) == 0) {
755                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
756                     sctpParams->logLevel = MDCLOG_WARN;
757                 } else if ((tmpStr.compare("error")) == 0) {
758                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
759                     sctpParams->logLevel = MDCLOG_ERR;
760                 } else {
761                     mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
762                     sctpParams->logLevel = MDCLOG_INFO;
763                 }
764                 mdclog_level_set(sctpParams->logLevel);
765
766
767                 tmpStr = conf.getStringValue("trace");
768                 if (tmpStr.length() == 0) {
769                     mdclog_write(MDCLOG_ERR, "illigal trace. Set trace to stop");
770                     tmpStr = "stop";
771                 }
772
773                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
774                 if ((tmpStr.compare("start")) == 0) {
775                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
776                     sctpParams->trace = true;
777                 } else if ((tmpStr.compare("stop")) == 0) {
778                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
779                     sctpParams->trace = false;
780                 } else {
781                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
782                     sctpParams->trace = false;
783                 }
784                 jsonTrace = sctpParams->trace;
785                 endlessLoop = false;
786             }
787         }
788     }
789 }
790
791 /**
792  *
793  * @param event
794  * @param message
795  * @param rmrMessageBuffer
796  * @param params
797  */
798 void handleEinprogressMessages(struct epoll_event &event,
799                                ReportingMessages_t &message,
800                                RmrMessagesBuffer_t &rmrMessageBuffer,
801                                sctp_params_t *params) {
802     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
803     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
804
805     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
806     auto retVal = 0;
807     socklen_t retValLen = 0;
808     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
809     if (rc != 0 || retVal != 0) {
810         if (rc != 0) {
811             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
812                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
813                                                          peerInfo->enodbName, strerror(errno));
814         } else if (retVal != 0) {
815             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
816                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
817                                                          peerInfo->enodbName);
818         }
819
820         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
821         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
822         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
823         message.message.direction = 'N';
824         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
825             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
826         }
827         memset(peerInfo->asnData, 0, peerInfo->asnLength);
828         peerInfo->asnLength = 0;
829         peerInfo->mtype = 0;
830         return;
831     }
832
833     peerInfo->isConnected = true;
834
835     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
836                       peerInfo->mtype) != 0) {
837         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
838         return;
839     }
840
841     message.message.asndata = (unsigned char *)peerInfo->asnData;
842     message.message.asnLength = peerInfo->asnLength;
843     message.message.messageType = peerInfo->mtype;
844     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
845     num_of_messages.fetch_add(1, std::memory_order_release);
846     if (mdclog_level_get() >= MDCLOG_DEBUG) {
847         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
848                      message.message.enodbName);
849     }
850     if (sendSctpMsg(peerInfo, message, params->sctpMap) != 0) {
851         if (mdclog_level_get() >= MDCLOG_DEBUG) {
852             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
853         }
854         return;
855     }
856
857     memset(peerInfo->asnData, 0, peerInfo->asnLength);
858     peerInfo->asnLength = 0;
859     peerInfo->mtype = 0;
860 }
861
862
863 void handlepoll_error(struct epoll_event &event,
864                       ReportingMessages_t &message,
865                       RmrMessagesBuffer_t &rmrMessageBuffer,
866                       sctp_params_t *params) {
867     if (event.data.fd != params->rmrListenFd) {
868         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
869         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
870                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
871
872         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
873                                                      "%s|Failed SCTP Connection",
874                                                      peerInfo->enodbName);
875         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
876         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
877
878         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
879         message.message.direction = 'N';
880         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
881             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
882         }
883
884         close(peerInfo->fileDescriptor);
885         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
886     } else {
887         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
888     }
889 }
890 /**
891  *
892  * @param socket
893  * @return
894  */
895 int setSocketNoBlocking(int socket) {
896     auto flags = fcntl(socket, F_GETFL, 0);
897
898     if (flags == -1) {
899         mdclog_mdc_add("func", "fcntl");
900         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
901         mdclog_mdc_clean();
902         return -1;
903     }
904
905     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
906     if (fcntl(socket, F_SETFL, flags) == -1) {
907         mdclog_mdc_add("func", "fcntl");
908         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
909         mdclog_mdc_clean();
910         return -1;
911     }
912
913     return 0;
914 }
915
916 /**
917  *
918  * @param val
919  * @param m
920  */
921 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
922     char *dummy;
923     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
924     char searchBuff[2048]{};
925
926     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
927     m->erase(searchBuff);
928
929     m->erase(val->enodbName);
930     free(val);
931 }
932
933 /**
934  *
935  * @param fd file discriptor
936  * @param data the asn data to send
937  * @param len  length of the data
938  * @param enodbName the enodbName as in the map for printing purpose
939  * @param m map host information
940  * @param mtype message number
941  * @return 0 success, anegative number on fail
942  */
943 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
944     auto loglevel = mdclog_level_get();
945     int fd = peerInfo->fileDescriptor;
946     if (loglevel >= MDCLOG_DEBUG) {
947         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
948                      message.message.enodbName, __FUNCTION__);
949     }
950
951     while (true) {
952         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
953             if (errno == EINTR) {
954                 continue;
955             }
956             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
957             if (!peerInfo->isConnected) {
958                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
959                 return -1;
960             }
961             cleanHashEntry(peerInfo, m);
962             close(fd);
963             char key[MAX_ENODB_NAME_SIZE * 2];
964             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
965                      message.message.messageType);
966             if (loglevel >= MDCLOG_DEBUG) {
967                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
968             }
969             auto tmp = m->find(key);
970             if (tmp) {
971                 free(tmp);
972             }
973             m->erase(key);
974             return -1;
975         }
976         message.statCollector->incSentMessage(string(message.message.enodbName));
977         message.message.direction = 'D';
978         // send report.buffer of size
979         buildJsonMessage(message);
980
981         if (loglevel >= MDCLOG_DEBUG) {
982             mdclog_write(MDCLOG_DEBUG,
983                          "SCTP message for CU %s sent from %s",
984                          message.message.enodbName,
985                          __FUNCTION__);
986         }
987         return 0;
988     }
989 }
990
991 /**
992  *
993  * @param message
994  * @param rmrMessageBuffer
995  */
996 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
997     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *) (message.message.enodbName));
998
999     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1000     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1001
1002     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1003         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
1004                      message.message.enodbName, (unsigned long) message.message.asnLength);
1005     }
1006 }
1007
1008
1009
1010 /**
1011  *
1012  * @param events
1013  * @param sctpMap
1014  * @param numOfMessages
1015  * @param rmrMessageBuffer
1016  * @param ts
1017  * @return
1018  */
1019 int receiveDataFromSctp(struct epoll_event *events,
1020                         Sctp_Map_t *sctpMap,
1021                         int &numOfMessages,
1022                         RmrMessagesBuffer_t &rmrMessageBuffer,
1023                         struct timespec &ts) {
1024     /* We have data on the fd waiting to be read. Read and display it.
1025  * We must read whatever data is available completely, as we are running
1026  *  in edge-triggered mode and won't get a notification again for the same data. */
1027     ReportingMessages_t message {};
1028     auto done = 0;
1029     auto loglevel = mdclog_level_get();
1030
1031     // get the identity of the interface
1032     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1033
1034     message.statCollector = StatCollector::GetInstance();
1035     struct timespec start{0, 0};
1036     struct timespec decodestart{0, 0};
1037     struct timespec end{0, 0};
1038
1039     E2AP_PDU_t *pdu = nullptr;
1040
1041
1042     while (true) {
1043         if (loglevel >= MDCLOG_DEBUG) {
1044             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1045             clock_gettime(CLOCK_MONOTONIC, &start);
1046         }
1047         // read the buffer directly to rmr payload
1048         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1049         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1050                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1051
1052         if (loglevel >= MDCLOG_DEBUG) {
1053             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1054                          message.peerInfo->fileDescriptor, message.message.asnLength);
1055         }
1056
1057         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1058         message.statCollector->incRecvMessage(string(message.message.enodbName));
1059         message.message.direction = 'U';
1060         message.message.time.tv_nsec = ts.tv_nsec;
1061         message.message.time.tv_sec = ts.tv_sec;
1062
1063         if (message.message.asnLength < 0) {
1064             if (errno == EINTR) {
1065                 continue;
1066             }
1067             /* If errno == EAGAIN, that means we have read all
1068                data. So goReportingMessages_t back to the main loop. */
1069             if (errno != EAGAIN) {
1070                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1071                 done = 1;
1072             } else if (loglevel >= MDCLOG_DEBUG) {
1073                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1074             }
1075             break;
1076         } else if (message.message.asnLength == 0) {
1077             /* End of file. The remote has closed the connection. */
1078             if (loglevel >= MDCLOG_INFO) {
1079                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1080                              message.peerInfo->fileDescriptor);
1081             }
1082             done = 1;
1083             break;
1084         }
1085
1086         asn_dec_rval_t rval;
1087         if (loglevel >= MDCLOG_DEBUG) {
1088             char printBuffer[4096]{};
1089             char *tmp = printBuffer;
1090             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1091                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1092                 tmp += 2;
1093             }
1094             printBuffer[message.message.asnLength] = 0;
1095             clock_gettime(CLOCK_MONOTONIC, &end);
1096             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1097                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1098             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1099                          printBuffer);
1100             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1101         }
1102
1103         rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1104                           message.message.asndata, message.message.asnLength);
1105         if (rval.code != RC_OK) {
1106             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1107                          message.peerInfo->enodbName);
1108             break;
1109         }
1110
1111         if (loglevel >= MDCLOG_DEBUG) {
1112             clock_gettime(CLOCK_MONOTONIC, &end);
1113             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1114                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1115             char *printBuffer;
1116             size_t size;
1117             FILE *stream = open_memstream(&printBuffer, &size);
1118             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1119             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1120             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1121         }
1122
1123         switch (pdu->present) {
1124             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1125                 asnInitiatingRequest(pdu, message, rmrMessageBuffer);
1126                 break;
1127             }
1128             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1129                 asnSuccsesfulMsg(pdu, message,  rmrMessageBuffer);
1130                 break;
1131             }
1132             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1133                 asnUnSuccsesfulMsg(pdu, message, rmrMessageBuffer);
1134                 break;
1135             }
1136             default:
1137                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1138                 break;
1139         }
1140         if (loglevel >= MDCLOG_DEBUG) {
1141             clock_gettime(CLOCK_MONOTONIC, &end);
1142             mdclog_write(MDCLOG_DEBUG,
1143                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1144                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1145         }
1146         numOfMessages++;
1147         // remove the break for EAGAIN
1148         //break;
1149         if (pdu != nullptr) {
1150             //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
1151             //ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1152             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1153             pdu = nullptr;
1154         }
1155         //clock_gettime(CLOCK_MONOTONIC, &start);
1156     }
1157     // in case of break to avoid memory leak
1158     if (pdu != nullptr) {
1159         ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1160         pdu = nullptr;
1161     }
1162
1163     if (done) {
1164         if (loglevel >= MDCLOG_INFO) {
1165             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1166         }
1167         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1168                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1169                          256,
1170                          "%s|CU disconnected unexpectedly",
1171                          message.peerInfo->enodbName);
1172         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1173
1174         if (sendRequestToXapp(message,
1175                               RIC_SCTP_CONNECTION_FAILURE,
1176                               rmrMessageBuffer) != 0) {
1177             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1178         }
1179
1180         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1181         close(message.peerInfo->fileDescriptor);
1182         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
1183     }
1184     if (loglevel >= MDCLOG_DEBUG) {
1185         clock_gettime(CLOCK_MONOTONIC, &end);
1186         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1187                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1188
1189     }
1190     return 0;
1191 }
1192
1193 static void buildAndsendSetupRequest(ReportingMessages_t &message,
1194                                      E2setupRequestIEs_t *ie,
1195                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1196                                      E2AP_PDU_t *pdu) {
1197     auto logLevel = mdclog_level_get();
1198
1199
1200     if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1201         mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1202     } else {
1203         memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1204     }
1205     // now we can send the data to e2Mgr
1206     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1207
1208     auto *rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size);
1209     // add addrees to message
1210
1211
1212     // unsigned char *buffer = &rmrMsg->payload[j];
1213     unsigned char buffer[RECEIVE_SCTP_BUFFER_SIZE * 2];
1214     // encode to xml
1215     asn_enc_rval_t er;
1216     er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size);
1217     if (er.encoded == -1) {
1218         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1219     } else if (er.encoded > (ssize_t) buffer_size) {
1220         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s",
1221                      (int) buffer_size,
1222                      asn_DEF_E2AP_PDU.name);
1223     } else {
1224         rmrMsg->len = snprintf((char *)rmrMsg->payload, RECEIVE_SCTP_BUFFER_SIZE * 2, "%s:%d|%s",
1225                                message.peerInfo->sctpParams->myIP.c_str(),
1226                                message.peerInfo->sctpParams->rmrPort,
1227                                buffer);
1228         if (logLevel >= MDCLOG_DEBUG) {
1229             mdclog_write(MDCLOG_DEBUG, "Setup request of size %d :\n %s\n", rmrMsg->len, rmrMsg->payload);
1230         }
1231         // send to RMR
1232         message.message.messageType = rmrMsg->mtype = RIC_E2_SETUP_REQ;
1233         rmrMsg->state = 0;
1234         rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1235
1236         static unsigned char tx[32];
1237         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1238         rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1239
1240         rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1241         if (rmrMsg == nullptr) {
1242             mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1243         } else if (rmrMsg->state != 0) {
1244             char meid[RMR_MAX_MEID]{};
1245             if (rmrMsg->state == RMR_ERR_RETRY) {
1246                 usleep(5);
1247                 rmrMsg->state = 0;
1248                 mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1249                              rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1250                 rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1251                 if (rmrMsg == nullptr) {
1252                     mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1253                 } else if (rmrMsg->state != 0) {
1254                     mdclog_write(MDCLOG_ERR,
1255                                  "RMR Retry failed %s sending request %d to Xapp from %s",
1256                                  translateRmrErrorMessages(rmrMsg->state).c_str(),
1257                                  rmrMsg->mtype,
1258                                  rmr_get_meid(rmrMsg, (unsigned char *) meid));
1259                 }
1260             } else {
1261                 mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1262                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1263                              rmrMsg->mtype,
1264                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1265             }
1266         }
1267         message.peerInfo->gotSetup = true;
1268         buildJsonMessage(message);
1269         if (rmrMsg != nullptr) {
1270             rmr_free_msg(rmrMsg);
1271         }
1272     }
1273
1274 }
1275 /**
1276  *
1277  * @param pdu
1278  * @param message
1279  * @param rmrMessageBuffer
1280  */
1281 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1282                           ReportingMessages_t &message,
1283                           RmrMessagesBuffer_t &rmrMessageBuffer) {
1284     auto logLevel = mdclog_level_get();
1285     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1286     if (logLevel >= MDCLOG_DEBUG) {
1287         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1288     }
1289     switch (procedureCode) {
1290         case ProcedureCode_id_E2setup: {
1291             if (logLevel >= MDCLOG_DEBUG) {
1292                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1293             }
1294
1295             memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1296             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1297                 auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1298                 if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1299                     if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1300                         buildAndsendSetupRequest(message, ie, rmrMessageBuffer, pdu);
1301                         break;
1302                     }
1303                 }
1304             }
1305             break;
1306         }
1307         case ProcedureCode_id_ErrorIndication: {
1308             if (logLevel >= MDCLOG_DEBUG) {
1309                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1310             }
1311             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1312                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1313             }
1314             break;
1315         }
1316         case ProcedureCode_id_Reset: {
1317             if (logLevel >= MDCLOG_DEBUG) {
1318                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1319             }
1320             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1321                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1322             }
1323             break;
1324         }
1325         case ProcedureCode_id_RICcontrol: {
1326             if (logLevel >= MDCLOG_DEBUG) {
1327                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1328             }
1329             break;
1330         }
1331         case ProcedureCode_id_RICindication: {
1332             if (logLevel >= MDCLOG_DEBUG) {
1333                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1334             }
1335             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1336                 auto messageSent = false;
1337                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1338                 if (logLevel >= MDCLOG_DEBUG) {
1339                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1340                 }
1341                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1342                     if (logLevel >= MDCLOG_DEBUG) {
1343                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1344                     }
1345                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1346                         static unsigned char tx[32];
1347                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1348                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1349                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1350                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1351                                        (unsigned char *)message.message.enodbName,
1352                                        strlen(message.message.enodbName));
1353                         rmrMessageBuffer.sendMessage->state = 0;
1354                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1355                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1356                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1357                                          rmrMessageBuffer.sendMessage->sub_id,
1358                                          rmrMessageBuffer.sendMessage->mtype);
1359                         }
1360                         sendRmrMessage(rmrMessageBuffer, message);
1361                         messageSent = true;
1362                     } else {
1363                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1364                     }
1365                 }
1366                 if (messageSent) {
1367                     break;
1368                 }
1369             }
1370             break;
1371         }
1372         case ProcedureCode_id_RICserviceQuery: {
1373             if (logLevel >= MDCLOG_DEBUG) {
1374                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1375             }
1376             break;
1377         }
1378         case ProcedureCode_id_RICserviceUpdate: {
1379             if (logLevel >= MDCLOG_DEBUG) {
1380                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1381             }
1382             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1383                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1384             }
1385             break;
1386         }
1387         case ProcedureCode_id_RICsubscription: {
1388             if (logLevel >= MDCLOG_DEBUG) {
1389                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1390             }
1391             break;
1392         }
1393         case ProcedureCode_id_RICsubscriptionDelete: {
1394             if (logLevel >= MDCLOG_DEBUG) {
1395                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1396             }
1397             break;
1398         }
1399         default: {
1400             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1401             message.message.messageType = 0; // no RMR message type yet
1402
1403             buildJsonMessage(message);
1404
1405             break;
1406         }
1407     }
1408 }
1409
1410 /**
1411  *
1412  * @param pdu
1413  * @param message
1414  * @param rmrMessageBuffer
1415  */
1416 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1417     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1418     auto logLevel = mdclog_level_get();
1419     if (logLevel >= MDCLOG_INFO) {
1420         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1421     }
1422     switch (procedureCode) {
1423         case ProcedureCode_id_E2setup: {
1424             if (logLevel >= MDCLOG_DEBUG) {
1425                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1426             }
1427             break;
1428         }
1429         case ProcedureCode_id_ErrorIndication: {
1430             if (logLevel >= MDCLOG_DEBUG) {
1431                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1432             }
1433             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1434                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1435             }
1436             break;
1437         }
1438         case ProcedureCode_id_Reset: {
1439             if (logLevel >= MDCLOG_DEBUG) {
1440                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1441             }
1442             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1443                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1444             }
1445             break;
1446         }
1447         case ProcedureCode_id_RICcontrol: {
1448             if (logLevel >= MDCLOG_DEBUG) {
1449                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1450             }
1451             for (auto i = 0;
1452                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1453                 auto messageSent = false;
1454                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1455                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1456                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1457                 }
1458                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1459                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1460                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1461                     }
1462                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1463                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1464                         rmrMessageBuffer.sendMessage->state = 0;
1465                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1466                         static unsigned char tx[32];
1467                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1468                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1469                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1470                                        (unsigned char *)message.message.enodbName,
1471                                        strlen(message.message.enodbName));
1472
1473                         sendRmrMessage(rmrMessageBuffer, message);
1474                         messageSent = true;
1475                     } else {
1476                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1477                     }
1478                 }
1479                 if (messageSent) {
1480                     break;
1481                 }
1482             }
1483
1484             break;
1485         }
1486         case ProcedureCode_id_RICindication: {
1487             if (logLevel >= MDCLOG_DEBUG) {
1488                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1489             }
1490             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1491                 auto messageSent = false;
1492                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1493                 if (logLevel >= MDCLOG_DEBUG) {
1494                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1495                 }
1496                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1497                     if (logLevel >= MDCLOG_DEBUG) {
1498                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1499                     }
1500                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1501                         static unsigned char tx[32];
1502                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1503                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1504                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1505                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1506                                        (unsigned char *)message.message.enodbName,
1507                                        strlen(message.message.enodbName));
1508                         rmrMessageBuffer.sendMessage->state = 0;
1509                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1510                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1511                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1512                                          rmrMessageBuffer.sendMessage->sub_id,
1513                                          rmrMessageBuffer.sendMessage->mtype);
1514                         }
1515                         sendRmrMessage(rmrMessageBuffer, message);
1516                         messageSent = true;
1517                     } else {
1518                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1519                     }
1520                 }
1521                 if (messageSent) {
1522                     break;
1523                 }
1524             }
1525             break;
1526         }
1527         case ProcedureCode_id_RICserviceQuery: {
1528             if (logLevel >= MDCLOG_DEBUG) {
1529                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1530             }
1531             break;
1532         }
1533         case ProcedureCode_id_RICserviceUpdate: {
1534             if (logLevel >= MDCLOG_DEBUG) {
1535                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1536             }
1537             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1538                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1539             }
1540             break;
1541         }
1542         case ProcedureCode_id_RICsubscription: {
1543             if (logLevel >= MDCLOG_DEBUG) {
1544                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1545             }
1546             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
1547                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1548             }
1549             break;
1550         }
1551         case ProcedureCode_id_RICsubscriptionDelete: {
1552             if (logLevel >= MDCLOG_DEBUG) {
1553                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1554             }
1555             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
1556                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1557             }
1558             break;
1559         }
1560         default: {
1561             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1562             message.message.messageType = 0; // no RMR message type yet
1563             buildJsonMessage(message);
1564
1565             break;
1566         }
1567     }
1568 }
1569
1570 /**
1571  *
1572  * @param pdu
1573  * @param message
1574  * @param rmrMessageBuffer
1575  */
1576 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1577                         ReportingMessages_t &message,
1578                         RmrMessagesBuffer_t &rmrMessageBuffer) {
1579     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1580     auto logLevel = mdclog_level_get();
1581     if (logLevel >= MDCLOG_INFO) {
1582         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1583     }
1584     switch (procedureCode) {
1585         case ProcedureCode_id_E2setup: {
1586             if (logLevel >= MDCLOG_DEBUG) {
1587                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1588             }
1589             break;
1590         }
1591         case ProcedureCode_id_ErrorIndication: {
1592             if (logLevel >= MDCLOG_DEBUG) {
1593                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1594             }
1595             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1596                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1597             }
1598             break;
1599         }
1600         case ProcedureCode_id_Reset: {
1601             if (logLevel >= MDCLOG_DEBUG) {
1602                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1603             }
1604             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1605                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1606             }
1607             break;
1608         }
1609         case ProcedureCode_id_RICcontrol: {
1610             if (logLevel >= MDCLOG_DEBUG) {
1611                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1612             }
1613             for (int i = 0;
1614                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1615                 auto messageSent = false;
1616                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1617                 if (logLevel >= MDCLOG_DEBUG) {
1618                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1619                 }
1620                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1621                     if (logLevel >= MDCLOG_DEBUG) {
1622                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1623                     }
1624                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1625                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1626                         rmrMessageBuffer.sendMessage->state = 0;
1627                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1628                         static unsigned char tx[32];
1629                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1630                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1631                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
1632                                        strlen(message.message.enodbName));
1633                         sendRmrMessage(rmrMessageBuffer, message);
1634                         messageSent = true;
1635                     } else {
1636                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1637                     }
1638                 }
1639                 if (messageSent) {
1640                     break;
1641                 }
1642             }
1643             break;
1644         }
1645         case ProcedureCode_id_RICindication: {
1646             if (logLevel >= MDCLOG_DEBUG) {
1647                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1648             }
1649             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1650                 auto messageSent = false;
1651                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1652                 if (logLevel >= MDCLOG_DEBUG) {
1653                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1654                 }
1655                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1656                     if (logLevel >= MDCLOG_DEBUG) {
1657                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1658                     }
1659                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1660                         static unsigned char tx[32];
1661                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1662                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1663                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1664                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1665                                        (unsigned char *)message.message.enodbName,
1666                                        strlen(message.message.enodbName));
1667                         rmrMessageBuffer.sendMessage->state = 0;
1668                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1669                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1670                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1671                                          rmrMessageBuffer.sendMessage->sub_id,
1672                                          rmrMessageBuffer.sendMessage->mtype);
1673                         }
1674                         sendRmrMessage(rmrMessageBuffer, message);
1675                         messageSent = true;
1676                     } else {
1677                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1678                     }
1679                 }
1680                 if (messageSent) {
1681                     break;
1682                 }
1683             }
1684             break;
1685         }
1686         case ProcedureCode_id_RICserviceQuery: {
1687             if (logLevel >= MDCLOG_DEBUG) {
1688                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1689             }
1690             break;
1691         }
1692         case ProcedureCode_id_RICserviceUpdate: {
1693             if (logLevel >= MDCLOG_DEBUG) {
1694                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1695             }
1696             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1697                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1698             }
1699             break;
1700         }
1701         case ProcedureCode_id_RICsubscription: {
1702             if (logLevel >= MDCLOG_DEBUG) {
1703                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1704             }
1705             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1706                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1707             }
1708             break;
1709         }
1710         case ProcedureCode_id_RICsubscriptionDelete: {
1711             if (logLevel >= MDCLOG_DEBUG) {
1712                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1713             }
1714             if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer) != 0) {
1715                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1716             }
1717             break;
1718         }
1719         default: {
1720             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1721             message.message.messageType = 0; // no RMR message type yet
1722
1723             buildJsonMessage(message);
1724
1725             break;
1726         }
1727     }
1728 }
1729
1730 /**
1731  *
1732  * @param message
1733  * @param requestId
1734  * @param rmrMmessageBuffer
1735  * @return
1736  */
1737 int sendRequestToXapp(ReportingMessages_t &message,
1738                       int requestId,
1739                       RmrMessagesBuffer_t &rmrMmessageBuffer) {
1740     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1741                    (unsigned char *)message.message.enodbName,
1742                    strlen(message.message.enodbName));
1743     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1744     rmrMmessageBuffer.sendMessage->state = 0;
1745     static unsigned char tx[32];
1746     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1747     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1748
1749     auto rc = sendRmrMessage(rmrMmessageBuffer, message);
1750     return rc;
1751 }
1752
1753
1754 void getRmrContext(sctp_params_t &pSctpParams) {
1755     pSctpParams.rmrCtx = nullptr;
1756     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1757     if (pSctpParams.rmrCtx == nullptr) {
1758         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1759         return;
1760     }
1761
1762     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1763     // we need to find that routing table exist and we can run
1764     if (mdclog_level_get() >= MDCLOG_INFO) {
1765         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1766     }
1767     int rmrReady = 0;
1768     int count = 0;
1769     while (!rmrReady) {
1770         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1771             sleep(1);
1772         }
1773         count++;
1774         if (count % 60 == 0) {
1775             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1776         }
1777     }
1778     if (mdclog_level_get() >= MDCLOG_INFO) {
1779         mdclog_write(MDCLOG_INFO, "RMR running");
1780     }
1781     rmr_init_trace(pSctpParams.rmrCtx, 200);
1782     // get the RMR fd for the epoll
1783     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1784     struct epoll_event event{};
1785     // add RMR fd to epoll
1786     event.events = (EPOLLIN);
1787     event.data.fd = pSctpParams.rmrListenFd;
1788     // add listening RMR FD to epoll
1789     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
1790         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
1791         close(pSctpParams.rmrListenFd);
1792         rmr_close(pSctpParams.rmrCtx);
1793         pSctpParams.rmrCtx = nullptr;
1794     }
1795 }
1796
1797 /**
1798  *
1799  * @param sctpMap
1800  * @param rmrMessageBuffer
1801  * @param ts
1802  * @return
1803  */
1804 int receiveXappMessages(Sctp_Map_t *sctpMap,
1805                         RmrMessagesBuffer_t &rmrMessageBuffer,
1806                         struct timespec &ts) {
1807     if (rmrMessageBuffer.rcvMessage == nullptr) {
1808         //we have error
1809         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1810         return -1;
1811     }
1812
1813     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1814         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1815     }
1816     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1817     if (rmrMessageBuffer.rcvMessage == nullptr) {
1818         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1819         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1820         return -2;
1821     }
1822     ReportingMessages_t message;
1823     message.message.direction = 'D';
1824     message.message.time.tv_nsec = ts.tv_nsec;
1825     message.message.time.tv_sec = ts.tv_sec;
1826
1827     // get message payload
1828     //auto msgData = msg->payload;
1829     if (rmrMessageBuffer.rcvMessage->state != 0) {
1830         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1831         return -1;
1832     }
1833     switch (rmrMessageBuffer.rcvMessage->mtype) {
1834         case RIC_E2_SETUP_RESP : {
1835             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1836                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
1837                 return -6;
1838             }
1839             break;
1840         }
1841         case RIC_E2_SETUP_FAILURE : {
1842             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1843                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
1844                 return -6;
1845             }
1846             break;
1847         }
1848         case RIC_ERROR_INDICATION: {
1849             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1850                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
1851                 return -6;
1852             }
1853             break;
1854         }
1855         case RIC_SUB_REQ: {
1856             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1857                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
1858                 return -6;
1859             }
1860             break;
1861         }
1862         case RIC_SUB_DEL_REQ: {
1863             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1864                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
1865                 return -6;
1866             }
1867             break;
1868         }
1869         case RIC_CONTROL_REQ: {
1870             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1871                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
1872                 return -6;
1873             }
1874             break;
1875         }
1876         case RIC_SERVICE_QUERY: {
1877             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1878                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
1879                 return -6;
1880             }
1881             break;
1882         }
1883         case RIC_SERVICE_UPDATE_ACK: {
1884             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1885                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
1886                 return -6;
1887             }
1888             break;
1889         }
1890         case RIC_SERVICE_UPDATE_FAILURE: {
1891             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1892                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
1893                 return -6;
1894             }
1895             break;
1896         }
1897         case RIC_X2_RESET: {
1898             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1899                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
1900                 return -6;
1901             }
1902             break;
1903         }
1904         case RIC_X2_RESET_RESP: {
1905             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1906                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
1907                 return -6;
1908             }
1909             break;
1910         }
1911         case RIC_SCTP_CLEAR_ALL: {
1912             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
1913             // loop on all keys and close socket and then erase all map.
1914             vector<char *> v;
1915             sctpMap->getKeys(v);
1916             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
1917                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
1918                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
1919                     if (peerInfo == nullptr) {
1920                         continue;
1921                     }
1922                     close(peerInfo->fileDescriptor);
1923                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1924                     message.message.direction = 'D';
1925                     message.message.time.tv_nsec = ts.tv_nsec;
1926                     message.message.time.tv_sec = ts.tv_sec;
1927
1928                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1929                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1930                                      256,
1931                                      "%s|RIC_SCTP_CLEAR_ALL",
1932                                      peerInfo->enodbName);
1933                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1934                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
1935                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
1936                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1937                     }
1938                     free(peerInfo);
1939                 }
1940             }
1941
1942             sleep(1);
1943             sctpMap->clear();
1944             break;
1945         }
1946         case E2_TERM_KEEP_ALIVE_REQ: {
1947             // send message back
1948             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
1949                               (unsigned char *)rmrMessageBuffer.ka_message,
1950                               rmrMessageBuffer.ka_message_len);
1951             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
1952             rmrMessageBuffer.sendMessage->state = 0;
1953             static unsigned char tx[32];
1954             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1955             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
1956             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
1957             if (rmrMessageBuffer.sendMessage == nullptr) {
1958                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1959                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
1960             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
1961                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
1962                              rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
1963             } else if (mdclog_level_get() >= MDCLOG_DEBUG) {
1964                 mdclog_write(MDCLOG_DEBUG, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
1965             }
1966
1967             break;
1968         }
1969         default:
1970             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
1971             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1972             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1973             message.message.time.tv_nsec = ts.tv_nsec;
1974             message.message.time.tv_sec = ts.tv_sec;
1975             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
1976
1977             buildJsonMessage(message);
1978
1979
1980             return -7;
1981     }
1982     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1983         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
1984     }
1985     return 0;
1986 }
1987
1988 /**
1989  * Send message to the CU that is not expecting for successful or unsuccessful results
1990  * @param messageBuffer
1991  * @param message
1992  * @param failedMsgId
1993  * @param sctpMap
1994  * @return
1995  */
1996 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
1997                            ReportingMessages_t &message,
1998                            int failedMsgId,
1999                            Sctp_Map_t *sctpMap) {
2000
2001     getRequestMetaData(message, messageBuffer);
2002     if (mdclog_level_get() >= MDCLOG_INFO) {
2003         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2004     }
2005
2006     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId);
2007     return rc;
2008 }
2009
2010 /**
2011  *
2012  * @param sctpMap
2013  * @param messageBuffer
2014  * @param message
2015  * @param failedMesgId
2016  * @return
2017  */
2018 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2019                     RmrMessagesBuffer_t &messageBuffer,
2020                     ReportingMessages_t &message,
2021                     int failedMesgId) {
2022     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2023     if (peerInfo == nullptr) {
2024         if (failedMesgId != 0) {
2025             sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId);
2026         } else {
2027             mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2028         }
2029         return -1;
2030     }
2031
2032     // get the FD
2033     message.message.messageType = messageBuffer.rcvMessage->mtype;
2034     auto rc = sendSctpMsg(peerInfo, message, sctpMap);
2035     return rc;
2036 }
2037
2038 /**
2039  *
2040  * @param rmrCtx the rmr context to send and receive
2041  * @param msg the msg we got fromxApp
2042  * @param metaData data from xApp in ordered struct
2043  * @param failedMesgId the return message type error
2044  */
2045 void
2046 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId) {
2047     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
2048     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
2049                         message.message.enodbName);
2050     if (mdclog_level_get() >= MDCLOG_INFO) {
2051         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
2052     }
2053     msg->mtype = failedMesgId;
2054     msg->state = 0;
2055
2056     static unsigned char tx[32];
2057     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2058     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
2059
2060     sendRmrMessage(rmrMessageBuffer, message);
2061 }
2062
2063
2064
2065 /**
2066  *
2067  * @param epoll_fd
2068  * @param peerInfo
2069  * @param events
2070  * @param sctpMap
2071  * @param enodbName
2072  * @param msgType
2073  * @return
2074  */
2075 int addToEpoll(int epoll_fd,
2076                ConnectedCU_t *peerInfo,
2077                uint32_t events,
2078                Sctp_Map_t *sctpMap,
2079                char *enodbName,
2080                int msgType) {
2081     // Add to Epol
2082     struct epoll_event event{};
2083     event.data.ptr = peerInfo;
2084     event.events = events;
2085     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2086         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2087             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2088                          strerror(errno), __func__, __LINE__);
2089         }
2090         close(peerInfo->fileDescriptor);
2091         if (enodbName != nullptr) {
2092             cleanHashEntry(peerInfo, sctpMap);
2093             char key[MAX_ENODB_NAME_SIZE * 2];
2094             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2095             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2096                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2097             }
2098             auto tmp = sctpMap->find(key);
2099             if (tmp) {
2100                 free(tmp);
2101                 sctpMap->erase(key);
2102             }
2103         } else {
2104             peerInfo->enodbName[0] = 0;
2105         }
2106         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2107         return -1;
2108     }
2109     return 0;
2110 }
2111
2112 /**
2113  *
2114  * @param epoll_fd
2115  * @param peerInfo
2116  * @param events
2117  * @param sctpMap
2118  * @param enodbName
2119  * @param msgType
2120  * @return
2121  */
2122 int modifyToEpoll(int epoll_fd,
2123                   ConnectedCU_t *peerInfo,
2124                   uint32_t events,
2125                   Sctp_Map_t *sctpMap,
2126                   char *enodbName,
2127                   int msgType) {
2128     // Add to Epol
2129     struct epoll_event event{};
2130     event.data.ptr = peerInfo;
2131     event.events = events;
2132     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2133         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2134             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2135                          strerror(errno), __func__, __LINE__);
2136         }
2137         close(peerInfo->fileDescriptor);
2138         cleanHashEntry(peerInfo, sctpMap);
2139         char key[MAX_ENODB_NAME_SIZE * 2];
2140         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2141         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2142             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2143         }
2144         auto tmp = sctpMap->find(key);
2145         if (tmp) {
2146             free(tmp);
2147         }
2148         sctpMap->erase(key);
2149         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2150         return -1;
2151     }
2152     return 0;
2153 }
2154
2155
2156 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message) {
2157     buildJsonMessage(message);
2158
2159     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2160
2161     if (rmrMessageBuffer.sendMessage == nullptr) {
2162         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2163         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2164         return -1;
2165     }
2166
2167     if (rmrMessageBuffer.sendMessage->state != 0) {
2168         char meid[RMR_MAX_MEID]{};
2169         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2170             usleep(5);
2171             rmrMessageBuffer.sendMessage->state = 0;
2172             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2173                          rmrMessageBuffer.sendMessage->mtype,
2174                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2175             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2176             if (rmrMessageBuffer.sendMessage == nullptr) {
2177                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2178                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2179                 return -1;
2180             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2181                 mdclog_write(MDCLOG_ERR,
2182                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2183                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2184                              rmrMessageBuffer.sendMessage->mtype,
2185                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2186                 auto rc = rmrMessageBuffer.sendMessage->state;
2187                 return rc;
2188             }
2189         } else {
2190             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2191                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2192                          rmrMessageBuffer.sendMessage->mtype,
2193                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2194             return rmrMessageBuffer.sendMessage->state;
2195         }
2196     }
2197     return 0;
2198 }
2199
2200 void buildJsonMessage(ReportingMessages_t &message) {
2201     if (jsonTrace) {
2202         message.outLen = sizeof(message.base64Data);
2203         base64::encode((const unsigned char *) message.message.asndata,
2204                        (const int) message.message.asnLength,
2205                        message.base64Data,
2206                        message.outLen);
2207         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2208             mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2209                          (int) message.message.asnLength,
2210                          (int) message.outLen);
2211         }
2212
2213         snprintf(message.buffer, sizeof(message.buffer),
2214                  "{\"header\": {\"ts\": \"%ld.%09ld\","
2215                  "\"ranName\": \"%s\","
2216                  "\"messageType\": %d,"
2217                  "\"direction\": \"%c\"},"
2218                  "\"base64Length\": %d,"
2219                  "\"asnBase64\": \"%s\"}",
2220                  message.message.time.tv_sec,
2221                  message.message.time.tv_nsec,
2222                  message.message.enodbName,
2223                  message.message.messageType,
2224                  message.message.direction,
2225                  (int) message.outLen,
2226                  message.base64Data);
2227         static src::logger_mt &lg = my_logger::get();
2228
2229         BOOST_LOG(lg) << message.buffer;
2230     }
2231 }
2232
2233
2234 /**
2235  * take RMR error code to string
2236  * @param state
2237  * @return
2238  */
2239 string translateRmrErrorMessages(int state) {
2240     string str = {};
2241     switch (state) {
2242         case RMR_OK:
2243             str = "RMR_OK - state is good";
2244             break;
2245         case RMR_ERR_BADARG:
2246             str = "RMR_ERR_BADARG - argument passd to function was unusable";
2247             break;
2248         case RMR_ERR_NOENDPT:
2249             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2250             break;
2251         case RMR_ERR_EMPTY:
2252             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2253             break;
2254         case RMR_ERR_NOHDR:
2255             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2256             break;
2257         case RMR_ERR_SENDFAILED:
2258             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2259             break;
2260         case RMR_ERR_CALLFAILED:
2261             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2262             break;
2263         case RMR_ERR_NOWHOPEN:
2264             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2265             break;
2266         case RMR_ERR_WHID:
2267             str = "RMR_ERR_WHID - wormhole id was invalid";
2268             break;
2269         case RMR_ERR_OVERFLOW:
2270             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2271             break;
2272         case RMR_ERR_RETRY:
2273             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2274             break;
2275         case RMR_ERR_RCVFAILED:
2276             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2277             break;
2278         case RMR_ERR_TIMEOUT:
2279             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2280             break;
2281         case RMR_ERR_UNSET:
2282             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2283             break;
2284         case RMR_ERR_TRUNC:
2285             str = "RMR_ERR_TRUNC - received message likely truncated";
2286             break;
2287         case RMR_ERR_INITFAILED:
2288             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2289             break;
2290         case RMR_ERR_NOTSUPP:
2291             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2292             break;
2293         default:
2294             char buf[128]{};
2295             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2296             str = buf;
2297             break;
2298     }
2299     return str;
2300 }
2301
2302