version 4.0.6
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include "sctpThread.h"
24 #include "BuildRunName.h"
25
26 using namespace std;
27 //using namespace std::placeholders;
28 using namespace boost::filesystem;
29
30 //#ifdef __cplusplus
31 //extern "C"
32 //{
33 //#endif
34
35 // need to expose without the include of gcov
36 extern "C" void __gcov_flush(void);
37
38 static void catch_function(int signal) {
39     __gcov_flush();
40     exit(signal);
41 }
42
43
44 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
45
46 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
47 double cpuClock = 0.0;
48 bool jsonTrace = true;
49
50 void init_log() {
51     mdclog_attr_t *attr;
52     mdclog_attr_init(&attr);
53     mdclog_attr_set_ident(attr, "E2Terminator");
54     mdclog_init(attr);
55     mdclog_attr_destroy(attr);
56 }
57 auto start_time = std::chrono::high_resolution_clock::now();
58 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
59
60 double age() {
61     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
62 }
63
64 double approx_CPU_MHz(unsigned sleeptime) {
65     using namespace std::chrono_literals;
66     uint32_t aux = 0;
67     uint64_t cycles_start = rdtscp(aux);
68     double time_start = age();
69     std::this_thread::sleep_for(sleeptime * 1ms);
70     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
71     double elapsed_time = age() - time_start;
72     return elapsed_cycles / elapsed_time;
73 }
74
75 //std::atomic<int64_t> rmrCounter{0};
76 std::atomic<int64_t> num_of_messages{0};
77 std::atomic<int64_t> num_of_XAPP_messages{0};
78 static long transactionCounter = 0;
79
80 int buildListeningPort(sctp_params_t &sctpParams) {
81     sctpParams.listenFD = socket (AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
82     struct sockaddr_in6 servaddr {};
83     servaddr.sin6_family = AF_INET6;
84     servaddr.sin6_addr   = in6addr_any;
85     servaddr.sin6_port = htons(sctpParams.sctpPort);
86     if (bind(sctpParams.listenFD, (SA *)&servaddr, sizeof(servaddr)) < 0 ) {
87         mdclog_write(MDCLOG_ERR, "Error binding. %s\n", strerror(errno));
88         return -1;
89     }
90     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
91         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
92         return -1;
93     }
94     if (mdclog_level_get() >= MDCLOG_DEBUG) {
95         struct sockaddr_in6 cliaddr {};
96         socklen_t len = sizeof(cliaddr);
97         getsockname(sctpParams.listenFD, (SA *)&cliaddr, &len);
98         char buff[1024] {};
99         inet_ntop(AF_INET6, &cliaddr.sin6_addr, buff, sizeof(buff));
100         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(cliaddr.sin6_port));
101     }
102
103     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
104         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
105         return -1;
106     }
107     struct epoll_event event {};
108     event.events = EPOLLIN | EPOLLET;
109     event.data.fd = sctpParams.listenFD;
110
111     // add listening port to epoll
112     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
113         printf("Failed to add descriptor to epoll\n");
114         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
115         return -1;
116     }
117
118     return 0;
119 }
120
121 int buildConfiguration(sctp_params_t &sctpParams) {
122     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
123     if (exists(p)) {
124         const int size = 2048;
125         auto fileSize = file_size(p);
126         if (fileSize > size) {
127             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
128             return -1;
129         }
130     } else {
131         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
132         return -1;
133     }
134
135     ReadConfigFile conf;
136     if (conf.openConfigFile(p.string()) == -1) {
137         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
138                      p.string().c_str(), strerror(errno));
139         return -1;
140     }
141     int rmrPort = conf.getIntValue("nano");
142     if (rmrPort == -1) {
143         mdclog_write(MDCLOG_ERR, "illigal RMR port ");
144         return -1;
145     }
146     sctpParams.rmrPort = (uint16_t)rmrPort;
147     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
148
149     auto tmpStr = conf.getStringValue("loglevel");
150     if (tmpStr.length() == 0) {
151         mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
152         tmpStr = "info";
153     }
154     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
155
156     if ((tmpStr.compare("debug")) == 0) {
157         sctpParams.logLevel = MDCLOG_DEBUG;
158     } else if ((tmpStr.compare("info")) == 0) {
159         sctpParams.logLevel = MDCLOG_INFO;
160     } else if ((tmpStr.compare("warning")) == 0) {
161         sctpParams.logLevel = MDCLOG_WARN;
162     } else if ((tmpStr.compare("error")) == 0) {
163         sctpParams.logLevel = MDCLOG_ERR;
164     } else {
165         mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
166         sctpParams.logLevel = MDCLOG_INFO;
167     }
168     mdclog_level_set(sctpParams.logLevel);
169
170     tmpStr = conf.getStringValue("volume");
171     if (tmpStr.length() == 0) {
172         mdclog_write(MDCLOG_ERR, "illigal volume.");
173         return -1;
174     }
175
176     char tmpLogFilespec[VOLUME_URL_SIZE];
177     tmpLogFilespec[0] = 0;
178     sctpParams.volume[0] = 0;
179     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
180     // copy the name to temp file as well
181     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
182
183
184     // define the file name in the tmp directory under the volume
185     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
186
187     sctpParams.myIP = conf.getStringValue("local-ip");
188     if (sctpParams.myIP.length() == 0) {
189         mdclog_write(MDCLOG_ERR, "illigal local-ip.");
190         return -1;
191     }
192
193     int sctpPort = conf.getIntValue("sctp-port");
194     if (sctpPort == -1) {
195         mdclog_write(MDCLOG_ERR, "illigal SCTP port ");
196         return -1;
197     }
198     sctpParams.sctpPort = (uint16_t)sctpPort;
199
200     sctpParams.fqdn = conf.getStringValue("external-fqdn");
201     if (sctpParams.fqdn.length() == 0) {
202         mdclog_write(MDCLOG_ERR, "illigal external-fqdn");
203         return -1;
204     }
205
206     std::string pod = conf.getStringValue("pod_name");
207     if (pod.length() == 0) {
208         mdclog_write(MDCLOG_ERR, "illigal pod_name in config file");
209         return -1;
210     }
211     auto *podName = getenv(pod.c_str());
212     if (podName == nullptr) {
213         mdclog_write(MDCLOG_ERR, "illigal pod_name or environment varible not exists : %s", pod.c_str());
214         return -1;
215
216     } else {
217         sctpParams.podName.assign(podName);
218         if (sctpParams.podName.length() == 0) {
219             mdclog_write(MDCLOG_ERR, "illigal pod_name");
220             return -1;
221         }
222     }
223
224     tmpStr = conf.getStringValue("trace");
225     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
226     if ((tmpStr.compare("start")) == 0) {
227         mdclog_write(MDCLOG_INFO, "Trace set to: start");
228         sctpParams.trace = true;
229     } else if ((tmpStr.compare("stop")) == 0) {
230         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
231         sctpParams.trace = false;
232     }
233     jsonTrace = sctpParams.trace;
234
235     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, KA_MESSAGE_SIZE, "{\"address\": \"%s:%d\","
236                                                                                     "\"fqdn\": \"%s\","
237                                                                                     "\"pod_name\": \"%s\"}",
238                                             (const char *)sctpParams.myIP.c_str(),
239                                             sctpParams.rmrPort,
240                                             sctpParams.fqdn.c_str(),
241                                             sctpParams.podName.c_str());
242
243     if (mdclog_level_get() >= MDCLOG_INFO) {
244         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
245         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
246         mdclog_mdc_add("volume", sctpParams.volume);
247         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
248         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
249         mdclog_mdc_add("pod name", sctpParams.podName.c_str());
250
251         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
252     }
253     mdclog_mdc_clean();
254
255     // Files written to the current working directory
256     boostLogger = logging::add_file_log(
257             keywords::file_name = tmpLogFilespec, // to temp directory
258             keywords::rotation_size = 10 * 1024 * 1024,
259             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
260             keywords::format = "%Message%"
261             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
262     );
263
264     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
265     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
266             keywords::target = sctpParams.volume
267     ));
268
269     // Upon restart, scan the directory for files matching the file_name pattern
270     boostLogger->locked_backend()->scan_for_files();
271
272     // Enable auto-flushing after each tmpStr record written
273     if (mdclog_level_get() >= MDCLOG_DEBUG) {
274         boostLogger->locked_backend()->auto_flush(true);
275     }
276
277     return 0;
278 }
279
280
281
282 int main(const int argc, char **argv) {
283     sctp_params_t sctpParams;
284
285     {
286         std::random_device device{};
287         std::mt19937 generator(device());
288         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
289         transactionCounter = distribution(generator);
290     }
291
292 //    uint64_t st = 0;
293 //    uint32_t aux1 = 0;
294 //   st = rdtscp(aux1);
295
296     unsigned num_cpus = std::thread::hardware_concurrency();
297     init_log();
298     mdclog_level_set(MDCLOG_INFO);
299
300     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
301         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
302         exit(1);
303     }
304     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
305         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
306         exit(1);
307     }
308     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
309         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
310         exit(1);
311     }
312
313     cpuClock = approx_CPU_MHz(100);
314
315     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
316
317     auto result = parse(argc, argv, sctpParams);
318
319     if (buildConfiguration(sctpParams) != 0) {
320         exit(-1);
321     }
322
323     // start epoll
324     sctpParams.epoll_fd = epoll_create1(0);
325     if (sctpParams.epoll_fd == -1) {
326         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
327         exit(-1);
328     }
329
330     getRmrContext(sctpParams);
331     if (sctpParams.rmrCtx == nullptr) {
332         close(sctpParams.epoll_fd);
333         exit(-1);
334     }
335
336     if (buildInotify(sctpParams) == -1) {
337         close(sctpParams.rmrListenFd);
338         rmr_close(sctpParams.rmrCtx);
339         close(sctpParams.epoll_fd);
340         exit(-1);
341     }
342
343     if (buildListeningPort(sctpParams) != 0) {
344         close(sctpParams.rmrListenFd);
345         rmr_close(sctpParams.rmrCtx);
346         close(sctpParams.epoll_fd);
347         exit(-1);
348     }
349
350     sctpParams.sctpMap = new mapWrapper();
351
352     std::vector<std::thread> threads(num_cpus);
353 //    std::vector<std::thread> threads;
354
355     num_cpus = 1;
356     for (unsigned int i = 0; i < num_cpus; i++) {
357         threads[i] = std::thread(listener, &sctpParams);
358
359         cpu_set_t cpuset;
360         CPU_ZERO(&cpuset);
361         CPU_SET(i, &cpuset);
362         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
363         if (rc != 0) {
364             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
365         }
366     }
367
368     auto statFlag = false;
369     auto statThread = std::thread(statColectorThread, (void *)&statFlag);
370
371     //loop over term_init until first message from xApp
372     handleTermInit(sctpParams);
373
374     for (auto &t : threads) {
375         t.join();
376     }
377
378     statFlag = true;
379     statThread.join();
380
381     return 0;
382 }
383
384 void handleTermInit(sctp_params_t &sctpParams) {
385     sendTermInit(sctpParams);
386     //send to e2 manager init of e2 term
387     //E2_TERM_INIT
388
389     int count = 0;
390     while (true) {
391         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
392         if (xappMessages > 0) {
393             if (mdclog_level_get() >=  MDCLOG_INFO) {
394                 mdclog_write(MDCLOG_INFO, "Got a message from some appliction, stop sending E2_TERM_INIT");
395             }
396             return;
397         }
398         usleep(100000);
399         count++;
400         if (count % 1000 == 0) {
401             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
402             sendTermInit(sctpParams);
403         }
404     }
405 }
406
407 void sendTermInit(sctp_params_t &sctpParams) {
408     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
409     auto count = 0;
410     while (true) {
411         msg->mtype = E2_TERM_INIT;
412         msg->state = 0;
413         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
414         static unsigned char tx[32];
415         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
416         rmr_bytes2xact(msg, tx, txLen);
417         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
418         if (msg == nullptr) {
419             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.myIP.length());
420         } else if (msg->state == 0) {
421             rmr_free_msg(msg);
422             if (mdclog_level_get() >=  MDCLOG_INFO) {
423                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT succsesfuly sent ");
424             }
425             return;
426         } else {
427             if (count % 100 == 0) {
428                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %s ", translateRmrErrorMessages(msg->state).c_str());
429             }
430             sleep(1);
431         }
432         count++;
433     }
434 }
435
436 /**
437  *
438  * @param argc
439  * @param argv
440  * @param sctpParams
441  * @return
442  */
443 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
444     cxxopts::Options options(argv[0], "e2 term help");
445     options.positional_help("[optional args]").show_positional_help();
446     options.allow_unrecognised_options().add_options()
447             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
448             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
449             ("h,help", "Print help");
450
451     auto result = options.parse(argc, argv);
452
453     if (result.count("help")) {
454         std::cout << options.help({""}) << std::endl;
455         exit(0);
456     }
457     return result;
458 }
459
460 /**
461  *
462  * @param sctpParams
463  * @return -1 failed 0 success
464  */
465 int buildInotify(sctp_params_t &sctpParams) {
466     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
467     if (sctpParams.inotifyFD == -1) {
468         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
469         close(sctpParams.rmrListenFd);
470         rmr_close(sctpParams.rmrCtx);
471         close(sctpParams.epoll_fd);
472         return -1;
473     }
474
475     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
476                                              (const char *)sctpParams.configFilePath.c_str(),
477                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
478     if (sctpParams.inotifyWD == -1) {
479         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
480                      sctpParams.configFilePath.c_str(),
481                      strerror(errno));
482         close(sctpParams.inotifyFD);
483         return -1;
484     }
485
486     struct epoll_event event{};
487     event.events = (EPOLLIN);
488     event.data.fd = sctpParams.inotifyFD;
489     // add listening RMR FD to epoll
490     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
491         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
492         close(sctpParams.inotifyFD);
493         return -1;
494     }
495     return 0;
496 }
497
498 /**
499  *
500  * @param args
501  * @return
502  */
503 void listener(sctp_params_t *params) {
504     int num_of_SCTP_messages = 0;
505     auto totalTime = 0.0;
506     mdclog_mdc_clean();
507     mdclog_level_set(params->logLevel);
508
509     std::thread::id this_id = std::this_thread::get_id();
510     //save cout
511     streambuf *oldCout = cout.rdbuf();
512     ostringstream memCout;
513     // create new cout
514     cout.rdbuf(memCout.rdbuf());
515     cout << this_id;
516     //return to the normal cout
517     cout.rdbuf(oldCout);
518
519     char tid[32];
520     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
521     tid[memCout.str().length()] = 0;
522     mdclog_mdc_add("thread id", tid);
523
524     if (mdclog_level_get() >= MDCLOG_DEBUG) {
525         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
526     }
527
528
529     RmrMessagesBuffer_t rmrMessageBuffer{};
530     //create and init RMR
531     rmrMessageBuffer.rmrCtx = params->rmrCtx;
532
533     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
534     struct timespec end{0, 0};
535     struct timespec start{0, 0};
536
537     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
538     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
539
540     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
541     rmrMessageBuffer.ka_message_len = params->ka_message_length;
542     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
543
544     if (mdclog_level_get() >= MDCLOG_DEBUG) {
545         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
546     }
547
548     ReportingMessages_t message {};
549
550 //    for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
551 //        rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
552 //        rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
553 //    }
554
555     message.statCollector = StatCollector::GetInstance();
556
557     while (true) {
558         if (mdclog_level_get() >= MDCLOG_DEBUG) {
559             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
560         }
561         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
562         if (numOfEvents < 0 && errno == EINTR) {
563             if (mdclog_level_get() >= MDCLOG_DEBUG) {
564                 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
565             }
566             continue;
567         }
568         if (numOfEvents < 0) {
569             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
570             return;
571         }
572         for (auto i = 0; i < numOfEvents; i++) {
573             if (mdclog_level_get() >= MDCLOG_DEBUG) {
574                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
575             }
576             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
577             start.tv_sec = message.message.time.tv_sec;
578             start.tv_nsec = message.message.time.tv_nsec;
579
580
581             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
582                 handlepoll_error(events[i], message, rmrMessageBuffer, params);
583             } else if (events[i].events & EPOLLOUT) {
584                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params);
585             } else if (params->listenFD == events[i].data.fd) {
586                 if (mdclog_level_get() >= MDCLOG_INFO) {
587                     mdclog_write(MDCLOG_INFO, "New connection request from sctp network\n");
588                 }
589                 // new connection is requested from RAN  start build connection
590                 while (true) {
591                     struct sockaddr in_addr {};
592                     socklen_t in_len;
593                     char hostBuff[NI_MAXHOST];
594                     char portBuff[NI_MAXSERV];
595
596                     in_len = sizeof(in_addr);
597                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
598                     peerInfo->sctpParams = params;
599                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
600                     if (peerInfo->fileDescriptor == -1) {
601                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
602                             /* We have processed all incoming connections. */
603                             break;
604                         } else {
605                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
606                             break;
607                         }
608                     }
609                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
610                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
611                         close(peerInfo->fileDescriptor);
612                         break;
613                     }
614                     auto  ans = getnameinfo(&in_addr, in_len,
615                                             peerInfo->hostName, NI_MAXHOST,
616                                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
617                     if (ans < 0) {
618                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
619                         close(peerInfo->fileDescriptor);
620                         break;
621                     }
622                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
623                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
624                     }
625                     peerInfo->isConnected = false;
626                     peerInfo->gotSetup = false;
627                     if (addToEpoll(params->epoll_fd,
628                                    peerInfo,
629                                    (EPOLLIN | EPOLLET),
630                                    params->sctpMap, nullptr,
631                                    0) != 0) {
632                         break;
633                     }
634                     break;
635                 }
636             } else if (params->rmrListenFd == events[i].data.fd) {
637                 // got message from XAPP
638                 num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
639                 num_of_messages.fetch_add(1, std::memory_order_release);
640                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
641                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
642                 }
643                 if (receiveXappMessages(params->sctpMap,
644                                         rmrMessageBuffer,
645                                         message.message.time) != 0) {
646                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
647                 }
648             } else if (params->inotifyFD == events[i].data.fd) {
649                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
650                 handleConfigChange(params);
651             } else {
652                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
653                  * We must read whatever data is available completely, as we are running
654                  *  in edge-triggered mode and won't get a notification again for the same data. */
655                 num_of_messages.fetch_add(1, std::memory_order_release);
656                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
657                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
658                 }
659                 receiveDataFromSctp(&events[i],
660                                     params->sctpMap,
661                                     num_of_SCTP_messages,
662                                     rmrMessageBuffer,
663                                     message.message.time);
664             }
665
666             clock_gettime(CLOCK_MONOTONIC, &end);
667             if (mdclog_level_get() >= MDCLOG_INFO) {
668                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
669                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
670             }
671             if (mdclog_level_get() >= MDCLOG_DEBUG) {
672                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
673                              end.tv_sec - start.tv_sec,
674                              end.tv_nsec - start.tv_nsec);
675             }
676         }
677     }
678 }
679
680 /**
681  *
682  * @param sctpParams
683  */
684 void handleConfigChange(sctp_params_t *sctpParams) {
685     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
686     const struct inotify_event *event;
687     char *ptr;
688
689     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
690     auto endlessLoop = true;
691     while (endlessLoop) {
692         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
693         if (len == -1) {
694             if (errno != EAGAIN) {
695                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
696                 endlessLoop = false;
697                 continue;
698             }
699             else {
700                 endlessLoop = false;
701                 continue;
702             }
703         }
704
705         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
706             event = (const struct inotify_event *)ptr;
707             if (event->mask & (uint32_t)IN_ISDIR) {
708                 continue;
709             }
710
711             // the directory name
712             if (sctpParams->inotifyWD == event->wd) {
713                 // not the directory
714             }
715             if (event->len) {
716                 if (!(sctpParams->configFileName.compare(event->name))) {
717                     continue;
718                 }
719             }
720             // only the file we want
721             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
722                 if (exists(p)) {
723                     const int size = 2048;
724                     auto fileSize = file_size(p);
725                     if (fileSize > size) {
726                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
727                         return;
728                     }
729                 } else {
730                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
731                     return;
732                 }
733
734                 ReadConfigFile conf;
735                 if (conf.openConfigFile(p.string()) == -1) {
736                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
737                                  p.string().c_str(), strerror(errno));
738                     return;
739                 }
740
741                 auto tmpStr = conf.getStringValue("loglevel");
742                 if (tmpStr.length() == 0) {
743                     mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
744                     tmpStr = "info";
745                 }
746                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
747
748                 if ((tmpStr.compare("debug")) == 0) {
749                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
750                     sctpParams->logLevel = MDCLOG_DEBUG;
751                 } else if ((tmpStr.compare("info")) == 0) {
752                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
753                     sctpParams->logLevel = MDCLOG_INFO;
754                 } else if ((tmpStr.compare("warning")) == 0) {
755                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
756                     sctpParams->logLevel = MDCLOG_WARN;
757                 } else if ((tmpStr.compare("error")) == 0) {
758                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
759                     sctpParams->logLevel = MDCLOG_ERR;
760                 } else {
761                     mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
762                     sctpParams->logLevel = MDCLOG_INFO;
763                 }
764                 mdclog_level_set(sctpParams->logLevel);
765
766
767                 tmpStr = conf.getStringValue("trace");
768                 if (tmpStr.length() == 0) {
769                     mdclog_write(MDCLOG_ERR, "illigal trace. Set trace to stop");
770                     tmpStr = "stop";
771                 }
772
773                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
774                 if ((tmpStr.compare("start")) == 0) {
775                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
776                     sctpParams->trace = true;
777                 } else if ((tmpStr.compare("stop")) == 0) {
778                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
779                     sctpParams->trace = false;
780                 } else {
781                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
782                     sctpParams->trace = false;
783                 }
784                 jsonTrace = sctpParams->trace;
785                 endlessLoop = false;
786             }
787         }
788     }
789 }
790
791 /**
792  *
793  * @param event
794  * @param message
795  * @param rmrMessageBuffer
796  * @param params
797  */
798 void handleEinprogressMessages(struct epoll_event &event,
799                                ReportingMessages_t &message,
800                                RmrMessagesBuffer_t &rmrMessageBuffer,
801                                sctp_params_t *params) {
802     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
803     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
804
805     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
806     auto retVal = 0;
807     socklen_t retValLen = 0;
808     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
809     if (rc != 0 || retVal != 0) {
810         if (rc != 0) {
811             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
812                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
813                                                          peerInfo->enodbName, strerror(errno));
814         } else if (retVal != 0) {
815             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
816                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
817                                                          peerInfo->enodbName);
818         }
819
820         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
821         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
822         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
823         message.message.direction = 'N';
824         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
825             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
826         }
827         memset(peerInfo->asnData, 0, peerInfo->asnLength);
828         peerInfo->asnLength = 0;
829         peerInfo->mtype = 0;
830         return;
831     }
832
833     peerInfo->isConnected = true;
834
835     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
836                       peerInfo->mtype) != 0) {
837         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
838         return;
839     }
840
841     message.message.asndata = (unsigned char *)peerInfo->asnData;
842     message.message.asnLength = peerInfo->asnLength;
843     message.message.messageType = peerInfo->mtype;
844     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
845     num_of_messages.fetch_add(1, std::memory_order_release);
846     if (mdclog_level_get() >= MDCLOG_DEBUG) {
847         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
848                      message.message.enodbName);
849     }
850     if (sendSctpMsg(peerInfo, message, params->sctpMap) != 0) {
851         if (mdclog_level_get() >= MDCLOG_DEBUG) {
852             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
853         }
854         return;
855     }
856
857     memset(peerInfo->asnData, 0, peerInfo->asnLength);
858     peerInfo->asnLength = 0;
859     peerInfo->mtype = 0;
860 }
861
862
863 void handlepoll_error(struct epoll_event &event,
864                       ReportingMessages_t &message,
865                       RmrMessagesBuffer_t &rmrMessageBuffer,
866                       sctp_params_t *params) {
867     if (event.data.fd != params->rmrListenFd) {
868         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
869         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
870                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
871
872         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
873                                                      "%s|Failed SCTP Connection",
874                                                      peerInfo->enodbName);
875         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
876         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
877
878         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
879         message.message.direction = 'N';
880         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
881             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
882         }
883
884         close(peerInfo->fileDescriptor);
885         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
886     } else {
887         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
888     }
889 }
890 /**
891  *
892  * @param socket
893  * @return
894  */
895 int setSocketNoBlocking(int socket) {
896     auto flags = fcntl(socket, F_GETFL, 0);
897
898     if (flags == -1) {
899         mdclog_mdc_add("func", "fcntl");
900         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
901         mdclog_mdc_clean();
902         return -1;
903     }
904
905     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
906     if (fcntl(socket, F_SETFL, flags) == -1) {
907         mdclog_mdc_add("func", "fcntl");
908         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
909         mdclog_mdc_clean();
910         return -1;
911     }
912
913     return 0;
914 }
915
916 /**
917  *
918  * @param val
919  * @param m
920  */
921 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
922     char *dummy;
923     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
924     char searchBuff[2048]{};
925
926     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
927     m->erase(searchBuff);
928
929     m->erase(val->enodbName);
930     free(val);
931 }
932
933 /**
934  *
935  * @param fd file discriptor
936  * @param data the asn data to send
937  * @param len  length of the data
938  * @param enodbName the enodbName as in the map for printing purpose
939  * @param m map host information
940  * @param mtype message number
941  * @return 0 success, anegative number on fail
942  */
943 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
944     auto loglevel = mdclog_level_get();
945     int fd = peerInfo->fileDescriptor;
946     if (loglevel >= MDCLOG_DEBUG) {
947         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
948                      message.message.enodbName, __FUNCTION__);
949     }
950
951     while (true) {
952         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
953             if (errno == EINTR) {
954                 continue;
955             }
956             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
957             if (!peerInfo->isConnected) {
958                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
959                 return -1;
960             }
961             cleanHashEntry(peerInfo, m);
962             close(fd);
963             char key[MAX_ENODB_NAME_SIZE * 2];
964             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
965                      message.message.messageType);
966             if (loglevel >= MDCLOG_DEBUG) {
967                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
968             }
969             auto tmp = m->find(key);
970             if (tmp) {
971                 free(tmp);
972             }
973             m->erase(key);
974             return -1;
975         }
976         message.statCollector->incSentMessage(string(message.message.enodbName));
977         message.message.direction = 'D';
978         // send report.buffer of size
979         buildJsonMessage(message);
980
981         if (loglevel >= MDCLOG_DEBUG) {
982             mdclog_write(MDCLOG_DEBUG,
983                          "SCTP message for CU %s sent from %s",
984                          message.message.enodbName,
985                          __FUNCTION__);
986         }
987         return 0;
988     }
989 }
990
991 /**
992  *
993  * @param message
994  * @param rmrMessageBuffer
995  */
996 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
997     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *) (message.message.enodbName));
998
999     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1000     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1001
1002     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1003         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
1004                      message.message.enodbName, (unsigned long) message.message.asnLength);
1005     }
1006 }
1007
1008
1009
1010 /**
1011  *
1012  * @param events
1013  * @param sctpMap
1014  * @param numOfMessages
1015  * @param rmrMessageBuffer
1016  * @param ts
1017  * @return
1018  */
1019 int receiveDataFromSctp(struct epoll_event *events,
1020                         Sctp_Map_t *sctpMap,
1021                         int &numOfMessages,
1022                         RmrMessagesBuffer_t &rmrMessageBuffer,
1023                         struct timespec &ts) {
1024     /* We have data on the fd waiting to be read. Read and display it.
1025  * We must read whatever data is available completely, as we are running
1026  *  in edge-triggered mode and won't get a notification again for the same data. */
1027     ReportingMessages_t message {};
1028     auto done = 0;
1029     auto loglevel = mdclog_level_get();
1030
1031     // get the identity of the interface
1032     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1033
1034     message.statCollector = StatCollector::GetInstance();
1035     struct timespec start{0, 0};
1036     struct timespec decodestart{0, 0};
1037     struct timespec end{0, 0};
1038
1039     E2AP_PDU_t *pdu = nullptr;
1040
1041
1042     while (true) {
1043         if (loglevel >= MDCLOG_DEBUG) {
1044             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1045             clock_gettime(CLOCK_MONOTONIC, &start);
1046         }
1047         // read the buffer directly to rmr payload
1048         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1049         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1050                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1051
1052         if (loglevel >= MDCLOG_DEBUG) {
1053             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1054                          message.peerInfo->fileDescriptor, message.message.asnLength);
1055         }
1056
1057         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1058         message.statCollector->incRecvMessage(string(message.message.enodbName));
1059         message.message.direction = 'U';
1060         message.message.time.tv_nsec = ts.tv_nsec;
1061         message.message.time.tv_sec = ts.tv_sec;
1062
1063         if (message.message.asnLength < 0) {
1064             if (errno == EINTR) {
1065                 continue;
1066             }
1067             /* If errno == EAGAIN, that means we have read all
1068                data. So goReportingMessages_t back to the main loop. */
1069             if (errno != EAGAIN) {
1070                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1071                 done = 1;
1072             } else if (loglevel >= MDCLOG_DEBUG) {
1073                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1074             }
1075             break;
1076         } else if (message.message.asnLength == 0) {
1077             /* End of file. The remote has closed the connection. */
1078             if (loglevel >= MDCLOG_INFO) {
1079                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1080                              message.peerInfo->fileDescriptor);
1081             }
1082             done = 1;
1083             break;
1084         }
1085
1086         asn_dec_rval_t rval;
1087         if (loglevel >= MDCLOG_DEBUG) {
1088             char printBuffer[4096]{};
1089             char *tmp = printBuffer;
1090             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1091                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1092                 tmp += 2;
1093             }
1094             printBuffer[message.message.asnLength] = 0;
1095             clock_gettime(CLOCK_MONOTONIC, &end);
1096             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1097                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1098             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1099                          printBuffer);
1100             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1101         }
1102
1103         rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1104                           message.message.asndata, message.message.asnLength);
1105         if (rval.code != RC_OK) {
1106             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1107                          message.peerInfo->enodbName);
1108             //todo may need reset to pdu
1109             break;
1110         }
1111
1112         if (loglevel >= MDCLOG_DEBUG) {
1113             clock_gettime(CLOCK_MONOTONIC, &end);
1114             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1115                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1116             char *printBuffer;
1117             size_t size;
1118             FILE *stream = open_memstream(&printBuffer, &size);
1119             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1120             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1121             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1122         }
1123
1124         switch (pdu->present) {
1125             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1126                 asnInitiatingRequest(pdu, message, rmrMessageBuffer);
1127                 break;
1128             }
1129             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1130                 asnSuccsesfulMsg(pdu, message,  rmrMessageBuffer);
1131                 break;
1132             }
1133             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1134                 asnUnSuccsesfulMsg(pdu, message, rmrMessageBuffer);
1135                 break;
1136             }
1137             default:
1138                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1139                 break;
1140         }
1141         if (loglevel >= MDCLOG_DEBUG) {
1142             clock_gettime(CLOCK_MONOTONIC, &end);
1143             mdclog_write(MDCLOG_DEBUG,
1144                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1145                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1146         }
1147         numOfMessages++;
1148         // remove the break for EAGAIN
1149         //break;
1150         if (pdu != nullptr) {
1151             //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
1152             ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1153             //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1154             //pdu = nullptr;
1155         }
1156         //clock_gettime(CLOCK_MONOTONIC, &start);
1157     }
1158     // in case of break to avoid memory leak
1159 //    if (pdu != nullptr) {
1160 //        //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1161 //        ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1162 //        //pdu = nullptr;
1163 //    }
1164
1165     if (done) {
1166         if (loglevel >= MDCLOG_INFO) {
1167             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1168         }
1169         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1170                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1171                          256,
1172                          "%s|CU disconnected unexpectedly",
1173                          message.peerInfo->enodbName);
1174         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1175
1176         if (sendRequestToXapp(message,
1177                               RIC_SCTP_CONNECTION_FAILURE,
1178                               rmrMessageBuffer) != 0) {
1179             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1180         }
1181
1182         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1183         close(message.peerInfo->fileDescriptor);
1184         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
1185     }
1186     if (loglevel >= MDCLOG_DEBUG) {
1187         clock_gettime(CLOCK_MONOTONIC, &end);
1188         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1189                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1190
1191     }
1192     return 0;
1193 }
1194
1195 static void buildAndsendSetupRequest(ReportingMessages_t &message,
1196                                      E2setupRequestIEs_t *ie,
1197                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1198                                      E2AP_PDU_t *pdu) {
1199     auto logLevel = mdclog_level_get();
1200
1201
1202     if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1203         mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1204     } else {
1205         memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1206     }
1207     // now we can send the data to e2Mgr
1208     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1209
1210     auto *rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size);
1211     // add addrees to message
1212
1213
1214     // unsigned char *buffer = &rmrMsg->payload[j];
1215     unsigned char buffer[RECEIVE_SCTP_BUFFER_SIZE * 2];
1216     // encode to xml
1217     auto er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size);
1218     if (er.encoded == -1) {
1219         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1220     } else if (er.encoded > (ssize_t) buffer_size) {
1221         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s",
1222                      (int) buffer_size,
1223                      asn_DEF_E2AP_PDU.name);
1224     } else {
1225         rmrMsg->len = snprintf((char *)rmrMsg->payload, RECEIVE_SCTP_BUFFER_SIZE * 2, "%s:%d|%s",
1226                                message.peerInfo->sctpParams->myIP.c_str(),
1227                                message.peerInfo->sctpParams->rmrPort,
1228                                buffer);
1229         if (logLevel >= MDCLOG_DEBUG) {
1230             mdclog_write(MDCLOG_DEBUG, "Setup request of size %d :\n %s\n", rmrMsg->len, rmrMsg->payload);
1231         }
1232         // send to RMR
1233         message.message.messageType = rmrMsg->mtype = RIC_E2_SETUP_REQ;
1234         rmrMsg->state = 0;
1235         rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1236
1237         static unsigned char tx[32];
1238         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1239         rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1240
1241         rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1242         if (rmrMsg == nullptr) {
1243             mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1244         } else if (rmrMsg->state != 0) {
1245             char meid[RMR_MAX_MEID]{};
1246             if (rmrMsg->state == RMR_ERR_RETRY) {
1247                 usleep(5);
1248                 rmrMsg->state = 0;
1249                 mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1250                              rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1251                 rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1252                 if (rmrMsg == nullptr) {
1253                     mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1254                 } else if (rmrMsg->state != 0) {
1255                     mdclog_write(MDCLOG_ERR,
1256                                  "RMR Retry failed %s sending request %d to Xapp from %s",
1257                                  translateRmrErrorMessages(rmrMsg->state).c_str(),
1258                                  rmrMsg->mtype,
1259                                  rmr_get_meid(rmrMsg, (unsigned char *) meid));
1260                 }
1261             } else {
1262                 mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1263                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1264                              rmrMsg->mtype,
1265                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1266             }
1267         }
1268         message.peerInfo->gotSetup = true;
1269         buildJsonMessage(message);
1270         if (rmrMsg != nullptr) {
1271             rmr_free_msg(rmrMsg);
1272         }
1273     }
1274
1275 }
1276 /**
1277  *
1278  * @param pdu
1279  * @param message
1280  * @param rmrMessageBuffer
1281  */
1282 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1283                           ReportingMessages_t &message,
1284                           RmrMessagesBuffer_t &rmrMessageBuffer) {
1285     auto logLevel = mdclog_level_get();
1286     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1287     if (logLevel >= MDCLOG_DEBUG) {
1288         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1289     }
1290     switch (procedureCode) {
1291         case ProcedureCode_id_E2setup: {
1292             if (logLevel >= MDCLOG_DEBUG) {
1293                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1294             }
1295
1296             memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1297             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1298                 auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1299                 if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1300                     if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1301                         buildAndsendSetupRequest(message, ie, rmrMessageBuffer, pdu);
1302                         break;
1303                     }
1304                 }
1305             }
1306             break;
1307         }
1308         case ProcedureCode_id_ErrorIndication: {
1309             if (logLevel >= MDCLOG_DEBUG) {
1310                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1311             }
1312             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1313                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1314             }
1315             break;
1316         }
1317         case ProcedureCode_id_Reset: {
1318             if (logLevel >= MDCLOG_DEBUG) {
1319                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1320             }
1321             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1322                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1323             }
1324             break;
1325         }
1326         case ProcedureCode_id_RICcontrol: {
1327             if (logLevel >= MDCLOG_DEBUG) {
1328                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1329             }
1330             break;
1331         }
1332         case ProcedureCode_id_RICindication: {
1333             if (logLevel >= MDCLOG_DEBUG) {
1334                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1335             }
1336             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1337                 auto messageSent = false;
1338                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1339                 if (logLevel >= MDCLOG_DEBUG) {
1340                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1341                 }
1342                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1343                     if (logLevel >= MDCLOG_DEBUG) {
1344                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1345                     }
1346                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1347                         static unsigned char tx[32];
1348                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1349                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1350                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1351                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1352                                        (unsigned char *)message.message.enodbName,
1353                                        strlen(message.message.enodbName));
1354                         rmrMessageBuffer.sendMessage->state = 0;
1355                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1356                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1357                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1358                                          rmrMessageBuffer.sendMessage->sub_id,
1359                                          rmrMessageBuffer.sendMessage->mtype);
1360                         }
1361                         sendRmrMessage(rmrMessageBuffer, message);
1362                         messageSent = true;
1363                     } else {
1364                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1365                     }
1366                 }
1367                 if (messageSent) {
1368                     break;
1369                 }
1370             }
1371             break;
1372         }
1373         case ProcedureCode_id_RICserviceQuery: {
1374             if (logLevel >= MDCLOG_DEBUG) {
1375                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1376             }
1377             break;
1378         }
1379         case ProcedureCode_id_RICserviceUpdate: {
1380             if (logLevel >= MDCLOG_DEBUG) {
1381                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1382             }
1383             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1384                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1385             }
1386             break;
1387         }
1388         case ProcedureCode_id_RICsubscription: {
1389             if (logLevel >= MDCLOG_DEBUG) {
1390                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1391             }
1392             break;
1393         }
1394         case ProcedureCode_id_RICsubscriptionDelete: {
1395             if (logLevel >= MDCLOG_DEBUG) {
1396                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1397             }
1398             break;
1399         }
1400         default: {
1401             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1402             message.message.messageType = 0; // no RMR message type yet
1403
1404             buildJsonMessage(message);
1405
1406             break;
1407         }
1408     }
1409 }
1410
1411 /**
1412  *
1413  * @param pdu
1414  * @param message
1415  * @param rmrMessageBuffer
1416  */
1417 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1418     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1419     auto logLevel = mdclog_level_get();
1420     if (logLevel >= MDCLOG_INFO) {
1421         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1422     }
1423     switch (procedureCode) {
1424         case ProcedureCode_id_E2setup: {
1425             if (logLevel >= MDCLOG_DEBUG) {
1426                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1427             }
1428             break;
1429         }
1430         case ProcedureCode_id_ErrorIndication: {
1431             if (logLevel >= MDCLOG_DEBUG) {
1432                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1433             }
1434             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1435                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1436             }
1437             break;
1438         }
1439         case ProcedureCode_id_Reset: {
1440             if (logLevel >= MDCLOG_DEBUG) {
1441                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1442             }
1443             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1444                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1445             }
1446             break;
1447         }
1448         case ProcedureCode_id_RICcontrol: {
1449             if (logLevel >= MDCLOG_DEBUG) {
1450                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1451             }
1452             for (auto i = 0;
1453                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1454                 auto messageSent = false;
1455                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1456                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1457                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1458                 }
1459                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1460                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1461                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1462                     }
1463                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1464                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1465                         rmrMessageBuffer.sendMessage->state = 0;
1466                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1467                         static unsigned char tx[32];
1468                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1469                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1470                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1471                                        (unsigned char *)message.message.enodbName,
1472                                        strlen(message.message.enodbName));
1473
1474                         sendRmrMessage(rmrMessageBuffer, message);
1475                         messageSent = true;
1476                     } else {
1477                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1478                     }
1479                 }
1480                 if (messageSent) {
1481                     break;
1482                 }
1483             }
1484
1485             break;
1486         }
1487         case ProcedureCode_id_RICindication: {
1488             if (logLevel >= MDCLOG_DEBUG) {
1489                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1490             }
1491             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1492                 auto messageSent = false;
1493                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1494                 if (logLevel >= MDCLOG_DEBUG) {
1495                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1496                 }
1497                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1498                     if (logLevel >= MDCLOG_DEBUG) {
1499                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1500                     }
1501                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1502                         static unsigned char tx[32];
1503                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1504                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1505                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1506                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1507                                        (unsigned char *)message.message.enodbName,
1508                                        strlen(message.message.enodbName));
1509                         rmrMessageBuffer.sendMessage->state = 0;
1510                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1511                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1512                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1513                                          rmrMessageBuffer.sendMessage->sub_id,
1514                                          rmrMessageBuffer.sendMessage->mtype);
1515                         }
1516                         sendRmrMessage(rmrMessageBuffer, message);
1517                         messageSent = true;
1518                     } else {
1519                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1520                     }
1521                 }
1522                 if (messageSent) {
1523                     break;
1524                 }
1525             }
1526             break;
1527         }
1528         case ProcedureCode_id_RICserviceQuery: {
1529             if (logLevel >= MDCLOG_DEBUG) {
1530                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1531             }
1532             break;
1533         }
1534         case ProcedureCode_id_RICserviceUpdate: {
1535             if (logLevel >= MDCLOG_DEBUG) {
1536                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1537             }
1538             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1539                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1540             }
1541             break;
1542         }
1543         case ProcedureCode_id_RICsubscription: {
1544             if (logLevel >= MDCLOG_DEBUG) {
1545                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1546             }
1547             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
1548                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1549             }
1550             break;
1551         }
1552         case ProcedureCode_id_RICsubscriptionDelete: {
1553             if (logLevel >= MDCLOG_DEBUG) {
1554                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1555             }
1556             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
1557                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1558             }
1559             break;
1560         }
1561         default: {
1562             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1563             message.message.messageType = 0; // no RMR message type yet
1564             buildJsonMessage(message);
1565
1566             break;
1567         }
1568     }
1569 }
1570
1571 /**
1572  *
1573  * @param pdu
1574  * @param message
1575  * @param rmrMessageBuffer
1576  */
1577 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1578                         ReportingMessages_t &message,
1579                         RmrMessagesBuffer_t &rmrMessageBuffer) {
1580     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1581     auto logLevel = mdclog_level_get();
1582     if (logLevel >= MDCLOG_INFO) {
1583         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1584     }
1585     switch (procedureCode) {
1586         case ProcedureCode_id_E2setup: {
1587             if (logLevel >= MDCLOG_DEBUG) {
1588                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1589             }
1590             break;
1591         }
1592         case ProcedureCode_id_ErrorIndication: {
1593             if (logLevel >= MDCLOG_DEBUG) {
1594                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1595             }
1596             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1597                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1598             }
1599             break;
1600         }
1601         case ProcedureCode_id_Reset: {
1602             if (logLevel >= MDCLOG_DEBUG) {
1603                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1604             }
1605             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1606                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1607             }
1608             break;
1609         }
1610         case ProcedureCode_id_RICcontrol: {
1611             if (logLevel >= MDCLOG_DEBUG) {
1612                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1613             }
1614             for (int i = 0;
1615                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1616                 auto messageSent = false;
1617                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1618                 if (logLevel >= MDCLOG_DEBUG) {
1619                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1620                 }
1621                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1622                     if (logLevel >= MDCLOG_DEBUG) {
1623                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1624                     }
1625                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1626                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1627                         rmrMessageBuffer.sendMessage->state = 0;
1628                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1629                         static unsigned char tx[32];
1630                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1631                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1632                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
1633                                        strlen(message.message.enodbName));
1634                         sendRmrMessage(rmrMessageBuffer, message);
1635                         messageSent = true;
1636                     } else {
1637                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1638                     }
1639                 }
1640                 if (messageSent) {
1641                     break;
1642                 }
1643             }
1644             break;
1645         }
1646         case ProcedureCode_id_RICindication: {
1647             if (logLevel >= MDCLOG_DEBUG) {
1648                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1649             }
1650             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1651                 auto messageSent = false;
1652                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1653                 if (logLevel >= MDCLOG_DEBUG) {
1654                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1655                 }
1656                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1657                     if (logLevel >= MDCLOG_DEBUG) {
1658                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1659                     }
1660                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1661                         static unsigned char tx[32];
1662                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1663                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1664                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1665                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1666                                        (unsigned char *)message.message.enodbName,
1667                                        strlen(message.message.enodbName));
1668                         rmrMessageBuffer.sendMessage->state = 0;
1669                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1670                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1671                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1672                                          rmrMessageBuffer.sendMessage->sub_id,
1673                                          rmrMessageBuffer.sendMessage->mtype);
1674                         }
1675                         sendRmrMessage(rmrMessageBuffer, message);
1676                         messageSent = true;
1677                     } else {
1678                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1679                     }
1680                 }
1681                 if (messageSent) {
1682                     break;
1683                 }
1684             }
1685             break;
1686         }
1687         case ProcedureCode_id_RICserviceQuery: {
1688             if (logLevel >= MDCLOG_DEBUG) {
1689                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1690             }
1691             break;
1692         }
1693         case ProcedureCode_id_RICserviceUpdate: {
1694             if (logLevel >= MDCLOG_DEBUG) {
1695                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1696             }
1697             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1698                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1699             }
1700             break;
1701         }
1702         case ProcedureCode_id_RICsubscription: {
1703             if (logLevel >= MDCLOG_DEBUG) {
1704                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1705             }
1706             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1707                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1708             }
1709             break;
1710         }
1711         case ProcedureCode_id_RICsubscriptionDelete: {
1712             if (logLevel >= MDCLOG_DEBUG) {
1713                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1714             }
1715             if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer) != 0) {
1716                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1717             }
1718             break;
1719         }
1720         default: {
1721             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1722             message.message.messageType = 0; // no RMR message type yet
1723
1724             buildJsonMessage(message);
1725
1726             break;
1727         }
1728     }
1729 }
1730
1731 /**
1732  *
1733  * @param message
1734  * @param requestId
1735  * @param rmrMmessageBuffer
1736  * @return
1737  */
1738 int sendRequestToXapp(ReportingMessages_t &message,
1739                       int requestId,
1740                       RmrMessagesBuffer_t &rmrMmessageBuffer) {
1741     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1742                    (unsigned char *)message.message.enodbName,
1743                    strlen(message.message.enodbName));
1744     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1745     rmrMmessageBuffer.sendMessage->state = 0;
1746     static unsigned char tx[32];
1747     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1748     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1749
1750     auto rc = sendRmrMessage(rmrMmessageBuffer, message);
1751     return rc;
1752 }
1753
1754
1755 void getRmrContext(sctp_params_t &pSctpParams) {
1756     pSctpParams.rmrCtx = nullptr;
1757     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1758     if (pSctpParams.rmrCtx == nullptr) {
1759         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1760         return;
1761     }
1762
1763     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1764     // we need to find that routing table exist and we can run
1765     if (mdclog_level_get() >= MDCLOG_INFO) {
1766         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1767     }
1768     int rmrReady = 0;
1769     int count = 0;
1770     while (!rmrReady) {
1771         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1772             sleep(1);
1773         }
1774         count++;
1775         if (count % 60 == 0) {
1776             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1777         }
1778     }
1779     if (mdclog_level_get() >= MDCLOG_INFO) {
1780         mdclog_write(MDCLOG_INFO, "RMR running");
1781     }
1782     rmr_init_trace(pSctpParams.rmrCtx, 200);
1783     // get the RMR fd for the epoll
1784     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1785     struct epoll_event event{};
1786     // add RMR fd to epoll
1787     event.events = (EPOLLIN);
1788     event.data.fd = pSctpParams.rmrListenFd;
1789     // add listening RMR FD to epoll
1790     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
1791         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
1792         close(pSctpParams.rmrListenFd);
1793         rmr_close(pSctpParams.rmrCtx);
1794         pSctpParams.rmrCtx = nullptr;
1795     }
1796 }
1797
1798 int BuildPERSetupResponseMessaeFromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1799     E2AP_PDU_t *pdu;
1800     auto rval = asn_decode(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1801                            rmrMessageBuffer.rcvMessage->payload, rmrMessageBuffer.rcvMessage->len);
1802     if (rval.code != RC_OK) {
1803         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from E2MGR : %s",
1804                      rval.code,
1805                      message.message.enodbName);
1806         return -1;
1807     }
1808
1809     auto er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu,
1810                                    rmrMessageBuffer.rcvMessage->payload, rmrMessageBuffer.rcvMessage->len);
1811     if (er.encoded == -1) {
1812         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1813         return -1;
1814     } else if (er.encoded > (ssize_t)rmrMessageBuffer.rcvMessage->len) {
1815         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s",
1816                      (int)rmrMessageBuffer.rcvMessage->len,
1817                      asn_DEF_E2AP_PDU.name);
1818         return -1;
1819     }
1820     return 0;
1821 }
1822
1823 /**
1824  *
1825  * @param sctpMap
1826  * @param rmrMessageBuffer
1827  * @param ts
1828  * @return
1829  */
1830 int receiveXappMessages(Sctp_Map_t *sctpMap,
1831                         RmrMessagesBuffer_t &rmrMessageBuffer,
1832                         struct timespec &ts) {
1833     if (rmrMessageBuffer.rcvMessage == nullptr) {
1834         //we have error
1835         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1836         return -1;
1837     }
1838
1839     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1840         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1841     }
1842     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1843     if (rmrMessageBuffer.rcvMessage == nullptr) {
1844         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1845         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1846         return -2;
1847     }
1848     ReportingMessages_t message;
1849     message.message.direction = 'D';
1850     message.message.time.tv_nsec = ts.tv_nsec;
1851     message.message.time.tv_sec = ts.tv_sec;
1852
1853     // get message payload
1854     //auto msgData = msg->payload;
1855     if (rmrMessageBuffer.rcvMessage->state != 0) {
1856         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1857         return -1;
1858     }
1859     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName);
1860     switch (rmrMessageBuffer.rcvMessage->mtype) {
1861         case RIC_E2_SETUP_RESP : {
1862             if (BuildPERSetupResponseMessaeFromXML(message, rmrMessageBuffer) != 0) {
1863                 break;
1864             }
1865
1866             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1867                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
1868                 return -6;
1869             }
1870             break;
1871         }
1872         case RIC_E2_SETUP_FAILURE : {
1873             if (BuildPERSetupResponseMessaeFromXML(message, rmrMessageBuffer) != 0) {
1874                 break;
1875             }
1876             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1877                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
1878                 return -6;
1879             }
1880             break;
1881         }
1882         case RIC_ERROR_INDICATION: {
1883             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1884                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
1885                 return -6;
1886             }
1887             break;
1888         }
1889         case RIC_SUB_REQ: {
1890             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1891                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
1892                 return -6;
1893             }
1894             break;
1895         }
1896         case RIC_SUB_DEL_REQ: {
1897             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1898                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
1899                 return -6;
1900             }
1901             break;
1902         }
1903         case RIC_CONTROL_REQ: {
1904             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1905                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
1906                 return -6;
1907             }
1908             break;
1909         }
1910         case RIC_SERVICE_QUERY: {
1911             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1912                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
1913                 return -6;
1914             }
1915             break;
1916         }
1917         case RIC_SERVICE_UPDATE_ACK: {
1918             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1919                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
1920                 return -6;
1921             }
1922             break;
1923         }
1924         case RIC_SERVICE_UPDATE_FAILURE: {
1925             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1926                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
1927                 return -6;
1928             }
1929             break;
1930         }
1931         case RIC_X2_RESET: {
1932             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1933                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
1934                 return -6;
1935             }
1936             break;
1937         }
1938         case RIC_X2_RESET_RESP: {
1939             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1940                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
1941                 return -6;
1942             }
1943             break;
1944         }
1945         case RIC_SCTP_CLEAR_ALL: {
1946             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
1947             // loop on all keys and close socket and then erase all map.
1948             vector<char *> v;
1949             sctpMap->getKeys(v);
1950             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
1951                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
1952                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
1953                     if (peerInfo == nullptr) {
1954                         continue;
1955                     }
1956                     close(peerInfo->fileDescriptor);
1957                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1958                     message.message.direction = 'D';
1959                     message.message.time.tv_nsec = ts.tv_nsec;
1960                     message.message.time.tv_sec = ts.tv_sec;
1961
1962                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1963                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1964                                      256,
1965                                      "%s|RIC_SCTP_CLEAR_ALL",
1966                                      peerInfo->enodbName);
1967                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1968                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
1969                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
1970                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1971                     }
1972                     free(peerInfo);
1973                 }
1974             }
1975
1976             sleep(1);
1977             sctpMap->clear();
1978             break;
1979         }
1980         case E2_TERM_KEEP_ALIVE_REQ: {
1981             // send message back
1982             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
1983                               (unsigned char *)rmrMessageBuffer.ka_message,
1984                               rmrMessageBuffer.ka_message_len);
1985             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
1986             rmrMessageBuffer.sendMessage->state = 0;
1987             static unsigned char tx[32];
1988             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1989             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
1990             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
1991             if (rmrMessageBuffer.sendMessage == nullptr) {
1992                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1993                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
1994             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
1995                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
1996                              rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
1997             } else if (mdclog_level_get() >= MDCLOG_DEBUG) {
1998                 mdclog_write(MDCLOG_DEBUG, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
1999             }
2000
2001             break;
2002         }
2003         default:
2004             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
2005             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2006             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2007             message.message.time.tv_nsec = ts.tv_nsec;
2008             message.message.time.tv_sec = ts.tv_sec;
2009             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2010
2011             buildJsonMessage(message);
2012
2013
2014             return -7;
2015     }
2016     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2017         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2018     }
2019     return 0;
2020 }
2021
2022 /**
2023  * Send message to the CU that is not expecting for successful or unsuccessful results
2024  * @param messageBuffer
2025  * @param message
2026  * @param failedMsgId
2027  * @param sctpMap
2028  * @return
2029  */
2030 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2031                            ReportingMessages_t &message,
2032                            int failedMsgId,
2033                            Sctp_Map_t *sctpMap) {
2034
2035     getRequestMetaData(message, messageBuffer);
2036     if (mdclog_level_get() >= MDCLOG_INFO) {
2037         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2038     }
2039
2040     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId);
2041     return rc;
2042 }
2043
2044 /**
2045  *
2046  * @param sctpMap
2047  * @param messageBuffer
2048  * @param message
2049  * @param failedMesgId
2050  * @return
2051  */
2052 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2053                     RmrMessagesBuffer_t &messageBuffer,
2054                     ReportingMessages_t &message,
2055                     int failedMesgId) {
2056     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2057     if (peerInfo == nullptr) {
2058         if (failedMesgId != 0) {
2059             sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId);
2060         } else {
2061             mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2062         }
2063         return -1;
2064     }
2065
2066     // get the FD
2067     message.message.messageType = messageBuffer.rcvMessage->mtype;
2068     auto rc = sendSctpMsg(peerInfo, message, sctpMap);
2069     return rc;
2070 }
2071
2072 /**
2073  *
2074  * @param rmrCtx the rmr context to send and receive
2075  * @param msg the msg we got fromxApp
2076  * @param metaData data from xApp in ordered struct
2077  * @param failedMesgId the return message type error
2078  */
2079 void
2080 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId) {
2081     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
2082     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
2083                         message.message.enodbName);
2084     if (mdclog_level_get() >= MDCLOG_INFO) {
2085         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
2086     }
2087     msg->mtype = failedMesgId;
2088     msg->state = 0;
2089
2090     static unsigned char tx[32];
2091     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2092     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
2093
2094     sendRmrMessage(rmrMessageBuffer, message);
2095 }
2096
2097
2098
2099 /**
2100  *
2101  * @param epoll_fd
2102  * @param peerInfo
2103  * @param events
2104  * @param sctpMap
2105  * @param enodbName
2106  * @param msgType
2107  * @return
2108  */
2109 int addToEpoll(int epoll_fd,
2110                ConnectedCU_t *peerInfo,
2111                uint32_t events,
2112                Sctp_Map_t *sctpMap,
2113                char *enodbName,
2114                int msgType) {
2115     // Add to Epol
2116     struct epoll_event event{};
2117     event.data.ptr = peerInfo;
2118     event.events = events;
2119     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2120         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2121             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2122                          strerror(errno), __func__, __LINE__);
2123         }
2124         close(peerInfo->fileDescriptor);
2125         if (enodbName != nullptr) {
2126             cleanHashEntry(peerInfo, sctpMap);
2127             char key[MAX_ENODB_NAME_SIZE * 2];
2128             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2129             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2130                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2131             }
2132             auto tmp = sctpMap->find(key);
2133             if (tmp) {
2134                 free(tmp);
2135                 sctpMap->erase(key);
2136             }
2137         } else {
2138             peerInfo->enodbName[0] = 0;
2139         }
2140         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2141         return -1;
2142     }
2143     return 0;
2144 }
2145
2146 /**
2147  *
2148  * @param epoll_fd
2149  * @param peerInfo
2150  * @param events
2151  * @param sctpMap
2152  * @param enodbName
2153  * @param msgType
2154  * @return
2155  */
2156 int modifyToEpoll(int epoll_fd,
2157                   ConnectedCU_t *peerInfo,
2158                   uint32_t events,
2159                   Sctp_Map_t *sctpMap,
2160                   char *enodbName,
2161                   int msgType) {
2162     // Add to Epol
2163     struct epoll_event event{};
2164     event.data.ptr = peerInfo;
2165     event.events = events;
2166     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2167         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2168             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2169                          strerror(errno), __func__, __LINE__);
2170         }
2171         close(peerInfo->fileDescriptor);
2172         cleanHashEntry(peerInfo, sctpMap);
2173         char key[MAX_ENODB_NAME_SIZE * 2];
2174         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2175         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2176             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2177         }
2178         auto tmp = sctpMap->find(key);
2179         if (tmp) {
2180             free(tmp);
2181         }
2182         sctpMap->erase(key);
2183         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2184         return -1;
2185     }
2186     return 0;
2187 }
2188
2189
2190 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message) {
2191     buildJsonMessage(message);
2192
2193     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2194
2195     if (rmrMessageBuffer.sendMessage == nullptr) {
2196         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2197         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2198         return -1;
2199     }
2200
2201     if (rmrMessageBuffer.sendMessage->state != 0) {
2202         char meid[RMR_MAX_MEID]{};
2203         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2204             usleep(5);
2205             rmrMessageBuffer.sendMessage->state = 0;
2206             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2207                          rmrMessageBuffer.sendMessage->mtype,
2208                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2209             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2210             if (rmrMessageBuffer.sendMessage == nullptr) {
2211                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2212                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2213                 return -1;
2214             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2215                 mdclog_write(MDCLOG_ERR,
2216                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2217                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2218                              rmrMessageBuffer.sendMessage->mtype,
2219                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2220                 auto rc = rmrMessageBuffer.sendMessage->state;
2221                 return rc;
2222             }
2223         } else {
2224             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2225                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2226                          rmrMessageBuffer.sendMessage->mtype,
2227                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2228             return rmrMessageBuffer.sendMessage->state;
2229         }
2230     }
2231     return 0;
2232 }
2233
2234 void buildJsonMessage(ReportingMessages_t &message) {
2235     if (jsonTrace) {
2236         message.outLen = sizeof(message.base64Data);
2237         base64::encode((const unsigned char *) message.message.asndata,
2238                        (const int) message.message.asnLength,
2239                        message.base64Data,
2240                        message.outLen);
2241         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2242             mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2243                          (int) message.message.asnLength,
2244                          (int) message.outLen);
2245         }
2246
2247         snprintf(message.buffer, sizeof(message.buffer),
2248                  "{\"header\": {\"ts\": \"%ld.%09ld\","
2249                  "\"ranName\": \"%s\","
2250                  "\"messageType\": %d,"
2251                  "\"direction\": \"%c\"},"
2252                  "\"base64Length\": %d,"
2253                  "\"asnBase64\": \"%s\"}",
2254                  message.message.time.tv_sec,
2255                  message.message.time.tv_nsec,
2256                  message.message.enodbName,
2257                  message.message.messageType,
2258                  message.message.direction,
2259                  (int) message.outLen,
2260                  message.base64Data);
2261         static src::logger_mt &lg = my_logger::get();
2262
2263         BOOST_LOG(lg) << message.buffer;
2264     }
2265 }
2266
2267
2268 /**
2269  * take RMR error code to string
2270  * @param state
2271  * @return
2272  */
2273 string translateRmrErrorMessages(int state) {
2274     string str = {};
2275     switch (state) {
2276         case RMR_OK:
2277             str = "RMR_OK - state is good";
2278             break;
2279         case RMR_ERR_BADARG:
2280             str = "RMR_ERR_BADARG - argument passd to function was unusable";
2281             break;
2282         case RMR_ERR_NOENDPT:
2283             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2284             break;
2285         case RMR_ERR_EMPTY:
2286             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2287             break;
2288         case RMR_ERR_NOHDR:
2289             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2290             break;
2291         case RMR_ERR_SENDFAILED:
2292             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2293             break;
2294         case RMR_ERR_CALLFAILED:
2295             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2296             break;
2297         case RMR_ERR_NOWHOPEN:
2298             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2299             break;
2300         case RMR_ERR_WHID:
2301             str = "RMR_ERR_WHID - wormhole id was invalid";
2302             break;
2303         case RMR_ERR_OVERFLOW:
2304             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2305             break;
2306         case RMR_ERR_RETRY:
2307             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2308             break;
2309         case RMR_ERR_RCVFAILED:
2310             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2311             break;
2312         case RMR_ERR_TIMEOUT:
2313             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2314             break;
2315         case RMR_ERR_UNSET:
2316             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2317             break;
2318         case RMR_ERR_TRUNC:
2319             str = "RMR_ERR_TRUNC - received message likely truncated";
2320             break;
2321         case RMR_ERR_INITFAILED:
2322             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2323             break;
2324         case RMR_ERR_NOTSUPP:
2325             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2326             break;
2327         default:
2328             char buf[128]{};
2329             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2330             str = buf;
2331             break;
2332     }
2333     return str;
2334 }
2335
2336