version 4.0.2
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include "sctpThread.h"
24 #include "BuildRunName.h"
25
26 using namespace std;
27 //using namespace std::placeholders;
28 using namespace boost::filesystem;
29
30 //#ifdef __cplusplus
31 //extern "C"
32 //{
33 //#endif
34
35 // need to expose without the include of gcov
36 extern "C" void __gcov_flush(void);
37
38 static void catch_function(int signal) {
39     __gcov_flush();
40     exit(signal);
41 }
42
43
44 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
45
46 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
47 double cpuClock = 0.0;
48 bool jsonTrace = true;
49
50 void init_log() {
51     mdclog_attr_t *attr;
52     mdclog_attr_init(&attr);
53     mdclog_attr_set_ident(attr, "E2Terminator");
54     mdclog_init(attr);
55     mdclog_attr_destroy(attr);
56 }
57 auto start_time = std::chrono::high_resolution_clock::now();
58 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
59
60 double age() {
61     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
62 }
63
64 double approx_CPU_MHz(unsigned sleeptime) {
65     using namespace std::chrono_literals;
66     uint32_t aux = 0;
67     uint64_t cycles_start = rdtscp(aux);
68     double time_start = age();
69     std::this_thread::sleep_for(sleeptime * 1ms);
70     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
71     double elapsed_time = age() - time_start;
72     return elapsed_cycles / elapsed_time;
73 }
74
75 //std::atomic<int64_t> rmrCounter{0};
76 std::atomic<int64_t> num_of_messages{0};
77 std::atomic<int64_t> num_of_XAPP_messages{0};
78 static long transactionCounter = 0;
79
80 int buildListeningPort(sctp_params_t &sctpParams) {
81     sctpParams.listenFD = socket (AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
82     struct sockaddr_in6 servaddr {};
83     servaddr.sin6_family = AF_INET6;
84     servaddr.sin6_addr   = in6addr_any;
85     servaddr.sin6_port = htons(sctpParams.sctpPort);
86     if (bind(sctpParams.listenFD, (SA *)&servaddr, sizeof(servaddr)) < 0 ) {
87         mdclog_write(MDCLOG_ERR, "Error binding. %s\n", strerror(errno));
88         return -1;
89     }
90     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
91         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
92         return -1;
93     }
94     if (mdclog_level_get() >= MDCLOG_DEBUG) {
95         struct sockaddr_in6 cliaddr {};
96         socklen_t len = sizeof(cliaddr);
97         getsockname(sctpParams.listenFD, (SA *)&cliaddr, &len);
98         char buff[1024] {};
99         inet_ntop(AF_INET6, &cliaddr.sin6_addr, buff, sizeof(buff));
100         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(cliaddr.sin6_port));
101     }
102
103     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
104         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
105         return -1;
106     }
107     struct epoll_event event {};
108     event.events = EPOLLIN | EPOLLET;
109     event.data.fd = sctpParams.listenFD;
110
111     // add listening port to epoll
112     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
113         printf("Failed to add descriptor to epoll\n");
114         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
115         return -1;
116     }
117
118     return 0;
119 }
120
121 int buildConfiguration(sctp_params_t &sctpParams) {
122     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
123     if (exists(p)) {
124         const int size = 2048;
125         auto fileSize = file_size(p);
126         if (fileSize > size) {
127             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
128             return -1;
129         }
130     } else {
131         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
132         return -1;
133     }
134
135     ReadConfigFile conf;
136     if (conf.openConfigFile(p.string()) == -1) {
137         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
138                      p.string().c_str(), strerror(errno));
139         return -1;
140     }
141     int rmrPort = conf.getIntValue("nano");
142     if (rmrPort == -1) {
143         mdclog_write(MDCLOG_ERR, "illigal RMR port ");
144         return -1;
145     }
146     sctpParams.rmrPort = (uint16_t)rmrPort;
147     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
148
149     auto tmpStr = conf.getStringValue("loglevel");
150     if (tmpStr.length() == 0) {
151         mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
152         tmpStr = "info";
153     }
154     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
155
156     if ((tmpStr.compare("debug")) == 0) {
157         sctpParams.logLevel = MDCLOG_DEBUG;
158     } else if ((tmpStr.compare("info")) == 0) {
159         sctpParams.logLevel = MDCLOG_INFO;
160     } else if ((tmpStr.compare("warning")) == 0) {
161         sctpParams.logLevel = MDCLOG_WARN;
162     } else if ((tmpStr.compare("error")) == 0) {
163         sctpParams.logLevel = MDCLOG_ERR;
164     } else {
165         mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
166         sctpParams.logLevel = MDCLOG_INFO;
167     }
168     mdclog_level_set(sctpParams.logLevel);
169
170     tmpStr = conf.getStringValue("volume");
171     if (tmpStr.length() == 0) {
172         mdclog_write(MDCLOG_ERR, "illigal volume.");
173         return -1;
174     }
175
176     char tmpLogFilespec[VOLUME_URL_SIZE];
177     tmpLogFilespec[0] = 0;
178     sctpParams.volume[0] = 0;
179     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
180     // copy the name to temp file as well
181     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
182
183
184     // define the file name in the tmp directory under the volume
185     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
186
187     sctpParams.myIP = conf.getStringValue("local-ip");
188     if (sctpParams.myIP.length() == 0) {
189         mdclog_write(MDCLOG_ERR, "illigal local-ip.");
190         return -1;
191     }
192
193     int sctpPort = conf.getIntValue("sctp-port");
194     if (sctpPort == -1) {
195         mdclog_write(MDCLOG_ERR, "illigal SCTP port ");
196         return -1;
197     }
198     sctpParams.sctpPort = (uint16_t)sctpPort;
199
200     sctpParams.fqdn = conf.getStringValue("external-fqdn");
201     if (sctpParams.fqdn.length() == 0) {
202         mdclog_write(MDCLOG_ERR, "illigal external-fqdn");
203         return -1;
204     }
205
206     std::string pod = conf.getStringValue("pod_name");
207     if (pod.length() == 0) {
208         mdclog_write(MDCLOG_ERR, "illigal pod_name in config file");
209         return -1;
210     }
211     auto *podName = getenv(pod.c_str());
212     if (podName == nullptr) {
213         mdclog_write(MDCLOG_ERR, "illigal pod_name or environment varible not exists : %s", pod.c_str());
214         return -1;
215
216     } else {
217         sctpParams.podName.assign(podName);
218         if (sctpParams.podName.length() == 0) {
219             mdclog_write(MDCLOG_ERR, "illigal pod_name");
220             return -1;
221         }
222     }
223
224     tmpStr = conf.getStringValue("trace");
225     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
226     if ((tmpStr.compare("start")) == 0) {
227         mdclog_write(MDCLOG_INFO, "Trace set to: start");
228         sctpParams.trace = true;
229     } else if ((tmpStr.compare("stop")) == 0) {
230         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
231         sctpParams.trace = false;
232     }
233     jsonTrace = sctpParams.trace;
234
235     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, KA_MESSAGE_SIZE, "{\"address\": \"%s:%d\","
236                                                                          "\"fqdn\": \"%s\","
237                                                                          "\"pod_name\": \"%s\"}",
238                                             (const char *)sctpParams.myIP.c_str(),
239                                             sctpParams.rmrPort,
240                                             sctpParams.fqdn.c_str(),
241                                             sctpParams.podName.c_str());
242
243     if (mdclog_level_get() >= MDCLOG_INFO) {
244         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
245         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
246         mdclog_mdc_add("volume", sctpParams.volume);
247         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
248         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
249         mdclog_mdc_add("pod name", sctpParams.podName.c_str());
250
251         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
252     }
253     mdclog_mdc_clean();
254
255     // Files written to the current working directory
256     boostLogger = logging::add_file_log(
257             keywords::file_name = tmpLogFilespec, // to temp directory
258             keywords::rotation_size = 10 * 1024 * 1024,
259             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
260             keywords::format = "%Message%"
261             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
262     );
263
264     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
265     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
266             keywords::target = sctpParams.volume
267     ));
268
269     // Upon restart, scan the directory for files matching the file_name pattern
270     boostLogger->locked_backend()->scan_for_files();
271
272     // Enable auto-flushing after each tmpStr record written
273     if (mdclog_level_get() >= MDCLOG_DEBUG) {
274         boostLogger->locked_backend()->auto_flush(true);
275     }
276
277     return 0;
278 }
279
280 int main(const int argc, char **argv) {
281     sctp_params_t sctpParams;
282
283     {
284         std::random_device device{};
285         std::mt19937 generator(device());
286         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
287         transactionCounter = distribution(generator);
288     }
289
290 //    uint64_t st = 0;
291 //    uint32_t aux1 = 0;
292 //   st = rdtscp(aux1);
293
294     unsigned num_cpus = std::thread::hardware_concurrency();
295     init_log();
296     mdclog_level_set(MDCLOG_INFO);
297
298     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
299         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
300         exit(1);
301     }
302     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
303         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
304         exit(1);
305     }
306     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
307         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
308         exit(1);
309     }
310
311     cpuClock = approx_CPU_MHz(100);
312
313     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
314
315     auto result = parse(argc, argv, sctpParams);
316
317     if (buildConfiguration(sctpParams) != 0) {
318         exit(-1);
319     }
320
321     // start epoll
322     sctpParams.epoll_fd = epoll_create1(0);
323     if (sctpParams.epoll_fd == -1) {
324         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
325         exit(-1);
326     }
327
328     getRmrContext(sctpParams);
329     if (sctpParams.rmrCtx == nullptr) {
330         close(sctpParams.epoll_fd);
331         exit(-1);
332     }
333
334     if (buildInotify(sctpParams) == -1) {
335         close(sctpParams.rmrListenFd);
336         rmr_close(sctpParams.rmrCtx);
337         close(sctpParams.epoll_fd);
338         exit(-1);
339     }
340
341     if (buildListeningPort(sctpParams) != 0) {
342         close(sctpParams.rmrListenFd);
343         rmr_close(sctpParams.rmrCtx);
344         close(sctpParams.epoll_fd);
345         exit(-1);
346     }
347
348     sctpParams.sctpMap = new mapWrapper();
349
350     std::vector<std::thread> threads(num_cpus);
351 //    std::vector<std::thread> threads;
352
353     num_cpus = 1;
354     for (unsigned int i = 0; i < num_cpus; i++) {
355         threads[i] = std::thread(listener, &sctpParams);
356
357         cpu_set_t cpuset;
358         CPU_ZERO(&cpuset);
359         CPU_SET(i, &cpuset);
360         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
361         if (rc != 0) {
362             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
363         }
364     }
365
366     //loop over term_init until first message from xApp
367     handleTermInit(sctpParams);
368
369     for (auto &t : threads) {
370         t.join();
371     }
372
373     return 0;
374 }
375
376 void handleTermInit(sctp_params_t &sctpParams) {
377     sendTermInit(sctpParams);
378     //send to e2 manager init of e2 term
379     //E2_TERM_INIT
380
381     int count = 0;
382     while (true) {
383         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
384         if (xappMessages > 0) {
385             if (mdclog_level_get() >=  MDCLOG_INFO) {
386                 mdclog_write(MDCLOG_INFO, "Got a message from some appliction, stop sending E2_TERM_INIT");
387             }
388             return;
389         }
390         usleep(100000);
391         count++;
392         if (count % 1000 == 0) {
393             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
394             sendTermInit(sctpParams);
395         }
396     }
397 }
398
399 void sendTermInit(sctp_params_t &sctpParams) {
400     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
401     auto count = 0;
402     while (true) {
403         msg->mtype = E2_TERM_INIT;
404         msg->state = 0;
405         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
406         static unsigned char tx[32];
407         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
408         rmr_bytes2xact(msg, tx, txLen);
409         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
410         if (msg == nullptr) {
411             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.myIP.length());
412         } else if (msg->state == 0) {
413             rmr_free_msg(msg);
414             if (mdclog_level_get() >=  MDCLOG_INFO) {
415                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT succsesfuly sent ");
416             }
417             return;
418         } else {
419             if (count % 100 == 0) {
420                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %s ", translateRmrErrorMessages(msg->state).c_str());
421             }
422             sleep(1);
423         }
424         count++;
425     }
426 }
427
428 /**
429  *
430  * @param argc
431  * @param argv
432  * @param sctpParams
433  * @return
434  */
435 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
436     cxxopts::Options options(argv[0], "e2 term help");
437     options.positional_help("[optional args]").show_positional_help();
438     options.allow_unrecognised_options().add_options()
439             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
440             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
441             ("h,help", "Print help");
442
443     auto result = options.parse(argc, argv);
444
445     if (result.count("help")) {
446         std::cout << options.help({""}) << std::endl;
447         exit(0);
448     }
449     return result;
450 }
451
452 /**
453  *
454  * @param sctpParams
455  * @return -1 failed 0 success
456  */
457 int buildInotify(sctp_params_t &sctpParams) {
458     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
459     if (sctpParams.inotifyFD == -1) {
460         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
461         close(sctpParams.rmrListenFd);
462         rmr_close(sctpParams.rmrCtx);
463         close(sctpParams.epoll_fd);
464         return -1;
465     }
466
467     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
468                                               (const char *)sctpParams.configFilePath.c_str(),
469                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
470     if (sctpParams.inotifyWD == -1) {
471         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
472                 sctpParams.configFilePath.c_str(),
473                 strerror(errno));
474         close(sctpParams.inotifyFD);
475         return -1;
476     }
477
478     struct epoll_event event{};
479     event.events = (EPOLLIN);
480     event.data.fd = sctpParams.inotifyFD;
481     // add listening RMR FD to epoll
482     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
483         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
484         close(sctpParams.inotifyFD);
485         return -1;
486     }
487     return 0;
488 }
489
490 /**
491  *
492  * @param args
493  * @return
494  */
495 void listener(sctp_params_t *params) {
496     int num_of_SCTP_messages = 0;
497     auto totalTime = 0.0;
498     mdclog_mdc_clean();
499     mdclog_level_set(params->logLevel);
500
501     std::thread::id this_id = std::this_thread::get_id();
502     //save cout
503     streambuf *oldCout = cout.rdbuf();
504     ostringstream memCout;
505     // create new cout
506     cout.rdbuf(memCout.rdbuf());
507     cout << this_id;
508     //return to the normal cout
509     cout.rdbuf(oldCout);
510
511     char tid[32];
512     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
513     tid[memCout.str().length()] = 0;
514     mdclog_mdc_add("thread id", tid);
515
516     if (mdclog_level_get() >= MDCLOG_DEBUG) {
517         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
518     }
519
520
521     RmrMessagesBuffer_t rmrMessageBuffer{};
522     //create and init RMR
523     rmrMessageBuffer.rmrCtx = params->rmrCtx;
524
525     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
526     struct timespec end{0, 0};
527     struct timespec start{0, 0};
528
529     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
530     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
531
532     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
533     rmrMessageBuffer.ka_message_len = params->ka_message_length;
534     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
535
536     if (mdclog_level_get() >= MDCLOG_DEBUG) {
537         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
538     }
539
540     ReportingMessages_t message {};
541
542     for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
543         rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
544         rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
545     }
546
547     while (true) {
548         if (mdclog_level_get() >= MDCLOG_DEBUG) {
549             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
550         }
551         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
552         if (numOfEvents < 0 && errno == EINTR) {
553             if (mdclog_level_get() >= MDCLOG_DEBUG) {
554                 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
555             }
556             continue;
557         }
558         if (numOfEvents < 0) {
559             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
560             return;
561         }
562         for (auto i = 0; i < numOfEvents; i++) {
563             if (mdclog_level_get() >= MDCLOG_DEBUG) {
564                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
565             }
566             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
567             start.tv_sec = message.message.time.tv_sec;
568             start.tv_nsec = message.message.time.tv_nsec;
569
570
571             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
572                 handlepoll_error(events[i], message, rmrMessageBuffer, params);
573             } else if (events[i].events & EPOLLOUT) {
574                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params);
575             } else if (params->listenFD == events[i].data.fd) {
576                 if (mdclog_level_get() >= MDCLOG_INFO) {
577                     mdclog_write(MDCLOG_INFO, "New connection request from sctp network\n");
578                 }
579                 // new connection is requested from RAN  start build connection
580                 while (true) {
581                     struct sockaddr in_addr {};
582                     socklen_t in_len;
583                     char hostBuff[NI_MAXHOST];
584                     char portBuff[NI_MAXSERV];
585
586                     in_len = sizeof(in_addr);
587                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
588                     peerInfo->sctpParams = params;
589                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
590                     if (peerInfo->fileDescriptor == -1) {
591                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
592                             /* We have processed all incoming connections. */
593                             break;
594                         } else {
595                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
596                             break;
597                         }
598                     }
599                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
600                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
601                         close(peerInfo->fileDescriptor);
602                         break;
603                     }
604                     auto  ans = getnameinfo(&in_addr, in_len,
605                             peerInfo->hostName, NI_MAXHOST,
606                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
607                     if (ans < 0) {
608                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
609                         close(peerInfo->fileDescriptor);
610                         break;
611                     }
612                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
613                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
614                     }
615                     peerInfo->isConnected = false;
616                     peerInfo->gotSetup = false;
617                     if (addToEpoll(params->epoll_fd,
618                                    peerInfo,
619                                    (EPOLLIN | EPOLLET),
620                                    params->sctpMap, nullptr,
621                                    0) != 0) {
622                         break;
623                     }
624                     break;
625                 }
626             } else if (params->rmrListenFd == events[i].data.fd) {
627                 // got message from XAPP
628                 num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
629                 num_of_messages.fetch_add(1, std::memory_order_release);
630                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
631                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
632                 }
633                 if (receiveXappMessages(params->sctpMap,
634                                         rmrMessageBuffer,
635                                         message.message.time) != 0) {
636                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
637                 }
638             } else if (params->inotifyFD == events[i].data.fd) {
639                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
640                 handleConfigChange(params);
641             } else {
642                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
643                  * We must read whatever data is available completely, as we are running
644                  *  in edge-triggered mode and won't get a notification again for the same data. */
645                 num_of_messages.fetch_add(1, std::memory_order_release);
646                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
647                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
648                 }
649                 receiveDataFromSctp(&events[i],
650                                     params->sctpMap,
651                                     num_of_SCTP_messages,
652                                     rmrMessageBuffer,
653                                     message.message.time);
654             }
655
656             clock_gettime(CLOCK_MONOTONIC, &end);
657             if (mdclog_level_get() >= MDCLOG_INFO) {
658                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
659                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
660             }
661             if (mdclog_level_get() >= MDCLOG_DEBUG) {
662                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
663                              end.tv_sec - start.tv_sec,
664                              end.tv_nsec - start.tv_nsec);
665             }
666         }
667     }
668 }
669
670 /**
671  *
672  * @param sctpParams
673  */
674 void handleConfigChange(sctp_params_t *sctpParams) {
675     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
676     const struct inotify_event *event;
677     char *ptr;
678
679     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
680     auto endlessLoop = true;
681     while (endlessLoop) {
682         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
683         if (len == -1) {
684             if (errno != EAGAIN) {
685                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
686                 endlessLoop = false;
687                 continue;
688             }
689             else {
690                 endlessLoop = false;
691                 continue;
692             }
693         }
694
695         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
696             event = (const struct inotify_event *)ptr;
697             if (event->mask & (uint32_t)IN_ISDIR) {
698                 continue;
699             }
700
701             // the directory name
702             if (sctpParams->inotifyWD == event->wd) {
703                 // not the directory
704             }
705             if (event->len) {
706                 if (!(sctpParams->configFileName.compare(event->name))) {
707                     continue;
708                 }
709             }
710             // only the file we want
711             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
712                 if (exists(p)) {
713                     const int size = 2048;
714                     auto fileSize = file_size(p);
715                     if (fileSize > size) {
716                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
717                         return;
718                     }
719                 } else {
720                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
721                     return;
722                 }
723
724                 ReadConfigFile conf;
725                 if (conf.openConfigFile(p.string()) == -1) {
726                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
727                                  p.string().c_str(), strerror(errno));
728                     return;
729                 }
730
731                 auto tmpStr = conf.getStringValue("loglevel");
732                 if (tmpStr.length() == 0) {
733                     mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
734                     tmpStr = "info";
735                 }
736                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
737
738                 if ((tmpStr.compare("debug")) == 0) {
739                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
740                     sctpParams->logLevel = MDCLOG_DEBUG;
741                 } else if ((tmpStr.compare("info")) == 0) {
742                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
743                     sctpParams->logLevel = MDCLOG_INFO;
744                 } else if ((tmpStr.compare("warning")) == 0) {
745                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
746                     sctpParams->logLevel = MDCLOG_WARN;
747                 } else if ((tmpStr.compare("error")) == 0) {
748                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
749                     sctpParams->logLevel = MDCLOG_ERR;
750                 } else {
751                     mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
752                     sctpParams->logLevel = MDCLOG_INFO;
753                 }
754                 mdclog_level_set(sctpParams->logLevel);
755
756
757                 tmpStr = conf.getStringValue("trace");
758                 if (tmpStr.length() == 0) {
759                     mdclog_write(MDCLOG_ERR, "illigal trace. Set trace to stop");
760                     tmpStr = "stop";
761                 }
762
763                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
764                 if ((tmpStr.compare("start")) == 0) {
765                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
766                     sctpParams->trace = true;
767                 } else if ((tmpStr.compare("stop")) == 0) {
768                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
769                     sctpParams->trace = false;
770                 } else {
771                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
772                     sctpParams->trace = false;
773                 }
774                 jsonTrace = sctpParams->trace;
775                 endlessLoop = false;
776             }
777         }
778     }
779 }
780
781 /**
782  *
783  * @param event
784  * @param message
785  * @param rmrMessageBuffer
786  * @param params
787  */
788 void handleEinprogressMessages(struct epoll_event &event,
789                                ReportingMessages_t &message,
790                                RmrMessagesBuffer_t &rmrMessageBuffer,
791                                sctp_params_t *params) {
792     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
793     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
794
795     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
796     auto retVal = 0;
797     socklen_t retValLen = 0;
798     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
799     if (rc != 0 || retVal != 0) {
800         if (rc != 0) {
801             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
802                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
803                                                          peerInfo->enodbName, strerror(errno));
804         } else if (retVal != 0) {
805             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
806                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
807                                                          peerInfo->enodbName);
808         }
809
810         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
811         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
812         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
813         message.message.direction = 'N';
814         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
815             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
816         }
817         memset(peerInfo->asnData, 0, peerInfo->asnLength);
818         peerInfo->asnLength = 0;
819         peerInfo->mtype = 0;
820         return;
821     }
822
823     peerInfo->isConnected = true;
824
825     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
826                       peerInfo->mtype) != 0) {
827         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
828         return;
829     }
830
831     message.message.asndata = (unsigned char *)peerInfo->asnData;
832     message.message.asnLength = peerInfo->asnLength;
833     message.message.messageType = peerInfo->mtype;
834     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
835     num_of_messages.fetch_add(1, std::memory_order_release);
836     if (mdclog_level_get() >= MDCLOG_DEBUG) {
837         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
838                      message.message.enodbName);
839     }
840     if (sendSctpMsg(peerInfo, message, params->sctpMap) != 0) {
841         if (mdclog_level_get() >= MDCLOG_DEBUG) {
842             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
843         }
844         return;
845     }
846
847     memset(peerInfo->asnData, 0, peerInfo->asnLength);
848     peerInfo->asnLength = 0;
849     peerInfo->mtype = 0;
850 }
851
852
853 void handlepoll_error(struct epoll_event &event,
854                       ReportingMessages_t &message,
855                       RmrMessagesBuffer_t &rmrMessageBuffer,
856                       sctp_params_t *params) {
857     if (event.data.fd != params->rmrListenFd) {
858         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
859         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
860                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
861
862         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
863                                                      "%s|Failed SCTP Connection",
864                                                      peerInfo->enodbName);
865         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
866         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
867
868         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
869         message.message.direction = 'N';
870         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
871             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
872         }
873
874         close(peerInfo->fileDescriptor);
875         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
876     } else {
877         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
878     }
879 }
880 /**
881  *
882  * @param socket
883  * @return
884  */
885 int setSocketNoBlocking(int socket) {
886     auto flags = fcntl(socket, F_GETFL, 0);
887
888     if (flags == -1) {
889         mdclog_mdc_add("func", "fcntl");
890         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
891         mdclog_mdc_clean();
892         return -1;
893     }
894
895     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
896     if (fcntl(socket, F_SETFL, flags) == -1) {
897         mdclog_mdc_add("func", "fcntl");
898         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
899         mdclog_mdc_clean();
900         return -1;
901     }
902
903     return 0;
904 }
905
906 /**
907  *
908  * @param val
909  * @param m
910  */
911 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
912     char *dummy;
913     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
914     char searchBuff[2048]{};
915
916     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
917     m->erase(searchBuff);
918
919     m->erase(val->enodbName);
920     free(val);
921 }
922
923 /**
924  *
925  * @param fd file discriptor
926  * @param data the asn data to send
927  * @param len  length of the data
928  * @param enodbName the enodbName as in the map for printing purpose
929  * @param m map host information
930  * @param mtype message number
931  * @return 0 success, anegative number on fail
932  */
933 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
934     auto loglevel = mdclog_level_get();
935     int fd = peerInfo->fileDescriptor;
936     if (loglevel >= MDCLOG_DEBUG) {
937         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
938                      message.message.enodbName, __FUNCTION__);
939     }
940
941     while (true) {
942         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
943             if (errno == EINTR) {
944                 continue;
945             }
946             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
947             if (!peerInfo->isConnected) {
948                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
949                 return -1;
950             }
951             cleanHashEntry(peerInfo, m);
952             close(fd);
953             char key[MAX_ENODB_NAME_SIZE * 2];
954             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
955                      message.message.messageType);
956             if (loglevel >= MDCLOG_DEBUG) {
957                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
958             }
959             auto tmp = m->find(key);
960             if (tmp) {
961                 free(tmp);
962             }
963             m->erase(key);
964             return -1;
965         }
966         peerInfo->sentMesgs++;
967         message.message.direction = 'D';
968         // send report.buffer of size
969         buildJsonMessage(message);
970
971         if (loglevel >= MDCLOG_DEBUG) {
972             mdclog_write(MDCLOG_DEBUG,
973                          "SCTP message for CU %s sent from %s",
974                          message.message.enodbName,
975                          __FUNCTION__);
976         }
977         return 0;
978     }
979 }
980
981 /**
982  *
983  * @param message
984  * @param rmrMessageBuffer
985  */
986 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
987     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *) (message.message.enodbName));
988
989     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
990     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
991
992     if (mdclog_level_get() >= MDCLOG_DEBUG) {
993         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
994                      message.message.enodbName, (unsigned long) message.message.asnLength);
995     }
996 }
997
998
999
1000 /**
1001  *
1002  * @param events
1003  * @param sctpMap
1004  * @param numOfMessages
1005  * @param rmrMessageBuffer
1006  * @param ts
1007  * @return
1008  */
1009 int receiveDataFromSctp(struct epoll_event *events,
1010                         Sctp_Map_t *sctpMap,
1011                         int &numOfMessages,
1012                         RmrMessagesBuffer_t &rmrMessageBuffer,
1013                         struct timespec &ts) {
1014     /* We have data on the fd waiting to be read. Read and display it.
1015  * We must read whatever data is available completely, as we are running
1016  *  in edge-triggered mode and won't get a notification again for the same data. */
1017     ReportingMessages_t message {};
1018     auto done = 0;
1019     auto loglevel = mdclog_level_get();
1020
1021     // get the identity of the interface
1022     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1023
1024
1025     struct timespec start{0, 0};
1026     struct timespec decodestart{0, 0};
1027     struct timespec end{0, 0};
1028
1029     E2AP_PDU_t *pdu = nullptr;
1030
1031
1032     while (true) {
1033         if (loglevel >= MDCLOG_DEBUG) {
1034             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1035             clock_gettime(CLOCK_MONOTONIC, &start);
1036         }
1037         // read the buffer directly to rmr payload
1038         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1039         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1040                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1041
1042         if (loglevel >= MDCLOG_DEBUG) {
1043             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1044                     message.peerInfo->fileDescriptor, message.message.asnLength);
1045         }
1046         message.peerInfo->rcvMsgs++;
1047         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1048         message.message.direction = 'U';
1049         message.message.time.tv_nsec = ts.tv_nsec;
1050         message.message.time.tv_sec = ts.tv_sec;
1051
1052         if (message.message.asnLength < 0) {
1053             if (errno == EINTR) {
1054                 continue;
1055             }
1056             /* If errno == EAGAIN, that means we have read all
1057                data. So goReportingMessages_t back to the main loop. */
1058             if (errno != EAGAIN) {
1059                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1060                 done = 1;
1061             } else if (loglevel >= MDCLOG_DEBUG) {
1062                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1063             }
1064             break;
1065         } else if (message.message.asnLength == 0) {
1066             /* End of file. The remote has closed the connection. */
1067             if (loglevel >= MDCLOG_INFO) {
1068                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1069                              message.peerInfo->fileDescriptor);
1070             }
1071             done = 1;
1072             break;
1073         }
1074
1075         asn_dec_rval_t rval;
1076         if (loglevel >= MDCLOG_DEBUG) {
1077             char printBuffer[4096]{};
1078             char *tmp = printBuffer;
1079             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1080                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1081                 tmp += 2;
1082             }
1083             printBuffer[message.message.asnLength] = 0;
1084             clock_gettime(CLOCK_MONOTONIC, &end);
1085             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1086                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1087             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1088                          printBuffer);
1089             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1090         }
1091
1092         rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1093                           message.message.asndata, message.message.asnLength);
1094         if (rval.code != RC_OK) {
1095             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1096                          message.peerInfo->enodbName);
1097             break;
1098         }
1099
1100         if (loglevel >= MDCLOG_DEBUG) {
1101             clock_gettime(CLOCK_MONOTONIC, &end);
1102             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1103                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1104             char *printBuffer;
1105             size_t size;
1106             FILE *stream = open_memstream(&printBuffer, &size);
1107             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1108             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1109             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1110         }
1111
1112         switch (pdu->present) {
1113             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1114                 asnInitiatingRequest(pdu, message, rmrMessageBuffer);
1115                 break;
1116             }
1117             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1118                 asnSuccsesfulMsg(pdu, message,  rmrMessageBuffer);
1119                 break;
1120             }
1121             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1122                 asnUnSuccsesfulMsg(pdu, message, rmrMessageBuffer);
1123                 break;
1124             }
1125             default:
1126                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1127                 break;
1128         }
1129         if (loglevel >= MDCLOG_DEBUG) {
1130             clock_gettime(CLOCK_MONOTONIC, &end);
1131             mdclog_write(MDCLOG_DEBUG,
1132                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1133                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1134         }
1135         numOfMessages++;
1136         // remove the break for EAGAIN
1137         //break;
1138         if (pdu != nullptr) {
1139             //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
1140             //ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1141             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1142             pdu = nullptr;
1143         }
1144         //clock_gettime(CLOCK_MONOTONIC, &start);
1145     }
1146     // in case of break to avoid memory leak
1147     if (pdu != nullptr) {
1148         ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1149         pdu = nullptr;
1150     }
1151
1152     if (done) {
1153         if (loglevel >= MDCLOG_INFO) {
1154             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1155         }
1156         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1157                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1158                         256,
1159                         "%s|CU disconnected unexpectedly",
1160                         message.peerInfo->enodbName);
1161         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1162
1163         if (sendRequestToXapp(message,
1164                               RIC_SCTP_CONNECTION_FAILURE,
1165                               rmrMessageBuffer) != 0) {
1166             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1167         }
1168
1169         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1170         close(message.peerInfo->fileDescriptor);
1171         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
1172     }
1173     if (loglevel >= MDCLOG_DEBUG) {
1174         clock_gettime(CLOCK_MONOTONIC, &end);
1175         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1176                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1177
1178     }
1179     return 0;
1180 }
1181
1182 static void buildAndsendSetupRequest(ReportingMessages_t &message,
1183                                      E2setupRequestIEs_t *ie,
1184                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1185                                      E2AP_PDU_t *pdu) {
1186     auto logLevel = mdclog_level_get();
1187
1188
1189     if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1190         mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1191     } else {
1192         memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1193     }
1194     // now we can send the data to e2Mgr
1195     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE;
1196
1197     auto *rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size);
1198     // add addrees to message
1199     auto j = snprintf((char *)rmrMsg->payload, 256, "%s:%d|", message.peerInfo->sctpParams->myIP.c_str(), message.peerInfo->sctpParams->rmrPort);
1200
1201
1202     unsigned char *buffer = &rmrMsg->payload[j];
1203     // encode to xml
1204     asn_enc_rval_t er;
1205     er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size - j);
1206     if (er.encoded == -1) {
1207         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1208     } else if (er.encoded > (ssize_t) buffer_size) {
1209         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s",
1210                      (int) buffer_size,
1211                      asn_DEF_E2AP_PDU.name);
1212     } else {
1213         if (logLevel >= MDCLOG_DEBUG) {
1214             mdclog_write(MDCLOG_DEBUG, "Buffer of size %d, data = %s", (int) er.encoded, buffer);
1215         }
1216         // send to RMR
1217         message.message.messageType = rmrMsg->mtype = RIC_E2_SETUP_REQ;
1218         rmrMsg->state = 0;
1219         rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1220
1221         static unsigned char tx[32];
1222         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1223         rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1224
1225         rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1226         if (rmrMsg == nullptr) {
1227             mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1228         } else if (rmrMsg->state != 0) {
1229             char meid[RMR_MAX_MEID]{};
1230             if (rmrMsg->state == RMR_ERR_RETRY) {
1231                 usleep(5);
1232                 rmrMsg->state = 0;
1233                 mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1234                              rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1235                 rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1236                 if (rmrMsg == nullptr) {
1237                     mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1238                 } else if (rmrMsg->state != 0) {
1239                     mdclog_write(MDCLOG_ERR,
1240                                  "RMR Retry failed %s sending request %d to Xapp from %s",
1241                                  translateRmrErrorMessages(rmrMsg->state).c_str(),
1242                                  rmrMsg->mtype,
1243                                  rmr_get_meid(rmrMsg, (unsigned char *) meid));
1244                 }
1245             } else {
1246                 mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1247                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1248                              rmrMsg->mtype,
1249                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1250             }
1251         }
1252         message.peerInfo->gotSetup = true;
1253         buildJsonMessage(message);
1254     }
1255
1256 }
1257 /**
1258  *
1259  * @param pdu
1260  * @param message
1261  * @param rmrMessageBuffer
1262  */
1263 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1264                           ReportingMessages_t &message,
1265                           RmrMessagesBuffer_t &rmrMessageBuffer) {
1266     auto logLevel = mdclog_level_get();
1267     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1268     if (logLevel >= MDCLOG_DEBUG) {
1269         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1270     }
1271     switch (procedureCode) {
1272         case ProcedureCode_id_E2setup: {
1273             if (logLevel >= MDCLOG_DEBUG) {
1274                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1275             }
1276
1277             memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1278             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1279                 auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1280                 if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1281                     if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1282                         buildAndsendSetupRequest(message, ie, rmrMessageBuffer, pdu);
1283                         break;
1284                     }
1285                 }
1286             }
1287             break;
1288         }
1289         case ProcedureCode_id_ErrorIndication: {
1290             if (logLevel >= MDCLOG_DEBUG) {
1291                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1292             }
1293             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1294                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1295             }
1296             break;
1297         }
1298         case ProcedureCode_id_Reset: {
1299             if (logLevel >= MDCLOG_DEBUG) {
1300                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1301             }
1302             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1303                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1304             }
1305             break;
1306         }
1307         case ProcedureCode_id_RICcontrol: {
1308             if (logLevel >= MDCLOG_DEBUG) {
1309                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1310             }
1311             break;
1312         }
1313         case ProcedureCode_id_RICindication: {
1314             if (logLevel >= MDCLOG_DEBUG) {
1315                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1316             }
1317             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1318                 auto messageSent = false;
1319                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1320                 if (logLevel >= MDCLOG_DEBUG) {
1321                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1322                 }
1323                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1324                     if (logLevel >= MDCLOG_DEBUG) {
1325                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1326                     }
1327                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1328                         static unsigned char tx[32];
1329                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1330                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1331                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1332                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1333                                        (unsigned char *)message.message.enodbName,
1334                                        strlen(message.message.enodbName));
1335                         rmrMessageBuffer.sendMessage->state = 0;
1336                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1337                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1338                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1339                                          rmrMessageBuffer.sendMessage->sub_id,
1340                                          rmrMessageBuffer.sendMessage->mtype);
1341                         }
1342                         sendRmrMessage(rmrMessageBuffer, message);
1343                         messageSent = true;
1344                     } else {
1345                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1346                     }
1347                 }
1348                 if (messageSent) {
1349                     break;
1350                 }
1351             }
1352             break;
1353         }
1354         case ProcedureCode_id_RICserviceQuery: {
1355             if (logLevel >= MDCLOG_DEBUG) {
1356                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1357             }
1358             break;
1359         }
1360         case ProcedureCode_id_RICserviceUpdate: {
1361             if (logLevel >= MDCLOG_DEBUG) {
1362                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1363             }
1364             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1365                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1366             }
1367             break;
1368         }
1369         case ProcedureCode_id_RICsubscription: {
1370             if (logLevel >= MDCLOG_DEBUG) {
1371                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1372             }
1373             break;
1374         }
1375         case ProcedureCode_id_RICsubscriptionDelete: {
1376             if (logLevel >= MDCLOG_DEBUG) {
1377                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1378             }
1379             break;
1380         }
1381         default: {
1382             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1383             message.message.messageType = 0; // no RMR message type yet
1384
1385             buildJsonMessage(message);
1386
1387             break;
1388         }
1389     }
1390 }
1391
1392 /**
1393  *
1394  * @param pdu
1395  * @param message
1396  * @param rmrMessageBuffer
1397  */
1398 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1399     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1400     auto logLevel = mdclog_level_get();
1401     if (logLevel >= MDCLOG_INFO) {
1402         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1403     }
1404     switch (procedureCode) {
1405         case ProcedureCode_id_E2setup: {
1406             if (logLevel >= MDCLOG_DEBUG) {
1407                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1408             }
1409             break;
1410         }
1411         case ProcedureCode_id_ErrorIndication: {
1412             if (logLevel >= MDCLOG_DEBUG) {
1413                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1414             }
1415             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1416                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1417             }
1418             break;
1419         }
1420         case ProcedureCode_id_Reset: {
1421             if (logLevel >= MDCLOG_DEBUG) {
1422                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1423             }
1424             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1425                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1426             }
1427             break;
1428         }
1429         case ProcedureCode_id_RICcontrol: {
1430             if (logLevel >= MDCLOG_DEBUG) {
1431                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1432             }
1433             for (auto i = 0;
1434                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1435                 auto messageSent = false;
1436                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1437                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1438                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1439                 }
1440                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1441                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1442                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1443                     }
1444                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1445                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1446                         rmrMessageBuffer.sendMessage->state = 0;
1447                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1448                         static unsigned char tx[32];
1449                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1450                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1451                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1452                                        (unsigned char *)message.message.enodbName,
1453                                        strlen(message.message.enodbName));
1454
1455                         sendRmrMessage(rmrMessageBuffer, message);
1456                         messageSent = true;
1457                     } else {
1458                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1459                     }
1460                 }
1461                 if (messageSent) {
1462                     break;
1463                 }
1464             }
1465
1466             break;
1467         }
1468         case ProcedureCode_id_RICindication: {
1469             if (logLevel >= MDCLOG_DEBUG) {
1470                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1471             }
1472             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1473                 auto messageSent = false;
1474                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1475                 if (logLevel >= MDCLOG_DEBUG) {
1476                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1477                 }
1478                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1479                     if (logLevel >= MDCLOG_DEBUG) {
1480                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1481                     }
1482                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1483                         static unsigned char tx[32];
1484                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1485                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1486                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1487                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1488                                        (unsigned char *)message.message.enodbName,
1489                                        strlen(message.message.enodbName));
1490                         rmrMessageBuffer.sendMessage->state = 0;
1491                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1492                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1493                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1494                                          rmrMessageBuffer.sendMessage->sub_id,
1495                                          rmrMessageBuffer.sendMessage->mtype);
1496                         }
1497                         sendRmrMessage(rmrMessageBuffer, message);
1498                         messageSent = true;
1499                     } else {
1500                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1501                     }
1502                 }
1503                 if (messageSent) {
1504                     break;
1505                 }
1506             }
1507             break;
1508         }
1509         case ProcedureCode_id_RICserviceQuery: {
1510             if (logLevel >= MDCLOG_DEBUG) {
1511                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1512             }
1513             break;
1514         }
1515         case ProcedureCode_id_RICserviceUpdate: {
1516             if (logLevel >= MDCLOG_DEBUG) {
1517                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1518             }
1519             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1520                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1521             }
1522             break;
1523         }
1524         case ProcedureCode_id_RICsubscription: {
1525             if (logLevel >= MDCLOG_DEBUG) {
1526                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1527             }
1528             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
1529                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1530             }
1531             break;
1532         }
1533         case ProcedureCode_id_RICsubscriptionDelete: {
1534             if (logLevel >= MDCLOG_DEBUG) {
1535                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1536             }
1537             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
1538                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1539             }
1540             break;
1541         }
1542         default: {
1543             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1544             message.message.messageType = 0; // no RMR message type yet
1545             buildJsonMessage(message);
1546
1547             break;
1548         }
1549     }
1550 }
1551
1552 /**
1553  *
1554  * @param pdu
1555  * @param message
1556  * @param rmrMessageBuffer
1557  */
1558 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1559                         ReportingMessages_t &message,
1560                         RmrMessagesBuffer_t &rmrMessageBuffer) {
1561     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1562     auto logLevel = mdclog_level_get();
1563     if (logLevel >= MDCLOG_INFO) {
1564         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1565     }
1566     switch (procedureCode) {
1567         case ProcedureCode_id_E2setup: {
1568             if (logLevel >= MDCLOG_DEBUG) {
1569                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1570             }
1571             break;
1572         }
1573         case ProcedureCode_id_ErrorIndication: {
1574             if (logLevel >= MDCLOG_DEBUG) {
1575                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1576             }
1577             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1578                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1579             }
1580             break;
1581         }
1582         case ProcedureCode_id_Reset: {
1583             if (logLevel >= MDCLOG_DEBUG) {
1584                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1585             }
1586             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1587                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1588             }
1589             break;
1590         }
1591         case ProcedureCode_id_RICcontrol: {
1592             if (logLevel >= MDCLOG_DEBUG) {
1593                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1594             }
1595             for (int i = 0;
1596                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1597                 auto messageSent = false;
1598                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1599                 if (logLevel >= MDCLOG_DEBUG) {
1600                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1601                 }
1602                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1603                     if (logLevel >= MDCLOG_DEBUG) {
1604                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1605                     }
1606                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1607                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1608                         rmrMessageBuffer.sendMessage->state = 0;
1609                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1610                         static unsigned char tx[32];
1611                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1612                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1613                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
1614                                        strlen(message.message.enodbName));
1615                         sendRmrMessage(rmrMessageBuffer, message);
1616                         messageSent = true;
1617                     } else {
1618                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1619                     }
1620                 }
1621                 if (messageSent) {
1622                     break;
1623                 }
1624             }
1625             break;
1626         }
1627         case ProcedureCode_id_RICindication: {
1628             if (logLevel >= MDCLOG_DEBUG) {
1629                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1630             }
1631             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1632                 auto messageSent = false;
1633                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1634                 if (logLevel >= MDCLOG_DEBUG) {
1635                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1636                 }
1637                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1638                     if (logLevel >= MDCLOG_DEBUG) {
1639                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1640                     }
1641                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1642                         static unsigned char tx[32];
1643                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1644                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1645                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1646                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1647                                        (unsigned char *)message.message.enodbName,
1648                                        strlen(message.message.enodbName));
1649                         rmrMessageBuffer.sendMessage->state = 0;
1650                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1651                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1652                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1653                                          rmrMessageBuffer.sendMessage->sub_id,
1654                                          rmrMessageBuffer.sendMessage->mtype);
1655                         }
1656                         sendRmrMessage(rmrMessageBuffer, message);
1657                         messageSent = true;
1658                     } else {
1659                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1660                     }
1661                 }
1662                 if (messageSent) {
1663                     break;
1664                 }
1665             }
1666             break;
1667         }
1668         case ProcedureCode_id_RICserviceQuery: {
1669             if (logLevel >= MDCLOG_DEBUG) {
1670                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1671             }
1672             break;
1673         }
1674         case ProcedureCode_id_RICserviceUpdate: {
1675             if (logLevel >= MDCLOG_DEBUG) {
1676                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1677             }
1678             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1679                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1680             }
1681             break;
1682         }
1683         case ProcedureCode_id_RICsubscription: {
1684             if (logLevel >= MDCLOG_DEBUG) {
1685                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1686             }
1687             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1688                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1689             }
1690             break;
1691         }
1692         case ProcedureCode_id_RICsubscriptionDelete: {
1693             if (logLevel >= MDCLOG_DEBUG) {
1694                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1695             }
1696             if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer) != 0) {
1697                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1698             }
1699             break;
1700         }
1701         default: {
1702             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1703             message.message.messageType = 0; // no RMR message type yet
1704
1705             buildJsonMessage(message);
1706
1707             break;
1708         }
1709     }
1710 }
1711
1712 /**
1713  *
1714  * @param message
1715  * @param requestId
1716  * @param rmrMmessageBuffer
1717  * @return
1718  */
1719 int sendRequestToXapp(ReportingMessages_t &message,
1720                       int requestId,
1721                       RmrMessagesBuffer_t &rmrMmessageBuffer) {
1722     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1723                    (unsigned char *)message.message.enodbName,
1724                    strlen(message.message.enodbName));
1725     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1726     rmrMmessageBuffer.sendMessage->state = 0;
1727     static unsigned char tx[32];
1728     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1729     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1730
1731     auto rc = sendRmrMessage(rmrMmessageBuffer, message);
1732     return rc;
1733 }
1734
1735
1736 void getRmrContext(sctp_params_t &pSctpParams) {
1737     pSctpParams.rmrCtx = nullptr;
1738     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1739     if (pSctpParams.rmrCtx == nullptr) {
1740         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1741         return;
1742     }
1743
1744     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1745     // we need to find that routing table exist and we can run
1746     if (mdclog_level_get() >= MDCLOG_INFO) {
1747         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1748     }
1749     int rmrReady = 0;
1750     int count = 0;
1751     while (!rmrReady) {
1752         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1753             sleep(1);
1754         }
1755         count++;
1756         if (count % 60 == 0) {
1757             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1758         }
1759     }
1760     if (mdclog_level_get() >= MDCLOG_INFO) {
1761         mdclog_write(MDCLOG_INFO, "RMR running");
1762     }
1763     rmr_init_trace(pSctpParams.rmrCtx, 200);
1764     // get the RMR fd for the epoll
1765     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1766     struct epoll_event event{};
1767     // add RMR fd to epoll
1768     event.events = (EPOLLIN);
1769     event.data.fd = pSctpParams.rmrListenFd;
1770     // add listening RMR FD to epoll
1771     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
1772         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
1773         close(pSctpParams.rmrListenFd);
1774         rmr_close(pSctpParams.rmrCtx);
1775         pSctpParams.rmrCtx = nullptr;
1776     }
1777 }
1778
1779 /**
1780  *
1781  * @param sctpMap
1782  * @param rmrMessageBuffer
1783  * @param ts
1784  * @return
1785  */
1786 int receiveXappMessages(Sctp_Map_t *sctpMap,
1787                         RmrMessagesBuffer_t &rmrMessageBuffer,
1788                         struct timespec &ts) {
1789     if (rmrMessageBuffer.rcvMessage == nullptr) {
1790         //we have error
1791         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1792         return -1;
1793     }
1794
1795     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1796         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1797     }
1798     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1799     if (rmrMessageBuffer.rcvMessage == nullptr) {
1800         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1801         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1802         return -2;
1803     }
1804     ReportingMessages_t message;
1805     message.message.direction = 'D';
1806     message.message.time.tv_nsec = ts.tv_nsec;
1807     message.message.time.tv_sec = ts.tv_sec;
1808
1809     // get message payload
1810     //auto msgData = msg->payload;
1811     if (rmrMessageBuffer.rcvMessage->state != 0) {
1812         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1813         return -1;
1814     }
1815     switch (rmrMessageBuffer.rcvMessage->mtype) {
1816         case RIC_E2_SETUP_RESP : {
1817             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1818                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
1819                 return -6;
1820             }
1821             break;
1822         }
1823         case RIC_E2_SETUP_FAILURE : {
1824             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1825                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
1826                 return -6;
1827             }
1828             break;
1829         }
1830         case RIC_ERROR_INDICATION: {
1831             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1832                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
1833                 return -6;
1834             }
1835             break;
1836         }
1837         case RIC_SUB_REQ: {
1838             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1839                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
1840                 return -6;
1841             }
1842             break;
1843         }
1844         case RIC_SUB_DEL_REQ: {
1845             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1846                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
1847                 return -6;
1848             }
1849             break;
1850         }
1851         case RIC_CONTROL_REQ: {
1852             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1853                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
1854                 return -6;
1855             }
1856             break;
1857         }
1858         case RIC_SERVICE_QUERY: {
1859             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1860                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
1861                 return -6;
1862             }
1863             break;
1864         }
1865         case RIC_SERVICE_UPDATE_ACK: {
1866             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1867                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
1868                 return -6;
1869             }
1870             break;
1871         }
1872         case RIC_SERVICE_UPDATE_FAILURE: {
1873             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1874                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
1875                 return -6;
1876             }
1877             break;
1878         }
1879         case RIC_X2_RESET: {
1880             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1881                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
1882                 return -6;
1883             }
1884             break;
1885         }
1886         case RIC_X2_RESET_RESP: {
1887             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1888                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
1889                 return -6;
1890             }
1891             break;
1892         }
1893         case RIC_SCTP_CLEAR_ALL: {
1894             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
1895             // loop on all keys and close socket and then erase all map.
1896             vector<char *> v;
1897             sctpMap->getKeys(v);
1898             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
1899                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
1900                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
1901                     if (peerInfo == nullptr) {
1902                         continue;
1903                     }
1904                     close(peerInfo->fileDescriptor);
1905                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1906                     message.message.direction = 'D';
1907                     message.message.time.tv_nsec = ts.tv_nsec;
1908                     message.message.time.tv_sec = ts.tv_sec;
1909
1910                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1911                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1912                                                                    256,
1913                                                                    "%s|RIC_SCTP_CLEAR_ALL",
1914                                                                    peerInfo->enodbName);
1915                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1916                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
1917                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
1918                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1919                     }
1920                     free(peerInfo);
1921                 }
1922             }
1923
1924             sleep(1);
1925             sctpMap->clear();
1926             break;
1927         }
1928         case E2_TERM_KEEP_ALIVE_REQ: {
1929             // send message back
1930             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
1931                     (unsigned char *)rmrMessageBuffer.ka_message,
1932                     rmrMessageBuffer.ka_message_len);
1933             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
1934             rmrMessageBuffer.sendMessage->state = 0;
1935             static unsigned char tx[32];
1936             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1937             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
1938             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
1939             if (rmrMessageBuffer.sendMessage == nullptr) {
1940                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1941                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
1942             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
1943                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
1944                         rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
1945             } else if (mdclog_level_get() >= MDCLOG_INFO) {
1946                 mdclog_write(MDCLOG_INFO, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
1947             }
1948
1949             break;
1950         }
1951         default:
1952             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
1953             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1954             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1955             message.message.time.tv_nsec = ts.tv_nsec;
1956             message.message.time.tv_sec = ts.tv_sec;
1957             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
1958
1959             buildJsonMessage(message);
1960
1961
1962             return -7;
1963     }
1964     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1965         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
1966     }
1967     return 0;
1968 }
1969
1970 /**
1971  * Send message to the CU that is not expecting for successful or unsuccessful results
1972  * @param messageBuffer
1973  * @param message
1974  * @param failedMsgId
1975  * @param sctpMap
1976  * @return
1977  */
1978 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
1979                            ReportingMessages_t &message,
1980                            int failedMsgId,
1981                            Sctp_Map_t *sctpMap) {
1982
1983     getRequestMetaData(message, messageBuffer);
1984     if (mdclog_level_get() >= MDCLOG_INFO) {
1985         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
1986     }
1987
1988     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId);
1989     return rc;
1990 }
1991
1992 /**
1993  *
1994  * @param sctpMap
1995  * @param messageBuffer
1996  * @param message
1997  * @param failedMesgId
1998  * @return
1999  */
2000 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2001                     RmrMessagesBuffer_t &messageBuffer,
2002                     ReportingMessages_t &message,
2003                     int failedMesgId) {
2004     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2005     if (peerInfo == nullptr) {
2006         if (failedMesgId != 0) {
2007             sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId);
2008         } else {
2009             mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2010         }
2011         return -1;
2012     }
2013
2014     // get the FD
2015     message.message.messageType = messageBuffer.rcvMessage->mtype;
2016     auto rc = sendSctpMsg(peerInfo, message, sctpMap);
2017     return rc;
2018 }
2019
2020 /**
2021  *
2022  * @param rmrCtx the rmr context to send and receive
2023  * @param msg the msg we got fromxApp
2024  * @param metaData data from xApp in ordered struct
2025  * @param failedMesgId the return message type error
2026  */
2027 void
2028 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId) {
2029     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
2030     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
2031                         message.message.enodbName);
2032     if (mdclog_level_get() >= MDCLOG_INFO) {
2033         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
2034     }
2035     msg->mtype = failedMesgId;
2036     msg->state = 0;
2037
2038     static unsigned char tx[32];
2039     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2040     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
2041
2042     sendRmrMessage(rmrMessageBuffer, message);
2043 }
2044
2045
2046
2047 /**
2048  *
2049  * @param epoll_fd
2050  * @param peerInfo
2051  * @param events
2052  * @param sctpMap
2053  * @param enodbName
2054  * @param msgType
2055  * @return
2056  */
2057 int addToEpoll(int epoll_fd,
2058                ConnectedCU_t *peerInfo,
2059                uint32_t events,
2060                Sctp_Map_t *sctpMap,
2061                char *enodbName,
2062                int msgType) {
2063     // Add to Epol
2064     struct epoll_event event{};
2065     event.data.ptr = peerInfo;
2066     event.events = events;
2067     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2068         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2069             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2070                          strerror(errno), __func__, __LINE__);
2071         }
2072         close(peerInfo->fileDescriptor);
2073         if (enodbName != nullptr) {
2074             cleanHashEntry(peerInfo, sctpMap);
2075             char key[MAX_ENODB_NAME_SIZE * 2];
2076             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2077             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2078                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2079             }
2080             auto tmp = sctpMap->find(key);
2081             if (tmp) {
2082                 free(tmp);
2083                 sctpMap->erase(key);
2084             }
2085         } else {
2086             peerInfo->enodbName[0] = 0;
2087         }
2088         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2089         return -1;
2090     }
2091     return 0;
2092 }
2093
2094 /**
2095  *
2096  * @param epoll_fd
2097  * @param peerInfo
2098  * @param events
2099  * @param sctpMap
2100  * @param enodbName
2101  * @param msgType
2102  * @return
2103  */
2104 int modifyToEpoll(int epoll_fd,
2105                   ConnectedCU_t *peerInfo,
2106                   uint32_t events,
2107                   Sctp_Map_t *sctpMap,
2108                   char *enodbName,
2109                   int msgType) {
2110     // Add to Epol
2111     struct epoll_event event{};
2112     event.data.ptr = peerInfo;
2113     event.events = events;
2114     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2115         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2116             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2117                          strerror(errno), __func__, __LINE__);
2118         }
2119         close(peerInfo->fileDescriptor);
2120         cleanHashEntry(peerInfo, sctpMap);
2121         char key[MAX_ENODB_NAME_SIZE * 2];
2122         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2123         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2124             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2125         }
2126         auto tmp = sctpMap->find(key);
2127         if (tmp) {
2128             free(tmp);
2129         }
2130         sctpMap->erase(key);
2131         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2132         return -1;
2133     }
2134     return 0;
2135 }
2136
2137
2138 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message) {
2139     buildJsonMessage(message);
2140
2141     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2142
2143     if (rmrMessageBuffer.sendMessage == nullptr) {
2144         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2145         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2146         return -1;
2147     }
2148
2149     if (rmrMessageBuffer.sendMessage->state != 0) {
2150         char meid[RMR_MAX_MEID]{};
2151         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2152             usleep(5);
2153             rmrMessageBuffer.sendMessage->state = 0;
2154             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2155                          rmrMessageBuffer.sendMessage->mtype,
2156                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2157             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2158             if (rmrMessageBuffer.sendMessage == nullptr) {
2159                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2160                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2161                 return -1;
2162             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2163                 mdclog_write(MDCLOG_ERR,
2164                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2165                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2166                              rmrMessageBuffer.sendMessage->mtype,
2167                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2168                 auto rc = rmrMessageBuffer.sendMessage->state;
2169                 return rc;
2170             }
2171         } else {
2172             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2173                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2174                          rmrMessageBuffer.sendMessage->mtype,
2175                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2176             return rmrMessageBuffer.sendMessage->state;
2177         }
2178     }
2179     return 0;
2180 }
2181
2182 void buildJsonMessage(ReportingMessages_t &message) {
2183     if (jsonTrace) {
2184         message.outLen = sizeof(message.base64Data);
2185         base64::encode((const unsigned char *) message.message.asndata,
2186                        (const int) message.message.asnLength,
2187                        message.base64Data,
2188                        message.outLen);
2189         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2190             mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2191                          (int) message.message.asnLength,
2192                          (int) message.outLen);
2193         }
2194
2195         snprintf(message.buffer, sizeof(message.buffer),
2196                                      "{\"header\": {\"ts\": \"%ld.%09ld\","
2197                                      "\"ranName\": \"%s\","
2198                                      "\"messageType\": %d,"
2199                                      "\"direction\": \"%c\"},"
2200                                      "\"base64Length\": %d,"
2201                                      "\"asnBase64\": \"%s\"}",
2202                                      message.message.time.tv_sec,
2203                                      message.message.time.tv_nsec,
2204                                      message.message.enodbName,
2205                                      message.message.messageType,
2206                                      message.message.direction,
2207                                      (int) message.outLen,
2208                                      message.base64Data);
2209         static src::logger_mt &lg = my_logger::get();
2210
2211         BOOST_LOG(lg) << message.buffer;
2212     }
2213 }
2214
2215
2216 /**
2217  * take RMR error code to string
2218  * @param state
2219  * @return
2220  */
2221 string translateRmrErrorMessages(int state) {
2222     string str = {};
2223     switch (state) {
2224         case RMR_OK:
2225             str = "RMR_OK - state is good";
2226             break;
2227         case RMR_ERR_BADARG:
2228             str = "RMR_ERR_BADARG - argument passd to function was unusable";
2229             break;
2230         case RMR_ERR_NOENDPT:
2231             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2232             break;
2233         case RMR_ERR_EMPTY:
2234             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2235             break;
2236         case RMR_ERR_NOHDR:
2237             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2238             break;
2239         case RMR_ERR_SENDFAILED:
2240             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2241             break;
2242         case RMR_ERR_CALLFAILED:
2243             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2244             break;
2245         case RMR_ERR_NOWHOPEN:
2246             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2247             break;
2248         case RMR_ERR_WHID:
2249             str = "RMR_ERR_WHID - wormhole id was invalid";
2250             break;
2251         case RMR_ERR_OVERFLOW:
2252             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2253             break;
2254         case RMR_ERR_RETRY:
2255             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2256             break;
2257         case RMR_ERR_RCVFAILED:
2258             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2259             break;
2260         case RMR_ERR_TIMEOUT:
2261             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2262             break;
2263         case RMR_ERR_UNSET:
2264             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2265             break;
2266         case RMR_ERR_TRUNC:
2267             str = "RMR_ERR_TRUNC - received message likely truncated";
2268             break;
2269         case RMR_ERR_INITFAILED:
2270             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2271             break;
2272         case RMR_ERR_NOTSUPP:
2273             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2274             break;
2275         default:
2276             char buf[128]{};
2277             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2278             str = buf;
2279             break;
2280     }
2281     return str;
2282 }
2283
2284