Taking RMR v4.5.2 to use
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include <3rdparty/oranE2/RANfunctions-List.h>
24 #include "sctpThread.h"
25 #include "BuildRunName.h"
26
27 //#include "3rdparty/oranE2SM/E2SM-gNB-NRT-RANfunction-Definition.h"
28 //#include "BuildXml.h"
29 //#include "pugixml/src/pugixml.hpp"
30
31 using namespace std;
32 //using namespace std::placeholders;
33 using namespace boost::filesystem;
34 using namespace prometheus;
35
36
37 //#ifdef __cplusplus
38 //extern "C"
39 //{
40 //#endif
41
42 // need to expose without the include of gcov
43 extern "C" void __gcov_flush(void);
44
45 static void catch_function(int signal) {
46     __gcov_flush();
47     exit(signal);
48 }
49
50
51 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
52
53 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
54 double cpuClock = 0.0;
55 bool jsonTrace = false;
56
57 void init_log() {
58     mdclog_attr_t *attr;
59     mdclog_attr_init(&attr);
60     mdclog_attr_set_ident(attr, "E2Terminator");
61     mdclog_init(attr);
62     mdclog_attr_destroy(attr);
63 }
64 auto start_time = std::chrono::high_resolution_clock::now();
65 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
66
67 double age() {
68     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
69 }
70
71 double approx_CPU_MHz(unsigned sleepTime) {
72     using namespace std::chrono_literals;
73     uint32_t aux = 0;
74     uint64_t cycles_start = rdtscp(aux);
75     double time_start = age();
76     std::this_thread::sleep_for(sleepTime * 1ms);
77     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
78     double elapsed_time = age() - time_start;
79     return elapsed_cycles / elapsed_time;
80 }
81
82 //std::atomic<int64_t> rmrCounter{0};
83 std::atomic<int64_t> num_of_messages{0};
84 std::atomic<int64_t> num_of_XAPP_messages{0};
85 static long transactionCounter = 0;
86
87 int buildListeningPort(sctp_params_t &sctpParams) {
88     sctpParams.listenFD = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
89     if (sctpParams.listenFD <= 0) {
90         mdclog_write(MDCLOG_ERR, "Error Opening socket, %s", strerror(errno));
91         return -1;
92     }
93
94     struct sockaddr_in6 serverAddress {};
95     serverAddress.sin6_family = AF_INET6;
96     serverAddress.sin6_addr   = in6addr_any;
97     serverAddress.sin6_port = htons(sctpParams.sctpPort);
98     if (bind(sctpParams.listenFD, (SA *)&serverAddress, sizeof(serverAddress)) < 0 ) {
99         mdclog_write(MDCLOG_ERR, "Error binding port %d. %s", sctpParams.sctpPort, strerror(errno));
100         return -1;
101     }
102     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
103         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
104         return -1;
105     }
106     if (mdclog_level_get() >= MDCLOG_DEBUG) {
107         struct sockaddr_in6 clientAddress {};
108         socklen_t len = sizeof(clientAddress);
109         getsockname(sctpParams.listenFD, (SA *)&clientAddress, &len);
110         char buff[1024] {};
111         inet_ntop(AF_INET6, &clientAddress.sin6_addr, buff, sizeof(buff));
112         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(clientAddress.sin6_port));
113     }
114
115     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
116         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
117         return -1;
118     }
119     struct epoll_event event {};
120     event.events = EPOLLIN | EPOLLET;
121     event.data.fd = sctpParams.listenFD;
122
123     // add listening port to epoll
124     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
125         printf("Failed to add descriptor to epoll\n");
126         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
127         return -1;
128     }
129
130     return 0;
131 }
132
133 int buildConfiguration(sctp_params_t &sctpParams) {
134     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
135     if (exists(p)) {
136         const int size = 2048;
137         auto fileSize = file_size(p);
138         if (fileSize > size) {
139             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
140             return -1;
141         }
142     } else {
143         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
144         return -1;
145     }
146
147     ReadConfigFile conf;
148     if (conf.openConfigFile(p.string()) == -1) {
149         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
150                      p.string().c_str(), strerror(errno));
151         return -1;
152     }
153     int rmrPort = conf.getIntValue("nano");
154     if (rmrPort == -1) {
155         mdclog_write(MDCLOG_ERR, "illegal RMR port ");
156         return -1;
157     }
158     sctpParams.rmrPort = (uint16_t)rmrPort;
159     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
160
161     auto tmpStr = conf.getStringValue("loglevel");
162     if (tmpStr.length() == 0) {
163         mdclog_write(MDCLOG_ERR, "illegal loglevel. Set loglevel to MDCLOG_INFO");
164         tmpStr = "info";
165     }
166     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
167
168     if ((tmpStr.compare("debug")) == 0) {
169         sctpParams.logLevel = MDCLOG_DEBUG;
170     } else if ((tmpStr.compare("info")) == 0) {
171         sctpParams.logLevel = MDCLOG_INFO;
172     } else if ((tmpStr.compare("warning")) == 0) {
173         sctpParams.logLevel = MDCLOG_WARN;
174     } else if ((tmpStr.compare("error")) == 0) {
175         sctpParams.logLevel = MDCLOG_ERR;
176     } else {
177         mdclog_write(MDCLOG_ERR, "illegal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
178         sctpParams.logLevel = MDCLOG_INFO;
179     }
180     mdclog_level_set(sctpParams.logLevel);
181
182     tmpStr = conf.getStringValue("volume");
183     if (tmpStr.length() == 0) {
184         mdclog_write(MDCLOG_ERR, "illegal volume.");
185         return -1;
186     }
187
188     char tmpLogFilespec[VOLUME_URL_SIZE];
189     tmpLogFilespec[0] = 0;
190     sctpParams.volume[0] = 0;
191     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
192     // copy the name to temp file as well
193     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
194
195
196     // define the file name in the tmp directory under the volume
197     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
198
199     sctpParams.myIP = conf.getStringValue("local-ip");
200     if (sctpParams.myIP.length() == 0) {
201         mdclog_write(MDCLOG_ERR, "illegal local-ip.");
202         return -1;
203     }
204
205     int sctpPort = conf.getIntValue("sctp-port");
206     if (sctpPort == -1) {
207         mdclog_write(MDCLOG_ERR, "illegal SCTP port ");
208         return -1;
209     }
210     sctpParams.sctpPort = (uint16_t)sctpPort;
211
212     sctpParams.fqdn = conf.getStringValue("external-fqdn");
213     if (sctpParams.fqdn.length() == 0) {
214         mdclog_write(MDCLOG_ERR, "illegal external-fqdn");
215         return -1;
216     }
217
218     std::string pod = conf.getStringValue("pod_name");
219     if (pod.length() == 0) {
220         mdclog_write(MDCLOG_ERR, "illegal pod_name in config file");
221         return -1;
222     }
223     auto *podName = getenv(pod.c_str());
224     if (podName == nullptr) {
225         mdclog_write(MDCLOG_ERR, "illegal pod_name or environment variable not exists : %s", pod.c_str());
226         return -1;
227
228     } else {
229         sctpParams.podName.assign(podName);
230         if (sctpParams.podName.length() == 0) {
231             mdclog_write(MDCLOG_ERR, "illegal pod_name");
232             return -1;
233         }
234     }
235
236     tmpStr = conf.getStringValue("trace");
237     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
238     if ((tmpStr.compare("start")) == 0) {
239         mdclog_write(MDCLOG_INFO, "Trace set to: start");
240         sctpParams.trace = true;
241     } else if ((tmpStr.compare("stop")) == 0) {
242         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
243         sctpParams.trace = false;
244     } else {
245         mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
246         sctpParams.trace = false;
247     }
248     jsonTrace = sctpParams.trace;
249
250     sctpParams.epollTimeOut = -1;
251
252     tmpStr = conf.getStringValue("prometheusPort");
253     if (tmpStr.length() != 0) {
254         sctpParams.prometheusPort = tmpStr;
255     }
256
257     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, KA_MESSAGE_SIZE, "{\"address\": \"%s:%d\","
258                                                                                     "\"fqdn\": \"%s\","
259                                                                                     "\"pod_name\": \"%s\"}",
260                                             (const char *)sctpParams.myIP.c_str(),
261                                             sctpParams.rmrPort,
262                                             sctpParams.fqdn.c_str(),
263                                             sctpParams.podName.c_str());
264
265     if (mdclog_level_get() >= MDCLOG_INFO) {
266         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
267         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
268         mdclog_mdc_add("volume", sctpParams.volume);
269         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
270         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
271         mdclog_mdc_add("pod name", sctpParams.podName.c_str());
272
273         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
274     }
275     mdclog_mdc_clean();
276
277     // Files written to the current working directory
278     boostLogger = logging::add_file_log(
279             keywords::file_name = tmpLogFilespec, // to temp directory
280             keywords::rotation_size = 10 * 1024 * 1024,
281             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
282             keywords::format = "%Message%"
283             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
284     );
285
286     // Setup a destination folder for collecting rotated (closed) files --since the same volume can use rename()
287     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
288             keywords::target = sctpParams.volume
289     ));
290
291     // Upon restart, scan the directory for files matching the file_name pattern
292     boostLogger->locked_backend()->scan_for_files();
293
294     // Enable auto-flushing after each tmpStr record written
295     if (mdclog_level_get() >= MDCLOG_DEBUG) {
296         boostLogger->locked_backend()->auto_flush(true);
297     }
298
299     return 0;
300 }
301
302 void startPrometheus(sctp_params_t &sctpParams) {
303     sctpParams.prometheusFamily = &BuildCounter()
304             .Name("E2T")
305             .Help("E2T message counter")
306             .Labels({{"POD_NAME", sctpParams.podName}})
307             .Register(*sctpParams.prometheusRegistry);
308
309     string prometheusPath = sctpParams.prometheusPort + "," + "[::]:" + sctpParams.prometheusPort;
310     if (mdclog_level_get() >= MDCLOG_DEBUG) {
311         mdclog_write(MDCLOG_DEBUG, "Start Prometheus Pull mode on %s", prometheusPath.c_str());
312     }
313     sctpParams.prometheusExposer = new Exposer(prometheusPath, 1);
314     sctpParams.prometheusExposer->RegisterCollectable(sctpParams.prometheusRegistry);
315 }
316
317 int main(const int argc, char **argv) {
318     sctp_params_t sctpParams;
319
320     {
321         std::random_device device{};
322         std::mt19937 generator(device());
323         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
324         transactionCounter = distribution(generator);
325     }
326
327 //    uint64_t st = 0;
328 //    uint32_t aux1 = 0;
329 //   st = rdtscp(aux1);
330
331     unsigned num_cpus = std::thread::hardware_concurrency();
332     init_log();
333     mdclog_level_set(MDCLOG_INFO);
334
335     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
336         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
337         exit(1);
338     }
339     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
340         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
341         exit(1);
342     }
343     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
344         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
345         exit(1);
346     }
347
348     cpuClock = approx_CPU_MHz(100);
349
350     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
351
352     auto result = parse(argc, argv, sctpParams);
353
354     if (buildConfiguration(sctpParams) != 0) {
355         exit(-1);
356     }
357
358     //auto registry = std::make_shared<Registry>();
359     sctpParams.prometheusRegistry = std::make_shared<Registry>();
360
361     //sctpParams.prometheusFamily = new Family<Counter>("E2T", "E2T message counter", {{"E", sctpParams.podName}});
362
363     startPrometheus(sctpParams);
364
365     // start epoll
366     sctpParams.epoll_fd = epoll_create1(0);
367     if (sctpParams.epoll_fd == -1) {
368         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
369         exit(-1);
370     }
371
372     getRmrContext(sctpParams);
373     if (sctpParams.rmrCtx == nullptr) {
374         close(sctpParams.epoll_fd);
375         exit(-1);
376     }
377
378     if (buildInotify(sctpParams) == -1) {
379         close(sctpParams.rmrListenFd);
380         rmr_close(sctpParams.rmrCtx);
381         close(sctpParams.epoll_fd);
382         exit(-1);
383     }
384
385     if (buildListeningPort(sctpParams) != 0) {
386         close(sctpParams.rmrListenFd);
387         rmr_close(sctpParams.rmrCtx);
388         close(sctpParams.epoll_fd);
389         exit(-1);
390     }
391
392     sctpParams.sctpMap = new mapWrapper();
393
394     std::vector<std::thread> threads(num_cpus);
395 //    std::vector<std::thread> threads;
396
397     num_cpus = 3;
398     for (unsigned int i = 0; i < num_cpus; i++) {
399         threads[i] = std::thread(listener, &sctpParams);
400
401         cpu_set_t cpuset;
402         CPU_ZERO(&cpuset);
403         CPU_SET(i, &cpuset);
404         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
405         if (rc != 0) {
406             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
407         }
408     }
409
410
411     //loop over term_init until first message from xApp
412     handleTermInit(sctpParams);
413
414     for (auto &t : threads) {
415         t.join();
416     }
417
418     return 0;
419 }
420
421 void handleTermInit(sctp_params_t &sctpParams) {
422     sendTermInit(sctpParams);
423     //send to e2 manager init of e2 term
424     //E2_TERM_INIT
425
426     int count = 0;
427     while (true) {
428         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
429         if (xappMessages > 0) {
430             if (mdclog_level_get() >=  MDCLOG_INFO) {
431                 mdclog_write(MDCLOG_INFO, "Got a message from some application, stop sending E2_TERM_INIT");
432             }
433             return;
434         }
435         usleep(100000);
436         count++;
437         if (count % 1000 == 0) {
438             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
439             sendTermInit(sctpParams);
440         }
441     }
442 }
443
444 void sendTermInit(sctp_params_t &sctpParams) {
445     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
446     auto count = 0;
447     while (true) {
448         msg->mtype = E2_TERM_INIT;
449         msg->state = 0;
450         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
451         static unsigned char tx[32];
452         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
453         rmr_bytes2xact(msg, tx, txLen);
454         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
455         if (msg == nullptr) {
456             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
457         } else if (msg->state == 0) {
458             rmr_free_msg(msg);
459             if (mdclog_level_get() >=  MDCLOG_INFO) {
460                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT successfully sent ");
461             }
462             return;
463         } else {
464             if (count % 100 == 0) {
465                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %s ", translateRmrErrorMessages(msg->state).c_str());
466             }
467             sleep(1);
468         }
469         count++;
470     }
471 }
472
473 /**
474  *
475  * @param argc
476  * @param argv
477  * @param sctpParams
478  * @return
479  */
480 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
481     cxxopts::Options options(argv[0], "e2 term help");
482     options.positional_help("[optional args]").show_positional_help();
483     options.allow_unrecognised_options().add_options()
484             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
485             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
486             ("h,help", "Print help");
487
488     auto result = options.parse(argc, (const char **&)argv);
489
490     if (result.count("help")) {
491         std::cout << options.help({""}) << std::endl;
492         exit(0);
493     }
494     return result;
495 }
496
497 /**
498  *
499  * @param sctpParams
500  * @return -1 failed 0 success
501  */
502 int buildInotify(sctp_params_t &sctpParams) {
503     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
504     if (sctpParams.inotifyFD == -1) {
505         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
506         return -1;
507     }
508
509     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
510                                              (const char *)sctpParams.configFilePath.c_str(),
511                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
512     if (sctpParams.inotifyWD == -1) {
513         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
514                      sctpParams.configFilePath.c_str(),
515                      strerror(errno));
516         close(sctpParams.inotifyFD);
517         return -1;
518     }
519
520     struct epoll_event event{};
521     event.events = (EPOLLIN);
522     event.data.fd = sctpParams.inotifyFD;
523     // add listening RMR FD to epoll
524     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
525         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
526         close(sctpParams.inotifyFD);
527         return -1;
528     }
529     return 0;
530 }
531
532 /**
533  *
534  * @param args
535  * @return
536  */
537 void listener(sctp_params_t *params) {
538     int num_of_SCTP_messages = 0;
539     auto totalTime = 0.0;
540     mdclog_mdc_clean();
541     mdclog_level_set(params->logLevel);
542
543     std::thread::id this_id = std::this_thread::get_id();
544     //save cout
545     streambuf *oldCout = cout.rdbuf();
546     ostringstream memCout;
547     // create new cout
548     cout.rdbuf(memCout.rdbuf());
549     cout << this_id;
550     //return to the normal cout
551     cout.rdbuf(oldCout);
552
553     char tid[32];
554     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
555     tid[memCout.str().length()] = 0;
556     mdclog_mdc_add("thread id", tid);
557
558     if (mdclog_level_get() >= MDCLOG_DEBUG) {
559         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
560     }
561
562     RmrMessagesBuffer_t rmrMessageBuffer{};
563     //create and init RMR
564     rmrMessageBuffer.rmrCtx = params->rmrCtx;
565
566     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
567     struct timespec end{0, 0};
568     struct timespec start{0, 0};
569
570     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
571     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
572
573     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
574     rmrMessageBuffer.ka_message_len = params->ka_message_length;
575     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
576
577     if (mdclog_level_get() >= MDCLOG_DEBUG) {
578         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
579     }
580
581     ReportingMessages_t message {};
582
583 //    for (int i = 0; i < MAX_RMR_BUFF_ARRAY; i++) {
584 //        rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
585 //        rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
586 //    }
587
588     while (true) {
589         if (mdclog_level_get() >= MDCLOG_DEBUG) {
590             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait. Timeout = %d", params->epollTimeOut);
591         }
592         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, params->epollTimeOut);
593         if (numOfEvents == 0) { // time out
594             if (mdclog_level_get() >= MDCLOG_DEBUG) {
595                 mdclog_write(MDCLOG_DEBUG, "got epoll timeout");
596             }
597             continue;
598         } else if (numOfEvents < 0) {
599             if (errno == EINTR) {
600                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
601                     mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
602                 }
603                 continue;
604             }
605             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
606             return;
607         }
608         for (auto i = 0; i < numOfEvents; i++) {
609             if (mdclog_level_get() >= MDCLOG_DEBUG) {
610                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
611             }
612             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
613             start.tv_sec = message.message.time.tv_sec;
614             start.tv_nsec = message.message.time.tv_nsec;
615
616
617             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
618                 handlepoll_error(events[i], message, rmrMessageBuffer, params);
619             } else if (events[i].events & EPOLLOUT) {
620                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params);
621             } else if (params->listenFD == events[i].data.fd) {
622                 if (mdclog_level_get() >= MDCLOG_INFO) {
623                     mdclog_write(MDCLOG_INFO, "New connection request from sctp network\n");
624                 }
625                 // new connection is requested from RAN  start build connection
626                 while (true) {
627                     struct sockaddr in_addr {};
628                     socklen_t in_len;
629                     char hostBuff[NI_MAXHOST];
630                     char portBuff[NI_MAXSERV];
631
632                     in_len = sizeof(in_addr);
633                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
634                     if(peerInfo == nullptr){
635                         mdclog_write(MDCLOG_ERR, "calloc failed");
636                         break;
637                     }
638                     peerInfo->sctpParams = params;
639                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
640                     if (peerInfo->fileDescriptor == -1) {
641                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
642                             /* We have processed all incoming connections. */
643                             break;
644                         } else {
645                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
646                             break;
647                         }
648                     }
649                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
650                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
651                         close(peerInfo->fileDescriptor);
652                         break;
653                     }
654                     auto  ans = getnameinfo(&in_addr, in_len,
655                                             peerInfo->hostName, NI_MAXHOST,
656                                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
657                     if (ans < 0) {
658                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
659                         close(peerInfo->fileDescriptor);
660                         break;
661                     }
662                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
663                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
664                     }
665                     peerInfo->isConnected = false;
666                     peerInfo->gotSetup = false;
667                     if (addToEpoll(params->epoll_fd,
668                                    peerInfo,
669                                    (EPOLLIN | EPOLLET),
670                                    params->sctpMap, nullptr,
671                                    0) != 0) {
672                         break;
673                     }
674                     break;
675                 }
676             } else if (params->rmrListenFd == events[i].data.fd) {
677                 // got message from XAPP
678                 //num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
679                 num_of_messages.fetch_add(1, std::memory_order_release);
680                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
681                     mdclog_write(MDCLOG_DEBUG, "new RMR message");
682                 }
683                 if (receiveXappMessages(params->sctpMap,
684                                         rmrMessageBuffer,
685                                         message.message.time) != 0) {
686                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
687                 }
688             } else if (params->inotifyFD == events[i].data.fd) {
689                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
690                 handleConfigChange(params);
691             } else {
692                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
693                  * We must read whatever data is available completely, as we are running
694                  *  in edge-triggered mode and won't get a notification again for the same data. */
695                 num_of_messages.fetch_add(1, std::memory_order_release);
696                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
697                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
698                 }
699                 receiveDataFromSctp(&events[i],
700                                     params->sctpMap,
701                                     num_of_SCTP_messages,
702                                     rmrMessageBuffer,
703                                     message.message.time);
704             }
705
706             clock_gettime(CLOCK_MONOTONIC, &end);
707             if (mdclog_level_get() >= MDCLOG_INFO) {
708                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
709                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
710             }
711             if (mdclog_level_get() >= MDCLOG_DEBUG) {
712                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
713                              end.tv_sec - start.tv_sec,
714                              end.tv_nsec - start.tv_nsec);
715             }
716         }
717     }
718 }
719
720 /**
721  *
722  * @param sctpParams
723  */
724 void handleConfigChange(sctp_params_t *sctpParams) {
725     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
726     const struct inotify_event *event;
727     char *ptr;
728
729     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
730     auto endlessLoop = true;
731     while (endlessLoop) {
732         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
733         if (len == -1) {
734             if (errno != EAGAIN) {
735                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
736                 endlessLoop = false;
737                 continue;
738             }
739             else {
740                 endlessLoop = false;
741                 continue;
742             }
743         }
744
745         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
746             event = (const struct inotify_event *)ptr;
747             if (event->mask & (uint32_t)IN_ISDIR) {
748                 continue;
749             }
750
751             // the directory name
752             if (sctpParams->inotifyWD == event->wd) {
753                 // not the directory
754             }
755             if (event->len) {
756                 auto  retVal = strcmp(sctpParams->configFileName.c_str(), event->name);
757                 if (retVal != 0) {
758                     continue;
759                 }
760             }
761             // only the file we want
762             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
763                 if (mdclog_level_get() >= MDCLOG_INFO) {
764                     mdclog_write(MDCLOG_INFO, "Configuration file changed");
765                 }
766                 if (exists(p)) {
767                     const int size = 2048;
768                     auto fileSize = file_size(p);
769                     if (fileSize > size) {
770                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
771                         return;
772                     }
773                 } else {
774                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
775                     return;
776                 }
777
778                 ReadConfigFile conf;
779                 if (conf.openConfigFile(p.string()) == -1) {
780                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
781                                  p.string().c_str(), strerror(errno));
782                     return;
783                 }
784
785                 auto tmpStr = conf.getStringValue("loglevel");
786                 if (tmpStr.length() == 0) {
787                     mdclog_write(MDCLOG_ERR, "illegal loglevel. Set loglevel to MDCLOG_INFO");
788                     tmpStr = "info";
789                 }
790                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
791
792                 if ((tmpStr.compare("debug")) == 0) {
793                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
794                     sctpParams->logLevel = MDCLOG_DEBUG;
795                 } else if ((tmpStr.compare("info")) == 0) {
796                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
797                     sctpParams->logLevel = MDCLOG_INFO;
798                 } else if ((tmpStr.compare("warning")) == 0) {
799                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
800                     sctpParams->logLevel = MDCLOG_WARN;
801                 } else if ((tmpStr.compare("error")) == 0) {
802                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
803                     sctpParams->logLevel = MDCLOG_ERR;
804                 } else {
805                     mdclog_write(MDCLOG_ERR, "illegal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
806                     sctpParams->logLevel = MDCLOG_INFO;
807                 }
808                 mdclog_level_set(sctpParams->logLevel);
809
810
811                 tmpStr = conf.getStringValue("trace");
812                 if (tmpStr.length() == 0) {
813                     mdclog_write(MDCLOG_ERR, "illegal trace. Set trace to stop");
814                     tmpStr = "stop";
815                 }
816
817                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
818                 if ((tmpStr.compare("start")) == 0) {
819                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
820                     sctpParams->trace = true;
821                 } else if ((tmpStr.compare("stop")) == 0) {
822                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
823                     sctpParams->trace = false;
824                 } else {
825                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
826                     sctpParams->trace = false;
827                 }
828                 jsonTrace = sctpParams->trace;
829
830
831                 endlessLoop = false;
832             }
833         }
834     }
835 }
836
837 /**
838  *
839  * @param event
840  * @param message
841  * @param rmrMessageBuffer
842  * @param params
843  */
844 void handleEinprogressMessages(struct epoll_event &event,
845                                ReportingMessages_t &message,
846                                RmrMessagesBuffer_t &rmrMessageBuffer,
847                                sctp_params_t *params) {
848     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
849     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
850
851     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
852     auto retVal = 0;
853     socklen_t retValLen = 0;
854     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
855     if (rc != 0 || retVal != 0) {
856         if (rc != 0) {
857             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
858                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
859                                                          peerInfo->enodbName, strerror(errno));
860         } else if (retVal != 0) {
861             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
862                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
863                                                          peerInfo->enodbName);
864         }
865
866         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
867         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
868         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
869         message.message.direction = 'N';
870         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
871             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
872         }
873         memset(peerInfo->asnData, 0, peerInfo->asnLength);
874         peerInfo->asnLength = 0;
875         peerInfo->mtype = 0;
876         return;
877     }
878
879     peerInfo->isConnected = true;
880
881     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
882                       peerInfo->mtype) != 0) {
883         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
884         return;
885     }
886
887     message.message.asndata = (unsigned char *)peerInfo->asnData;
888     message.message.asnLength = peerInfo->asnLength;
889     message.message.messageType = peerInfo->mtype;
890     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
891     num_of_messages.fetch_add(1, std::memory_order_release);
892     if (mdclog_level_get() >= MDCLOG_DEBUG) {
893         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
894                      message.message.enodbName);
895     }
896     if (sendSctpMsg(peerInfo, message, params->sctpMap) != 0) {
897         if (mdclog_level_get() >= MDCLOG_DEBUG) {
898             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
899         }
900         return;
901     }
902
903     memset(peerInfo->asnData, 0, peerInfo->asnLength);
904     peerInfo->asnLength = 0;
905     peerInfo->mtype = 0;
906 }
907
908
909 void handlepoll_error(struct epoll_event &event,
910                       ReportingMessages_t &message,
911                       RmrMessagesBuffer_t &rmrMessageBuffer,
912                       sctp_params_t *params) {
913     if (event.data.fd != params->rmrListenFd) {
914         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
915         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
916                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
917
918         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
919                                                      "%s|Failed SCTP Connection",
920                                                      peerInfo->enodbName);
921         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
922         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
923
924         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
925         message.message.direction = 'N';
926         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
927             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
928         }
929
930         close(peerInfo->fileDescriptor);
931         params->sctpMap->erase(peerInfo->enodbName);
932         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
933     } else {
934         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
935     }
936 }
937 /**
938  *
939  * @param socket
940  * @return
941  */
942 int setSocketNoBlocking(int socket) {
943     auto flags = fcntl(socket, F_GETFL, 0);
944
945     if (flags == -1) {
946         mdclog_mdc_add("func", "fcntl");
947         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
948         mdclog_mdc_clean();
949         return -1;
950     }
951
952     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
953     if (fcntl(socket, F_SETFL, flags) == -1) {
954         mdclog_mdc_add("func", "fcntl");
955         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
956         mdclog_mdc_clean();
957         return -1;
958     }
959
960     return 0;
961 }
962
963 /**
964  *
965  * @param val
966  * @param m
967  */
968 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
969     char *dummy;
970     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
971     char searchBuff[2048]{};
972
973     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
974     m->erase(searchBuff);
975
976     m->erase(val->enodbName);
977     free(val);
978 }
979
980 /**
981  *
982  * @param fd file descriptor
983  * @param data the asn data to send
984  * @param len  length of the data
985  * @param enodbName the enodbName as in the map for printing purpose
986  * @param m map host information
987  * @param mtype message number
988  * @return 0 success, a negative number on fail
989  */
990 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
991     auto loglevel = mdclog_level_get();
992     int fd = peerInfo->fileDescriptor;
993     if (loglevel >= MDCLOG_DEBUG) {
994         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
995                      message.message.enodbName, __FUNCTION__);
996     }
997
998     while (true) {
999         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
1000             if (errno == EINTR) {
1001                 continue;
1002             }
1003             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
1004             if (!peerInfo->isConnected) {
1005                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
1006                 return -1;
1007             }
1008             cleanHashEntry(peerInfo, m);
1009             close(fd);
1010             char key[MAX_ENODB_NAME_SIZE * 2];
1011             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
1012                      message.message.messageType);
1013             if (loglevel >= MDCLOG_DEBUG) {
1014                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
1015             }
1016             auto tmp = m->find(key);
1017             if (tmp) {
1018                 free(tmp);
1019             }
1020             m->erase(key);
1021             return -1;
1022         }
1023         message.message.direction = 'D';
1024         // send report.buffer of size
1025         buildJsonMessage(message);
1026
1027         if (loglevel >= MDCLOG_DEBUG) {
1028             mdclog_write(MDCLOG_DEBUG,
1029                          "SCTP message for CU %s sent from %s",
1030                          message.message.enodbName,
1031                          __FUNCTION__);
1032         }
1033         return 0;
1034     }
1035 }
1036
1037 /**
1038  *
1039  * @param message
1040  * @param rmrMessageBuffer
1041  */
1042 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1043     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1044     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1045
1046     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1047         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
1048                      message.message.enodbName, (unsigned long) message.message.asnLength);
1049     }
1050 }
1051
1052
1053
1054 /**
1055  *
1056  * @param events
1057  * @param sctpMap
1058  * @param numOfMessages
1059  * @param rmrMessageBuffer
1060  * @param ts
1061  * @return
1062  */
1063 int receiveDataFromSctp(struct epoll_event *events,
1064                         Sctp_Map_t *sctpMap,
1065                         int &numOfMessages,
1066                         RmrMessagesBuffer_t &rmrMessageBuffer,
1067                         struct timespec &ts) {
1068     /* We have data on the fd waiting to be read. Read and display it.
1069  * We must read whatever data is available completely, as we are running
1070  *  in edge-triggered mode and won't get a notification again for the same data. */
1071     ReportingMessages_t message {};
1072     auto done = 0;
1073     auto loglevel = mdclog_level_get();
1074
1075     // get the identity of the interface
1076     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1077
1078     struct timespec start{0, 0};
1079     struct timespec decodeStart{0, 0};
1080     struct timespec end{0, 0};
1081
1082     E2AP_PDU_t *pdu = nullptr;
1083
1084     while (true) {
1085         if (loglevel >= MDCLOG_DEBUG) {
1086             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1087             clock_gettime(CLOCK_MONOTONIC, &start);
1088         }
1089         // read the buffer directly to rmr payload
1090         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1091         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1092                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1093
1094         if (loglevel >= MDCLOG_DEBUG) {
1095             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1096                          message.peerInfo->fileDescriptor, message.message.asnLength);
1097         }
1098
1099         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1100         message.message.direction = 'U';
1101         message.message.time.tv_nsec = ts.tv_nsec;
1102         message.message.time.tv_sec = ts.tv_sec;
1103
1104         if (message.message.asnLength < 0) {
1105             if (errno == EINTR) {
1106                 continue;
1107             }
1108             /* If errno == EAGAIN, that means we have read all
1109                data. So goReportingMessages_t back to the main loop. */
1110             if (errno != EAGAIN) {
1111                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1112                 done = 1;
1113             } else if (loglevel >= MDCLOG_DEBUG) {
1114                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1115             }
1116             break;
1117         } else if (message.message.asnLength == 0) {
1118             /* End of file. The remote has closed the connection. */
1119             if (loglevel >= MDCLOG_INFO) {
1120                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1121                              message.peerInfo->fileDescriptor);
1122             }
1123             done = 1;
1124             break;
1125         }
1126
1127         if (loglevel >= MDCLOG_DEBUG) {
1128             char printBuffer[RECEIVE_SCTP_BUFFER_SIZE]{};
1129             char *tmp = printBuffer;
1130             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1131                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1132                 tmp += 2;
1133             }
1134             printBuffer[message.message.asnLength] = 0;
1135             clock_gettime(CLOCK_MONOTONIC, &end);
1136             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1137                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1138             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1139                          printBuffer);
1140             clock_gettime(CLOCK_MONOTONIC, &decodeStart);
1141         }
1142
1143         auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1144                           message.message.asndata, message.message.asnLength);
1145         if (rval.code != RC_OK) {
1146             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1147                          message.peerInfo->enodbName);
1148             break;
1149         }
1150
1151         if (loglevel >= MDCLOG_DEBUG) {
1152             clock_gettime(CLOCK_MONOTONIC, &end);
1153             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1154                          message.peerInfo->enodbName, end.tv_sec - decodeStart.tv_sec, end.tv_nsec - decodeStart.tv_nsec);
1155             char *printBuffer;
1156             size_t size;
1157             FILE *stream = open_memstream(&printBuffer, &size);
1158             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1159             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1160             clock_gettime(CLOCK_MONOTONIC, &decodeStart);
1161
1162             fclose(stream);
1163             free(printBuffer);
1164         }
1165
1166         switch (pdu->present) {
1167             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1168                 asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer);
1169                 break;
1170             }
1171             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1172                 asnSuccessfulMsg(pdu, sctpMap, message, rmrMessageBuffer);
1173                 break;
1174             }
1175             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1176                 asnUnSuccsesfulMsg(pdu, sctpMap, message, rmrMessageBuffer);
1177                 break;
1178             }
1179             default:
1180                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1181                 break;
1182         }
1183         if (loglevel >= MDCLOG_DEBUG) {
1184             clock_gettime(CLOCK_MONOTONIC, &end);
1185             mdclog_write(MDCLOG_DEBUG,
1186                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1187                          message.peerInfo->enodbName, end.tv_sec - decodeStart.tv_sec, end.tv_nsec - decodeStart.tv_nsec);
1188         }
1189         numOfMessages++;
1190         if (pdu != nullptr) {
1191             ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1192             //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1193             //pdu = nullptr;
1194         }
1195     }
1196
1197     if (done) {
1198         if (loglevel >= MDCLOG_INFO) {
1199             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1200         }
1201         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1202                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1203                          256,
1204                          "%s|CU disconnected unexpectedly",
1205                          message.peerInfo->enodbName);
1206         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1207
1208         if (sendRequestToXapp(message,
1209                               RIC_SCTP_CONNECTION_FAILURE,
1210                               rmrMessageBuffer) != 0) {
1211             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1212         }
1213
1214         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1215         close(message.peerInfo->fileDescriptor);
1216         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
1217     }
1218     if (loglevel >= MDCLOG_DEBUG) {
1219         clock_gettime(CLOCK_MONOTONIC, &end);
1220         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1221                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1222
1223     }
1224     return 0;
1225 }
1226
1227 static void buildAndSendSetupRequest(ReportingMessages_t &message,
1228                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1229                                      E2AP_PDU_t *pdu/*,
1230                                      string const &messageName,
1231                                      string const &ieName,
1232                                      vector<string> &functionsToAdd_v,
1233                                      vector<string> &functionsToModified_v*/) {
1234     auto logLevel = mdclog_level_get();
1235     // now we can send the data to e2Mgr
1236
1237     asn_enc_rval_t er;
1238     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1239     unsigned char *buffer = nullptr;
1240     buffer = (unsigned char *) calloc(buffer_size, sizeof(unsigned char));
1241     if(!buffer)
1242     {
1243         mdclog_write(MDCLOG_ERR, "Allocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1244         return;
1245     }
1246     while (true) {
1247         er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size);
1248         if (er.encoded == -1) {
1249             mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1250             return;
1251         } else if (er.encoded > (ssize_t) buffer_size) {
1252             buffer_size = er.encoded + 128;
1253             mdclog_write(MDCLOG_WARN, "Buffer of size %d is to small for %s. Reallocate buffer of size %d",
1254                          (int) buffer_size,
1255                          asn_DEF_E2AP_PDU.name, buffer_size);
1256             buffer_size = er.encoded + 128;
1257
1258             unsigned char *newBuffer = nullptr;
1259             newBuffer = (unsigned char *) realloc(buffer, buffer_size);
1260             if (!newBuffer)
1261             {
1262                 // out of memory
1263                 mdclog_write(MDCLOG_ERR, "Reallocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1264                 free(buffer);
1265                 return;
1266             }
1267             buffer = newBuffer;
1268             continue;
1269         }
1270         buffer[er.encoded] = '\0';
1271         break;
1272     }
1273     // encode to xml
1274
1275     string res((char *)buffer);
1276     res.erase(std::remove(res.begin(), res.end(), '\n'), res.end());
1277     res.erase(std::remove(res.begin(), res.end(), '\t'), res.end());
1278     res.erase(std::remove(res.begin(), res.end(), ' '), res.end());
1279
1280 //    string res {};
1281 //    if (!functionsToAdd_v.empty() || !functionsToModified_v.empty()) {
1282 //        res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded);
1283 //    }
1284     rmr_mbuf_t *rmrMsg;
1285 //    if (res.length() == 0) {
1286 //        rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size + 256);
1287 //        rmrMsg->len = snprintf((char *) rmrMsg->payload, RECEIVE_SCTP_BUFFER_SIZE * 2, "%s:%d|%s",
1288 //                               message.peerInfo->sctpParams->myIP.c_str(),
1289 //                               message.peerInfo->sctpParams->rmrPort,
1290 //                               buffer);
1291 //    } else {
1292         rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, (int)res.length() + 256);
1293         rmrMsg->len = snprintf((char *) rmrMsg->payload, res.length() + 256, "%s:%d|%s",
1294                                message.peerInfo->sctpParams->myIP.c_str(),
1295                                message.peerInfo->sctpParams->rmrPort,
1296                                res.c_str());
1297 //    }
1298
1299     if (logLevel >= MDCLOG_DEBUG) {
1300         mdclog_write(MDCLOG_DEBUG, "Setup request of size %d :\n %s\n", rmrMsg->len, rmrMsg->payload);
1301     }
1302     // send to RMR
1303     rmrMsg->mtype = message.message.messageType;
1304     rmrMsg->state = 0;
1305     rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1306
1307     static unsigned char tx[32];
1308     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1309     rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1310
1311     rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1312     if (rmrMsg == nullptr) {
1313         mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1314     } else if (rmrMsg->state != 0) {
1315         char meid[RMR_MAX_MEID]{};
1316         if (rmrMsg->state == RMR_ERR_RETRY) {
1317             usleep(5);
1318             rmrMsg->state = 0;
1319             mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1320                          rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1321             rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1322             if (rmrMsg == nullptr) {
1323                 mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1324             } else if (rmrMsg->state != 0) {
1325                 mdclog_write(MDCLOG_ERR,
1326                              "RMR Retry failed %s sending request %d to Xapp from %s",
1327                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1328                              rmrMsg->mtype,
1329                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1330             }
1331         } else {
1332             mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1333                          translateRmrErrorMessages(rmrMsg->state).c_str(),
1334                          rmrMsg->mtype,
1335                          rmr_get_meid(rmrMsg, (unsigned char *) meid));
1336         }
1337     }
1338     message.peerInfo->gotSetup = true;
1339     buildJsonMessage(message);
1340
1341     if (rmrMsg != nullptr) {
1342         rmr_free_msg(rmrMsg);
1343     }
1344     free(buffer);
1345
1346     return;
1347 }
1348
1349 #if 0
1350 int RAN_Function_list_To_Vector(RANfunctions_List_t& list, vector <string> &runFunXML_v) {
1351     auto index = 0;
1352     runFunXML_v.clear();
1353     for (auto j = 0; j < list.list.count; j++) {
1354         auto *raNfunctionItemIEs = (RANfunction_ItemIEs_t *)list.list.array[j];
1355         if (raNfunctionItemIEs->id == ProtocolIE_ID_id_RANfunction_Item &&
1356             (raNfunctionItemIEs->value.present == RANfunction_ItemIEs__value_PR_RANfunction_Item)) {
1357             // encode to xml
1358             E2SM_gNB_NRT_RANfunction_Definition_t *ranFunDef = nullptr;
1359             auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER,
1360                                    &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1361                                    (void **)&ranFunDef,
1362                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.buf,
1363                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.size);
1364             if (rval.code != RC_OK) {
1365                 mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2SM message from : %s",
1366                              rval.code,
1367                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name);
1368                 return -1;
1369             }
1370
1371             auto xml_buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1372             unsigned char xml_buffer[RECEIVE_SCTP_BUFFER_SIZE * 2];
1373             memset(xml_buffer, 0, RECEIVE_SCTP_BUFFER_SIZE * 2);
1374             // encode to xml
1375             auto er = asn_encode_to_buffer(nullptr,
1376                                            ATS_BASIC_XER,
1377                                            &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1378                                            ranFunDef,
1379                                            xml_buffer,
1380                                            xml_buffer_size);
1381             if (er.encoded == -1) {
1382                 mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s",
1383                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1384                              strerror(errno));
1385             } else if (er.encoded > (ssize_t)xml_buffer_size) {
1386                 mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1387                              (int) xml_buffer_size,
1388                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name, __func__, __LINE__);
1389             } else {
1390                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1391                     mdclog_write(MDCLOG_DEBUG, "Encoding E2SM %s PDU number %d : %s",
1392                                  asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1393                                  index++,
1394                                  xml_buffer);
1395                 }
1396
1397                 string runFuncs = (char *)(xml_buffer);
1398                 runFunXML_v.emplace_back(runFuncs);
1399             }
1400         }
1401     }
1402     return 0;
1403 }
1404
1405 int collectServiceUpdate_RequestData(E2AP_PDU_t *pdu,
1406                                      Sctp_Map_t *sctpMap,
1407                                      ReportingMessages_t &message,
1408                                      vector <string> &RANfunctionsAdded_v,
1409                                      vector <string> &RANfunctionsModified_v) {
1410     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1411     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.count; i++) {
1412         auto *ie = pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.array[i];
1413         if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1414             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctionsID_List) {
1415                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1416                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1417                                  ie->value.choice.RANfunctions_List.list.count);
1418                 }
1419                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1420                     return -1;
1421                 }
1422             }
1423         } else if (ie->id == ProtocolIE_ID_id_RANfunctionsModified) {
1424             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctions_List) {
1425                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1426                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1427                                  ie->value.choice.RANfunctions_List.list.count);
1428                 }
1429                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsModified_v) != 0 ) {
1430                     return -1;
1431                 }
1432             }
1433         }
1434     }
1435     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1436         mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1437                      RANfunctionsAdded_v.size());
1438     }
1439     return 0;
1440 }
1441
1442 #endif
1443
1444
1445 void buildPrometheusList(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFamily) {
1446     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}});
1447     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}});
1448
1449     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Messages"}});
1450     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Bytes"}});
1451
1452     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICindication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Messages"}});
1453     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICindication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Bytes"}});
1454
1455     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Messages"}});
1456     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Bytes"}});
1457
1458     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Messages"}});
1459     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Bytes"}});
1460     // ---------------------------------------------
1461     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Messages"}});
1462     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Bytes"}});
1463
1464     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Messages"}});
1465     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Bytes"}});
1466
1467     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Messages"}});
1468     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Bytes"}});
1469
1470     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Messages"}});
1471     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Bytes"}});
1472     //-------------------------------------------------------------
1473
1474     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Messages"}});
1475     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Bytes"}});
1476
1477     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Messages"}});
1478     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Bytes"}});
1479
1480     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Messages"}});
1481     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Bytes"}});
1482
1483     //====================================================================================
1484     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Messages"}});
1485     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Bytes"}});
1486
1487     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Messages"}});
1488     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Bytes"}});
1489
1490     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Messages"}});
1491     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Bytes"}});
1492
1493     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceQuery)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Messages"}});
1494     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceQuery)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Bytes"}});
1495
1496     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Messages"}});
1497     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Bytes"}});
1498
1499     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Messages"}});
1500     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Bytes"}});
1501     //---------------------------------------------------------------------------------------------------------
1502     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Messages"}});
1503     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Bytes"}});
1504
1505     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Messages"}});
1506     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Bytes"}});
1507
1508     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Messages"}});
1509     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Bytes"}});
1510     //----------------------------------------------------------------------------------------------------------------
1511     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Messages"}});
1512     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Bytes"}});
1513
1514     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Messages"}});
1515     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Bytes"}});
1516 }
1517 /**
1518  *
1519  * @param pdu
1520  * @param sctpMap
1521  * @param message
1522  * @param RANfunctionsAdded_v
1523  * @return
1524  */
1525 int collectSetupRequestData(E2AP_PDU_t *pdu,
1526                                      Sctp_Map_t *sctpMap,
1527                                      ReportingMessages_t &message /*, vector <string> &RANfunctionsAdded_v*/) {
1528     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1529     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1530         auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1531         if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1532             // get the ran name for meid
1533             if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1534                 if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1535                     mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1536                     // no message will be sent
1537                     return -1;
1538                 }
1539
1540                 memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1541                 sctpMap->setkey(message.message.enodbName, message.peerInfo);
1542             }
1543         } /*else if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1544             if (ie->value.present == E2setupRequestIEs__value_PR_RANfunctions_List) {
1545                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1546                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1547                                  ie->value.choice.RANfunctions_List.list.count);
1548                 }
1549                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1550                     return -1;
1551                 }
1552             }
1553         } */
1554     }
1555 //    if (mdclog_level_get() >= MDCLOG_DEBUG) {
1556 //        mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1557 //                     RANfunctionsAdded_v.size());
1558 //    }
1559     return 0;
1560 }
1561
1562 int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1563     E2AP_PDU_t *pdu = nullptr;
1564
1565     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1566         mdclog_write(MDCLOG_DEBUG, "got PER message of size %d is:%s",
1567                      rmrMessageBuffer.sendMessage->len, rmrMessageBuffer.sendMessage->payload);
1568     }
1569     auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1570                            rmrMessageBuffer.sendMessage->payload, rmrMessageBuffer.sendMessage->len);
1571     if (rval.code != RC_OK) {
1572         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
1573                      rval.code,
1574                      message.message.enodbName);
1575         return -1;
1576     }
1577
1578     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
1579     auto er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu,
1580                                    rmrMessageBuffer.sendMessage->payload, buff_size);
1581     if (er.encoded == -1) {
1582         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1583         return -1;
1584     } else if (er.encoded > (ssize_t)buff_size) {
1585         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1586                      (int)rmrMessageBuffer.sendMessage->len,
1587                      asn_DEF_E2AP_PDU.name,
1588                      __func__,
1589                      __LINE__);
1590         return -1;
1591     }
1592     rmrMessageBuffer.sendMessage->len = er.encoded;
1593     return 0;
1594
1595 }
1596
1597 /**
1598  *
1599  * @param pdu
1600  * @param message
1601  * @param rmrMessageBuffer
1602  */
1603 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1604                           Sctp_Map_t *sctpMap,
1605                           ReportingMessages_t &message,
1606                           RmrMessagesBuffer_t &rmrMessageBuffer) {
1607     auto logLevel = mdclog_level_get();
1608     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1609     if (logLevel >= MDCLOG_DEBUG) {
1610         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1611     }
1612     switch (procedureCode) {
1613         case ProcedureCode_id_E2setup: {
1614             if (logLevel >= MDCLOG_DEBUG) {
1615                 mdclog_write(MDCLOG_DEBUG, "Got E2setup");
1616             }
1617
1618 //            vector <string> RANfunctionsAdded_v;
1619 //            vector <string> RANfunctionsModified_v;
1620 //            RANfunctionsAdded_v.clear();
1621 //            RANfunctionsModified_v.clear();
1622             if (collectSetupRequestData(pdu, sctpMap, message) != 0) {
1623                 break;
1624             }
1625
1626             buildPrometheusList(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily);
1627
1628             string messageName("E2setupRequest");
1629             string ieName("E2setupRequestIEs");
1630             message.message.messageType = RIC_E2_SETUP_REQ;
1631             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
1632             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment((double)message.message.asnLength);
1633             buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
1634             break;
1635         }
1636         case ProcedureCode_id_RICserviceUpdate: {
1637             if (logLevel >= MDCLOG_DEBUG) {
1638                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1639             }
1640 //            vector <string> RANfunctionsAdded_v;
1641 //            vector <string> RANfunctionsModified_v;
1642 //            RANfunctionsAdded_v.clear();
1643 //            RANfunctionsModified_v.clear();
1644 //            if (collectServiceUpdate_RequestData(pdu, sctpMap, message,
1645 //                                                 RANfunctionsAdded_v, RANfunctionsModified_v) != 0) {
1646 //                break;
1647 //            }
1648
1649             string messageName("RICserviceUpdate");
1650             string ieName("RICserviceUpdateIEs");
1651             message.message.messageType = RIC_SERVICE_UPDATE;
1652             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
1653             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment((double)message.message.asnLength);
1654
1655             buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
1656             break;
1657         }
1658         case ProcedureCode_id_ErrorIndication: {
1659             if (logLevel >= MDCLOG_DEBUG) {
1660                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1661             }
1662             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
1663             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment((double)message.message.asnLength);
1664             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1665                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1666             }
1667             break;
1668         }
1669         case ProcedureCode_id_Reset: {
1670             if (logLevel >= MDCLOG_DEBUG) {
1671                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1672             }
1673
1674             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
1675             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
1676             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
1677                 break;
1678             }
1679
1680             if (sendRequestToXapp(message, RIC_E2_RESET_REQ, rmrMessageBuffer) != 0) {
1681                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_REQ message failed to send to xAPP");
1682             }
1683             break;
1684         }
1685         case ProcedureCode_id_RICindication: {
1686             if (logLevel >= MDCLOG_DEBUG) {
1687                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1688             }
1689             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1690                 auto messageSent = false;
1691                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1692                 if (logLevel >= MDCLOG_DEBUG) {
1693                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1694                 }
1695                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1696                     if (logLevel >= MDCLOG_DEBUG) {
1697                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1698                     }
1699                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1700                         static unsigned char tx[32];
1701                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1702                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1703                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1704                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1705                                        (unsigned char *)message.message.enodbName,
1706                                        strlen(message.message.enodbName));
1707                         rmrMessageBuffer.sendMessage->state = 0;
1708                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1709
1710                         //ie->value.choice.RICrequestID.ricInstanceID;
1711                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1712                             mdclog_write(MDCLOG_DEBUG, "sub id = %d, mtype = %d, ric instance id %ld, requestor id = %ld",
1713                                          rmrMessageBuffer.sendMessage->sub_id,
1714                                          rmrMessageBuffer.sendMessage->mtype,
1715                                          ie->value.choice.RICrequestID.ricInstanceID,
1716                                          ie->value.choice.RICrequestID.ricRequestorID);
1717                         }
1718                         message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication]->Increment();
1719                         message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication]->Increment((double)message.message.asnLength);
1720                         sendRmrMessage(rmrMessageBuffer, message);
1721                         messageSent = true;
1722                     } else {
1723                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
1724                     }
1725                 }
1726                 if (messageSent) {
1727                     break;
1728                 }
1729             }
1730             break;
1731         }
1732         default: {
1733             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1734             message.message.messageType = 0; // no RMR message type yet
1735
1736             buildJsonMessage(message);
1737
1738             break;
1739         }
1740     }
1741 }
1742
1743 /**
1744  *
1745  * @param pdu
1746  * @param message
1747  * @param rmrMessageBuffer
1748  */
1749 void asnSuccessfulMsg(E2AP_PDU_t *pdu,
1750                       Sctp_Map_t *sctpMap,
1751                       ReportingMessages_t &message,
1752                       RmrMessagesBuffer_t &rmrMessageBuffer) {
1753     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1754     auto logLevel = mdclog_level_get();
1755     if (logLevel >= MDCLOG_INFO) {
1756         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1757     }
1758     switch (procedureCode) {
1759         case ProcedureCode_id_Reset: {
1760             if (logLevel >= MDCLOG_DEBUG) {
1761                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1762             }
1763             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
1764             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
1765             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
1766                 break;
1767             }
1768             if (sendRequestToXapp(message, RIC_E2_RESET_RESP, rmrMessageBuffer) != 0) {
1769                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_RESP message failed to send to xAPP");
1770             }
1771             break;
1772         }
1773         case ProcedureCode_id_RICcontrol: {
1774             if (logLevel >= MDCLOG_DEBUG) {
1775                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1776             }
1777             for (auto i = 0;
1778                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1779                 auto messageSent = false;
1780                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1781                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1782                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1783                 }
1784                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1785                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1786                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1787                     }
1788                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1789                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1790                         rmrMessageBuffer.sendMessage->state = 0;
1791 //                        rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1792                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1793
1794                         static unsigned char tx[32];
1795                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1796                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1797                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1798                                        (unsigned char *)message.message.enodbName,
1799                                        strlen(message.message.enodbName));
1800
1801                         message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
1802                         message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
1803                         sendRmrMessage(rmrMessageBuffer, message);
1804                         messageSent = true;
1805                     } else {
1806                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
1807                     }
1808                 }
1809                 if (messageSent) {
1810                     break;
1811                 }
1812             }
1813
1814             break;
1815         }
1816         case ProcedureCode_id_RICsubscription: {
1817             if (logLevel >= MDCLOG_DEBUG) {
1818                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1819             }
1820             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
1821             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
1822             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
1823                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1824             }
1825             break;
1826         }
1827         case ProcedureCode_id_RICsubscriptionDelete: {
1828             if (logLevel >= MDCLOG_DEBUG) {
1829                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1830             }
1831             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
1832             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
1833             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
1834                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1835             }
1836             break;
1837         }
1838         default: {
1839             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1840             message.message.messageType = 0; // no RMR message type yet
1841             buildJsonMessage(message);
1842
1843             break;
1844         }
1845     }
1846 }
1847
1848 /**
1849  *
1850  * @param pdu
1851  * @param message
1852  * @param rmrMessageBuffer
1853  */
1854 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1855                         Sctp_Map_t *sctpMap,
1856                         ReportingMessages_t &message,
1857                         RmrMessagesBuffer_t &rmrMessageBuffer) {
1858     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1859     auto logLevel = mdclog_level_get();
1860     if (logLevel >= MDCLOG_INFO) {
1861         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1862     }
1863     switch (procedureCode) {
1864         case ProcedureCode_id_RICcontrol: {
1865             if (logLevel >= MDCLOG_DEBUG) {
1866                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1867             }
1868             for (int i = 0;
1869                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1870                 auto messageSent = false;
1871                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1872                 if (logLevel >= MDCLOG_DEBUG) {
1873                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1874                 }
1875                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1876                     if (logLevel >= MDCLOG_DEBUG) {
1877                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1878                     }
1879                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1880                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1881                         rmrMessageBuffer.sendMessage->state = 0;
1882 //                        rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricRequestorID;
1883                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1884                         static unsigned char tx[32];
1885                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1886                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1887                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
1888                                        strlen(message.message.enodbName));
1889                         message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
1890                         message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
1891                         sendRmrMessage(rmrMessageBuffer, message);
1892                         messageSent = true;
1893                     } else {
1894                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
1895                     }
1896                 }
1897                 if (messageSent) {
1898                     break;
1899                 }
1900             }
1901             break;
1902         }
1903         case ProcedureCode_id_RICsubscription: {
1904             if (logLevel >= MDCLOG_DEBUG) {
1905                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1906             }
1907             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
1908             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
1909             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1910                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1911             }
1912             break;
1913         }
1914         case ProcedureCode_id_RICsubscriptionDelete: {
1915             if (logLevel >= MDCLOG_DEBUG) {
1916                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1917             }
1918             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
1919             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
1920             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1921                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1922             }
1923             break;
1924         }
1925         default: {
1926             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1927             message.message.messageType = 0; // no RMR message type yet
1928
1929             buildJsonMessage(message);
1930
1931             break;
1932         }
1933     }
1934 }
1935
1936 /**
1937  *
1938  * @param message
1939  * @param requestId
1940  * @param rmrMmessageBuffer
1941  * @return
1942  */
1943 int sendRequestToXapp(ReportingMessages_t &message,
1944                       int requestId,
1945                       RmrMessagesBuffer_t &rmrMmessageBuffer) {
1946     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1947                    (unsigned char *)message.message.enodbName,
1948                    strlen(message.message.enodbName));
1949     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1950     rmrMmessageBuffer.sendMessage->state = 0;
1951     static unsigned char tx[32];
1952     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1953     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1954
1955     auto rc = sendRmrMessage(rmrMmessageBuffer, message);
1956     return rc;
1957 }
1958
1959 /**
1960  *
1961  * @param pSctpParams
1962  */
1963 void getRmrContext(sctp_params_t &pSctpParams) {
1964     pSctpParams.rmrCtx = nullptr;
1965     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RECEIVE_XAPP_BUFFER_SIZE, RMRFL_NONE);
1966     if (pSctpParams.rmrCtx == nullptr) {
1967         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1968         return;
1969     }
1970
1971     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1972     // we need to find that routing table exist and we can run
1973     if (mdclog_level_get() >= MDCLOG_INFO) {
1974         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1975     }
1976     int rmrReady = 0;
1977     int count = 0;
1978     while (!rmrReady) {
1979         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1980             sleep(1);
1981         }
1982         count++;
1983         if (count % 60 == 0) {
1984             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1985         }
1986     }
1987     if (mdclog_level_get() >= MDCLOG_INFO) {
1988         mdclog_write(MDCLOG_INFO, "RMR running");
1989     }
1990     rmr_init_trace(pSctpParams.rmrCtx, 200);
1991     // get the RMR fd for the epoll
1992     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1993     struct epoll_event event{};
1994     // add RMR fd to epoll
1995     event.events = (EPOLLIN);
1996     event.data.fd = pSctpParams.rmrListenFd;
1997     // add listening RMR FD to epoll
1998     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
1999         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
2000         close(pSctpParams.rmrListenFd);
2001         rmr_close(pSctpParams.rmrCtx);
2002         pSctpParams.rmrCtx = nullptr;
2003     }
2004 }
2005
2006 /**
2007  *
2008  * @param message
2009  * @param rmrMessageBuffer
2010  * @return
2011  */
2012 int PER_FromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
2013     E2AP_PDU_t *pdu = nullptr;
2014
2015     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2016         mdclog_write(MDCLOG_DEBUG, "got xml Format  data from xApp of size %d is:%s",
2017                 rmrMessageBuffer.rcvMessage->len, rmrMessageBuffer.rcvMessage->payload);
2018     }
2019     auto rval = asn_decode(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, (void **) &pdu,
2020                            rmrMessageBuffer.rcvMessage->payload, rmrMessageBuffer.rcvMessage->len);
2021     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2022         mdclog_write(MDCLOG_DEBUG, "%s After  decoding the XML to PDU", __func__ );
2023     }
2024     if (rval.code != RC_OK) {
2025         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
2026                      rval.code,
2027                      message.message.enodbName);
2028         return -1;
2029     }
2030
2031     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
2032     auto er = asn_encode_to_buffer(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, pdu,
2033                                    rmrMessageBuffer.rcvMessage->payload, buff_size);
2034     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2035         mdclog_write(MDCLOG_DEBUG, "%s After encoding PDU to PER", __func__ );
2036     }
2037     if (er.encoded == -1) {
2038         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
2039         return -1;
2040     } else if (er.encoded > (ssize_t)buff_size) {
2041         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
2042                      (int)rmrMessageBuffer.rcvMessage->len,
2043                      asn_DEF_E2AP_PDU.name,
2044                      __func__,
2045                      __LINE__);
2046         return -1;
2047     }
2048     rmrMessageBuffer.rcvMessage->len = er.encoded;
2049     return 0;
2050 }
2051
2052 /**
2053  *
2054  * @param sctpMap
2055  * @param rmrMessageBuffer
2056  * @param ts
2057  * @return
2058  */
2059 int receiveXappMessages(Sctp_Map_t *sctpMap,
2060                         RmrMessagesBuffer_t &rmrMessageBuffer,
2061                         struct timespec &ts) {
2062     int loglevel = mdclog_level_get();
2063     if (rmrMessageBuffer.rcvMessage == nullptr) {
2064         //we have error
2065         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
2066         return -1;
2067     }
2068
2069 //    if (loglevel >= MDCLOG_DEBUG) {
2070 //        mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
2071 //    }
2072     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2073     if (rmrMessageBuffer.rcvMessage == nullptr) {
2074         mdclog_write(MDCLOG_ERR, "RMR Receiving message with null pointer, Reallocated rmr message buffer");
2075         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2076         return -2;
2077     }
2078     ReportingMessages_t message;
2079     message.message.direction = 'D';
2080     message.message.time.tv_nsec = ts.tv_nsec;
2081     message.message.time.tv_sec = ts.tv_sec;
2082
2083     // get message payload
2084     //auto msgData = msg->payload;
2085     if (rmrMessageBuffer.rcvMessage->state != 0) {
2086         mdclog_write(MDCLOG_ERR, "RMR Receiving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
2087         return -1;
2088     }
2089     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName);
2090     message.peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2091     if (message.peerInfo == nullptr) {
2092         auto type = rmrMessageBuffer.rcvMessage->mtype;
2093         switch (type) {
2094             case RIC_SCTP_CLEAR_ALL:
2095             case E2_TERM_KEEP_ALIVE_REQ:
2096             case RIC_HEALTH_CHECK_REQ:
2097                 break;
2098             default:
2099                 mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2100                 return -1;
2101         }
2102     }
2103
2104     if (rmrMessageBuffer.rcvMessage->mtype != RIC_HEALTH_CHECK_REQ) {
2105         num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
2106
2107     }
2108     switch (rmrMessageBuffer.rcvMessage->mtype) {
2109         case RIC_E2_SETUP_RESP : {
2110             if (loglevel >= MDCLOG_DEBUG) {
2111                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_SETUP_RESP");
2112             }
2113             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2114                 break;
2115             }
2116             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2117             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2118             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2119                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
2120                 return -6;
2121             }
2122             break;
2123         }
2124         case RIC_E2_SETUP_FAILURE : {
2125             if (loglevel >= MDCLOG_DEBUG) {
2126                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_SETUP_FAILURE");
2127             }
2128             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2129                 break;
2130             }
2131             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2132             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2133             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2134                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
2135                 return -6;
2136             }
2137             break;
2138         }
2139         case RIC_ERROR_INDICATION: {
2140             if (loglevel >= MDCLOG_DEBUG) {
2141                 mdclog_write(MDCLOG_DEBUG, "RIC_ERROR_INDICATION");
2142             }
2143             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
2144             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len);
2145             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2146                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
2147                 return -6;
2148             }
2149             break;
2150         }
2151         case RIC_SUB_REQ: {
2152             if (loglevel >= MDCLOG_DEBUG) {
2153                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_REQ");
2154             }
2155             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
2156             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len);
2157             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2158                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
2159                 return -6;
2160             }
2161             break;
2162         }
2163         case RIC_SUB_DEL_REQ: {
2164             if (loglevel >= MDCLOG_DEBUG) {
2165                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_DEL_REQ");
2166             }
2167             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
2168             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len);
2169             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2170                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
2171                 return -6;
2172             }
2173             break;
2174         }
2175         case RIC_CONTROL_REQ: {
2176             if (loglevel >= MDCLOG_DEBUG) {
2177                 mdclog_write(MDCLOG_DEBUG, "RIC_CONTROL_REQ");
2178             }
2179             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
2180             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len);
2181             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2182                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
2183                 return -6;
2184             }
2185             break;
2186         }
2187         case RIC_SERVICE_QUERY: {
2188             if (loglevel >= MDCLOG_DEBUG) {
2189                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_QUERY");
2190             }
2191             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2192                 break;
2193             }
2194             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment();
2195             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len);
2196             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2197                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
2198                 return -6;
2199             }
2200             break;
2201         }
2202         case RIC_SERVICE_UPDATE_ACK: {
2203             if (loglevel >= MDCLOG_DEBUG) {
2204                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_UPDATE_ACK");
2205             }
2206             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2207                 mdclog_write(MDCLOG_ERR, "error in PER_FromXML");
2208                 break;
2209             }
2210             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2211             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2212             if (loglevel >= MDCLOG_DEBUG) {
2213                 mdclog_write(MDCLOG_DEBUG, "Before sending to CU");
2214             }
2215             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2216                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
2217                 return -6;
2218             }
2219             break;
2220         }
2221         case RIC_SERVICE_UPDATE_FAILURE: {
2222             if (loglevel >= MDCLOG_DEBUG) {
2223                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_UPDATE_FAILURE");
2224             }
2225             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2226                 break;
2227             }
2228             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2229             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2230             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2231                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
2232                 return -6;
2233             }
2234             break;
2235         }
2236         case RIC_E2_RESET_REQ: {
2237             if (loglevel >= MDCLOG_DEBUG) {
2238                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_RESET_REQ");
2239             }
2240             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2241                 break;
2242             }
2243             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2244             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2245             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2246                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET");
2247                 return -6;
2248             }
2249             break;
2250         }
2251         case RIC_E2_RESET_RESP: {
2252             if (loglevel >= MDCLOG_DEBUG) {
2253                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_RESET_RESP");
2254             }
2255             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2256                 break;
2257             }
2258             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2259             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2260             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2261                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP");
2262                 return -6;
2263             }
2264             break;
2265         }
2266         case RIC_SCTP_CLEAR_ALL: {
2267             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
2268             // loop on all keys and close socket and then erase all map.
2269             vector<char *> v;
2270             sctpMap->getKeys(v);
2271             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
2272                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
2273                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
2274                     if (peerInfo == nullptr) {
2275                         continue;
2276                     }
2277                     close(peerInfo->fileDescriptor);
2278                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
2279                     message.message.direction = 'D';
2280                     message.message.time.tv_nsec = ts.tv_nsec;
2281                     message.message.time.tv_sec = ts.tv_sec;
2282
2283                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2284                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2285                                      256,
2286                                      "%s|RIC_SCTP_CLEAR_ALL",
2287                                      peerInfo->enodbName);
2288                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2289                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
2290                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
2291                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
2292                     }
2293                     free(peerInfo);
2294                 }
2295             }
2296
2297             sleep(1);
2298             sctpMap->clear();
2299             break;
2300         }
2301         case E2_TERM_KEEP_ALIVE_REQ: {
2302             // send message back
2303             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2304                               (unsigned char *)rmrMessageBuffer.ka_message,
2305                               rmrMessageBuffer.ka_message_len);
2306             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
2307             rmrMessageBuffer.sendMessage->state = 0;
2308             static unsigned char tx[32];
2309             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2310             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2311             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2312             if (rmrMessageBuffer.sendMessage == nullptr) {
2313                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2314                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
2315             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
2316                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
2317                              rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
2318             } else if (loglevel >= MDCLOG_DEBUG) {
2319                 mdclog_write(MDCLOG_DEBUG, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
2320             }
2321
2322             break;
2323         }
2324         case RIC_HEALTH_CHECK_REQ: {
2325             static int counter = 0;
2326             // send message back
2327             rmr_bytes2payload(rmrMessageBuffer.rcvMessage,
2328                               (unsigned char *)"OK",
2329                               2);
2330             rmrMessageBuffer.rcvMessage->mtype = RIC_HEALTH_CHECK_RESP;
2331             rmrMessageBuffer.rcvMessage->state = 0;
2332             static unsigned char tx[32];
2333             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2334             rmr_bytes2xact(rmrMessageBuffer.rcvMessage, tx, txLen);
2335             rmrMessageBuffer.rcvMessage = rmr_rts_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2336             //rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2337             if (rmrMessageBuffer.rcvMessage == nullptr) {
2338                 rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2339                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP RMR message returned NULL");
2340             } else if (rmrMessageBuffer.rcvMessage->state != 0)  {
2341                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP, on RMR state = %d ( %s)",
2342                              rmrMessageBuffer.rcvMessage->state, translateRmrErrorMessages(rmrMessageBuffer.rcvMessage->state).c_str());
2343             } else if (loglevel >= MDCLOG_DEBUG && ++counter % 100 == 0) {
2344                 mdclog_write(MDCLOG_DEBUG, "Got %d RIC_HEALTH_CHECK_REQ Request send : OK", counter);
2345             }
2346
2347             break;
2348         }
2349
2350         default:
2351             mdclog_write(MDCLOG_WARN, "Message Type : %d is not supported", rmrMessageBuffer.rcvMessage->mtype);
2352             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2353             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2354             message.message.time.tv_nsec = ts.tv_nsec;
2355             message.message.time.tv_sec = ts.tv_sec;
2356             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2357
2358             buildJsonMessage(message);
2359
2360
2361             return -7;
2362     }
2363     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2364         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2365     }
2366     return 0;
2367 }
2368
2369 /**
2370  * Send message to the CU that is not expecting for successful or unsuccessful results
2371  * @param messageBuffer
2372  * @param message
2373  * @param failedMsgId
2374  * @param sctpMap
2375  * @return
2376  */
2377 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2378                            ReportingMessages_t &message,
2379                            int failedMsgId,
2380                            Sctp_Map_t *sctpMap) {
2381     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2382         mdclog_write(MDCLOG_DEBUG, "send message: %d to %s address", message.message.messageType, message.message.enodbName);
2383     }
2384
2385     getRequestMetaData(message, messageBuffer);
2386     if (mdclog_level_get() >= MDCLOG_INFO) {
2387         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2388     }
2389
2390     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId);
2391     return rc;
2392 }
2393
2394 /**
2395  *
2396  * @param sctpMap
2397  * @param messageBuffer
2398  * @param message
2399  * @param failedMesgId
2400  * @return
2401  */
2402 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2403                     RmrMessagesBuffer_t &messageBuffer,
2404                     ReportingMessages_t &message,
2405                     int failedMesgId) {
2406     // get the FD
2407     message.message.messageType = messageBuffer.rcvMessage->mtype;
2408     auto rc = sendSctpMsg(message.peerInfo, message, sctpMap);
2409     return rc;
2410 }
2411
2412
2413 /**
2414  *
2415  * @param epoll_fd
2416  * @param peerInfo
2417  * @param events
2418  * @param sctpMap
2419  * @param enodbName
2420  * @param msgType
2421  * @return
2422  */
2423 int addToEpoll(int epoll_fd,
2424                ConnectedCU_t *peerInfo,
2425                uint32_t events,
2426                Sctp_Map_t *sctpMap,
2427                char *enodbName,
2428                int msgType) {
2429     // Add to Epol
2430     struct epoll_event event{};
2431     event.data.ptr = peerInfo;
2432     event.events = events;
2433     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2434         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2435             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here), %s, %s %d",
2436                          strerror(errno), __func__, __LINE__);
2437         }
2438         close(peerInfo->fileDescriptor);
2439         if (enodbName != nullptr) {
2440             cleanHashEntry(peerInfo, sctpMap);
2441             char key[MAX_ENODB_NAME_SIZE * 2];
2442             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2443             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2444                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2445             }
2446             auto tmp = sctpMap->find(key);
2447             if (tmp) {
2448                 free(tmp);
2449                 sctpMap->erase(key);
2450             }
2451         } else {
2452             peerInfo->enodbName[0] = 0;
2453         }
2454         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)");
2455         return -1;
2456     }
2457     return 0;
2458 }
2459
2460 /**
2461  *
2462  * @param epoll_fd
2463  * @param peerInfo
2464  * @param events
2465  * @param sctpMap
2466  * @param enodbName
2467  * @param msgType
2468  * @return
2469  */
2470 int modifyToEpoll(int epoll_fd,
2471                   ConnectedCU_t *peerInfo,
2472                   uint32_t events,
2473                   Sctp_Map_t *sctpMap,
2474                   char *enodbName,
2475                   int msgType) {
2476     // Add to Epol
2477     struct epoll_event event{};
2478     event.data.ptr = peerInfo;
2479     event.events = events;
2480     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2481         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2482             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may check not to quit here), %s, %s %d",
2483                          strerror(errno), __func__, __LINE__);
2484         }
2485         close(peerInfo->fileDescriptor);
2486         cleanHashEntry(peerInfo, sctpMap);
2487         char key[MAX_ENODB_NAME_SIZE * 2];
2488         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2489         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2490             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2491         }
2492         auto tmp = sctpMap->find(key);
2493         if (tmp) {
2494             free(tmp);
2495         }
2496         sctpMap->erase(key);
2497         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)");
2498         return -1;
2499     }
2500     return 0;
2501 }
2502
2503
2504 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message) {
2505     buildJsonMessage(message);
2506
2507     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2508
2509     if (rmrMessageBuffer.sendMessage == nullptr) {
2510         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2511         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2512         return -1;
2513     }
2514
2515     if (rmrMessageBuffer.sendMessage->state != 0) {
2516         char meid[RMR_MAX_MEID]{};
2517         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2518             usleep(5);
2519             rmrMessageBuffer.sendMessage->state = 0;
2520             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2521                          rmrMessageBuffer.sendMessage->mtype,
2522                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2523             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2524             if (rmrMessageBuffer.sendMessage == nullptr) {
2525                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2526                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2527                 return -1;
2528             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2529                 mdclog_write(MDCLOG_ERR,
2530                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2531                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2532                              rmrMessageBuffer.sendMessage->mtype,
2533                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2534                 auto rc = rmrMessageBuffer.sendMessage->state;
2535                 return rc;
2536             }
2537         } else {
2538             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2539                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2540                          rmrMessageBuffer.sendMessage->mtype,
2541                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2542             return rmrMessageBuffer.sendMessage->state;
2543         }
2544     }
2545     return 0;
2546 }
2547
2548 void buildJsonMessage(ReportingMessages_t &message) {
2549     if (jsonTrace) {
2550         message.outLen = sizeof(message.base64Data);
2551         base64::encode((const unsigned char *) message.message.asndata,
2552                        (const int) message.message.asnLength,
2553                        message.base64Data,
2554                        message.outLen);
2555         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2556             mdclog_write(MDCLOG_DEBUG, "Tracing: ASN length = %d, base64 message length = %d ",
2557                          (int) message.message.asnLength,
2558                          (int) message.outLen);
2559         }
2560
2561         snprintf(message.buffer, sizeof(message.buffer),
2562                  "{\"header\": {\"ts\": \"%ld.%09ld\","
2563                  "\"ranName\": \"%s\","
2564                  "\"messageType\": %d,"
2565                  "\"direction\": \"%c\"},"
2566                  "\"base64Length\": %d,"
2567                  "\"asnBase64\": \"%s\"}",
2568                  message.message.time.tv_sec,
2569                  message.message.time.tv_nsec,
2570                  message.message.enodbName,
2571                  message.message.messageType,
2572                  message.message.direction,
2573                  (int) message.outLen,
2574                  message.base64Data);
2575         static src::logger_mt &lg = my_logger::get();
2576
2577         BOOST_LOG(lg) << message.buffer;
2578     }
2579 }
2580
2581
2582 /**
2583  * take RMR error code to string
2584  * @param state
2585  * @return
2586  */
2587 string translateRmrErrorMessages(int state) {
2588     string str = {};
2589     switch (state) {
2590         case RMR_OK:
2591             str = "RMR_OK - state is good";
2592             break;
2593         case RMR_ERR_BADARG:
2594             str = "RMR_ERR_BADARG - argument passed to function was unusable";
2595             break;
2596         case RMR_ERR_NOENDPT:
2597             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2598             break;
2599         case RMR_ERR_EMPTY:
2600             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2601             break;
2602         case RMR_ERR_NOHDR:
2603             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2604             break;
2605         case RMR_ERR_SENDFAILED:
2606             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2607             break;
2608         case RMR_ERR_CALLFAILED:
2609             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2610             break;
2611         case RMR_ERR_NOWHOPEN:
2612             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2613             break;
2614         case RMR_ERR_WHID:
2615             str = "RMR_ERR_WHID - wormhole id was invalid";
2616             break;
2617         case RMR_ERR_OVERFLOW:
2618             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2619             break;
2620         case RMR_ERR_RETRY:
2621             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2622             break;
2623         case RMR_ERR_RCVFAILED:
2624             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2625             break;
2626         case RMR_ERR_TIMEOUT:
2627             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2628             break;
2629         case RMR_ERR_UNSET:
2630             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2631             break;
2632         case RMR_ERR_TRUNC:
2633             str = "RMR_ERR_TRUNC - received message likely truncated";
2634             break;
2635         case RMR_ERR_INITFAILED:
2636             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2637             break;
2638         case RMR_ERR_NOTSUPP:
2639             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2640             break;
2641         default:
2642             char buf[128]{};
2643             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2644             str = buf;
2645             break;
2646     }
2647     return str;
2648 }
2649
2650