version 4.0.6
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include "sctpThread.h"
24 #include "BuildRunName.h"
25
26 using namespace std;
27 //using namespace std::placeholders;
28 using namespace boost::filesystem;
29
30 //#ifdef __cplusplus
31 //extern "C"
32 //{
33 //#endif
34
35 // need to expose without the include of gcov
36 extern "C" void __gcov_flush(void);
37
38 static void catch_function(int signal) {
39     __gcov_flush();
40     exit(signal);
41 }
42
43
44 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
45
46 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
47 double cpuClock = 0.0;
48 bool jsonTrace = true;
49
50 void init_log() {
51     mdclog_attr_t *attr;
52     mdclog_attr_init(&attr);
53     mdclog_attr_set_ident(attr, "E2Terminator");
54     mdclog_init(attr);
55     mdclog_attr_destroy(attr);
56 }
57 auto start_time = std::chrono::high_resolution_clock::now();
58 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
59
60 double age() {
61     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
62 }
63
64 double approx_CPU_MHz(unsigned sleeptime) {
65     using namespace std::chrono_literals;
66     uint32_t aux = 0;
67     uint64_t cycles_start = rdtscp(aux);
68     double time_start = age();
69     std::this_thread::sleep_for(sleeptime * 1ms);
70     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
71     double elapsed_time = age() - time_start;
72     return elapsed_cycles / elapsed_time;
73 }
74
75 //std::atomic<int64_t> rmrCounter{0};
76 std::atomic<int64_t> num_of_messages{0};
77 std::atomic<int64_t> num_of_XAPP_messages{0};
78 static long transactionCounter = 0;
79
80 int buildListeningPort(sctp_params_t &sctpParams) {
81     sctpParams.listenFD = socket (AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
82     struct sockaddr_in6 servaddr {};
83     servaddr.sin6_family = AF_INET6;
84     servaddr.sin6_addr   = in6addr_any;
85     servaddr.sin6_port = htons(sctpParams.sctpPort);
86     if (bind(sctpParams.listenFD, (SA *)&servaddr, sizeof(servaddr)) < 0 ) {
87         mdclog_write(MDCLOG_ERR, "Error binding. %s\n", strerror(errno));
88         return -1;
89     }
90     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
91         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
92         return -1;
93     }
94     if (mdclog_level_get() >= MDCLOG_DEBUG) {
95         struct sockaddr_in6 cliaddr {};
96         socklen_t len = sizeof(cliaddr);
97         getsockname(sctpParams.listenFD, (SA *)&cliaddr, &len);
98         char buff[1024] {};
99         inet_ntop(AF_INET6, &cliaddr.sin6_addr, buff, sizeof(buff));
100         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(cliaddr.sin6_port));
101     }
102
103     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
104         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
105         return -1;
106     }
107     struct epoll_event event {};
108     event.events = EPOLLIN | EPOLLET;
109     event.data.fd = sctpParams.listenFD;
110
111     // add listening port to epoll
112     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
113         printf("Failed to add descriptor to epoll\n");
114         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
115         return -1;
116     }
117
118     return 0;
119 }
120
121 int buildConfiguration(sctp_params_t &sctpParams) {
122     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
123     if (exists(p)) {
124         const int size = 2048;
125         auto fileSize = file_size(p);
126         if (fileSize > size) {
127             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
128             return -1;
129         }
130     } else {
131         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
132         return -1;
133     }
134
135     ReadConfigFile conf;
136     if (conf.openConfigFile(p.string()) == -1) {
137         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
138                      p.string().c_str(), strerror(errno));
139         return -1;
140     }
141     int rmrPort = conf.getIntValue("nano");
142     if (rmrPort == -1) {
143         mdclog_write(MDCLOG_ERR, "illigal RMR port ");
144         return -1;
145     }
146     sctpParams.rmrPort = (uint16_t)rmrPort;
147     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
148
149     auto tmpStr = conf.getStringValue("loglevel");
150     if (tmpStr.length() == 0) {
151         mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
152         tmpStr = "info";
153     }
154     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
155
156     if ((tmpStr.compare("debug")) == 0) {
157         sctpParams.logLevel = MDCLOG_DEBUG;
158     } else if ((tmpStr.compare("info")) == 0) {
159         sctpParams.logLevel = MDCLOG_INFO;
160     } else if ((tmpStr.compare("warning")) == 0) {
161         sctpParams.logLevel = MDCLOG_WARN;
162     } else if ((tmpStr.compare("error")) == 0) {
163         sctpParams.logLevel = MDCLOG_ERR;
164     } else {
165         mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
166         sctpParams.logLevel = MDCLOG_INFO;
167     }
168     mdclog_level_set(sctpParams.logLevel);
169
170     tmpStr = conf.getStringValue("volume");
171     if (tmpStr.length() == 0) {
172         mdclog_write(MDCLOG_ERR, "illigal volume.");
173         return -1;
174     }
175
176     char tmpLogFilespec[VOLUME_URL_SIZE];
177     tmpLogFilespec[0] = 0;
178     sctpParams.volume[0] = 0;
179     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
180     // copy the name to temp file as well
181     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
182
183
184     // define the file name in the tmp directory under the volume
185     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
186
187     sctpParams.myIP = conf.getStringValue("local-ip");
188     if (sctpParams.myIP.length() == 0) {
189         mdclog_write(MDCLOG_ERR, "illigal local-ip.");
190         return -1;
191     }
192
193     int sctpPort = conf.getIntValue("sctp-port");
194     if (sctpPort == -1) {
195         mdclog_write(MDCLOG_ERR, "illigal SCTP port ");
196         return -1;
197     }
198     sctpParams.sctpPort = (uint16_t)sctpPort;
199
200     sctpParams.fqdn = conf.getStringValue("external-fqdn");
201     if (sctpParams.fqdn.length() == 0) {
202         mdclog_write(MDCLOG_ERR, "illigal external-fqdn");
203         return -1;
204     }
205
206     std::string pod = conf.getStringValue("pod_name");
207     if (pod.length() == 0) {
208         mdclog_write(MDCLOG_ERR, "illigal pod_name in config file");
209         return -1;
210     }
211     auto *podName = getenv(pod.c_str());
212     if (podName == nullptr) {
213         mdclog_write(MDCLOG_ERR, "illigal pod_name or environment varible not exists : %s", pod.c_str());
214         return -1;
215
216     } else {
217         sctpParams.podName.assign(podName);
218         if (sctpParams.podName.length() == 0) {
219             mdclog_write(MDCLOG_ERR, "illigal pod_name");
220             return -1;
221         }
222     }
223
224     tmpStr = conf.getStringValue("trace");
225     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
226     if ((tmpStr.compare("start")) == 0) {
227         mdclog_write(MDCLOG_INFO, "Trace set to: start");
228         sctpParams.trace = true;
229     } else if ((tmpStr.compare("stop")) == 0) {
230         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
231         sctpParams.trace = false;
232     }
233     jsonTrace = sctpParams.trace;
234
235     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, KA_MESSAGE_SIZE, "{\"address\": \"%s:%d\","
236                                                                                     "\"fqdn\": \"%s\","
237                                                                                     "\"pod_name\": \"%s\"}",
238                                             (const char *)sctpParams.myIP.c_str(),
239                                             sctpParams.rmrPort,
240                                             sctpParams.fqdn.c_str(),
241                                             sctpParams.podName.c_str());
242
243     if (mdclog_level_get() >= MDCLOG_INFO) {
244         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
245         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
246         mdclog_mdc_add("volume", sctpParams.volume);
247         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
248         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
249         mdclog_mdc_add("pod name", sctpParams.podName.c_str());
250
251         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
252     }
253     mdclog_mdc_clean();
254
255     // Files written to the current working directory
256     boostLogger = logging::add_file_log(
257             keywords::file_name = tmpLogFilespec, // to temp directory
258             keywords::rotation_size = 10 * 1024 * 1024,
259             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
260             keywords::format = "%Message%"
261             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
262     );
263
264     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
265     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
266             keywords::target = sctpParams.volume
267     ));
268
269     // Upon restart, scan the directory for files matching the file_name pattern
270     boostLogger->locked_backend()->scan_for_files();
271
272     // Enable auto-flushing after each tmpStr record written
273     if (mdclog_level_get() >= MDCLOG_DEBUG) {
274         boostLogger->locked_backend()->auto_flush(true);
275     }
276
277     return 0;
278 }
279
280
281
282 int main(const int argc, char **argv) {
283     sctp_params_t sctpParams;
284
285     {
286         std::random_device device{};
287         std::mt19937 generator(device());
288         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
289         transactionCounter = distribution(generator);
290     }
291
292 //    uint64_t st = 0;
293 //    uint32_t aux1 = 0;
294 //   st = rdtscp(aux1);
295
296     unsigned num_cpus = std::thread::hardware_concurrency();
297     init_log();
298     mdclog_level_set(MDCLOG_INFO);
299
300     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
301         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
302         exit(1);
303     }
304     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
305         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
306         exit(1);
307     }
308     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
309         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
310         exit(1);
311     }
312
313     cpuClock = approx_CPU_MHz(100);
314
315     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
316
317     auto result = parse(argc, argv, sctpParams);
318
319     if (buildConfiguration(sctpParams) != 0) {
320         exit(-1);
321     }
322
323     // start epoll
324     sctpParams.epoll_fd = epoll_create1(0);
325     if (sctpParams.epoll_fd == -1) {
326         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
327         exit(-1);
328     }
329
330     getRmrContext(sctpParams);
331     if (sctpParams.rmrCtx == nullptr) {
332         close(sctpParams.epoll_fd);
333         exit(-1);
334     }
335
336     if (buildInotify(sctpParams) == -1) {
337         close(sctpParams.rmrListenFd);
338         rmr_close(sctpParams.rmrCtx);
339         close(sctpParams.epoll_fd);
340         exit(-1);
341     }
342
343     if (buildListeningPort(sctpParams) != 0) {
344         close(sctpParams.rmrListenFd);
345         rmr_close(sctpParams.rmrCtx);
346         close(sctpParams.epoll_fd);
347         exit(-1);
348     }
349
350     sctpParams.sctpMap = new mapWrapper();
351
352     std::vector<std::thread> threads(num_cpus);
353 //    std::vector<std::thread> threads;
354
355     num_cpus = 1;
356     for (unsigned int i = 0; i < num_cpus; i++) {
357         threads[i] = std::thread(listener, &sctpParams);
358
359         cpu_set_t cpuset;
360         CPU_ZERO(&cpuset);
361         CPU_SET(i, &cpuset);
362         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
363         if (rc != 0) {
364             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
365         }
366     }
367
368     auto statFlag = false;
369     auto statThread = std::thread(statColectorThread, (void *)&statFlag);
370
371     //loop over term_init until first message from xApp
372     handleTermInit(sctpParams);
373
374     for (auto &t : threads) {
375         t.join();
376     }
377
378     statFlag = true;
379     statThread.join();
380
381     return 0;
382 }
383
384 void handleTermInit(sctp_params_t &sctpParams) {
385     sendTermInit(sctpParams);
386     //send to e2 manager init of e2 term
387     //E2_TERM_INIT
388
389     int count = 0;
390     while (true) {
391         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
392         if (xappMessages > 0) {
393             if (mdclog_level_get() >=  MDCLOG_INFO) {
394                 mdclog_write(MDCLOG_INFO, "Got a message from some appliction, stop sending E2_TERM_INIT");
395             }
396             return;
397         }
398         usleep(100000);
399         count++;
400         if (count % 1000 == 0) {
401             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
402             sendTermInit(sctpParams);
403         }
404     }
405 }
406
407 void sendTermInit(sctp_params_t &sctpParams) {
408     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
409     auto count = 0;
410     while (true) {
411         msg->mtype = E2_TERM_INIT;
412         msg->state = 0;
413         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
414         static unsigned char tx[32];
415         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
416         rmr_bytes2xact(msg, tx, txLen);
417         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
418         if (msg == nullptr) {
419             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.myIP.length());
420         } else if (msg->state == 0) {
421             rmr_free_msg(msg);
422             if (mdclog_level_get() >=  MDCLOG_INFO) {
423                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT succsesfuly sent ");
424             }
425             return;
426         } else {
427             if (count % 100 == 0) {
428                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %s ", translateRmrErrorMessages(msg->state).c_str());
429             }
430             sleep(1);
431         }
432         count++;
433     }
434 }
435
436 /**
437  *
438  * @param argc
439  * @param argv
440  * @param sctpParams
441  * @return
442  */
443 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
444     cxxopts::Options options(argv[0], "e2 term help");
445     options.positional_help("[optional args]").show_positional_help();
446     options.allow_unrecognised_options().add_options()
447             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
448             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
449             ("h,help", "Print help");
450
451     auto result = options.parse(argc, argv);
452
453     if (result.count("help")) {
454         std::cout << options.help({""}) << std::endl;
455         exit(0);
456     }
457     return result;
458 }
459
460 /**
461  *
462  * @param sctpParams
463  * @return -1 failed 0 success
464  */
465 int buildInotify(sctp_params_t &sctpParams) {
466     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
467     if (sctpParams.inotifyFD == -1) {
468         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
469         close(sctpParams.rmrListenFd);
470         rmr_close(sctpParams.rmrCtx);
471         close(sctpParams.epoll_fd);
472         return -1;
473     }
474
475     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
476                                              (const char *)sctpParams.configFilePath.c_str(),
477                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
478     if (sctpParams.inotifyWD == -1) {
479         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
480                      sctpParams.configFilePath.c_str(),
481                      strerror(errno));
482         close(sctpParams.inotifyFD);
483         return -1;
484     }
485
486     struct epoll_event event{};
487     event.events = (EPOLLIN);
488     event.data.fd = sctpParams.inotifyFD;
489     // add listening RMR FD to epoll
490     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
491         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
492         close(sctpParams.inotifyFD);
493         return -1;
494     }
495     return 0;
496 }
497
498 /**
499  *
500  * @param args
501  * @return
502  */
503 void listener(sctp_params_t *params) {
504     int num_of_SCTP_messages = 0;
505     auto totalTime = 0.0;
506     mdclog_mdc_clean();
507     mdclog_level_set(params->logLevel);
508
509     std::thread::id this_id = std::this_thread::get_id();
510     //save cout
511     streambuf *oldCout = cout.rdbuf();
512     ostringstream memCout;
513     // create new cout
514     cout.rdbuf(memCout.rdbuf());
515     cout << this_id;
516     //return to the normal cout
517     cout.rdbuf(oldCout);
518
519     char tid[32];
520     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
521     tid[memCout.str().length()] = 0;
522     mdclog_mdc_add("thread id", tid);
523
524     if (mdclog_level_get() >= MDCLOG_DEBUG) {
525         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
526     }
527
528
529     RmrMessagesBuffer_t rmrMessageBuffer{};
530     //create and init RMR
531     rmrMessageBuffer.rmrCtx = params->rmrCtx;
532
533     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
534     struct timespec end{0, 0};
535     struct timespec start{0, 0};
536
537     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
538     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
539
540     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
541     rmrMessageBuffer.ka_message_len = params->ka_message_length;
542     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
543
544     if (mdclog_level_get() >= MDCLOG_DEBUG) {
545         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
546     }
547
548     ReportingMessages_t message {};
549
550 //    for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
551 //        rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
552 //        rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
553 //    }
554
555     message.statCollector = StatCollector::GetInstance();
556
557     while (true) {
558         if (mdclog_level_get() >= MDCLOG_DEBUG) {
559             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
560         }
561         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
562         if (numOfEvents < 0 && errno == EINTR) {
563             if (mdclog_level_get() >= MDCLOG_DEBUG) {
564                 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
565             }
566             continue;
567         }
568         if (numOfEvents < 0) {
569             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
570             return;
571         }
572         for (auto i = 0; i < numOfEvents; i++) {
573             if (mdclog_level_get() >= MDCLOG_DEBUG) {
574                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
575             }
576             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
577             start.tv_sec = message.message.time.tv_sec;
578             start.tv_nsec = message.message.time.tv_nsec;
579
580
581             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
582                 handlepoll_error(events[i], message, rmrMessageBuffer, params);
583             } else if (events[i].events & EPOLLOUT) {
584                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params);
585             } else if (params->listenFD == events[i].data.fd) {
586                 if (mdclog_level_get() >= MDCLOG_INFO) {
587                     mdclog_write(MDCLOG_INFO, "New connection request from sctp network\n");
588                 }
589                 // new connection is requested from RAN  start build connection
590                 while (true) {
591                     struct sockaddr in_addr {};
592                     socklen_t in_len;
593                     char hostBuff[NI_MAXHOST];
594                     char portBuff[NI_MAXSERV];
595
596                     in_len = sizeof(in_addr);
597                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
598                     peerInfo->sctpParams = params;
599                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
600                     if (peerInfo->fileDescriptor == -1) {
601                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
602                             /* We have processed all incoming connections. */
603                             break;
604                         } else {
605                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
606                             break;
607                         }
608                     }
609                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
610                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
611                         close(peerInfo->fileDescriptor);
612                         break;
613                     }
614                     auto  ans = getnameinfo(&in_addr, in_len,
615                                             peerInfo->hostName, NI_MAXHOST,
616                                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
617                     if (ans < 0) {
618                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
619                         close(peerInfo->fileDescriptor);
620                         break;
621                     }
622                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
623                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
624                     }
625                     peerInfo->isConnected = false;
626                     peerInfo->gotSetup = false;
627                     if (addToEpoll(params->epoll_fd,
628                                    peerInfo,
629                                    (EPOLLIN | EPOLLET),
630                                    params->sctpMap, nullptr,
631                                    0) != 0) {
632                         break;
633                     }
634                     break;
635                 }
636             } else if (params->rmrListenFd == events[i].data.fd) {
637                 // got message from XAPP
638                 num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
639                 num_of_messages.fetch_add(1, std::memory_order_release);
640                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
641                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
642                 }
643                 if (receiveXappMessages(params->sctpMap,
644                                         rmrMessageBuffer,
645                                         message.message.time) != 0) {
646                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
647                 }
648             } else if (params->inotifyFD == events[i].data.fd) {
649                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
650                 handleConfigChange(params);
651             } else {
652                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
653                  * We must read whatever data is available completely, as we are running
654                  *  in edge-triggered mode and won't get a notification again for the same data. */
655                 num_of_messages.fetch_add(1, std::memory_order_release);
656                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
657                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
658                 }
659                 receiveDataFromSctp(&events[i],
660                                     params->sctpMap,
661                                     num_of_SCTP_messages,
662                                     rmrMessageBuffer,
663                                     message.message.time);
664             }
665
666             clock_gettime(CLOCK_MONOTONIC, &end);
667             if (mdclog_level_get() >= MDCLOG_INFO) {
668                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
669                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
670             }
671             if (mdclog_level_get() >= MDCLOG_DEBUG) {
672                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
673                              end.tv_sec - start.tv_sec,
674                              end.tv_nsec - start.tv_nsec);
675             }
676         }
677     }
678 }
679
680 /**
681  *
682  * @param sctpParams
683  */
684 void handleConfigChange(sctp_params_t *sctpParams) {
685     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
686     const struct inotify_event *event;
687     char *ptr;
688
689     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
690     auto endlessLoop = true;
691     while (endlessLoop) {
692         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
693         if (len == -1) {
694             if (errno != EAGAIN) {
695                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
696                 endlessLoop = false;
697                 continue;
698             }
699             else {
700                 endlessLoop = false;
701                 continue;
702             }
703         }
704
705         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
706             event = (const struct inotify_event *)ptr;
707             if (event->mask & (uint32_t)IN_ISDIR) {
708                 continue;
709             }
710
711             // the directory name
712             if (sctpParams->inotifyWD == event->wd) {
713                 // not the directory
714             }
715             if (event->len) {
716                 if (!(sctpParams->configFileName.compare(event->name))) {
717                     continue;
718                 }
719             }
720             // only the file we want
721             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
722                 if (exists(p)) {
723                     const int size = 2048;
724                     auto fileSize = file_size(p);
725                     if (fileSize > size) {
726                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
727                         return;
728                     }
729                 } else {
730                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
731                     return;
732                 }
733
734                 ReadConfigFile conf;
735                 if (conf.openConfigFile(p.string()) == -1) {
736                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
737                                  p.string().c_str(), strerror(errno));
738                     return;
739                 }
740
741                 auto tmpStr = conf.getStringValue("loglevel");
742                 if (tmpStr.length() == 0) {
743                     mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
744                     tmpStr = "info";
745                 }
746                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
747
748                 if ((tmpStr.compare("debug")) == 0) {
749                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
750                     sctpParams->logLevel = MDCLOG_DEBUG;
751                 } else if ((tmpStr.compare("info")) == 0) {
752                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
753                     sctpParams->logLevel = MDCLOG_INFO;
754                 } else if ((tmpStr.compare("warning")) == 0) {
755                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
756                     sctpParams->logLevel = MDCLOG_WARN;
757                 } else if ((tmpStr.compare("error")) == 0) {
758                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
759                     sctpParams->logLevel = MDCLOG_ERR;
760                 } else {
761                     mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
762                     sctpParams->logLevel = MDCLOG_INFO;
763                 }
764                 mdclog_level_set(sctpParams->logLevel);
765
766
767                 tmpStr = conf.getStringValue("trace");
768                 if (tmpStr.length() == 0) {
769                     mdclog_write(MDCLOG_ERR, "illigal trace. Set trace to stop");
770                     tmpStr = "stop";
771                 }
772
773                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
774                 if ((tmpStr.compare("start")) == 0) {
775                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
776                     sctpParams->trace = true;
777                 } else if ((tmpStr.compare("stop")) == 0) {
778                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
779                     sctpParams->trace = false;
780                 } else {
781                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
782                     sctpParams->trace = false;
783                 }
784                 jsonTrace = sctpParams->trace;
785                 endlessLoop = false;
786             }
787         }
788     }
789 }
790
791 /**
792  *
793  * @param event
794  * @param message
795  * @param rmrMessageBuffer
796  * @param params
797  */
798 void handleEinprogressMessages(struct epoll_event &event,
799                                ReportingMessages_t &message,
800                                RmrMessagesBuffer_t &rmrMessageBuffer,
801                                sctp_params_t *params) {
802     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
803     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
804
805     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
806     auto retVal = 0;
807     socklen_t retValLen = 0;
808     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
809     if (rc != 0 || retVal != 0) {
810         if (rc != 0) {
811             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
812                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
813                                                          peerInfo->enodbName, strerror(errno));
814         } else if (retVal != 0) {
815             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
816                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
817                                                          peerInfo->enodbName);
818         }
819
820         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
821         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
822         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
823         message.message.direction = 'N';
824         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
825             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
826         }
827         memset(peerInfo->asnData, 0, peerInfo->asnLength);
828         peerInfo->asnLength = 0;
829         peerInfo->mtype = 0;
830         return;
831     }
832
833     peerInfo->isConnected = true;
834
835     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
836                       peerInfo->mtype) != 0) {
837         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
838         return;
839     }
840
841     message.message.asndata = (unsigned char *)peerInfo->asnData;
842     message.message.asnLength = peerInfo->asnLength;
843     message.message.messageType = peerInfo->mtype;
844     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
845     num_of_messages.fetch_add(1, std::memory_order_release);
846     if (mdclog_level_get() >= MDCLOG_DEBUG) {
847         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
848                      message.message.enodbName);
849     }
850     if (sendSctpMsg(peerInfo, message, params->sctpMap) != 0) {
851         if (mdclog_level_get() >= MDCLOG_DEBUG) {
852             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
853         }
854         return;
855     }
856
857     memset(peerInfo->asnData, 0, peerInfo->asnLength);
858     peerInfo->asnLength = 0;
859     peerInfo->mtype = 0;
860 }
861
862
863 void handlepoll_error(struct epoll_event &event,
864                       ReportingMessages_t &message,
865                       RmrMessagesBuffer_t &rmrMessageBuffer,
866                       sctp_params_t *params) {
867     if (event.data.fd != params->rmrListenFd) {
868         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
869         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
870                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
871
872         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
873                                                      "%s|Failed SCTP Connection",
874                                                      peerInfo->enodbName);
875         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
876         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
877
878         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
879         message.message.direction = 'N';
880         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
881             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
882         }
883
884         close(peerInfo->fileDescriptor);
885         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
886     } else {
887         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
888     }
889 }
890 /**
891  *
892  * @param socket
893  * @return
894  */
895 int setSocketNoBlocking(int socket) {
896     auto flags = fcntl(socket, F_GETFL, 0);
897
898     if (flags == -1) {
899         mdclog_mdc_add("func", "fcntl");
900         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
901         mdclog_mdc_clean();
902         return -1;
903     }
904
905     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
906     if (fcntl(socket, F_SETFL, flags) == -1) {
907         mdclog_mdc_add("func", "fcntl");
908         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
909         mdclog_mdc_clean();
910         return -1;
911     }
912
913     return 0;
914 }
915
916 /**
917  *
918  * @param val
919  * @param m
920  */
921 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
922     char *dummy;
923     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
924     char searchBuff[2048]{};
925
926     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
927     m->erase(searchBuff);
928
929     m->erase(val->enodbName);
930     free(val);
931 }
932
933 /**
934  *
935  * @param fd file discriptor
936  * @param data the asn data to send
937  * @param len  length of the data
938  * @param enodbName the enodbName as in the map for printing purpose
939  * @param m map host information
940  * @param mtype message number
941  * @return 0 success, anegative number on fail
942  */
943 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
944     auto loglevel = mdclog_level_get();
945     int fd = peerInfo->fileDescriptor;
946     if (loglevel >= MDCLOG_DEBUG) {
947         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
948                      message.message.enodbName, __FUNCTION__);
949     }
950
951     while (true) {
952         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
953             if (errno == EINTR) {
954                 continue;
955             }
956             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
957             if (!peerInfo->isConnected) {
958                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
959                 return -1;
960             }
961             cleanHashEntry(peerInfo, m);
962             close(fd);
963             char key[MAX_ENODB_NAME_SIZE * 2];
964             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
965                      message.message.messageType);
966             if (loglevel >= MDCLOG_DEBUG) {
967                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
968             }
969             auto tmp = m->find(key);
970             if (tmp) {
971                 free(tmp);
972             }
973             m->erase(key);
974             return -1;
975         }
976         message.statCollector->incSentMessage(string(message.message.enodbName));
977         message.message.direction = 'D';
978         // send report.buffer of size
979         buildJsonMessage(message);
980
981         if (loglevel >= MDCLOG_DEBUG) {
982             mdclog_write(MDCLOG_DEBUG,
983                          "SCTP message for CU %s sent from %s",
984                          message.message.enodbName,
985                          __FUNCTION__);
986         }
987         return 0;
988     }
989 }
990
991 /**
992  *
993  * @param message
994  * @param rmrMessageBuffer
995  */
996 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
997     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *) (message.message.enodbName));
998
999     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1000     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1001
1002     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1003         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
1004                      message.message.enodbName, (unsigned long) message.message.asnLength);
1005     }
1006 }
1007
1008
1009
1010 /**
1011  *
1012  * @param events
1013  * @param sctpMap
1014  * @param numOfMessages
1015  * @param rmrMessageBuffer
1016  * @param ts
1017  * @return
1018  */
1019 int receiveDataFromSctp(struct epoll_event *events,
1020                         Sctp_Map_t *sctpMap,
1021                         int &numOfMessages,
1022                         RmrMessagesBuffer_t &rmrMessageBuffer,
1023                         struct timespec &ts) {
1024     /* We have data on the fd waiting to be read. Read and display it.
1025  * We must read whatever data is available completely, as we are running
1026  *  in edge-triggered mode and won't get a notification again for the same data. */
1027     ReportingMessages_t message {};
1028     auto done = 0;
1029     auto loglevel = mdclog_level_get();
1030
1031     // get the identity of the interface
1032     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1033
1034     message.statCollector = StatCollector::GetInstance();
1035     struct timespec start{0, 0};
1036     struct timespec decodestart{0, 0};
1037     struct timespec end{0, 0};
1038
1039     E2AP_PDU_t *pdu = nullptr;
1040
1041
1042     while (true) {
1043         if (loglevel >= MDCLOG_DEBUG) {
1044             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1045             clock_gettime(CLOCK_MONOTONIC, &start);
1046         }
1047         // read the buffer directly to rmr payload
1048         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1049         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1050                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1051
1052         if (loglevel >= MDCLOG_DEBUG) {
1053             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1054                          message.peerInfo->fileDescriptor, message.message.asnLength);
1055         }
1056
1057         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1058         message.statCollector->incRecvMessage(string(message.message.enodbName));
1059         message.message.direction = 'U';
1060         message.message.time.tv_nsec = ts.tv_nsec;
1061         message.message.time.tv_sec = ts.tv_sec;
1062
1063         if (message.message.asnLength < 0) {
1064             if (errno == EINTR) {
1065                 continue;
1066             }
1067             /* If errno == EAGAIN, that means we have read all
1068                data. So goReportingMessages_t back to the main loop. */
1069             if (errno != EAGAIN) {
1070                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1071                 done = 1;
1072             } else if (loglevel >= MDCLOG_DEBUG) {
1073                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1074             }
1075             break;
1076         } else if (message.message.asnLength == 0) {
1077             /* End of file. The remote has closed the connection. */
1078             if (loglevel >= MDCLOG_INFO) {
1079                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1080                              message.peerInfo->fileDescriptor);
1081             }
1082             done = 1;
1083             break;
1084         }
1085
1086         asn_dec_rval_t rval;
1087         if (loglevel >= MDCLOG_DEBUG) {
1088             char printBuffer[4096]{};
1089             char *tmp = printBuffer;
1090             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1091                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1092                 tmp += 2;
1093             }
1094             printBuffer[message.message.asnLength] = 0;
1095             clock_gettime(CLOCK_MONOTONIC, &end);
1096             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1097                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1098             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1099                          printBuffer);
1100             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1101         }
1102
1103         rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1104                           message.message.asndata, message.message.asnLength);
1105         if (rval.code != RC_OK) {
1106             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1107                          message.peerInfo->enodbName);
1108             break;
1109         }
1110
1111         if (loglevel >= MDCLOG_DEBUG) {
1112             clock_gettime(CLOCK_MONOTONIC, &end);
1113             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1114                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1115             char *printBuffer;
1116             size_t size;
1117             FILE *stream = open_memstream(&printBuffer, &size);
1118             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1119             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1120             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1121         }
1122
1123         switch (pdu->present) {
1124             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1125                 asnInitiatingRequest(pdu, message, rmrMessageBuffer);
1126                 break;
1127             }
1128             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1129                 asnSuccsesfulMsg(pdu, message,  rmrMessageBuffer);
1130                 break;
1131             }
1132             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1133                 asnUnSuccsesfulMsg(pdu, message, rmrMessageBuffer);
1134                 break;
1135             }
1136             default:
1137                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1138                 break;
1139         }
1140         if (loglevel >= MDCLOG_DEBUG) {
1141             clock_gettime(CLOCK_MONOTONIC, &end);
1142             mdclog_write(MDCLOG_DEBUG,
1143                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1144                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1145         }
1146         numOfMessages++;
1147         // remove the break for EAGAIN
1148         //break;
1149         if (pdu != nullptr) {
1150             //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
1151             ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1152             //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1153             pdu = nullptr;
1154         }
1155         //clock_gettime(CLOCK_MONOTONIC, &start);
1156     }
1157     // in case of break to avoid memory leak
1158     if (pdu != nullptr) {
1159         //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1160         ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1161         pdu = nullptr;
1162     }
1163
1164     if (done) {
1165         if (loglevel >= MDCLOG_INFO) {
1166             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1167         }
1168         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1169                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1170                          256,
1171                          "%s|CU disconnected unexpectedly",
1172                          message.peerInfo->enodbName);
1173         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1174
1175         if (sendRequestToXapp(message,
1176                               RIC_SCTP_CONNECTION_FAILURE,
1177                               rmrMessageBuffer) != 0) {
1178             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1179         }
1180
1181         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1182         close(message.peerInfo->fileDescriptor);
1183         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
1184     }
1185     if (loglevel >= MDCLOG_DEBUG) {
1186         clock_gettime(CLOCK_MONOTONIC, &end);
1187         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1188                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1189
1190     }
1191     return 0;
1192 }
1193
1194 static void buildAndsendSetupRequest(ReportingMessages_t &message,
1195                                      E2setupRequestIEs_t *ie,
1196                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1197                                      E2AP_PDU_t *pdu) {
1198     auto logLevel = mdclog_level_get();
1199
1200
1201     if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1202         mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1203     } else {
1204         memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1205     }
1206     // now we can send the data to e2Mgr
1207     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1208
1209     auto *rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size);
1210     // add addrees to message
1211
1212
1213     // unsigned char *buffer = &rmrMsg->payload[j];
1214     unsigned char buffer[RECEIVE_SCTP_BUFFER_SIZE * 2];
1215     // encode to xml
1216     auto er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size);
1217     if (er.encoded == -1) {
1218         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1219     } else if (er.encoded > (ssize_t) buffer_size) {
1220         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s",
1221                      (int) buffer_size,
1222                      asn_DEF_E2AP_PDU.name);
1223     } else {
1224         rmrMsg->len = snprintf((char *)rmrMsg->payload, RECEIVE_SCTP_BUFFER_SIZE * 2, "%s:%d|%s",
1225                                message.peerInfo->sctpParams->myIP.c_str(),
1226                                message.peerInfo->sctpParams->rmrPort,
1227                                buffer);
1228         if (logLevel >= MDCLOG_DEBUG) {
1229             mdclog_write(MDCLOG_DEBUG, "Setup request of size %d :\n %s\n", rmrMsg->len, rmrMsg->payload);
1230         }
1231         // send to RMR
1232         message.message.messageType = rmrMsg->mtype = RIC_E2_SETUP_REQ;
1233         rmrMsg->state = 0;
1234         rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1235
1236         static unsigned char tx[32];
1237         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1238         rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1239
1240         rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1241         if (rmrMsg == nullptr) {
1242             mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1243         } else if (rmrMsg->state != 0) {
1244             char meid[RMR_MAX_MEID]{};
1245             if (rmrMsg->state == RMR_ERR_RETRY) {
1246                 usleep(5);
1247                 rmrMsg->state = 0;
1248                 mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1249                              rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1250                 rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1251                 if (rmrMsg == nullptr) {
1252                     mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1253                 } else if (rmrMsg->state != 0) {
1254                     mdclog_write(MDCLOG_ERR,
1255                                  "RMR Retry failed %s sending request %d to Xapp from %s",
1256                                  translateRmrErrorMessages(rmrMsg->state).c_str(),
1257                                  rmrMsg->mtype,
1258                                  rmr_get_meid(rmrMsg, (unsigned char *) meid));
1259                 }
1260             } else {
1261                 mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1262                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1263                              rmrMsg->mtype,
1264                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1265             }
1266         }
1267         message.peerInfo->gotSetup = true;
1268         buildJsonMessage(message);
1269         if (rmrMsg != nullptr) {
1270             rmr_free_msg(rmrMsg);
1271         }
1272     }
1273
1274 }
1275 /**
1276  *
1277  * @param pdu
1278  * @param message
1279  * @param rmrMessageBuffer
1280  */
1281 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1282                           ReportingMessages_t &message,
1283                           RmrMessagesBuffer_t &rmrMessageBuffer) {
1284     auto logLevel = mdclog_level_get();
1285     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1286     if (logLevel >= MDCLOG_DEBUG) {
1287         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1288     }
1289     switch (procedureCode) {
1290         case ProcedureCode_id_E2setup: {
1291             if (logLevel >= MDCLOG_DEBUG) {
1292                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1293             }
1294
1295             memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1296             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1297                 auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1298                 if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1299                     if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1300                         buildAndsendSetupRequest(message, ie, rmrMessageBuffer, pdu);
1301                         break;
1302                     }
1303                 }
1304             }
1305             break;
1306         }
1307         case ProcedureCode_id_ErrorIndication: {
1308             if (logLevel >= MDCLOG_DEBUG) {
1309                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1310             }
1311             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1312                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1313             }
1314             break;
1315         }
1316         case ProcedureCode_id_Reset: {
1317             if (logLevel >= MDCLOG_DEBUG) {
1318                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1319             }
1320             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1321                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1322             }
1323             break;
1324         }
1325         case ProcedureCode_id_RICcontrol: {
1326             if (logLevel >= MDCLOG_DEBUG) {
1327                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1328             }
1329             break;
1330         }
1331         case ProcedureCode_id_RICindication: {
1332             if (logLevel >= MDCLOG_DEBUG) {
1333                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1334             }
1335             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1336                 auto messageSent = false;
1337                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1338                 if (logLevel >= MDCLOG_DEBUG) {
1339                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1340                 }
1341                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1342                     if (logLevel >= MDCLOG_DEBUG) {
1343                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1344                     }
1345                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1346                         static unsigned char tx[32];
1347                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1348                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1349                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1350                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1351                                        (unsigned char *)message.message.enodbName,
1352                                        strlen(message.message.enodbName));
1353                         rmrMessageBuffer.sendMessage->state = 0;
1354                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1355                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1356                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1357                                          rmrMessageBuffer.sendMessage->sub_id,
1358                                          rmrMessageBuffer.sendMessage->mtype);
1359                         }
1360                         sendRmrMessage(rmrMessageBuffer, message);
1361                         messageSent = true;
1362                     } else {
1363                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1364                     }
1365                 }
1366                 if (messageSent) {
1367                     break;
1368                 }
1369             }
1370             break;
1371         }
1372         case ProcedureCode_id_RICserviceQuery: {
1373             if (logLevel >= MDCLOG_DEBUG) {
1374                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1375             }
1376             break;
1377         }
1378         case ProcedureCode_id_RICserviceUpdate: {
1379             if (logLevel >= MDCLOG_DEBUG) {
1380                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1381             }
1382             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1383                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1384             }
1385             break;
1386         }
1387         case ProcedureCode_id_RICsubscription: {
1388             if (logLevel >= MDCLOG_DEBUG) {
1389                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1390             }
1391             break;
1392         }
1393         case ProcedureCode_id_RICsubscriptionDelete: {
1394             if (logLevel >= MDCLOG_DEBUG) {
1395                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1396             }
1397             break;
1398         }
1399         default: {
1400             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1401             message.message.messageType = 0; // no RMR message type yet
1402
1403             buildJsonMessage(message);
1404
1405             break;
1406         }
1407     }
1408 }
1409
1410 /**
1411  *
1412  * @param pdu
1413  * @param message
1414  * @param rmrMessageBuffer
1415  */
1416 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1417     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1418     auto logLevel = mdclog_level_get();
1419     if (logLevel >= MDCLOG_INFO) {
1420         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1421     }
1422     switch (procedureCode) {
1423         case ProcedureCode_id_E2setup: {
1424             if (logLevel >= MDCLOG_DEBUG) {
1425                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1426             }
1427             break;
1428         }
1429         case ProcedureCode_id_ErrorIndication: {
1430             if (logLevel >= MDCLOG_DEBUG) {
1431                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1432             }
1433             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1434                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1435             }
1436             break;
1437         }
1438         case ProcedureCode_id_Reset: {
1439             if (logLevel >= MDCLOG_DEBUG) {
1440                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1441             }
1442             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1443                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1444             }
1445             break;
1446         }
1447         case ProcedureCode_id_RICcontrol: {
1448             if (logLevel >= MDCLOG_DEBUG) {
1449                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1450             }
1451             for (auto i = 0;
1452                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1453                 auto messageSent = false;
1454                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1455                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1456                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1457                 }
1458                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1459                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1460                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1461                     }
1462                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1463                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1464                         rmrMessageBuffer.sendMessage->state = 0;
1465                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1466                         static unsigned char tx[32];
1467                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1468                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1469                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1470                                        (unsigned char *)message.message.enodbName,
1471                                        strlen(message.message.enodbName));
1472
1473                         sendRmrMessage(rmrMessageBuffer, message);
1474                         messageSent = true;
1475                     } else {
1476                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1477                     }
1478                 }
1479                 if (messageSent) {
1480                     break;
1481                 }
1482             }
1483
1484             break;
1485         }
1486         case ProcedureCode_id_RICindication: {
1487             if (logLevel >= MDCLOG_DEBUG) {
1488                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1489             }
1490             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1491                 auto messageSent = false;
1492                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1493                 if (logLevel >= MDCLOG_DEBUG) {
1494                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1495                 }
1496                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1497                     if (logLevel >= MDCLOG_DEBUG) {
1498                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1499                     }
1500                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1501                         static unsigned char tx[32];
1502                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1503                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1504                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1505                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1506                                        (unsigned char *)message.message.enodbName,
1507                                        strlen(message.message.enodbName));
1508                         rmrMessageBuffer.sendMessage->state = 0;
1509                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1510                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1511                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1512                                          rmrMessageBuffer.sendMessage->sub_id,
1513                                          rmrMessageBuffer.sendMessage->mtype);
1514                         }
1515                         sendRmrMessage(rmrMessageBuffer, message);
1516                         messageSent = true;
1517                     } else {
1518                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1519                     }
1520                 }
1521                 if (messageSent) {
1522                     break;
1523                 }
1524             }
1525             break;
1526         }
1527         case ProcedureCode_id_RICserviceQuery: {
1528             if (logLevel >= MDCLOG_DEBUG) {
1529                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1530             }
1531             break;
1532         }
1533         case ProcedureCode_id_RICserviceUpdate: {
1534             if (logLevel >= MDCLOG_DEBUG) {
1535                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1536             }
1537             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1538                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1539             }
1540             break;
1541         }
1542         case ProcedureCode_id_RICsubscription: {
1543             if (logLevel >= MDCLOG_DEBUG) {
1544                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1545             }
1546             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
1547                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1548             }
1549             break;
1550         }
1551         case ProcedureCode_id_RICsubscriptionDelete: {
1552             if (logLevel >= MDCLOG_DEBUG) {
1553                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1554             }
1555             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
1556                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1557             }
1558             break;
1559         }
1560         default: {
1561             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1562             message.message.messageType = 0; // no RMR message type yet
1563             buildJsonMessage(message);
1564
1565             break;
1566         }
1567     }
1568 }
1569
1570 /**
1571  *
1572  * @param pdu
1573  * @param message
1574  * @param rmrMessageBuffer
1575  */
1576 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1577                         ReportingMessages_t &message,
1578                         RmrMessagesBuffer_t &rmrMessageBuffer) {
1579     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1580     auto logLevel = mdclog_level_get();
1581     if (logLevel >= MDCLOG_INFO) {
1582         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1583     }
1584     switch (procedureCode) {
1585         case ProcedureCode_id_E2setup: {
1586             if (logLevel >= MDCLOG_DEBUG) {
1587                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1588             }
1589             break;
1590         }
1591         case ProcedureCode_id_ErrorIndication: {
1592             if (logLevel >= MDCLOG_DEBUG) {
1593                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1594             }
1595             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1596                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1597             }
1598             break;
1599         }
1600         case ProcedureCode_id_Reset: {
1601             if (logLevel >= MDCLOG_DEBUG) {
1602                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1603             }
1604             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer) != 0) {
1605                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1606             }
1607             break;
1608         }
1609         case ProcedureCode_id_RICcontrol: {
1610             if (logLevel >= MDCLOG_DEBUG) {
1611                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1612             }
1613             for (int i = 0;
1614                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1615                 auto messageSent = false;
1616                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1617                 if (logLevel >= MDCLOG_DEBUG) {
1618                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1619                 }
1620                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1621                     if (logLevel >= MDCLOG_DEBUG) {
1622                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1623                     }
1624                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1625                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1626                         rmrMessageBuffer.sendMessage->state = 0;
1627                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1628                         static unsigned char tx[32];
1629                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1630                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1631                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
1632                                        strlen(message.message.enodbName));
1633                         sendRmrMessage(rmrMessageBuffer, message);
1634                         messageSent = true;
1635                     } else {
1636                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1637                     }
1638                 }
1639                 if (messageSent) {
1640                     break;
1641                 }
1642             }
1643             break;
1644         }
1645         case ProcedureCode_id_RICindication: {
1646             if (logLevel >= MDCLOG_DEBUG) {
1647                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1648             }
1649             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1650                 auto messageSent = false;
1651                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1652                 if (logLevel >= MDCLOG_DEBUG) {
1653                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1654                 }
1655                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1656                     if (logLevel >= MDCLOG_DEBUG) {
1657                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1658                     }
1659                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1660                         static unsigned char tx[32];
1661                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1662                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1663                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1664                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1665                                        (unsigned char *)message.message.enodbName,
1666                                        strlen(message.message.enodbName));
1667                         rmrMessageBuffer.sendMessage->state = 0;
1668                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1669                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1670                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1671                                          rmrMessageBuffer.sendMessage->sub_id,
1672                                          rmrMessageBuffer.sendMessage->mtype);
1673                         }
1674                         sendRmrMessage(rmrMessageBuffer, message);
1675                         messageSent = true;
1676                     } else {
1677                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1678                     }
1679                 }
1680                 if (messageSent) {
1681                     break;
1682                 }
1683             }
1684             break;
1685         }
1686         case ProcedureCode_id_RICserviceQuery: {
1687             if (logLevel >= MDCLOG_DEBUG) {
1688                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1689             }
1690             break;
1691         }
1692         case ProcedureCode_id_RICserviceUpdate: {
1693             if (logLevel >= MDCLOG_DEBUG) {
1694                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1695             }
1696             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
1697                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1698             }
1699             break;
1700         }
1701         case ProcedureCode_id_RICsubscription: {
1702             if (logLevel >= MDCLOG_DEBUG) {
1703                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1704             }
1705             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1706                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1707             }
1708             break;
1709         }
1710         case ProcedureCode_id_RICsubscriptionDelete: {
1711             if (logLevel >= MDCLOG_DEBUG) {
1712                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1713             }
1714             if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer) != 0) {
1715                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1716             }
1717             break;
1718         }
1719         default: {
1720             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1721             message.message.messageType = 0; // no RMR message type yet
1722
1723             buildJsonMessage(message);
1724
1725             break;
1726         }
1727     }
1728 }
1729
1730 /**
1731  *
1732  * @param message
1733  * @param requestId
1734  * @param rmrMmessageBuffer
1735  * @return
1736  */
1737 int sendRequestToXapp(ReportingMessages_t &message,
1738                       int requestId,
1739                       RmrMessagesBuffer_t &rmrMmessageBuffer) {
1740     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1741                    (unsigned char *)message.message.enodbName,
1742                    strlen(message.message.enodbName));
1743     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1744     rmrMmessageBuffer.sendMessage->state = 0;
1745     static unsigned char tx[32];
1746     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1747     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1748
1749     auto rc = sendRmrMessage(rmrMmessageBuffer, message);
1750     return rc;
1751 }
1752
1753
1754 void getRmrContext(sctp_params_t &pSctpParams) {
1755     pSctpParams.rmrCtx = nullptr;
1756     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1757     if (pSctpParams.rmrCtx == nullptr) {
1758         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1759         return;
1760     }
1761
1762     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1763     // we need to find that routing table exist and we can run
1764     if (mdclog_level_get() >= MDCLOG_INFO) {
1765         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1766     }
1767     int rmrReady = 0;
1768     int count = 0;
1769     while (!rmrReady) {
1770         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1771             sleep(1);
1772         }
1773         count++;
1774         if (count % 60 == 0) {
1775             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1776         }
1777     }
1778     if (mdclog_level_get() >= MDCLOG_INFO) {
1779         mdclog_write(MDCLOG_INFO, "RMR running");
1780     }
1781     rmr_init_trace(pSctpParams.rmrCtx, 200);
1782     // get the RMR fd for the epoll
1783     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1784     struct epoll_event event{};
1785     // add RMR fd to epoll
1786     event.events = (EPOLLIN);
1787     event.data.fd = pSctpParams.rmrListenFd;
1788     // add listening RMR FD to epoll
1789     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
1790         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
1791         close(pSctpParams.rmrListenFd);
1792         rmr_close(pSctpParams.rmrCtx);
1793         pSctpParams.rmrCtx = nullptr;
1794     }
1795 }
1796
1797 int BuildPERSetupResponseMessaeFromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1798     E2AP_PDU_t *pdu;
1799     auto rval = asn_decode(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1800                            rmrMessageBuffer.rcvMessage->payload, rmrMessageBuffer.rcvMessage->len);
1801     if (rval.code != RC_OK) {
1802         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from E2MGR : %s",
1803                      rval.code,
1804                      message.message.enodbName);
1805         return -1;
1806     }
1807
1808     auto er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu,
1809                                    rmrMessageBuffer.rcvMessage->payload, rmrMessageBuffer.rcvMessage->len);
1810     if (er.encoded == -1) {
1811         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1812         return -1;
1813     } else if (er.encoded > (ssize_t)rmrMessageBuffer.rcvMessage->len) {
1814         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s",
1815                      (int)rmrMessageBuffer.rcvMessage->len,
1816                      asn_DEF_E2AP_PDU.name);
1817         return -1;
1818     }
1819     return 0;
1820 }
1821
1822 /**
1823  *
1824  * @param sctpMap
1825  * @param rmrMessageBuffer
1826  * @param ts
1827  * @return
1828  */
1829 int receiveXappMessages(Sctp_Map_t *sctpMap,
1830                         RmrMessagesBuffer_t &rmrMessageBuffer,
1831                         struct timespec &ts) {
1832     if (rmrMessageBuffer.rcvMessage == nullptr) {
1833         //we have error
1834         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1835         return -1;
1836     }
1837
1838     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1839         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1840     }
1841     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1842     if (rmrMessageBuffer.rcvMessage == nullptr) {
1843         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1844         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1845         return -2;
1846     }
1847     ReportingMessages_t message;
1848     message.message.direction = 'D';
1849     message.message.time.tv_nsec = ts.tv_nsec;
1850     message.message.time.tv_sec = ts.tv_sec;
1851
1852     // get message payload
1853     //auto msgData = msg->payload;
1854     if (rmrMessageBuffer.rcvMessage->state != 0) {
1855         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1856         return -1;
1857     }
1858     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName);
1859     switch (rmrMessageBuffer.rcvMessage->mtype) {
1860         case RIC_E2_SETUP_RESP : {
1861             if (BuildPERSetupResponseMessaeFromXML(message, rmrMessageBuffer) != 0) {
1862                 break;
1863             }
1864
1865             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1866                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
1867                 return -6;
1868             }
1869             break;
1870         }
1871         case RIC_E2_SETUP_FAILURE : {
1872             if (BuildPERSetupResponseMessaeFromXML(message, rmrMessageBuffer) != 0) {
1873                 break;
1874             }
1875             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1876                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
1877                 return -6;
1878             }
1879             break;
1880         }
1881         case RIC_ERROR_INDICATION: {
1882             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1883                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
1884                 return -6;
1885             }
1886             break;
1887         }
1888         case RIC_SUB_REQ: {
1889             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1890                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
1891                 return -6;
1892             }
1893             break;
1894         }
1895         case RIC_SUB_DEL_REQ: {
1896             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1897                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
1898                 return -6;
1899             }
1900             break;
1901         }
1902         case RIC_CONTROL_REQ: {
1903             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1904                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
1905                 return -6;
1906             }
1907             break;
1908         }
1909         case RIC_SERVICE_QUERY: {
1910             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1911                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
1912                 return -6;
1913             }
1914             break;
1915         }
1916         case RIC_SERVICE_UPDATE_ACK: {
1917             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1918                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
1919                 return -6;
1920             }
1921             break;
1922         }
1923         case RIC_SERVICE_UPDATE_FAILURE: {
1924             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1925                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
1926                 return -6;
1927             }
1928             break;
1929         }
1930         case RIC_X2_RESET: {
1931             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1932                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
1933                 return -6;
1934             }
1935             break;
1936         }
1937         case RIC_X2_RESET_RESP: {
1938             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
1939                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
1940                 return -6;
1941             }
1942             break;
1943         }
1944         case RIC_SCTP_CLEAR_ALL: {
1945             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
1946             // loop on all keys and close socket and then erase all map.
1947             vector<char *> v;
1948             sctpMap->getKeys(v);
1949             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
1950                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
1951                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
1952                     if (peerInfo == nullptr) {
1953                         continue;
1954                     }
1955                     close(peerInfo->fileDescriptor);
1956                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1957                     message.message.direction = 'D';
1958                     message.message.time.tv_nsec = ts.tv_nsec;
1959                     message.message.time.tv_sec = ts.tv_sec;
1960
1961                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1962                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1963                                      256,
1964                                      "%s|RIC_SCTP_CLEAR_ALL",
1965                                      peerInfo->enodbName);
1966                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1967                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
1968                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
1969                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1970                     }
1971                     free(peerInfo);
1972                 }
1973             }
1974
1975             sleep(1);
1976             sctpMap->clear();
1977             break;
1978         }
1979         case E2_TERM_KEEP_ALIVE_REQ: {
1980             // send message back
1981             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
1982                               (unsigned char *)rmrMessageBuffer.ka_message,
1983                               rmrMessageBuffer.ka_message_len);
1984             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
1985             rmrMessageBuffer.sendMessage->state = 0;
1986             static unsigned char tx[32];
1987             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1988             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
1989             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
1990             if (rmrMessageBuffer.sendMessage == nullptr) {
1991                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1992                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
1993             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
1994                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
1995                              rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
1996             } else if (mdclog_level_get() >= MDCLOG_DEBUG) {
1997                 mdclog_write(MDCLOG_DEBUG, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
1998             }
1999
2000             break;
2001         }
2002         default:
2003             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
2004             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2005             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2006             message.message.time.tv_nsec = ts.tv_nsec;
2007             message.message.time.tv_sec = ts.tv_sec;
2008             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2009
2010             buildJsonMessage(message);
2011
2012
2013             return -7;
2014     }
2015     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2016         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2017     }
2018     return 0;
2019 }
2020
2021 /**
2022  * Send message to the CU that is not expecting for successful or unsuccessful results
2023  * @param messageBuffer
2024  * @param message
2025  * @param failedMsgId
2026  * @param sctpMap
2027  * @return
2028  */
2029 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2030                            ReportingMessages_t &message,
2031                            int failedMsgId,
2032                            Sctp_Map_t *sctpMap) {
2033
2034     getRequestMetaData(message, messageBuffer);
2035     if (mdclog_level_get() >= MDCLOG_INFO) {
2036         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2037     }
2038
2039     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId);
2040     return rc;
2041 }
2042
2043 /**
2044  *
2045  * @param sctpMap
2046  * @param messageBuffer
2047  * @param message
2048  * @param failedMesgId
2049  * @return
2050  */
2051 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2052                     RmrMessagesBuffer_t &messageBuffer,
2053                     ReportingMessages_t &message,
2054                     int failedMesgId) {
2055     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2056     if (peerInfo == nullptr) {
2057         if (failedMesgId != 0) {
2058             sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId);
2059         } else {
2060             mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2061         }
2062         return -1;
2063     }
2064
2065     // get the FD
2066     message.message.messageType = messageBuffer.rcvMessage->mtype;
2067     auto rc = sendSctpMsg(peerInfo, message, sctpMap);
2068     return rc;
2069 }
2070
2071 /**
2072  *
2073  * @param rmrCtx the rmr context to send and receive
2074  * @param msg the msg we got fromxApp
2075  * @param metaData data from xApp in ordered struct
2076  * @param failedMesgId the return message type error
2077  */
2078 void
2079 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId) {
2080     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
2081     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
2082                         message.message.enodbName);
2083     if (mdclog_level_get() >= MDCLOG_INFO) {
2084         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
2085     }
2086     msg->mtype = failedMesgId;
2087     msg->state = 0;
2088
2089     static unsigned char tx[32];
2090     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2091     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
2092
2093     sendRmrMessage(rmrMessageBuffer, message);
2094 }
2095
2096
2097
2098 /**
2099  *
2100  * @param epoll_fd
2101  * @param peerInfo
2102  * @param events
2103  * @param sctpMap
2104  * @param enodbName
2105  * @param msgType
2106  * @return
2107  */
2108 int addToEpoll(int epoll_fd,
2109                ConnectedCU_t *peerInfo,
2110                uint32_t events,
2111                Sctp_Map_t *sctpMap,
2112                char *enodbName,
2113                int msgType) {
2114     // Add to Epol
2115     struct epoll_event event{};
2116     event.data.ptr = peerInfo;
2117     event.events = events;
2118     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2119         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2120             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2121                          strerror(errno), __func__, __LINE__);
2122         }
2123         close(peerInfo->fileDescriptor);
2124         if (enodbName != nullptr) {
2125             cleanHashEntry(peerInfo, sctpMap);
2126             char key[MAX_ENODB_NAME_SIZE * 2];
2127             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2128             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2129                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2130             }
2131             auto tmp = sctpMap->find(key);
2132             if (tmp) {
2133                 free(tmp);
2134                 sctpMap->erase(key);
2135             }
2136         } else {
2137             peerInfo->enodbName[0] = 0;
2138         }
2139         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2140         return -1;
2141     }
2142     return 0;
2143 }
2144
2145 /**
2146  *
2147  * @param epoll_fd
2148  * @param peerInfo
2149  * @param events
2150  * @param sctpMap
2151  * @param enodbName
2152  * @param msgType
2153  * @return
2154  */
2155 int modifyToEpoll(int epoll_fd,
2156                   ConnectedCU_t *peerInfo,
2157                   uint32_t events,
2158                   Sctp_Map_t *sctpMap,
2159                   char *enodbName,
2160                   int msgType) {
2161     // Add to Epol
2162     struct epoll_event event{};
2163     event.data.ptr = peerInfo;
2164     event.events = events;
2165     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2166         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2167             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2168                          strerror(errno), __func__, __LINE__);
2169         }
2170         close(peerInfo->fileDescriptor);
2171         cleanHashEntry(peerInfo, sctpMap);
2172         char key[MAX_ENODB_NAME_SIZE * 2];
2173         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2174         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2175             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2176         }
2177         auto tmp = sctpMap->find(key);
2178         if (tmp) {
2179             free(tmp);
2180         }
2181         sctpMap->erase(key);
2182         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2183         return -1;
2184     }
2185     return 0;
2186 }
2187
2188
2189 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message) {
2190     buildJsonMessage(message);
2191
2192     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2193
2194     if (rmrMessageBuffer.sendMessage == nullptr) {
2195         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2196         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2197         return -1;
2198     }
2199
2200     if (rmrMessageBuffer.sendMessage->state != 0) {
2201         char meid[RMR_MAX_MEID]{};
2202         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2203             usleep(5);
2204             rmrMessageBuffer.sendMessage->state = 0;
2205             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2206                          rmrMessageBuffer.sendMessage->mtype,
2207                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2208             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2209             if (rmrMessageBuffer.sendMessage == nullptr) {
2210                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2211                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2212                 return -1;
2213             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2214                 mdclog_write(MDCLOG_ERR,
2215                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2216                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2217                              rmrMessageBuffer.sendMessage->mtype,
2218                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2219                 auto rc = rmrMessageBuffer.sendMessage->state;
2220                 return rc;
2221             }
2222         } else {
2223             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2224                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2225                          rmrMessageBuffer.sendMessage->mtype,
2226                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2227             return rmrMessageBuffer.sendMessage->state;
2228         }
2229     }
2230     return 0;
2231 }
2232
2233 void buildJsonMessage(ReportingMessages_t &message) {
2234     if (jsonTrace) {
2235         message.outLen = sizeof(message.base64Data);
2236         base64::encode((const unsigned char *) message.message.asndata,
2237                        (const int) message.message.asnLength,
2238                        message.base64Data,
2239                        message.outLen);
2240         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2241             mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2242                          (int) message.message.asnLength,
2243                          (int) message.outLen);
2244         }
2245
2246         snprintf(message.buffer, sizeof(message.buffer),
2247                  "{\"header\": {\"ts\": \"%ld.%09ld\","
2248                  "\"ranName\": \"%s\","
2249                  "\"messageType\": %d,"
2250                  "\"direction\": \"%c\"},"
2251                  "\"base64Length\": %d,"
2252                  "\"asnBase64\": \"%s\"}",
2253                  message.message.time.tv_sec,
2254                  message.message.time.tv_nsec,
2255                  message.message.enodbName,
2256                  message.message.messageType,
2257                  message.message.direction,
2258                  (int) message.outLen,
2259                  message.base64Data);
2260         static src::logger_mt &lg = my_logger::get();
2261
2262         BOOST_LOG(lg) << message.buffer;
2263     }
2264 }
2265
2266
2267 /**
2268  * take RMR error code to string
2269  * @param state
2270  * @return
2271  */
2272 string translateRmrErrorMessages(int state) {
2273     string str = {};
2274     switch (state) {
2275         case RMR_OK:
2276             str = "RMR_OK - state is good";
2277             break;
2278         case RMR_ERR_BADARG:
2279             str = "RMR_ERR_BADARG - argument passd to function was unusable";
2280             break;
2281         case RMR_ERR_NOENDPT:
2282             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2283             break;
2284         case RMR_ERR_EMPTY:
2285             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2286             break;
2287         case RMR_ERR_NOHDR:
2288             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2289             break;
2290         case RMR_ERR_SENDFAILED:
2291             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2292             break;
2293         case RMR_ERR_CALLFAILED:
2294             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2295             break;
2296         case RMR_ERR_NOWHOPEN:
2297             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2298             break;
2299         case RMR_ERR_WHID:
2300             str = "RMR_ERR_WHID - wormhole id was invalid";
2301             break;
2302         case RMR_ERR_OVERFLOW:
2303             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2304             break;
2305         case RMR_ERR_RETRY:
2306             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2307             break;
2308         case RMR_ERR_RCVFAILED:
2309             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2310             break;
2311         case RMR_ERR_TIMEOUT:
2312             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2313             break;
2314         case RMR_ERR_UNSET:
2315             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2316             break;
2317         case RMR_ERR_TRUNC:
2318             str = "RMR_ERR_TRUNC - received message likely truncated";
2319             break;
2320         case RMR_ERR_INITFAILED:
2321             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2322             break;
2323         case RMR_ERR_NOTSUPP:
2324             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2325             break;
2326         default:
2327             char buf[128]{};
2328             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2329             str = buf;
2330             break;
2331     }
2332     return str;
2333 }
2334
2335