Upgrade RMR to v4.7.4
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include <3rdparty/oranE2/RANfunctions-List.h>
24 #include "sctpThread.h"
25 #include "BuildRunName.h"
26
27 //#include "3rdparty/oranE2SM/E2SM-gNB-NRT-RANfunction-Definition.h"
28 //#include "BuildXml.h"
29 //#include "pugixml/src/pugixml.hpp"
30
31 using namespace std;
32 //using namespace std::placeholders;
33 using namespace boost::filesystem;
34 using namespace prometheus;
35
36
37 //#ifdef __cplusplus
38 //extern "C"
39 //{
40 //#endif
41
42 // need to expose without the include of gcov
43 extern "C" void __gcov_flush(void);
44
45 static void catch_function(int signal) {
46     __gcov_flush();
47     exit(signal);
48 }
49
50
51 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
52
53 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
54 double cpuClock = 0.0;
55 bool jsonTrace = false;
56
57 void init_log() {
58     mdclog_attr_t *attr;
59     mdclog_attr_init(&attr);
60     mdclog_attr_set_ident(attr, "E2Terminator");
61     mdclog_init(attr);
62     mdclog_attr_destroy(attr);
63 }
64 auto start_time = std::chrono::high_resolution_clock::now();
65 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
66
67 double age() {
68     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
69 }
70
71 double approx_CPU_MHz(unsigned sleepTime) {
72     using namespace std::chrono_literals;
73     uint32_t aux = 0;
74     uint64_t cycles_start = rdtscp(aux);
75     double time_start = age();
76     std::this_thread::sleep_for(sleepTime * 1ms);
77     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
78     double elapsed_time = age() - time_start;
79     return elapsed_cycles / elapsed_time;
80 }
81
82 //std::atomic<int64_t> rmrCounter{0};
83 std::atomic<int64_t> num_of_messages{0};
84 std::atomic<int64_t> num_of_XAPP_messages{0};
85 static long transactionCounter = 0;
86
87 int buildListeningPort(sctp_params_t &sctpParams) {
88     sctpParams.listenFD = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
89     if (sctpParams.listenFD <= 0) {
90         mdclog_write(MDCLOG_ERR, "Error Opening socket, %s", strerror(errno));
91         return -1;
92     }
93
94     struct sockaddr_in6 serverAddress {};
95     serverAddress.sin6_family = AF_INET6;
96     serverAddress.sin6_addr   = in6addr_any;
97     serverAddress.sin6_port = htons(sctpParams.sctpPort);
98     if (bind(sctpParams.listenFD, (SA *)&serverAddress, sizeof(serverAddress)) < 0 ) {
99         mdclog_write(MDCLOG_ERR, "Error binding port %d. %s", sctpParams.sctpPort, strerror(errno));
100         return -1;
101     }
102     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
103         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
104         return -1;
105     }
106     if (mdclog_level_get() >= MDCLOG_DEBUG) {
107         struct sockaddr_in6 clientAddress {};
108         socklen_t len = sizeof(clientAddress);
109         getsockname(sctpParams.listenFD, (SA *)&clientAddress, &len);
110         char buff[1024] {};
111         inet_ntop(AF_INET6, &clientAddress.sin6_addr, buff, sizeof(buff));
112         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(clientAddress.sin6_port));
113     }
114
115     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
116         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
117         return -1;
118     }
119     struct epoll_event event {};
120     event.events = EPOLLIN | EPOLLET;
121     event.data.fd = sctpParams.listenFD;
122
123     // add listening port to epoll
124     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
125         printf("Failed to add descriptor to epoll\n");
126         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
127         return -1;
128     }
129
130     return 0;
131 }
132
133 int buildConfiguration(sctp_params_t &sctpParams) {
134     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
135     if (exists(p)) {
136         const int size = 2048;
137         auto fileSize = file_size(p);
138         if (fileSize > size) {
139             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
140             return -1;
141         }
142     } else {
143         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
144         return -1;
145     }
146
147     ReadConfigFile conf;
148     if (conf.openConfigFile(p.string()) == -1) {
149         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
150                      p.string().c_str(), strerror(errno));
151         return -1;
152     }
153     int rmrPort = conf.getIntValue("nano");
154     if (rmrPort == -1) {
155         mdclog_write(MDCLOG_ERR, "illegal RMR port ");
156         return -1;
157     }
158     sctpParams.rmrPort = (uint16_t)rmrPort;
159     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
160
161     auto tmpStr = conf.getStringValue("loglevel");
162     if (tmpStr.length() == 0) {
163         mdclog_write(MDCLOG_ERR, "illegal loglevel. Set loglevel to MDCLOG_INFO");
164         tmpStr = "info";
165     }
166     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
167
168     if ((tmpStr.compare("debug")) == 0) {
169         sctpParams.logLevel = MDCLOG_DEBUG;
170     } else if ((tmpStr.compare("info")) == 0) {
171         sctpParams.logLevel = MDCLOG_INFO;
172     } else if ((tmpStr.compare("warning")) == 0) {
173         sctpParams.logLevel = MDCLOG_WARN;
174     } else if ((tmpStr.compare("error")) == 0) {
175         sctpParams.logLevel = MDCLOG_ERR;
176     } else {
177         mdclog_write(MDCLOG_ERR, "illegal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
178         sctpParams.logLevel = MDCLOG_INFO;
179     }
180     mdclog_level_set(sctpParams.logLevel);
181
182     tmpStr = conf.getStringValue("volume");
183     if (tmpStr.length() == 0) {
184         mdclog_write(MDCLOG_ERR, "illegal volume.");
185         return -1;
186     }
187
188     char tmpLogFilespec[VOLUME_URL_SIZE];
189     tmpLogFilespec[0] = 0;
190     sctpParams.volume[0] = 0;
191     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
192     // copy the name to temp file as well
193     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
194
195
196     // define the file name in the tmp directory under the volume
197     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
198
199     sctpParams.myIP = conf.getStringValue("local-ip");
200     if (sctpParams.myIP.length() == 0) {
201         mdclog_write(MDCLOG_ERR, "illegal local-ip.");
202         return -1;
203     }
204
205     int sctpPort = conf.getIntValue("sctp-port");
206     if (sctpPort == -1) {
207         mdclog_write(MDCLOG_ERR, "illegal SCTP port ");
208         return -1;
209     }
210     sctpParams.sctpPort = (uint16_t)sctpPort;
211
212     sctpParams.fqdn = conf.getStringValue("external-fqdn");
213     if (sctpParams.fqdn.length() == 0) {
214         mdclog_write(MDCLOG_ERR, "illegal external-fqdn");
215         return -1;
216     }
217
218     std::string pod = conf.getStringValue("pod_name");
219     if (pod.length() == 0) {
220         mdclog_write(MDCLOG_ERR, "illegal pod_name in config file");
221         return -1;
222     }
223     auto *podName = getenv(pod.c_str());
224     if (podName == nullptr) {
225         mdclog_write(MDCLOG_ERR, "illegal pod_name or environment variable not exists : %s", pod.c_str());
226         return -1;
227
228     } else {
229         sctpParams.podName.assign(podName);
230         if (sctpParams.podName.length() == 0) {
231             mdclog_write(MDCLOG_ERR, "illegal pod_name");
232             return -1;
233         }
234     }
235
236     tmpStr = conf.getStringValue("trace");
237     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
238     if ((tmpStr.compare("start")) == 0) {
239         mdclog_write(MDCLOG_INFO, "Trace set to: start");
240         sctpParams.trace = true;
241     } else if ((tmpStr.compare("stop")) == 0) {
242         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
243         sctpParams.trace = false;
244     } else {
245         mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
246         sctpParams.trace = false;
247     }
248     jsonTrace = sctpParams.trace;
249
250     sctpParams.epollTimeOut = -1;
251
252     tmpStr = conf.getStringValue("prometheusPort");
253     if (tmpStr.length() != 0) {
254         sctpParams.prometheusPort = tmpStr;
255     }
256
257     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, KA_MESSAGE_SIZE, "{\"address\": \"%s:%d\","
258                                                                                     "\"fqdn\": \"%s\","
259                                                                                     "\"pod_name\": \"%s\"}",
260                                             (const char *)sctpParams.myIP.c_str(),
261                                             sctpParams.rmrPort,
262                                             sctpParams.fqdn.c_str(),
263                                             sctpParams.podName.c_str());
264
265     if (mdclog_level_get() >= MDCLOG_INFO) {
266         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
267         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
268         mdclog_mdc_add("volume", sctpParams.volume);
269         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
270         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
271         mdclog_mdc_add("pod name", sctpParams.podName.c_str());
272
273         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
274     }
275     mdclog_mdc_clean();
276
277     // Files written to the current working directory
278     boostLogger = logging::add_file_log(
279             keywords::file_name = tmpLogFilespec, // to temp directory
280             keywords::rotation_size = 10 * 1024 * 1024,
281             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
282             keywords::format = "%Message%"
283             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
284     );
285
286     // Setup a destination folder for collecting rotated (closed) files --since the same volume can use rename()
287     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
288             keywords::target = sctpParams.volume
289     ));
290
291     // Upon restart, scan the directory for files matching the file_name pattern
292     boostLogger->locked_backend()->scan_for_files();
293
294     // Enable auto-flushing after each tmpStr record written
295     if (mdclog_level_get() >= MDCLOG_DEBUG) {
296         boostLogger->locked_backend()->auto_flush(true);
297     }
298
299     return 0;
300 }
301
302 void startPrometheus(sctp_params_t &sctpParams) {
303     sctpParams.prometheusFamily = &BuildCounter()
304             .Name("E2T")
305             .Help("E2T message counter")
306             .Labels({{"POD_NAME", sctpParams.podName}})
307             .Register(*sctpParams.prometheusRegistry);
308
309     string prometheusPath = sctpParams.prometheusPort + "," + "[::]:" + sctpParams.prometheusPort;
310     if (mdclog_level_get() >= MDCLOG_DEBUG) {
311         mdclog_write(MDCLOG_DEBUG, "Start Prometheus Pull mode on %s", prometheusPath.c_str());
312     }
313     sctpParams.prometheusExposer = new Exposer(prometheusPath, 1);
314     sctpParams.prometheusExposer->RegisterCollectable(sctpParams.prometheusRegistry);
315 }
316 #ifndef UNIT_TEST
317 int main(const int argc, char **argv) {
318     sctp_params_t sctpParams;
319
320     {
321         std::random_device device{};
322         std::mt19937 generator(device());
323         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
324         transactionCounter = distribution(generator);
325     }
326
327 //    uint64_t st = 0;
328 //    uint32_t aux1 = 0;
329 //   st = rdtscp(aux1);
330
331     unsigned num_cpus = std::thread::hardware_concurrency();
332     init_log();
333     mdclog_level_set(MDCLOG_INFO);
334
335     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
336         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
337         exit(1);
338     }
339     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
340         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
341         exit(1);
342     }
343     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
344         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
345         exit(1);
346     }
347
348     cpuClock = approx_CPU_MHz(100);
349
350     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
351
352     auto result = parse(argc, argv, sctpParams);
353
354     if (buildConfiguration(sctpParams) != 0) {
355         exit(-1);
356     }
357
358     //auto registry = std::make_shared<Registry>();
359     sctpParams.prometheusRegistry = std::make_shared<Registry>();
360
361     //sctpParams.prometheusFamily = new Family<Counter>("E2T", "E2T message counter", {{"E", sctpParams.podName}});
362
363     startPrometheus(sctpParams);
364
365     // start epoll
366     sctpParams.epoll_fd = epoll_create1(0);
367     if (sctpParams.epoll_fd == -1) {
368         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
369         exit(-1);
370     }
371
372     getRmrContext(sctpParams);
373     if (sctpParams.rmrCtx == nullptr) {
374         close(sctpParams.epoll_fd);
375         exit(-1);
376     }
377
378     if (buildInotify(sctpParams) == -1) {
379         close(sctpParams.rmrListenFd);
380         rmr_close(sctpParams.rmrCtx);
381         close(sctpParams.epoll_fd);
382         exit(-1);
383     }
384
385     if (buildListeningPort(sctpParams) != 0) {
386         close(sctpParams.rmrListenFd);
387         rmr_close(sctpParams.rmrCtx);
388         close(sctpParams.epoll_fd);
389         exit(-1);
390     }
391
392     sctpParams.sctpMap = new mapWrapper();
393
394     std::vector<std::thread> threads(num_cpus);
395 //    std::vector<std::thread> threads;
396
397     num_cpus = 3;
398     for (unsigned int i = 0; i < num_cpus; i++) {
399         threads[i] = std::thread(listener, &sctpParams);
400
401         cpu_set_t cpuset;
402         CPU_ZERO(&cpuset);
403         CPU_SET(i, &cpuset);
404         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
405         if (rc != 0) {
406             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
407         }
408     }
409
410
411     //loop over term_init until first message from xApp
412     handleTermInit(sctpParams);
413
414     for (auto &t : threads) {
415         t.join();
416     }
417
418     return 0;
419 }
420 #endif
421 void handleTermInit(sctp_params_t &sctpParams) {
422     sendTermInit(sctpParams);
423     //send to e2 manager init of e2 term
424     //E2_TERM_INIT
425
426     int count = 0;
427     while (true) {
428         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
429         if (xappMessages > 0) {
430             if (mdclog_level_get() >=  MDCLOG_INFO) {
431                 mdclog_write(MDCLOG_INFO, "Got a message from some application, stop sending E2_TERM_INIT");
432             }
433             return;
434         }
435         usleep(100000);
436         count++;
437         if (count % 1000 == 0) {
438             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
439             sendTermInit(sctpParams);
440         }
441     }
442 }
443
444 void sendTermInit(sctp_params_t &sctpParams) {
445     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
446     auto count = 0;
447     while (true) {
448         msg->mtype = E2_TERM_INIT;
449         msg->state = 0;
450         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
451         static unsigned char tx[32];
452         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
453         rmr_bytes2xact(msg, tx, txLen);
454         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
455         if (msg == nullptr) {
456             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
457         } else if (msg->state == 0) {
458             rmr_free_msg(msg);
459             if (mdclog_level_get() >=  MDCLOG_INFO) {
460                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT successfully sent ");
461             }
462             return;
463         } else {
464             if (count % 100 == 0) {
465                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %s ", translateRmrErrorMessages(msg->state).c_str());
466             }
467             sleep(1);
468         }
469         count++;
470     }
471 }
472
473 /**
474  *
475  * @param argc
476  * @param argv
477  * @param sctpParams
478  * @return
479  */
480 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
481     cxxopts::Options options(argv[0], "e2 term help");
482     options.positional_help("[optional args]").show_positional_help();
483     options.allow_unrecognised_options().add_options()
484             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
485             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
486             ("h,help", "Print help");
487
488     auto result = options.parse(argc, (const char **&)argv);
489
490     if (result.count("help")) {
491         std::cout << options.help({""}) << std::endl;
492         exit(0);
493     }
494     return result;
495 }
496
497 /**
498  *
499  * @param sctpParams
500  * @return -1 failed 0 success
501  */
502 int buildInotify(sctp_params_t &sctpParams) {
503     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
504     if (sctpParams.inotifyFD == -1) {
505         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
506         return -1;
507     }
508
509     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
510                                              (const char *)sctpParams.configFilePath.c_str(),
511                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
512     if (sctpParams.inotifyWD == -1) {
513         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
514                      sctpParams.configFilePath.c_str(),
515                      strerror(errno));
516         close(sctpParams.inotifyFD);
517         return -1;
518     }
519
520     struct epoll_event event{};
521     event.events = (EPOLLIN);
522     event.data.fd = sctpParams.inotifyFD;
523     // add listening RMR FD to epoll
524     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
525         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
526         close(sctpParams.inotifyFD);
527         return -1;
528     }
529     return 0;
530 }
531
532 /**
533  *
534  * @param args
535  * @return
536  */
537 void listener(sctp_params_t *params) {
538     int num_of_SCTP_messages = 0;
539     auto totalTime = 0.0;
540     mdclog_mdc_clean();
541     mdclog_level_set(params->logLevel);
542
543     std::thread::id this_id = std::this_thread::get_id();
544     //save cout
545     streambuf *oldCout = cout.rdbuf();
546     ostringstream memCout;
547     // create new cout
548     cout.rdbuf(memCout.rdbuf());
549     cout << this_id;
550     //return to the normal cout
551     cout.rdbuf(oldCout);
552
553     char tid[32];
554     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
555     tid[memCout.str().length()] = 0;
556     mdclog_mdc_add("thread id", tid);
557
558     if (mdclog_level_get() >= MDCLOG_DEBUG) {
559         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
560     }
561
562     RmrMessagesBuffer_t rmrMessageBuffer{};
563     //create and init RMR
564     rmrMessageBuffer.rmrCtx = params->rmrCtx;
565
566     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
567     struct timespec end{0, 0};
568     struct timespec start{0, 0};
569
570     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
571     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
572
573     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
574     rmrMessageBuffer.ka_message_len = params->ka_message_length;
575     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
576
577     if (mdclog_level_get() >= MDCLOG_DEBUG) {
578         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
579     }
580
581     ReportingMessages_t message {};
582
583 //    for (int i = 0; i < MAX_RMR_BUFF_ARRAY; i++) {
584 //        rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
585 //        rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
586 //    }
587
588     while (true) {
589         if (mdclog_level_get() >= MDCLOG_DEBUG) {
590             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait. Timeout = %d", params->epollTimeOut);
591         }
592         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, params->epollTimeOut);
593         if (numOfEvents == 0) { // time out
594             if (mdclog_level_get() >= MDCLOG_DEBUG) {
595                 mdclog_write(MDCLOG_DEBUG, "got epoll timeout");
596             }
597             continue;
598         } else if (numOfEvents < 0) {
599             if (errno == EINTR) {
600                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
601                     mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
602                 }
603                 continue;
604             }
605             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
606             return;
607         }
608         for (auto i = 0; i < numOfEvents; i++) {
609             if (mdclog_level_get() >= MDCLOG_DEBUG) {
610                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
611             }
612             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
613             start.tv_sec = message.message.time.tv_sec;
614             start.tv_nsec = message.message.time.tv_nsec;
615
616
617             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
618                 handlepoll_error(events[i], message, rmrMessageBuffer, params);
619             } else if (events[i].events & EPOLLOUT) {
620                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params);
621             } else if (params->listenFD == events[i].data.fd) {
622                 if (mdclog_level_get() >= MDCLOG_INFO) {
623                     mdclog_write(MDCLOG_INFO, "New connection request from sctp network\n");
624                 }
625                 // new connection is requested from RAN  start build connection
626                 while (true) {
627                     struct sockaddr in_addr {};
628                     socklen_t in_len;
629                     char hostBuff[NI_MAXHOST];
630                     char portBuff[NI_MAXSERV];
631
632                     in_len = sizeof(in_addr);
633                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
634                     if(peerInfo == nullptr){
635                         mdclog_write(MDCLOG_ERR, "calloc failed");
636                         break;
637                     }
638                     peerInfo->sctpParams = params;
639                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
640                     if (peerInfo->fileDescriptor == -1) {
641                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
642                             /* We have processed all incoming connections. */
643                             break;
644                         } else {
645                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
646                             break;
647                         }
648                     }
649                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
650                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
651                         close(peerInfo->fileDescriptor);
652                         break;
653                     }
654                     auto  ans = getnameinfo(&in_addr, in_len,
655                                             peerInfo->hostName, NI_MAXHOST,
656                                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
657                     if (ans < 0) {
658                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
659                         close(peerInfo->fileDescriptor);
660                         break;
661                     }
662                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
663                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
664                     }
665                     peerInfo->isConnected = false;
666                     peerInfo->gotSetup = false;
667                     if (addToEpoll(params->epoll_fd,
668                                    peerInfo,
669                                    (EPOLLIN | EPOLLET),
670                                    params->sctpMap, nullptr,
671                                    0) != 0) {
672                         break;
673                     }
674                     break;
675                 }
676             } else if (params->rmrListenFd == events[i].data.fd) {
677                 // got message from XAPP
678                 //num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
679                 num_of_messages.fetch_add(1, std::memory_order_release);
680                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
681                     mdclog_write(MDCLOG_DEBUG, "new RMR message");
682                 }
683                 if (receiveXappMessages(params->sctpMap,
684                                         rmrMessageBuffer,
685                                         message.message.time) != 0) {
686                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
687                 }
688             } else if (params->inotifyFD == events[i].data.fd) {
689                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
690                 handleConfigChange(params);
691             } else {
692                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
693                  * We must read whatever data is available completely, as we are running
694                  *  in edge-triggered mode and won't get a notification again for the same data. */
695                 num_of_messages.fetch_add(1, std::memory_order_release);
696                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
697                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
698                 }
699                 receiveDataFromSctp(&events[i],
700                                     params->sctpMap,
701                                     num_of_SCTP_messages,
702                                     rmrMessageBuffer,
703                                     message.message.time);
704             }
705
706             clock_gettime(CLOCK_MONOTONIC, &end);
707             if (mdclog_level_get() >= MDCLOG_INFO) {
708                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
709                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
710             }
711             if (mdclog_level_get() >= MDCLOG_DEBUG) {
712                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
713                              end.tv_sec - start.tv_sec,
714                              end.tv_nsec - start.tv_nsec);
715             }
716         }
717     }
718 }
719
720 /**
721  *
722  * @param sctpParams
723  */
724 void handleConfigChange(sctp_params_t *sctpParams) {
725     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
726     const struct inotify_event *event;
727     char *ptr;
728
729     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
730     auto endlessLoop = true;
731     while (endlessLoop) {
732         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
733         if (len == -1) {
734             if (errno != EAGAIN) {
735                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
736                 endlessLoop = false;
737                 continue;
738             }
739             else {
740                 endlessLoop = false;
741                 continue;
742             }
743         }
744
745         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
746             event = (const struct inotify_event *)ptr;
747             if (event->mask & (uint32_t)IN_ISDIR) {
748                 continue;
749             }
750
751             // the directory name
752             if (sctpParams->inotifyWD == event->wd) {
753                 // not the directory
754             }
755             if (event->len) {
756                 auto  retVal = strcmp(sctpParams->configFileName.c_str(), event->name);
757                 if (retVal != 0) {
758                     continue;
759                 }
760             }
761             // only the file we want
762             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
763                 if (mdclog_level_get() >= MDCLOG_INFO) {
764                     mdclog_write(MDCLOG_INFO, "Configuration file changed");
765                 }
766                 if (exists(p)) {
767                     const int size = 2048;
768                     auto fileSize = file_size(p);
769                     if (fileSize > size) {
770                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
771                         return;
772                     }
773                 } else {
774                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
775                     return;
776                 }
777
778                 ReadConfigFile conf;
779                 if (conf.openConfigFile(p.string()) == -1) {
780                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
781                                  p.string().c_str(), strerror(errno));
782                     return;
783                 }
784
785                 auto tmpStr = conf.getStringValue("loglevel");
786                 if (tmpStr.length() == 0) {
787                     mdclog_write(MDCLOG_ERR, "illegal loglevel. Set loglevel to MDCLOG_INFO");
788                     tmpStr = "info";
789                 }
790                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
791
792                 if ((tmpStr.compare("debug")) == 0) {
793                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
794                     sctpParams->logLevel = MDCLOG_DEBUG;
795                 } else if ((tmpStr.compare("info")) == 0) {
796                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
797                     sctpParams->logLevel = MDCLOG_INFO;
798                 } else if ((tmpStr.compare("warning")) == 0) {
799                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
800                     sctpParams->logLevel = MDCLOG_WARN;
801                 } else if ((tmpStr.compare("error")) == 0) {
802                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
803                     sctpParams->logLevel = MDCLOG_ERR;
804                 } else {
805                     mdclog_write(MDCLOG_ERR, "illegal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
806                     sctpParams->logLevel = MDCLOG_INFO;
807                 }
808                 mdclog_level_set(sctpParams->logLevel);
809
810
811                 tmpStr = conf.getStringValue("trace");
812                 if (tmpStr.length() == 0) {
813                     mdclog_write(MDCLOG_ERR, "illegal trace. Set trace to stop");
814                     tmpStr = "stop";
815                 }
816
817                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
818                 if ((tmpStr.compare("start")) == 0) {
819                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
820                     sctpParams->trace = true;
821                 } else if ((tmpStr.compare("stop")) == 0) {
822                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
823                     sctpParams->trace = false;
824                 } else {
825                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
826                     sctpParams->trace = false;
827                 }
828                 jsonTrace = sctpParams->trace;
829
830
831                 endlessLoop = false;
832             }
833         }
834     }
835 }
836
837 /**
838  *
839  * @param event
840  * @param message
841  * @param rmrMessageBuffer
842  * @param params
843  */
844 void handleEinprogressMessages(struct epoll_event &event,
845                                ReportingMessages_t &message,
846                                RmrMessagesBuffer_t &rmrMessageBuffer,
847                                sctp_params_t *params) {
848     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
849     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
850
851     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
852     auto retVal = 0;
853     socklen_t retValLen = 0;
854     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
855     if (rc != 0 || retVal != 0) {
856 #ifndef UNIT_TEST
857         if (rc != 0) {
858             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
859                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
860                                                          peerInfo->enodbName, strerror(errno));
861         } else if (retVal != 0) {
862             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
863                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
864                                                          peerInfo->enodbName);
865         }
866
867         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
868         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
869         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
870         message.message.direction = 'N';
871         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
872             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
873         }
874 #endif
875         memset(peerInfo->asnData, 0, peerInfo->asnLength);
876         peerInfo->asnLength = 0;
877         peerInfo->mtype = 0;
878         return;
879     }
880
881     peerInfo->isConnected = true;
882
883     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
884                       peerInfo->mtype) != 0) {
885         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
886         return;
887     }
888
889     message.message.asndata = (unsigned char *)peerInfo->asnData;
890     message.message.asnLength = peerInfo->asnLength;
891     message.message.messageType = peerInfo->mtype;
892     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
893     num_of_messages.fetch_add(1, std::memory_order_release);
894     if (mdclog_level_get() >= MDCLOG_DEBUG) {
895         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
896                      message.message.enodbName);
897     }
898     if (sendSctpMsg(peerInfo, message, params->sctpMap) != 0) {
899         if (mdclog_level_get() >= MDCLOG_DEBUG) {
900             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
901         }
902         return;
903     }
904
905     memset(peerInfo->asnData, 0, peerInfo->asnLength);
906     peerInfo->asnLength = 0;
907     peerInfo->mtype = 0;
908 }
909
910
911 void handlepoll_error(struct epoll_event &event,
912                       ReportingMessages_t &message,
913                       RmrMessagesBuffer_t &rmrMessageBuffer,
914                       sctp_params_t *params) {
915     if (event.data.fd != params->rmrListenFd) {
916         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
917         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
918                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
919 #ifndef UNIT_TEST
920
921         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
922                                                      "%s|Failed SCTP Connection",
923                                                      peerInfo->enodbName);
924         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
925         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
926
927         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
928         message.message.direction = 'N';
929         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
930             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
931         }
932 #endif
933         close(peerInfo->fileDescriptor);
934         params->sctpMap->erase(peerInfo->enodbName);
935         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
936     } else {
937         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
938     }
939 }
940 /**
941  *
942  * @param socket
943  * @return
944  */
945 int setSocketNoBlocking(int socket) {
946     auto flags = fcntl(socket, F_GETFL, 0);
947
948     if (flags == -1) {
949         mdclog_mdc_add("func", "fcntl");
950         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
951         mdclog_mdc_clean();
952         return -1;
953     }
954
955     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
956     if (fcntl(socket, F_SETFL, flags) == -1) {
957         mdclog_mdc_add("func", "fcntl");
958         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
959         mdclog_mdc_clean();
960         return -1;
961     }
962
963     return 0;
964 }
965
966 /**
967  *
968  * @param val
969  * @param m
970  */
971 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
972     char *dummy;
973     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
974     char searchBuff[2048]{};
975
976     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
977     m->erase(searchBuff);
978
979     m->erase(val->enodbName);
980 #ifndef UNIT_TEST
981     free(val);
982 #endif
983 }
984
985 /**
986  *
987  * @param fd file descriptor
988  * @param data the asn data to send
989  * @param len  length of the data
990  * @param enodbName the enodbName as in the map for printing purpose
991  * @param m map host information
992  * @param mtype message number
993  * @return 0 success, a negative number on fail
994  */
995 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
996     auto loglevel = mdclog_level_get();
997     int fd = peerInfo->fileDescriptor;
998     if (loglevel >= MDCLOG_DEBUG) {
999         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
1000                      message.message.enodbName, __FUNCTION__);
1001     }
1002
1003     while (true) {
1004         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
1005             if (errno == EINTR) {
1006                 continue;
1007             }
1008             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
1009             if (!peerInfo->isConnected) {
1010                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
1011                 return -1;
1012             }
1013             cleanHashEntry(peerInfo, m);
1014             close(fd);
1015             char key[MAX_ENODB_NAME_SIZE * 2];
1016             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
1017                      message.message.messageType);
1018             if (loglevel >= MDCLOG_DEBUG) {
1019                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
1020             }
1021             auto tmp = m->find(key);
1022             if (tmp) {
1023                 free(tmp);
1024             }
1025             m->erase(key);
1026             return -1;
1027         }
1028         message.message.direction = 'D';
1029         // send report.buffer of size
1030         buildJsonMessage(message);
1031
1032         if (loglevel >= MDCLOG_DEBUG) {
1033             mdclog_write(MDCLOG_DEBUG,
1034                          "SCTP message for CU %s sent from %s",
1035                          message.message.enodbName,
1036                          __FUNCTION__);
1037         }
1038         return 0;
1039     }
1040 }
1041
1042 /**
1043  *
1044  * @param message
1045  * @param rmrMessageBuffer
1046  */
1047 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1048     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1049     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1050
1051     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1052         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
1053                      message.message.enodbName, (unsigned long) message.message.asnLength);
1054     }
1055 }
1056
1057
1058
1059 /**
1060  *
1061  * @param events
1062  * @param sctpMap
1063  * @param numOfMessages
1064  * @param rmrMessageBuffer
1065  * @param ts
1066  * @return
1067  */
1068 int receiveDataFromSctp(struct epoll_event *events,
1069                         Sctp_Map_t *sctpMap,
1070                         int &numOfMessages,
1071                         RmrMessagesBuffer_t &rmrMessageBuffer,
1072                         struct timespec &ts) {
1073     /* We have data on the fd waiting to be read. Read and display it.
1074  * We must read whatever data is available completely, as we are running
1075  *  in edge-triggered mode and won't get a notification again for the same data. */
1076     ReportingMessages_t message {};
1077     auto done = 0;
1078     auto loglevel = mdclog_level_get();
1079
1080     // get the identity of the interface
1081     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1082
1083     struct timespec start{0, 0};
1084     struct timespec decodeStart{0, 0};
1085     struct timespec end{0, 0};
1086
1087     E2AP_PDU_t *pdu = nullptr;
1088
1089     while (true) {
1090         if (loglevel >= MDCLOG_DEBUG) {
1091             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1092             clock_gettime(CLOCK_MONOTONIC, &start);
1093         }
1094         // read the buffer directly to rmr payload
1095         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1096         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1097                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1098
1099         if (loglevel >= MDCLOG_DEBUG) {
1100             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1101                          message.peerInfo->fileDescriptor, message.message.asnLength);
1102         }
1103
1104         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1105         message.message.direction = 'U';
1106         message.message.time.tv_nsec = ts.tv_nsec;
1107         message.message.time.tv_sec = ts.tv_sec;
1108
1109         if (message.message.asnLength < 0) {
1110             if (errno == EINTR) {
1111                 continue;
1112             }
1113             /* If errno == EAGAIN, that means we have read all
1114                data. So goReportingMessages_t back to the main loop. */
1115             if (errno != EAGAIN) {
1116                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1117                 done = 1;
1118             } else if (loglevel >= MDCLOG_DEBUG) {
1119                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1120             }
1121             break;
1122         } else if (message.message.asnLength == 0) {
1123             /* End of file. The remote has closed the connection. */
1124             if (loglevel >= MDCLOG_INFO) {
1125                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1126                              message.peerInfo->fileDescriptor);
1127             }
1128             done = 1;
1129             break;
1130         }
1131
1132         if (loglevel >= MDCLOG_DEBUG) {
1133             char printBuffer[RECEIVE_SCTP_BUFFER_SIZE]{};
1134             char *tmp = printBuffer;
1135             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1136                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1137                 tmp += 2;
1138             }
1139             printBuffer[message.message.asnLength] = 0;
1140             clock_gettime(CLOCK_MONOTONIC, &end);
1141             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1142                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1143             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1144                          printBuffer);
1145             clock_gettime(CLOCK_MONOTONIC, &decodeStart);
1146         }
1147
1148         auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1149                           message.message.asndata, message.message.asnLength);
1150         if (rval.code != RC_OK) {
1151             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1152                          message.peerInfo->enodbName);
1153             break;
1154         }
1155
1156         if (loglevel >= MDCLOG_DEBUG) {
1157             clock_gettime(CLOCK_MONOTONIC, &end);
1158             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1159                          message.peerInfo->enodbName, end.tv_sec - decodeStart.tv_sec, end.tv_nsec - decodeStart.tv_nsec);
1160             char *printBuffer;
1161             size_t size;
1162             FILE *stream = open_memstream(&printBuffer, &size);
1163             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1164             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1165             clock_gettime(CLOCK_MONOTONIC, &decodeStart);
1166
1167             fclose(stream);
1168             free(printBuffer);
1169         }
1170
1171         switch (pdu->present) {
1172             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1173                 asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer);
1174                 break;
1175             }
1176             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1177                 asnSuccessfulMsg(pdu, sctpMap, message, rmrMessageBuffer);
1178                 break;
1179             }
1180             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1181                 asnUnSuccsesfulMsg(pdu, sctpMap, message, rmrMessageBuffer);
1182                 break;
1183             }
1184             default:
1185                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1186                 break;
1187         }
1188         if (loglevel >= MDCLOG_DEBUG) {
1189             clock_gettime(CLOCK_MONOTONIC, &end);
1190             mdclog_write(MDCLOG_DEBUG,
1191                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1192                          message.peerInfo->enodbName, end.tv_sec - decodeStart.tv_sec, end.tv_nsec - decodeStart.tv_nsec);
1193         }
1194         numOfMessages++;
1195         if (pdu != nullptr) {
1196             ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1197             //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1198             //pdu = nullptr;
1199         }
1200     }
1201
1202     if (done) {
1203         if (loglevel >= MDCLOG_INFO) {
1204             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1205         }
1206         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1207                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1208                          256,
1209                          "%s|CU disconnected unexpectedly",
1210                          message.peerInfo->enodbName);
1211         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1212
1213         if (sendRequestToXapp(message,
1214                               RIC_SCTP_CONNECTION_FAILURE,
1215                               rmrMessageBuffer) != 0) {
1216             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1217         }
1218
1219         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1220         close(message.peerInfo->fileDescriptor);
1221         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
1222     }
1223     if (loglevel >= MDCLOG_DEBUG) {
1224         clock_gettime(CLOCK_MONOTONIC, &end);
1225         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1226                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1227
1228     }
1229     return 0;
1230 }
1231
1232 static void buildAndSendSetupRequest(ReportingMessages_t &message,
1233                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1234                                      E2AP_PDU_t *pdu/*,
1235                                      string const &messageName,
1236                                      string const &ieName,
1237                                      vector<string> &functionsToAdd_v,
1238                                      vector<string> &functionsToModified_v*/) {
1239     auto logLevel = mdclog_level_get();
1240     // now we can send the data to e2Mgr
1241
1242     asn_enc_rval_t er;
1243     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1244     unsigned char *buffer = nullptr;
1245     buffer = (unsigned char *) calloc(buffer_size, sizeof(unsigned char));
1246     if(!buffer)
1247     {
1248         mdclog_write(MDCLOG_ERR, "Allocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1249         return;
1250     }
1251     while (true) {
1252         er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size);
1253         if (er.encoded == -1) {
1254             mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1255             return;
1256         } else if (er.encoded > (ssize_t) buffer_size) {
1257             buffer_size = er.encoded + 128;
1258             mdclog_write(MDCLOG_WARN, "Buffer of size %d is to small for %s. Reallocate buffer of size %d",
1259                          (int) buffer_size,
1260                          asn_DEF_E2AP_PDU.name, buffer_size);
1261             buffer_size = er.encoded + 128;
1262
1263             unsigned char *newBuffer = nullptr;
1264             newBuffer = (unsigned char *) realloc(buffer, buffer_size);
1265             if (!newBuffer)
1266             {
1267                 // out of memory
1268                 mdclog_write(MDCLOG_ERR, "Reallocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1269                 free(buffer);
1270                 return;
1271             }
1272             buffer = newBuffer;
1273             continue;
1274         }
1275         buffer[er.encoded] = '\0';
1276         break;
1277     }
1278     // encode to xml
1279
1280     string res((char *)buffer);
1281     res.erase(std::remove(res.begin(), res.end(), '\n'), res.end());
1282     res.erase(std::remove(res.begin(), res.end(), '\t'), res.end());
1283     res.erase(std::remove(res.begin(), res.end(), ' '), res.end());
1284
1285 //    string res {};
1286 //    if (!functionsToAdd_v.empty() || !functionsToModified_v.empty()) {
1287 //        res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded);
1288 //    }
1289     rmr_mbuf_t *rmrMsg;
1290 //    if (res.length() == 0) {
1291 //        rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size + 256);
1292 //        rmrMsg->len = snprintf((char *) rmrMsg->payload, RECEIVE_SCTP_BUFFER_SIZE * 2, "%s:%d|%s",
1293 //                               message.peerInfo->sctpParams->myIP.c_str(),
1294 //                               message.peerInfo->sctpParams->rmrPort,
1295 //                               buffer);
1296 //    } else {
1297         rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, (int)res.length() + 256);
1298         rmrMsg->len = snprintf((char *) rmrMsg->payload, res.length() + 256, "%s:%d|%s",
1299                                message.peerInfo->sctpParams->myIP.c_str(),
1300                                message.peerInfo->sctpParams->rmrPort,
1301                                res.c_str());
1302 //    }
1303
1304     if (logLevel >= MDCLOG_DEBUG) {
1305         mdclog_write(MDCLOG_DEBUG, "Setup request of size %d :\n %s\n", rmrMsg->len, rmrMsg->payload);
1306     }
1307     // send to RMR
1308     rmrMsg->mtype = message.message.messageType;
1309     rmrMsg->state = 0;
1310     rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1311
1312     static unsigned char tx[32];
1313     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1314     rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1315
1316     rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1317     if (rmrMsg == nullptr) {
1318         mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1319     } else if (rmrMsg->state != 0) {
1320         char meid[RMR_MAX_MEID]{};
1321         if (rmrMsg->state == RMR_ERR_RETRY) {
1322             usleep(5);
1323             rmrMsg->state = 0;
1324             mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1325                          rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1326             rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1327             if (rmrMsg == nullptr) {
1328                 mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1329             } else if (rmrMsg->state != 0) {
1330                 mdclog_write(MDCLOG_ERR,
1331                              "RMR Retry failed %s sending request %d to Xapp from %s",
1332                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1333                              rmrMsg->mtype,
1334                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1335             }
1336         } else {
1337             mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1338                          translateRmrErrorMessages(rmrMsg->state).c_str(),
1339                          rmrMsg->mtype,
1340                          rmr_get_meid(rmrMsg, (unsigned char *) meid));
1341         }
1342     }
1343     message.peerInfo->gotSetup = true;
1344     buildJsonMessage(message);
1345
1346     if (rmrMsg != nullptr) {
1347         rmr_free_msg(rmrMsg);
1348     }
1349     free(buffer);
1350
1351     return;
1352 }
1353
1354 #if 0
1355 int RAN_Function_list_To_Vector(RANfunctions_List_t& list, vector <string> &runFunXML_v) {
1356     auto index = 0;
1357     runFunXML_v.clear();
1358     for (auto j = 0; j < list.list.count; j++) {
1359         auto *raNfunctionItemIEs = (RANfunction_ItemIEs_t *)list.list.array[j];
1360         if (raNfunctionItemIEs->id == ProtocolIE_ID_id_RANfunction_Item &&
1361             (raNfunctionItemIEs->value.present == RANfunction_ItemIEs__value_PR_RANfunction_Item)) {
1362             // encode to xml
1363             E2SM_gNB_NRT_RANfunction_Definition_t *ranFunDef = nullptr;
1364             auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER,
1365                                    &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1366                                    (void **)&ranFunDef,
1367                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.buf,
1368                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.size);
1369             if (rval.code != RC_OK) {
1370                 mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2SM message from : %s",
1371                              rval.code,
1372                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name);
1373                 return -1;
1374             }
1375
1376             auto xml_buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1377             unsigned char xml_buffer[RECEIVE_SCTP_BUFFER_SIZE * 2];
1378             memset(xml_buffer, 0, RECEIVE_SCTP_BUFFER_SIZE * 2);
1379             // encode to xml
1380             auto er = asn_encode_to_buffer(nullptr,
1381                                            ATS_BASIC_XER,
1382                                            &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1383                                            ranFunDef,
1384                                            xml_buffer,
1385                                            xml_buffer_size);
1386             if (er.encoded == -1) {
1387                 mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s",
1388                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1389                              strerror(errno));
1390             } else if (er.encoded > (ssize_t)xml_buffer_size) {
1391                 mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1392                              (int) xml_buffer_size,
1393                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name, __func__, __LINE__);
1394             } else {
1395                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1396                     mdclog_write(MDCLOG_DEBUG, "Encoding E2SM %s PDU number %d : %s",
1397                                  asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1398                                  index++,
1399                                  xml_buffer);
1400                 }
1401
1402                 string runFuncs = (char *)(xml_buffer);
1403                 runFunXML_v.emplace_back(runFuncs);
1404             }
1405         }
1406     }
1407     return 0;
1408 }
1409
1410 int collectServiceUpdate_RequestData(E2AP_PDU_t *pdu,
1411                                      Sctp_Map_t *sctpMap,
1412                                      ReportingMessages_t &message,
1413                                      vector <string> &RANfunctionsAdded_v,
1414                                      vector <string> &RANfunctionsModified_v) {
1415     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1416     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.count; i++) {
1417         auto *ie = pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.array[i];
1418         if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1419             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctionsID_List) {
1420                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1421                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1422                                  ie->value.choice.RANfunctions_List.list.count);
1423                 }
1424                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1425                     return -1;
1426                 }
1427             }
1428         } else if (ie->id == ProtocolIE_ID_id_RANfunctionsModified) {
1429             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctions_List) {
1430                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1431                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1432                                  ie->value.choice.RANfunctions_List.list.count);
1433                 }
1434                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsModified_v) != 0 ) {
1435                     return -1;
1436                 }
1437             }
1438         }
1439     }
1440     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1441         mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1442                      RANfunctionsAdded_v.size());
1443     }
1444     return 0;
1445 }
1446
1447 #endif
1448
1449
1450 void buildPrometheusList(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFamily) {
1451     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}});
1452     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}});
1453
1454     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Messages"}});
1455     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Bytes"}});
1456
1457     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICindication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Messages"}});
1458     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICindication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Bytes"}});
1459
1460     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Messages"}});
1461     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Bytes"}});
1462
1463     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Messages"}});
1464     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Bytes"}});
1465     // ---------------------------------------------
1466     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Messages"}});
1467     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Bytes"}});
1468
1469     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Messages"}});
1470     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Bytes"}});
1471
1472     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Messages"}});
1473     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Bytes"}});
1474
1475     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Messages"}});
1476     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Bytes"}});
1477     //-------------------------------------------------------------
1478
1479     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Messages"}});
1480     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Bytes"}});
1481
1482     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Messages"}});
1483     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Bytes"}});
1484
1485     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Messages"}});
1486     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Bytes"}});
1487
1488     //====================================================================================
1489     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Messages"}});
1490     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Bytes"}});
1491
1492     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Messages"}});
1493     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Bytes"}});
1494
1495     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Messages"}});
1496     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Bytes"}});
1497
1498     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceQuery)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Messages"}});
1499     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceQuery)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Bytes"}});
1500
1501     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Messages"}});
1502     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Bytes"}});
1503
1504     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Messages"}});
1505     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Bytes"}});
1506     //---------------------------------------------------------------------------------------------------------
1507     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Messages"}});
1508     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Bytes"}});
1509
1510     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Messages"}});
1511     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Bytes"}});
1512
1513     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Messages"}});
1514     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Bytes"}});
1515     //----------------------------------------------------------------------------------------------------------------
1516     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Messages"}});
1517     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Bytes"}});
1518
1519     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Messages"}});
1520     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Bytes"}});
1521 }
1522 /**
1523  *
1524  * @param pdu
1525  * @param sctpMap
1526  * @param message
1527  * @param RANfunctionsAdded_v
1528  * @return
1529  */
1530 int collectSetupRequestData(E2AP_PDU_t *pdu,
1531                                      Sctp_Map_t *sctpMap,
1532                                      ReportingMessages_t &message /*, vector <string> &RANfunctionsAdded_v*/) {
1533     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1534     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1535         auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1536         if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1537             // get the ran name for meid
1538             if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1539                 if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1540                     mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1541                     // no message will be sent
1542                     return -1;
1543                 }
1544
1545                 memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1546                 sctpMap->setkey(message.message.enodbName, message.peerInfo);
1547             }
1548         } /*else if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1549             if (ie->value.present == E2setupRequestIEs__value_PR_RANfunctions_List) {
1550                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1551                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1552                                  ie->value.choice.RANfunctions_List.list.count);
1553                 }
1554                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1555                     return -1;
1556                 }
1557             }
1558         } */
1559     }
1560 //    if (mdclog_level_get() >= MDCLOG_DEBUG) {
1561 //        mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1562 //                     RANfunctionsAdded_v.size());
1563 //    }
1564     return 0;
1565 }
1566
1567 int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1568     E2AP_PDU_t *pdu = nullptr;
1569
1570     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1571         mdclog_write(MDCLOG_DEBUG, "got PER message of size %d is:%s",
1572                      rmrMessageBuffer.sendMessage->len, rmrMessageBuffer.sendMessage->payload);
1573     }
1574     auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1575                            rmrMessageBuffer.sendMessage->payload, rmrMessageBuffer.sendMessage->len);
1576     if (rval.code != RC_OK) {
1577         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
1578                      rval.code,
1579                      message.message.enodbName);
1580         return -1;
1581     }
1582
1583     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
1584     auto er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu,
1585                                    rmrMessageBuffer.sendMessage->payload, buff_size);
1586     if (er.encoded == -1) {
1587         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1588         return -1;
1589     } else if (er.encoded > (ssize_t)buff_size) {
1590         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1591                      (int)rmrMessageBuffer.sendMessage->len,
1592                      asn_DEF_E2AP_PDU.name,
1593                      __func__,
1594                      __LINE__);
1595         return -1;
1596     }
1597     rmrMessageBuffer.sendMessage->len = er.encoded;
1598     return 0;
1599
1600 }
1601
1602 /**
1603  *
1604  * @param pdu
1605  * @param message
1606  * @param rmrMessageBuffer
1607  */
1608 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1609                           Sctp_Map_t *sctpMap,
1610                           ReportingMessages_t &message,
1611                           RmrMessagesBuffer_t &rmrMessageBuffer) {
1612     auto logLevel = mdclog_level_get();
1613     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1614     if (logLevel >= MDCLOG_DEBUG) {
1615         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1616     }
1617     switch (procedureCode) {
1618         case ProcedureCode_id_E2setup: {
1619             if (logLevel >= MDCLOG_DEBUG) {
1620                 mdclog_write(MDCLOG_DEBUG, "Got E2setup");
1621             }
1622
1623 //            vector <string> RANfunctionsAdded_v;
1624 //            vector <string> RANfunctionsModified_v;
1625 //            RANfunctionsAdded_v.clear();
1626 //            RANfunctionsModified_v.clear();
1627             if (collectSetupRequestData(pdu, sctpMap, message) != 0) {
1628                 break;
1629             }
1630
1631             buildPrometheusList(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily);
1632
1633             string messageName("E2setupRequest");
1634             string ieName("E2setupRequestIEs");
1635             message.message.messageType = RIC_E2_SETUP_REQ;
1636             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
1637             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment((double)message.message.asnLength);
1638             buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
1639             break;
1640         }
1641         case ProcedureCode_id_RICserviceUpdate: {
1642             if (logLevel >= MDCLOG_DEBUG) {
1643                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1644             }
1645 //            vector <string> RANfunctionsAdded_v;
1646 //            vector <string> RANfunctionsModified_v;
1647 //            RANfunctionsAdded_v.clear();
1648 //            RANfunctionsModified_v.clear();
1649 //            if (collectServiceUpdate_RequestData(pdu, sctpMap, message,
1650 //                                                 RANfunctionsAdded_v, RANfunctionsModified_v) != 0) {
1651 //                break;
1652 //            }
1653
1654             string messageName("RICserviceUpdate");
1655             string ieName("RICserviceUpdateIEs");
1656             message.message.messageType = RIC_SERVICE_UPDATE;
1657             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
1658             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment((double)message.message.asnLength);
1659
1660             buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
1661             break;
1662         }
1663         case ProcedureCode_id_ErrorIndication: {
1664             if (logLevel >= MDCLOG_DEBUG) {
1665                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1666             }
1667             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
1668             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment((double)message.message.asnLength);
1669             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1670                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1671             }
1672             break;
1673         }
1674         case ProcedureCode_id_Reset: {
1675             if (logLevel >= MDCLOG_DEBUG) {
1676                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1677             }
1678
1679             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
1680             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
1681             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
1682                 break;
1683             }
1684
1685             if (sendRequestToXapp(message, RIC_E2_RESET_REQ, rmrMessageBuffer) != 0) {
1686                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_REQ message failed to send to xAPP");
1687             }
1688             break;
1689         }
1690         case ProcedureCode_id_RICindication: {
1691             if (logLevel >= MDCLOG_DEBUG) {
1692                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1693             }
1694             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1695                 auto messageSent = false;
1696                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1697                 if (logLevel >= MDCLOG_DEBUG) {
1698                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1699                 }
1700                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1701                     if (logLevel >= MDCLOG_DEBUG) {
1702                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1703                     }
1704                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1705                         static unsigned char tx[32];
1706                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1707                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1708                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1709                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1710                                        (unsigned char *)message.message.enodbName,
1711                                        strlen(message.message.enodbName));
1712                         rmrMessageBuffer.sendMessage->state = 0;
1713                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1714
1715                         //ie->value.choice.RICrequestID.ricInstanceID;
1716                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1717                             mdclog_write(MDCLOG_DEBUG, "sub id = %d, mtype = %d, ric instance id %ld, requestor id = %ld",
1718                                          rmrMessageBuffer.sendMessage->sub_id,
1719                                          rmrMessageBuffer.sendMessage->mtype,
1720                                          ie->value.choice.RICrequestID.ricInstanceID,
1721                                          ie->value.choice.RICrequestID.ricRequestorID);
1722                         }
1723                         message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication]->Increment();
1724                         message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication]->Increment((double)message.message.asnLength);
1725                         sendRmrMessage(rmrMessageBuffer, message);
1726                         messageSent = true;
1727                     } else {
1728                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
1729                     }
1730                 }
1731                 if (messageSent) {
1732                     break;
1733                 }
1734             }
1735             break;
1736         }
1737         default: {
1738             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1739             message.message.messageType = 0; // no RMR message type yet
1740
1741             buildJsonMessage(message);
1742
1743             break;
1744         }
1745     }
1746 }
1747
1748 /**
1749  *
1750  * @param pdu
1751  * @param message
1752  * @param rmrMessageBuffer
1753  */
1754 void asnSuccessfulMsg(E2AP_PDU_t *pdu,
1755                       Sctp_Map_t *sctpMap,
1756                       ReportingMessages_t &message,
1757                       RmrMessagesBuffer_t &rmrMessageBuffer) {
1758     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1759     auto logLevel = mdclog_level_get();
1760     if (logLevel >= MDCLOG_INFO) {
1761         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1762     }
1763     switch (procedureCode) {
1764         case ProcedureCode_id_Reset: {
1765             if (logLevel >= MDCLOG_DEBUG) {
1766                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1767             }
1768             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
1769             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
1770             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
1771                 break;
1772             }
1773             if (sendRequestToXapp(message, RIC_E2_RESET_RESP, rmrMessageBuffer) != 0) {
1774                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_RESP message failed to send to xAPP");
1775             }
1776             break;
1777         }
1778         case ProcedureCode_id_RICcontrol: {
1779             if (logLevel >= MDCLOG_DEBUG) {
1780                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1781             }
1782             for (auto i = 0;
1783                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1784                 auto messageSent = false;
1785                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1786                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1787                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1788                 }
1789                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1790                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1791                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1792                     }
1793                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1794                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1795                         rmrMessageBuffer.sendMessage->state = 0;
1796 //                        rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1797                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1798
1799                         static unsigned char tx[32];
1800                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1801                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1802                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1803                                        (unsigned char *)message.message.enodbName,
1804                                        strlen(message.message.enodbName));
1805
1806                         message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
1807                         message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
1808                         sendRmrMessage(rmrMessageBuffer, message);
1809                         messageSent = true;
1810                     } else {
1811                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
1812                     }
1813                 }
1814                 if (messageSent) {
1815                     break;
1816                 }
1817             }
1818
1819             break;
1820         }
1821         case ProcedureCode_id_RICsubscription: {
1822             if (logLevel >= MDCLOG_DEBUG) {
1823                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1824             }
1825             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
1826             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
1827             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
1828                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1829             }
1830             break;
1831         }
1832         case ProcedureCode_id_RICsubscriptionDelete: {
1833             if (logLevel >= MDCLOG_DEBUG) {
1834                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1835             }
1836             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
1837             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
1838             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
1839                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1840             }
1841             break;
1842         }
1843         default: {
1844             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1845             message.message.messageType = 0; // no RMR message type yet
1846             buildJsonMessage(message);
1847
1848             break;
1849         }
1850     }
1851 }
1852
1853 /**
1854  *
1855  * @param pdu
1856  * @param message
1857  * @param rmrMessageBuffer
1858  */
1859 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1860                         Sctp_Map_t *sctpMap,
1861                         ReportingMessages_t &message,
1862                         RmrMessagesBuffer_t &rmrMessageBuffer) {
1863     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1864     auto logLevel = mdclog_level_get();
1865     if (logLevel >= MDCLOG_INFO) {
1866         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1867     }
1868     switch (procedureCode) {
1869         case ProcedureCode_id_RICcontrol: {
1870             if (logLevel >= MDCLOG_DEBUG) {
1871                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1872             }
1873             for (int i = 0;
1874                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1875                 auto messageSent = false;
1876                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1877                 if (logLevel >= MDCLOG_DEBUG) {
1878                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1879                 }
1880                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1881                     if (logLevel >= MDCLOG_DEBUG) {
1882                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1883                     }
1884                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1885                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1886                         rmrMessageBuffer.sendMessage->state = 0;
1887 //                        rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricRequestorID;
1888                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1889                         static unsigned char tx[32];
1890                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1891                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1892                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
1893                                        strlen(message.message.enodbName));
1894                         message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
1895                         message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
1896                         sendRmrMessage(rmrMessageBuffer, message);
1897                         messageSent = true;
1898                     } else {
1899                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
1900                     }
1901                 }
1902                 if (messageSent) {
1903                     break;
1904                 }
1905             }
1906             break;
1907         }
1908         case ProcedureCode_id_RICsubscription: {
1909             if (logLevel >= MDCLOG_DEBUG) {
1910                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1911             }
1912             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
1913             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
1914             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1915                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1916             }
1917             break;
1918         }
1919         case ProcedureCode_id_RICsubscriptionDelete: {
1920             if (logLevel >= MDCLOG_DEBUG) {
1921                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1922             }
1923             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
1924             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
1925             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1926                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1927             }
1928             break;
1929         }
1930         default: {
1931             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1932             message.message.messageType = 0; // no RMR message type yet
1933
1934             buildJsonMessage(message);
1935
1936             break;
1937         }
1938     }
1939 }
1940
1941 /**
1942  *
1943  * @param message
1944  * @param requestId
1945  * @param rmrMmessageBuffer
1946  * @return
1947  */
1948 int sendRequestToXapp(ReportingMessages_t &message,
1949                       int requestId,
1950                       RmrMessagesBuffer_t &rmrMmessageBuffer) {
1951     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1952                    (unsigned char *)message.message.enodbName,
1953                    strlen(message.message.enodbName));
1954     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1955     rmrMmessageBuffer.sendMessage->state = 0;
1956     static unsigned char tx[32];
1957     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1958     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1959
1960     auto rc = sendRmrMessage(rmrMmessageBuffer, message);
1961     return rc;
1962 }
1963
1964 /**
1965  *
1966  * @param pSctpParams
1967  */
1968 void getRmrContext(sctp_params_t &pSctpParams) {
1969     pSctpParams.rmrCtx = nullptr;
1970     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RECEIVE_XAPP_BUFFER_SIZE, RMRFL_NONE);
1971     if (pSctpParams.rmrCtx == nullptr) {
1972         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1973         return;
1974     }
1975
1976     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1977     // we need to find that routing table exist and we can run
1978     if (mdclog_level_get() >= MDCLOG_INFO) {
1979         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1980     }
1981     int rmrReady = 0;
1982     int count = 0;
1983     while (!rmrReady) {
1984         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1985             sleep(1);
1986         }
1987         count++;
1988         if (count % 60 == 0) {
1989             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1990         }
1991     }
1992     if (mdclog_level_get() >= MDCLOG_INFO) {
1993         mdclog_write(MDCLOG_INFO, "RMR running");
1994     }
1995     rmr_init_trace(pSctpParams.rmrCtx, 200);
1996     // get the RMR fd for the epoll
1997     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1998     struct epoll_event event{};
1999     // add RMR fd to epoll
2000     event.events = (EPOLLIN);
2001     event.data.fd = pSctpParams.rmrListenFd;
2002     // add listening RMR FD to epoll
2003     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
2004         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
2005         close(pSctpParams.rmrListenFd);
2006         rmr_close(pSctpParams.rmrCtx);
2007         pSctpParams.rmrCtx = nullptr;
2008     }
2009 }
2010
2011 /**
2012  *
2013  * @param message
2014  * @param rmrMessageBuffer
2015  * @return
2016  */
2017 int PER_FromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
2018     E2AP_PDU_t *pdu = nullptr;
2019
2020     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2021         mdclog_write(MDCLOG_DEBUG, "got xml Format  data from xApp of size %d is:%s",
2022                 rmrMessageBuffer.rcvMessage->len, rmrMessageBuffer.rcvMessage->payload);
2023     }
2024     auto rval = asn_decode(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, (void **) &pdu,
2025                            rmrMessageBuffer.rcvMessage->payload, rmrMessageBuffer.rcvMessage->len);
2026     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2027         mdclog_write(MDCLOG_DEBUG, "%s After  decoding the XML to PDU", __func__ );
2028     }
2029     if (rval.code != RC_OK) {
2030         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
2031                      rval.code,
2032                      message.message.enodbName);
2033         return -1;
2034     }
2035
2036     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
2037     auto er = asn_encode_to_buffer(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, pdu,
2038                                    rmrMessageBuffer.rcvMessage->payload, buff_size);
2039     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2040         mdclog_write(MDCLOG_DEBUG, "%s After encoding PDU to PER", __func__ );
2041     }
2042     if (er.encoded == -1) {
2043         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
2044         return -1;
2045     } else if (er.encoded > (ssize_t)buff_size) {
2046         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
2047                      (int)rmrMessageBuffer.rcvMessage->len,
2048                      asn_DEF_E2AP_PDU.name,
2049                      __func__,
2050                      __LINE__);
2051         return -1;
2052     }
2053     rmrMessageBuffer.rcvMessage->len = er.encoded;
2054     return 0;
2055 }
2056
2057 /**
2058  *
2059  * @param sctpMap
2060  * @param rmrMessageBuffer
2061  * @param ts
2062  * @return
2063  */
2064 int receiveXappMessages(Sctp_Map_t *sctpMap,
2065                         RmrMessagesBuffer_t &rmrMessageBuffer,
2066                         struct timespec &ts) {
2067     int loglevel = mdclog_level_get();
2068     if (rmrMessageBuffer.rcvMessage == nullptr) {
2069         //we have error
2070         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
2071         return -1;
2072     }
2073
2074 //    if (loglevel >= MDCLOG_DEBUG) {
2075 //        mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
2076 //    }
2077     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2078     if (rmrMessageBuffer.rcvMessage == nullptr) {
2079         mdclog_write(MDCLOG_ERR, "RMR Receiving message with null pointer, Reallocated rmr message buffer");
2080         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2081         return -2;
2082     }
2083     ReportingMessages_t message;
2084     message.message.direction = 'D';
2085     message.message.time.tv_nsec = ts.tv_nsec;
2086     message.message.time.tv_sec = ts.tv_sec;
2087
2088     // get message payload
2089     //auto msgData = msg->payload;
2090     if (rmrMessageBuffer.rcvMessage->state != 0) {
2091         mdclog_write(MDCLOG_ERR, "RMR Receiving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
2092         return -1;
2093     }
2094     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName);
2095     message.peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2096     if (message.peerInfo == nullptr) {
2097         auto type = rmrMessageBuffer.rcvMessage->mtype;
2098         switch (type) {
2099             case RIC_SCTP_CLEAR_ALL:
2100             case E2_TERM_KEEP_ALIVE_REQ:
2101             case RIC_HEALTH_CHECK_REQ:
2102                 break;
2103             default:
2104                 mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2105                 return -1;
2106         }
2107     }
2108
2109     if (rmrMessageBuffer.rcvMessage->mtype != RIC_HEALTH_CHECK_REQ) {
2110         num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
2111
2112     }
2113     switch (rmrMessageBuffer.rcvMessage->mtype) {
2114         case RIC_E2_SETUP_RESP : {
2115             if (loglevel >= MDCLOG_DEBUG) {
2116                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_SETUP_RESP");
2117             }
2118             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2119                 break;
2120             }
2121             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2122             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2123             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2124                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
2125                 return -6;
2126             }
2127             break;
2128         }
2129         case RIC_E2_SETUP_FAILURE : {
2130             if (loglevel >= MDCLOG_DEBUG) {
2131                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_SETUP_FAILURE");
2132             }
2133             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2134                 break;
2135             }
2136             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2137             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2138             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2139                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
2140                 return -6;
2141             }
2142             break;
2143         }
2144         case RIC_ERROR_INDICATION: {
2145             if (loglevel >= MDCLOG_DEBUG) {
2146                 mdclog_write(MDCLOG_DEBUG, "RIC_ERROR_INDICATION");
2147             }
2148             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
2149             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len);
2150             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2151                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
2152                 return -6;
2153             }
2154             break;
2155         }
2156         case RIC_SUB_REQ: {
2157             if (loglevel >= MDCLOG_DEBUG) {
2158                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_REQ");
2159             }
2160             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
2161             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len);
2162             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2163                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
2164                 return -6;
2165             }
2166             break;
2167         }
2168         case RIC_SUB_DEL_REQ: {
2169             if (loglevel >= MDCLOG_DEBUG) {
2170                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_DEL_REQ");
2171             }
2172             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
2173             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len);
2174             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2175                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
2176                 return -6;
2177             }
2178             break;
2179         }
2180         case RIC_CONTROL_REQ: {
2181             if (loglevel >= MDCLOG_DEBUG) {
2182                 mdclog_write(MDCLOG_DEBUG, "RIC_CONTROL_REQ");
2183             }
2184             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
2185             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len);
2186             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2187                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
2188                 return -6;
2189             }
2190             break;
2191         }
2192         case RIC_SERVICE_QUERY: {
2193             if (loglevel >= MDCLOG_DEBUG) {
2194                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_QUERY");
2195             }
2196             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2197                 break;
2198             }
2199             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment();
2200             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len);
2201             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2202                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
2203                 return -6;
2204             }
2205             break;
2206         }
2207         case RIC_SERVICE_UPDATE_ACK: {
2208             if (loglevel >= MDCLOG_DEBUG) {
2209                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_UPDATE_ACK");
2210             }
2211             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2212                 mdclog_write(MDCLOG_ERR, "error in PER_FromXML");
2213                 break;
2214             }
2215             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2216             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2217             if (loglevel >= MDCLOG_DEBUG) {
2218                 mdclog_write(MDCLOG_DEBUG, "Before sending to CU");
2219             }
2220             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2221                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
2222                 return -6;
2223             }
2224             break;
2225         }
2226         case RIC_SERVICE_UPDATE_FAILURE: {
2227             if (loglevel >= MDCLOG_DEBUG) {
2228                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_UPDATE_FAILURE");
2229             }
2230             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2231                 break;
2232             }
2233             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2234             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2235             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2236                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
2237                 return -6;
2238             }
2239             break;
2240         }
2241         case RIC_E2_RESET_REQ: {
2242             if (loglevel >= MDCLOG_DEBUG) {
2243                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_RESET_REQ");
2244             }
2245             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2246                 break;
2247             }
2248             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2249             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2250             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2251                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET");
2252                 return -6;
2253             }
2254             break;
2255         }
2256         case RIC_E2_RESET_RESP: {
2257             if (loglevel >= MDCLOG_DEBUG) {
2258                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_RESET_RESP");
2259             }
2260             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2261                 break;
2262             }
2263             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2264             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2265             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2266                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP");
2267                 return -6;
2268             }
2269             break;
2270         }
2271         case RIC_SCTP_CLEAR_ALL: {
2272             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
2273             // loop on all keys and close socket and then erase all map.
2274             vector<char *> v;
2275             sctpMap->getKeys(v);
2276             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
2277                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
2278                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
2279                     if (peerInfo == nullptr) {
2280                         continue;
2281                     }
2282                     close(peerInfo->fileDescriptor);
2283                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
2284                     message.message.direction = 'D';
2285                     message.message.time.tv_nsec = ts.tv_nsec;
2286                     message.message.time.tv_sec = ts.tv_sec;
2287
2288                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2289                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2290                                      256,
2291                                      "%s|RIC_SCTP_CLEAR_ALL",
2292                                      peerInfo->enodbName);
2293                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2294                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
2295                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
2296                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
2297                     }
2298                     free(peerInfo);
2299                 }
2300             }
2301
2302             sleep(1);
2303             sctpMap->clear();
2304             break;
2305         }
2306         case E2_TERM_KEEP_ALIVE_REQ: {
2307             // send message back
2308             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2309                               (unsigned char *)rmrMessageBuffer.ka_message,
2310                               rmrMessageBuffer.ka_message_len);
2311             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
2312             rmrMessageBuffer.sendMessage->state = 0;
2313             static unsigned char tx[32];
2314             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2315             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2316             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2317             if (rmrMessageBuffer.sendMessage == nullptr) {
2318                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2319                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
2320             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
2321                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
2322                              rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
2323             } else if (loglevel >= MDCLOG_DEBUG) {
2324                 mdclog_write(MDCLOG_DEBUG, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
2325             }
2326
2327             break;
2328         }
2329         case RIC_HEALTH_CHECK_REQ: {
2330             static int counter = 0;
2331             // send message back
2332             rmr_bytes2payload(rmrMessageBuffer.rcvMessage,
2333                               (unsigned char *)"OK",
2334                               2);
2335             rmrMessageBuffer.rcvMessage->mtype = RIC_HEALTH_CHECK_RESP;
2336             rmrMessageBuffer.rcvMessage->state = 0;
2337             static unsigned char tx[32];
2338             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2339             rmr_bytes2xact(rmrMessageBuffer.rcvMessage, tx, txLen);
2340             rmrMessageBuffer.rcvMessage = rmr_rts_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2341             //rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2342             if (rmrMessageBuffer.rcvMessage == nullptr) {
2343                 rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2344                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP RMR message returned NULL");
2345             } else if (rmrMessageBuffer.rcvMessage->state != 0)  {
2346                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP, on RMR state = %d ( %s)",
2347                              rmrMessageBuffer.rcvMessage->state, translateRmrErrorMessages(rmrMessageBuffer.rcvMessage->state).c_str());
2348             } else if (loglevel >= MDCLOG_DEBUG && ++counter % 100 == 0) {
2349                 mdclog_write(MDCLOG_DEBUG, "Got %d RIC_HEALTH_CHECK_REQ Request send : OK", counter);
2350             }
2351
2352             break;
2353         }
2354
2355         default:
2356             mdclog_write(MDCLOG_WARN, "Message Type : %d is not supported", rmrMessageBuffer.rcvMessage->mtype);
2357             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2358             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2359             message.message.time.tv_nsec = ts.tv_nsec;
2360             message.message.time.tv_sec = ts.tv_sec;
2361             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2362
2363             buildJsonMessage(message);
2364
2365
2366             return -7;
2367     }
2368     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2369         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2370     }
2371     return 0;
2372 }
2373
2374 /**
2375  * Send message to the CU that is not expecting for successful or unsuccessful results
2376  * @param messageBuffer
2377  * @param message
2378  * @param failedMsgId
2379  * @param sctpMap
2380  * @return
2381  */
2382 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2383                            ReportingMessages_t &message,
2384                            int failedMsgId,
2385                            Sctp_Map_t *sctpMap) {
2386     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2387         mdclog_write(MDCLOG_DEBUG, "send message: %d to %s address", message.message.messageType, message.message.enodbName);
2388     }
2389
2390     getRequestMetaData(message, messageBuffer);
2391     if (mdclog_level_get() >= MDCLOG_INFO) {
2392         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2393     }
2394
2395     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId);
2396     return rc;
2397 }
2398
2399 /**
2400  *
2401  * @param sctpMap
2402  * @param messageBuffer
2403  * @param message
2404  * @param failedMesgId
2405  * @return
2406  */
2407 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2408                     RmrMessagesBuffer_t &messageBuffer,
2409                     ReportingMessages_t &message,
2410                     int failedMesgId) {
2411     // get the FD
2412     message.message.messageType = messageBuffer.rcvMessage->mtype;
2413     auto rc = sendSctpMsg(message.peerInfo, message, sctpMap);
2414     return rc;
2415 }
2416
2417
2418 /**
2419  *
2420  * @param epoll_fd
2421  * @param peerInfo
2422  * @param events
2423  * @param sctpMap
2424  * @param enodbName
2425  * @param msgType
2426  * @return
2427  */
2428 int addToEpoll(int epoll_fd,
2429                ConnectedCU_t *peerInfo,
2430                uint32_t events,
2431                Sctp_Map_t *sctpMap,
2432                char *enodbName,
2433                int msgType) {
2434     // Add to Epol
2435     struct epoll_event event{};
2436     event.data.ptr = peerInfo;
2437     event.events = events;
2438     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2439         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2440             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here), %s, %s %d",
2441                          strerror(errno), __func__, __LINE__);
2442         }
2443         close(peerInfo->fileDescriptor);
2444         if (enodbName != nullptr) {
2445             cleanHashEntry(peerInfo, sctpMap);
2446             char key[MAX_ENODB_NAME_SIZE * 2];
2447             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2448             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2449                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2450             }
2451             auto tmp = sctpMap->find(key);
2452             if (tmp) {
2453                 free(tmp);
2454                 sctpMap->erase(key);
2455             }
2456         } else {
2457             peerInfo->enodbName[0] = 0;
2458         }
2459         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)");
2460         return -1;
2461     }
2462     return 0;
2463 }
2464
2465 /**
2466  *
2467  * @param epoll_fd
2468  * @param peerInfo
2469  * @param events
2470  * @param sctpMap
2471  * @param enodbName
2472  * @param msgType
2473  * @return
2474  */
2475 int modifyToEpoll(int epoll_fd,
2476                   ConnectedCU_t *peerInfo,
2477                   uint32_t events,
2478                   Sctp_Map_t *sctpMap,
2479                   char *enodbName,
2480                   int msgType) {
2481     // Add to Epol
2482     struct epoll_event event{};
2483     event.data.ptr = peerInfo;
2484     event.events = events;
2485     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2486         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2487             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may check not to quit here), %s, %s %d",
2488                          strerror(errno), __func__, __LINE__);
2489         }
2490         close(peerInfo->fileDescriptor);
2491         cleanHashEntry(peerInfo, sctpMap);
2492         char key[MAX_ENODB_NAME_SIZE * 2];
2493         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2494         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2495             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2496         }
2497         auto tmp = sctpMap->find(key);
2498         if (tmp) {
2499             free(tmp);
2500         }
2501         sctpMap->erase(key);
2502         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)");
2503         return -1;
2504     }
2505     return 0;
2506 }
2507
2508
2509 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message) {
2510     buildJsonMessage(message);
2511
2512     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2513
2514     if (rmrMessageBuffer.sendMessage == nullptr) {
2515         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2516         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2517         return -1;
2518     }
2519
2520     if (rmrMessageBuffer.sendMessage->state != 0) {
2521         char meid[RMR_MAX_MEID]{};
2522         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2523             usleep(5);
2524             rmrMessageBuffer.sendMessage->state = 0;
2525             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2526                          rmrMessageBuffer.sendMessage->mtype,
2527                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2528             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2529             if (rmrMessageBuffer.sendMessage == nullptr) {
2530                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2531                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2532                 return -1;
2533             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2534                 mdclog_write(MDCLOG_ERR,
2535                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2536                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2537                              rmrMessageBuffer.sendMessage->mtype,
2538                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2539                 auto rc = rmrMessageBuffer.sendMessage->state;
2540                 return rc;
2541             }
2542         } else {
2543             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2544                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2545                          rmrMessageBuffer.sendMessage->mtype,
2546                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2547             return rmrMessageBuffer.sendMessage->state;
2548         }
2549     }
2550     return 0;
2551 }
2552
2553 void buildJsonMessage(ReportingMessages_t &message) {
2554     if (jsonTrace) {
2555         message.outLen = sizeof(message.base64Data);
2556         base64::encode((const unsigned char *) message.message.asndata,
2557                        (const int) message.message.asnLength,
2558                        message.base64Data,
2559                        message.outLen);
2560         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2561             mdclog_write(MDCLOG_DEBUG, "Tracing: ASN length = %d, base64 message length = %d ",
2562                          (int) message.message.asnLength,
2563                          (int) message.outLen);
2564         }
2565
2566         snprintf(message.buffer, sizeof(message.buffer),
2567                  "{\"header\": {\"ts\": \"%ld.%09ld\","
2568                  "\"ranName\": \"%s\","
2569                  "\"messageType\": %d,"
2570                  "\"direction\": \"%c\"},"
2571                  "\"base64Length\": %d,"
2572                  "\"asnBase64\": \"%s\"}",
2573                  message.message.time.tv_sec,
2574                  message.message.time.tv_nsec,
2575                  message.message.enodbName,
2576                  message.message.messageType,
2577                  message.message.direction,
2578                  (int) message.outLen,
2579                  message.base64Data);
2580         static src::logger_mt &lg = my_logger::get();
2581
2582         BOOST_LOG(lg) << message.buffer;
2583     }
2584 }
2585
2586
2587 /**
2588  * take RMR error code to string
2589  * @param state
2590  * @return
2591  */
2592 string translateRmrErrorMessages(int state) {
2593     string str = {};
2594     switch (state) {
2595         case RMR_OK:
2596             str = "RMR_OK - state is good";
2597             break;
2598         case RMR_ERR_BADARG:
2599             str = "RMR_ERR_BADARG - argument passed to function was unusable";
2600             break;
2601         case RMR_ERR_NOENDPT:
2602             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2603             break;
2604         case RMR_ERR_EMPTY:
2605             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2606             break;
2607         case RMR_ERR_NOHDR:
2608             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2609             break;
2610         case RMR_ERR_SENDFAILED:
2611             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2612             break;
2613         case RMR_ERR_CALLFAILED:
2614             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2615             break;
2616         case RMR_ERR_NOWHOPEN:
2617             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2618             break;
2619         case RMR_ERR_WHID:
2620             str = "RMR_ERR_WHID - wormhole id was invalid";
2621             break;
2622         case RMR_ERR_OVERFLOW:
2623             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2624             break;
2625         case RMR_ERR_RETRY:
2626             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2627             break;
2628         case RMR_ERR_RCVFAILED:
2629             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2630             break;
2631         case RMR_ERR_TIMEOUT:
2632             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2633             break;
2634         case RMR_ERR_UNSET:
2635             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2636             break;
2637         case RMR_ERR_TRUNC:
2638             str = "RMR_ERR_TRUNC - received message likely truncated";
2639             break;
2640         case RMR_ERR_INITFAILED:
2641             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2642             break;
2643         case RMR_ERR_NOTSUPP:
2644             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2645             break;
2646         default:
2647             char buf[128]{};
2648             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2649             str = buf;
2650             break;
2651     }
2652     return str;
2653 }