Dynamic loglevel change for mdclogging
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include <3rdparty/oranE2/RANfunctions-List.h>
24 #include "sctpThread.h"
25 #include "BuildRunName.h"
26 #include <unistd.h>
27 //#include "3rdparty/oranE2SM/E2SM-gNB-NRT-RANfunction-Definition.h"
28 //#include "BuildXml.h"
29 //#include "pugixml/src/pugixml.hpp"
30
31 using namespace std;
32 //using namespace std::placeholders;
33 using namespace boost::filesystem;
34 using namespace prometheus;
35
36
37 //#ifdef __cplusplus
38 //extern "C"
39 //{
40 //#endif
41
42 // need to expose without the include of gcov
43 extern "C" void __gcov_flush(void);
44
45 static void catch_function(int signal) {
46     __gcov_flush();
47     exit(signal);
48 }
49
50
51 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
52
53 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
54 double cpuClock = 0.0;
55 bool jsonTrace = false;
56
57 void init_log() {
58     int log_change_monitor = 1;
59     mdclog_attr_t *attr;
60     mdclog_attr_init(&attr);
61     mdclog_attr_set_ident(attr, "E2Terminator");
62     mdclog_init(attr);
63     if(mdclog_format_initialize(log_change_monitor)!=0)
64         mdclog_write(MDCLOG_ERR, "Failed to intialize MDC log format !!!");
65     mdclog_attr_destroy(attr);
66 }
67 auto start_time = std::chrono::high_resolution_clock::now();
68 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
69
70 double age() {
71     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
72 }
73
74 double approx_CPU_MHz(unsigned sleepTime) {
75     using namespace std::chrono_literals;
76     uint32_t aux = 0;
77     uint64_t cycles_start = rdtscp(aux);
78     double time_start = age();
79     std::this_thread::sleep_for(sleepTime * 1ms);
80     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
81     double elapsed_time = age() - time_start;
82     return elapsed_cycles / elapsed_time;
83 }
84
85 //std::atomic<int64_t> rmrCounter{0};
86 std::atomic<int64_t> num_of_messages{0};
87 std::atomic<int64_t> num_of_XAPP_messages{0};
88 static long transactionCounter = 0;
89
90 int buildListeningPort(sctp_params_t &sctpParams) {
91     sctpParams.listenFD = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
92     if (sctpParams.listenFD <= 0) {
93         mdclog_write(MDCLOG_ERR, "Error Opening socket, %s", strerror(errno));
94         return -1;
95     }
96
97     struct sockaddr_in6 serverAddress {};
98     serverAddress.sin6_family = AF_INET6;
99     serverAddress.sin6_addr   = in6addr_any;
100     serverAddress.sin6_port = htons(sctpParams.sctpPort);
101     if (bind(sctpParams.listenFD, (SA *)&serverAddress, sizeof(serverAddress)) < 0 ) {
102         mdclog_write(MDCLOG_ERR, "Error binding port %d. %s", sctpParams.sctpPort, strerror(errno));
103         return -1;
104     }
105     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
106         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
107         return -1;
108     }
109     if (mdclog_level_get() >= MDCLOG_DEBUG) {
110         struct sockaddr_in6 clientAddress {};
111         socklen_t len = sizeof(clientAddress);
112         getsockname(sctpParams.listenFD, (SA *)&clientAddress, &len);
113         char buff[1024] {};
114         inet_ntop(AF_INET6, &clientAddress.sin6_addr, buff, sizeof(buff));
115         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(clientAddress.sin6_port));
116     }
117
118     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
119         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
120         return -1;
121     }
122     struct epoll_event event {};
123     event.events = EPOLLIN | EPOLLET;
124     event.data.fd = sctpParams.listenFD;
125
126     // add listening port to epoll
127     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
128         printf("Failed to add descriptor to epoll\n");
129         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
130         return -1;
131     }
132
133     return 0;
134 }
135
136 int buildConfiguration(sctp_params_t &sctpParams) {
137     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
138     if (exists(p)) {
139         const int size = 2048;
140         auto fileSize = file_size(p);
141         if (fileSize > size) {
142             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
143             return -1;
144         }
145     } else {
146         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
147         return -1;
148     }
149
150     ReadConfigFile conf;
151     if (conf.openConfigFile(p.string()) == -1) {
152         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
153                      p.string().c_str(), strerror(errno));
154         return -1;
155     }
156     int rmrPort = conf.getIntValue("nano");
157     if (rmrPort == -1) {
158         mdclog_write(MDCLOG_ERR, "illegal RMR port ");
159         return -1;
160     }
161     sctpParams.rmrPort = (uint16_t)rmrPort;
162     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
163     auto tmpStr = conf.getStringValue("volume");
164     if (tmpStr.length() == 0) {
165         mdclog_write(MDCLOG_ERR, "illegal volume.");
166         return -1;
167     }
168
169     char tmpLogFilespec[VOLUME_URL_SIZE];
170     tmpLogFilespec[0] = 0;
171     sctpParams.volume[0] = 0;
172     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
173     // copy the name to temp file as well
174     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
175
176
177     // define the file name in the tmp directory under the volume
178     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
179
180     sctpParams.myIP = conf.getStringValue("local-ip");
181     if (sctpParams.myIP.length() == 0) {
182         mdclog_write(MDCLOG_ERR, "illegal local-ip.");
183         return -1;
184     }
185
186     int sctpPort = conf.getIntValue("sctp-port");
187     if (sctpPort == -1) {
188         mdclog_write(MDCLOG_ERR, "illegal SCTP port ");
189         return -1;
190     }
191     sctpParams.sctpPort = (uint16_t)sctpPort;
192
193     sctpParams.fqdn = conf.getStringValue("external-fqdn");
194     if (sctpParams.fqdn.length() == 0) {
195         mdclog_write(MDCLOG_ERR, "illegal external-fqdn");
196         return -1;
197     }
198
199     std::string pod = conf.getStringValue("pod_name");
200     if (pod.length() == 0) {
201         mdclog_write(MDCLOG_ERR, "illegal pod_name in config file");
202         return -1;
203     }
204     auto *podName = getenv(pod.c_str());
205     if (podName == nullptr) {
206         mdclog_write(MDCLOG_ERR, "illegal pod_name or environment variable not exists : %s", pod.c_str());
207         return -1;
208
209     } else {
210         sctpParams.podName.assign(podName);
211         if (sctpParams.podName.length() == 0) {
212             mdclog_write(MDCLOG_ERR, "illegal pod_name");
213             return -1;
214         }
215     }
216
217     tmpStr = conf.getStringValue("trace");
218     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
219     if ((tmpStr.compare("start")) == 0) {
220         mdclog_write(MDCLOG_INFO, "Trace set to: start");
221         sctpParams.trace = true;
222     } else if ((tmpStr.compare("stop")) == 0) {
223         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
224         sctpParams.trace = false;
225     } else {
226         mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
227         sctpParams.trace = false;
228     }
229     jsonTrace = sctpParams.trace;
230
231     sctpParams.epollTimeOut = -1;
232
233     tmpStr = conf.getStringValue("prometheusPort");
234     if (tmpStr.length() != 0) {
235         sctpParams.prometheusPort = tmpStr;
236     }
237
238     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, KA_MESSAGE_SIZE, "{\"address\": \"%s:%d\","
239                                                                                     "\"fqdn\": \"%s\","
240                                                                                     "\"pod_name\": \"%s\"}",
241                                             (const char *)sctpParams.myIP.c_str(),
242                                             sctpParams.rmrPort,
243                                             sctpParams.fqdn.c_str(),
244                                             sctpParams.podName.c_str());
245
246     if (mdclog_level_get() >= MDCLOG_INFO) {
247         mdclog_write(MDCLOG_DEBUG,"RMR Port: %s", to_string(sctpParams.rmrPort).c_str());
248         mdclog_write(MDCLOG_DEBUG,"LogLevel: %s", to_string(sctpParams.logLevel).c_str());
249         mdclog_write(MDCLOG_DEBUG,"volume: %s", sctpParams.volume);
250         mdclog_write(MDCLOG_DEBUG,"tmpLogFilespec: %s", tmpLogFilespec);
251         mdclog_write(MDCLOG_DEBUG,"my ip: %s", sctpParams.myIP.c_str());
252         mdclog_write(MDCLOG_DEBUG,"pod name: %s", sctpParams.podName.c_str());
253
254         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
255     }
256
257     // Files written to the current working directory
258     boostLogger = logging::add_file_log(
259             keywords::file_name = tmpLogFilespec, // to temp directory
260             keywords::rotation_size = 10 * 1024 * 1024,
261             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
262             keywords::format = "%Message%"
263             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
264     );
265
266     // Setup a destination folder for collecting rotated (closed) files --since the same volume can use rename()
267     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
268             keywords::target = sctpParams.volume
269     ));
270
271     // Upon restart, scan the directory for files matching the file_name pattern
272     boostLogger->locked_backend()->scan_for_files();
273
274     // Enable auto-flushing after each tmpStr record written
275     if (mdclog_level_get() >= MDCLOG_DEBUG) {
276         boostLogger->locked_backend()->auto_flush(true);
277     }
278
279     return 0;
280 }
281
282 void startPrometheus(sctp_params_t &sctpParams) {
283     sctpParams.prometheusFamily = &BuildCounter()
284             .Name("E2T")
285             .Help("E2T message counter")
286             .Labels({{"POD_NAME", sctpParams.podName}})
287             .Register(*sctpParams.prometheusRegistry);
288
289     string prometheusPath = sctpParams.prometheusPort + "," + "[::]:" + sctpParams.prometheusPort;
290     if (mdclog_level_get() >= MDCLOG_DEBUG) {
291         mdclog_write(MDCLOG_DEBUG, "Start Prometheus Pull mode on %s", prometheusPath.c_str());
292     }
293     sctpParams.prometheusExposer = new Exposer(prometheusPath, 1);
294     sctpParams.prometheusExposer->RegisterCollectable(sctpParams.prometheusRegistry);
295 }
296 #ifndef UNIT_TEST
297
298 int main(const int argc, char **argv) {
299     sctp_params_t sctpParams;
300
301     {
302         std::random_device device{};
303         std::mt19937 generator(device());
304         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
305         transactionCounter = distribution(generator);
306     }
307
308 //    uint64_t st = 0;
309 //    uint32_t aux1 = 0;
310 //   st = rdtscp(aux1);
311
312     unsigned num_cpus = std::thread::hardware_concurrency();
313     init_log();
314     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
315         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
316         exit(1);
317     }
318     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
319         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
320         exit(1);
321     }
322     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
323         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
324         exit(1);
325     }
326
327     cpuClock = approx_CPU_MHz(100);
328
329     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
330
331     auto result = parse(argc, argv, sctpParams);
332
333     if (buildConfiguration(sctpParams) != 0) {
334         exit(-1);
335     }
336
337     //auto registry = std::make_shared<Registry>();
338     sctpParams.prometheusRegistry = std::make_shared<Registry>();
339
340     //sctpParams.prometheusFamily = new Family<Counter>("E2T", "E2T message counter", {{"E", sctpParams.podName}});
341
342     startPrometheus(sctpParams);
343
344     // start epoll
345     sctpParams.epoll_fd = epoll_create1(0);
346     if (sctpParams.epoll_fd == -1) {
347         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
348         exit(-1);
349     }
350
351     getRmrContext(sctpParams);
352     if (sctpParams.rmrCtx == nullptr) {
353         close(sctpParams.epoll_fd);
354         exit(-1);
355     }
356
357     if (buildInotify(sctpParams) == -1) {
358         close(sctpParams.rmrListenFd);
359         rmr_close(sctpParams.rmrCtx);
360         close(sctpParams.epoll_fd);
361         exit(-1);
362     }
363
364     if (buildListeningPort(sctpParams) != 0) {
365         close(sctpParams.rmrListenFd);
366         rmr_close(sctpParams.rmrCtx);
367         close(sctpParams.epoll_fd);
368         exit(-1);
369     }
370
371     sctpParams.sctpMap = new mapWrapper();
372
373     std::vector<std::thread> threads(num_cpus);
374 //    std::vector<std::thread> threads;
375
376     num_cpus = 3;
377     for (unsigned int i = 0; i < num_cpus; i++) {
378         threads[i] = std::thread(listener, &sctpParams);
379
380         cpu_set_t cpuset;
381         CPU_ZERO(&cpuset);
382         CPU_SET(i, &cpuset);
383         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
384         if (rc != 0) {
385             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
386         }
387     }
388
389
390     //loop over term_init until first message from xApp
391     handleTermInit(sctpParams);
392
393     for (auto &t : threads) {
394         t.join();
395     }
396
397     return 0;
398 }
399 #endif
400 void handleTermInit(sctp_params_t &sctpParams) {
401     sendTermInit(sctpParams);
402     //send to e2 manager init of e2 term
403     //E2_TERM_INIT
404
405     int count = 0;
406     while (true) {
407         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
408         if (xappMessages > 0) {
409             if (mdclog_level_get() >=  MDCLOG_INFO) {
410                 mdclog_write(MDCLOG_INFO, "Got a message from some application, stop sending E2_TERM_INIT");
411             }
412             return;
413         }
414         usleep(100000);
415         count++;
416         if (count % 1000 == 0) {
417             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
418             sendTermInit(sctpParams);
419         }
420     }
421 }
422
423 void sendTermInit(sctp_params_t &sctpParams) {
424     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
425     auto count = 0;
426     while (true) {
427         msg->mtype = E2_TERM_INIT;
428         msg->state = 0;
429         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
430         static unsigned char tx[32];
431         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
432         rmr_bytes2xact(msg, tx, txLen);
433         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
434         if (msg == nullptr) {
435             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
436         } else if (msg->state == 0) {
437             rmr_free_msg(msg);
438             if (mdclog_level_get() >=  MDCLOG_INFO) {
439                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT successfully sent ");
440             }
441             return;
442         } else {
443             if (count % 100 == 0) {
444                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %s ", translateRmrErrorMessages(msg->state).c_str());
445             }
446             sleep(1);
447         }
448         count++;
449     }
450 }
451
452 /**
453  *
454  * @param argc
455  * @param argv
456  * @param sctpParams
457  * @return
458  */
459 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
460     cxxopts::Options options(argv[0], "e2 term help");
461     options.positional_help("[optional args]").show_positional_help();
462     options.allow_unrecognised_options().add_options()
463             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
464             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
465             ("h,help", "Print help");
466
467     auto result = options.parse(argc, (const char **&)argv);
468
469     if (result.count("help")) {
470         std::cout << options.help({""}) << std::endl;
471         exit(0);
472     }
473     return result;
474 }
475
476 /**
477  *
478  * @param sctpParams
479  * @return -1 failed 0 success
480  */
481 int buildInotify(sctp_params_t &sctpParams) {
482     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
483     if (sctpParams.inotifyFD == -1) {
484         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
485         return -1;
486     }
487
488     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
489                                              (const char *)sctpParams.configFilePath.c_str(),
490                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
491     if (sctpParams.inotifyWD == -1) {
492         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
493                      sctpParams.configFilePath.c_str(),
494                      strerror(errno));
495         close(sctpParams.inotifyFD);
496         return -1;
497     }
498
499     struct epoll_event event{};
500     event.events = (EPOLLIN);
501     event.data.fd = sctpParams.inotifyFD;
502     // add listening RMR FD to epoll
503     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
504         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
505         close(sctpParams.inotifyFD);
506         return -1;
507     }
508     return 0;
509 }
510
511 /**
512  *
513  * @param args
514  * @return
515  */
516 void listener(sctp_params_t *params) {
517     int num_of_SCTP_messages = 0;
518     auto totalTime = 0.0;
519     std::thread::id this_id = std::this_thread::get_id();
520     //save cout
521     auto pod_name = std::getenv("POD_NAME");
522     auto container_name = std::getenv("CONTAINER_NAME");
523     auto service_name = std::getenv("SERVICE_NAME");
524     auto host_name = std::getenv("HOST_NAME");
525     auto system_name = std::getenv("SYSTEM_NAME");
526     auto pid = std::to_string(getpid()).c_str();
527     streambuf *oldCout = cout.rdbuf();
528     ostringstream memCout;
529     // create new cout
530     cout.rdbuf(memCout.rdbuf());
531     cout << this_id;
532     //return to the normal cout
533     cout.rdbuf(oldCout);
534
535     char tid[32];
536     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
537     tid[memCout.str().length()] = 0;
538     mdclog_mdc_add("SYSTEM_NAME", system_name);
539     mdclog_mdc_add("HOST_NAME", host_name);
540     mdclog_mdc_add("SERVICE_NAME", service_name);
541     mdclog_mdc_add("CONTAINER_NAME", container_name);
542     mdclog_mdc_add("POD_NAME", pod_name);
543     mdclog_mdc_add("PID", pid);
544
545     if (mdclog_level_get() >= MDCLOG_DEBUG) {
546         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
547     }
548
549     RmrMessagesBuffer_t rmrMessageBuffer{};
550     //create and init RMR
551     rmrMessageBuffer.rmrCtx = params->rmrCtx;
552
553     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
554     struct timespec end{0, 0};
555     struct timespec start{0, 0};
556
557     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
558     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
559
560     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
561     rmrMessageBuffer.ka_message_len = params->ka_message_length;
562     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
563
564     if (mdclog_level_get() >= MDCLOG_DEBUG) {
565         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
566     }
567
568     ReportingMessages_t message {};
569
570 //    for (int i = 0; i < MAX_RMR_BUFF_ARRAY; i++) {
571 //        rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
572 //        rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
573 //    }
574
575     while (true) {
576         if (mdclog_level_get() >= MDCLOG_DEBUG) {
577             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait. Timeout = %d", params->epollTimeOut);
578         }
579         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, params->epollTimeOut);
580         if (numOfEvents == 0) { // time out
581             if (mdclog_level_get() >= MDCLOG_DEBUG) {
582                 mdclog_write(MDCLOG_DEBUG, "got epoll timeout");
583             }
584             continue;
585         } else if (numOfEvents < 0) {
586             if (errno == EINTR) {
587                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
588                     mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
589                 }
590                 continue;
591             }
592             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
593             return;
594         }
595         for (auto i = 0; i < numOfEvents; i++) {
596             if (mdclog_level_get() >= MDCLOG_DEBUG) {
597                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
598             }
599             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
600             start.tv_sec = message.message.time.tv_sec;
601             start.tv_nsec = message.message.time.tv_nsec;
602
603
604             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
605                 handlepoll_error(events[i], message, rmrMessageBuffer, params);
606             } else if (events[i].events & EPOLLOUT) {
607                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params);
608             } else if (params->listenFD == events[i].data.fd) {
609                 if (mdclog_level_get() >= MDCLOG_INFO) {
610                     mdclog_write(MDCLOG_INFO, "New connection request from sctp network\n");
611                 }
612                 // new connection is requested from RAN  start build connection
613                 while (true) {
614                     struct sockaddr in_addr {};
615                     socklen_t in_len;
616                     char hostBuff[NI_MAXHOST];
617                     char portBuff[NI_MAXSERV];
618
619                     in_len = sizeof(in_addr);
620                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
621                     if(peerInfo == nullptr){
622                         mdclog_write(MDCLOG_ERR, "calloc failed");
623                         break;
624                     }
625                     peerInfo->sctpParams = params;
626                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
627                     if (peerInfo->fileDescriptor == -1) {
628                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
629                             /* We have processed all incoming connections. */
630                             break;
631                         } else {
632                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
633                             break;
634                         }
635                     }
636                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
637                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
638                         close(peerInfo->fileDescriptor);
639                         break;
640                     }
641                     auto  ans = getnameinfo(&in_addr, in_len,
642                                             peerInfo->hostName, NI_MAXHOST,
643                                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
644                     if (ans < 0) {
645                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
646                         close(peerInfo->fileDescriptor);
647                         break;
648                     }
649                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
650                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
651                     }
652                     peerInfo->isConnected = false;
653                     peerInfo->gotSetup = false;
654                     if (addToEpoll(params->epoll_fd,
655                                    peerInfo,
656                                    (EPOLLIN | EPOLLET),
657                                    params->sctpMap, nullptr,
658                                    0) != 0) {
659                         break;
660                     }
661                     break;
662                 }
663             } else if (params->rmrListenFd == events[i].data.fd) {
664                 // got message from XAPP
665                 //num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
666                 num_of_messages.fetch_add(1, std::memory_order_release);
667                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
668                     mdclog_write(MDCLOG_DEBUG, "new RMR message");
669                 }
670                 if (receiveXappMessages(params->sctpMap,
671                                         rmrMessageBuffer,
672                                         message.message.time) != 0) {
673                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
674                 }
675             } else if (params->inotifyFD == events[i].data.fd) {
676                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
677                 handleConfigChange(params);
678             } else {
679                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
680                  * We must read whatever data is available completely, as we are running
681                  *  in edge-triggered mode and won't get a notification again for the same data. */
682                 num_of_messages.fetch_add(1, std::memory_order_release);
683                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
684                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
685                 }
686                 receiveDataFromSctp(&events[i],
687                                     params->sctpMap,
688                                     num_of_SCTP_messages,
689                                     rmrMessageBuffer,
690                                     message.message.time);
691             }
692
693             clock_gettime(CLOCK_MONOTONIC, &end);
694             if (mdclog_level_get() >= MDCLOG_INFO) {
695                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
696                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
697             }
698             if (mdclog_level_get() >= MDCLOG_DEBUG) {
699                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
700                              end.tv_sec - start.tv_sec,
701                              end.tv_nsec - start.tv_nsec);
702             }
703         }
704     }
705 }
706
707 /**
708  *
709  * @param sctpParams
710  */
711 void handleConfigChange(sctp_params_t *sctpParams) {
712     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
713     const struct inotify_event *event;
714     char *ptr;
715
716     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
717     auto endlessLoop = true;
718     while (endlessLoop) {
719         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
720         if (len == -1) {
721             if (errno != EAGAIN) {
722                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
723                 endlessLoop = false;
724                 continue;
725             }
726             else {
727                 endlessLoop = false;
728                 continue;
729             }
730         }
731
732         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
733             event = (const struct inotify_event *)ptr;
734             if (event->mask & (uint32_t)IN_ISDIR) {
735                 continue;
736             }
737
738             // the directory name
739             if (sctpParams->inotifyWD == event->wd) {
740                 // not the directory
741             }
742             if (event->len) {
743                 auto  retVal = strcmp(sctpParams->configFileName.c_str(), event->name);
744                 if (retVal != 0) {
745                     continue;
746                 }
747             }
748             // only the file we want
749             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
750                 if (mdclog_level_get() >= MDCLOG_INFO) {
751                     mdclog_write(MDCLOG_INFO, "Configuration file changed");
752                 }
753                 if (exists(p)) {
754                     const int size = 2048;
755                     auto fileSize = file_size(p);
756                     if (fileSize > size) {
757                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
758                         return;
759                     }
760                 } else {
761                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
762                     return;
763                 }
764
765                 ReadConfigFile conf;
766                 if (conf.openConfigFile(p.string()) == -1) {
767                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
768                                  p.string().c_str(), strerror(errno));
769                     return;
770                 }
771                 auto tmpStr = conf.getStringValue("loglevel");
772                 if (tmpStr.length() == 0) {
773                     mdclog_write(MDCLOG_ERR, "illegal loglevel. Set loglevel to MDCLOG_INFO");
774                     tmpStr = "info";
775                 }
776                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
777
778                 if ((tmpStr.compare("debug")) == 0) {
779                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
780                     sctpParams->logLevel = MDCLOG_DEBUG;
781                 } else if ((tmpStr.compare("info")) == 0) {
782                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
783                     sctpParams->logLevel = MDCLOG_INFO;
784                 } else if ((tmpStr.compare("warning")) == 0) {
785                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
786                     sctpParams->logLevel = MDCLOG_WARN;
787                 } else if ((tmpStr.compare("error")) == 0) {
788                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
789                     sctpParams->logLevel = MDCLOG_ERR;
790                 } else {
791                     mdclog_write(MDCLOG_ERR, "illegal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
792                     sctpParams->logLevel = MDCLOG_INFO;
793                 }
794                 mdclog_level_set(sctpParams->logLevel);
795                 tmpStr = conf.getStringValue("trace");
796                 if (tmpStr.length() == 0) {
797                     mdclog_write(MDCLOG_ERR, "illegal trace. Set trace to stop");
798                     tmpStr = "stop";
799                 }
800
801                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
802                 if ((tmpStr.compare("start")) == 0) {
803                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
804                     sctpParams->trace = true;
805                 } else if ((tmpStr.compare("stop")) == 0) {
806                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
807                     sctpParams->trace = false;
808                 } else {
809                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
810                     sctpParams->trace = false;
811                 }
812                 jsonTrace = sctpParams->trace;
813
814
815                 endlessLoop = false;
816             }
817         }
818     }
819 }
820
821 /**
822  *
823  * @param event
824  * @param message
825  * @param rmrMessageBuffer
826  * @param params
827  */
828 void handleEinprogressMessages(struct epoll_event &event,
829                                ReportingMessages_t &message,
830                                RmrMessagesBuffer_t &rmrMessageBuffer,
831                                sctp_params_t *params) {
832     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
833     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
834
835     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
836     auto retVal = 0;
837     socklen_t retValLen = 0;
838     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
839     if (rc != 0 || retVal != 0) {
840 #ifndef UNIT_TEST
841         if (rc != 0) {
842             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
843                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
844                                                          peerInfo->enodbName, strerror(errno));
845         } else if (retVal != 0) {
846             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
847                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
848                                                          peerInfo->enodbName);
849         }
850
851         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
852         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
853         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
854         message.message.direction = 'N';
855         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
856             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
857         }
858 #endif
859         memset(peerInfo->asnData, 0, peerInfo->asnLength);
860         peerInfo->asnLength = 0;
861         peerInfo->mtype = 0;
862         return;
863     }
864
865     peerInfo->isConnected = true;
866
867     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
868                       peerInfo->mtype) != 0) {
869         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
870         return;
871     }
872
873     message.message.asndata = (unsigned char *)peerInfo->asnData;
874     message.message.asnLength = peerInfo->asnLength;
875     message.message.messageType = peerInfo->mtype;
876     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
877     num_of_messages.fetch_add(1, std::memory_order_release);
878     if (mdclog_level_get() >= MDCLOG_DEBUG) {
879         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
880                      message.message.enodbName);
881     }
882     if (sendSctpMsg(peerInfo, message, params->sctpMap) != 0) {
883         if (mdclog_level_get() >= MDCLOG_DEBUG) {
884             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
885         }
886         return;
887     }
888
889     memset(peerInfo->asnData, 0, peerInfo->asnLength);
890     peerInfo->asnLength = 0;
891     peerInfo->mtype = 0;
892 }
893
894
895 void handlepoll_error(struct epoll_event &event,
896                       ReportingMessages_t &message,
897                       RmrMessagesBuffer_t &rmrMessageBuffer,
898                       sctp_params_t *params) {
899     if (event.data.fd != params->rmrListenFd) {
900         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
901         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
902                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
903 #ifndef UNIT_TEST
904
905         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
906                                                      "%s|Failed SCTP Connection",
907                                                      peerInfo->enodbName);
908         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
909         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
910
911         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
912         message.message.direction = 'N';
913         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
914             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
915         }
916 #endif
917         close(peerInfo->fileDescriptor);
918         params->sctpMap->erase(peerInfo->enodbName);
919         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
920     } else {
921         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
922     }
923 }
924 /**
925  *
926  * @param socket
927  * @return
928  */
929 int setSocketNoBlocking(int socket) {
930     auto flags = fcntl(socket, F_GETFL, 0);
931
932     if (flags == -1) {
933         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
934         return -1;
935     }
936
937     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
938     if (fcntl(socket, F_SETFL, flags) == -1) {
939         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
940         return -1;
941     }
942
943     return 0;
944 }
945
946 /**
947  *
948  * @param val
949  * @param m
950  */
951 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
952     char *dummy;
953     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
954     char searchBuff[2048]{};
955
956     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
957     m->erase(searchBuff);
958
959     m->erase(val->enodbName);
960 #ifndef UNIT_TEST
961     free(val);
962 #endif
963 }
964
965 /**
966  *
967  * @param fd file descriptor
968  * @param data the asn data to send
969  * @param len  length of the data
970  * @param enodbName the enodbName as in the map for printing purpose
971  * @param m map host information
972  * @param mtype message number
973  * @return 0 success, a negative number on fail
974  */
975 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
976     auto loglevel = mdclog_level_get();
977     int fd = peerInfo->fileDescriptor;
978     if (loglevel >= MDCLOG_DEBUG) {
979         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
980                      message.message.enodbName, __FUNCTION__);
981     }
982
983     while (true) {
984         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
985             if (errno == EINTR) {
986                 continue;
987             }
988             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
989             if (!peerInfo->isConnected) {
990                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
991                 return -1;
992             }
993             cleanHashEntry(peerInfo, m);
994             close(fd);
995             char key[MAX_ENODB_NAME_SIZE * 2];
996             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
997                      message.message.messageType);
998             if (loglevel >= MDCLOG_DEBUG) {
999                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
1000             }
1001             auto tmp = m->find(key);
1002             if (tmp) {
1003                 free(tmp);
1004             }
1005             m->erase(key);
1006             return -1;
1007         }
1008         message.message.direction = 'D';
1009         // send report.buffer of size
1010         buildJsonMessage(message);
1011
1012         if (loglevel >= MDCLOG_DEBUG) {
1013             mdclog_write(MDCLOG_DEBUG,
1014                          "SCTP message for CU %s sent from %s",
1015                          message.message.enodbName,
1016                          __FUNCTION__);
1017         }
1018         return 0;
1019     }
1020 }
1021
1022 /**
1023  *
1024  * @param message
1025  * @param rmrMessageBuffer
1026  */
1027 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1028     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1029     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1030
1031     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1032         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
1033                      message.message.enodbName, (unsigned long) message.message.asnLength);
1034     }
1035 }
1036
1037
1038
1039 /**
1040  *
1041  * @param events
1042  * @param sctpMap
1043  * @param numOfMessages
1044  * @param rmrMessageBuffer
1045  * @param ts
1046  * @return
1047  */
1048 int receiveDataFromSctp(struct epoll_event *events,
1049                         Sctp_Map_t *sctpMap,
1050                         int &numOfMessages,
1051                         RmrMessagesBuffer_t &rmrMessageBuffer,
1052                         struct timespec &ts) {
1053     /* We have data on the fd waiting to be read. Read and display it.
1054  * We must read whatever data is available completely, as we are running
1055  *  in edge-triggered mode and won't get a notification again for the same data. */
1056     ReportingMessages_t message {};
1057     auto done = 0;
1058     auto loglevel = mdclog_level_get();
1059
1060     // get the identity of the interface
1061     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1062
1063     struct timespec start{0, 0};
1064     struct timespec decodeStart{0, 0};
1065     struct timespec end{0, 0};
1066
1067     E2AP_PDU_t *pdu = nullptr;
1068
1069     while (true) {
1070         if (loglevel >= MDCLOG_DEBUG) {
1071             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1072             clock_gettime(CLOCK_MONOTONIC, &start);
1073         }
1074         // read the buffer directly to rmr payload
1075         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1076         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1077                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1078
1079         if (loglevel >= MDCLOG_DEBUG) {
1080             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1081                          message.peerInfo->fileDescriptor, message.message.asnLength);
1082         }
1083
1084         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1085         message.message.direction = 'U';
1086         message.message.time.tv_nsec = ts.tv_nsec;
1087         message.message.time.tv_sec = ts.tv_sec;
1088
1089         if (message.message.asnLength < 0) {
1090             if (errno == EINTR) {
1091                 continue;
1092             }
1093             /* If errno == EAGAIN, that means we have read all
1094                data. So goReportingMessages_t back to the main loop. */
1095             if (errno != EAGAIN) {
1096                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1097                 done = 1;
1098             } else if (loglevel >= MDCLOG_DEBUG) {
1099                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1100             }
1101             break;
1102         } else if (message.message.asnLength == 0) {
1103             /* End of file. The remote has closed the connection. */
1104             if (loglevel >= MDCLOG_INFO) {
1105                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1106                              message.peerInfo->fileDescriptor);
1107             }
1108             done = 1;
1109             break;
1110         }
1111
1112         if (loglevel >= MDCLOG_DEBUG) {
1113             char printBuffer[RECEIVE_SCTP_BUFFER_SIZE]{};
1114             char *tmp = printBuffer;
1115             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1116                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1117                 tmp += 2;
1118             }
1119             printBuffer[message.message.asnLength] = 0;
1120             clock_gettime(CLOCK_MONOTONIC, &end);
1121             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1122                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1123             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1124                          printBuffer);
1125             clock_gettime(CLOCK_MONOTONIC, &decodeStart);
1126         }
1127
1128         auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1129                           message.message.asndata, message.message.asnLength);
1130         if (rval.code != RC_OK) {
1131             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1132                          message.peerInfo->enodbName);
1133             break;
1134         }
1135
1136         if (loglevel >= MDCLOG_DEBUG) {
1137             clock_gettime(CLOCK_MONOTONIC, &end);
1138             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1139                          message.peerInfo->enodbName, end.tv_sec - decodeStart.tv_sec, end.tv_nsec - decodeStart.tv_nsec);
1140             char *printBuffer;
1141             size_t size;
1142             FILE *stream = open_memstream(&printBuffer, &size);
1143             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1144             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1145             clock_gettime(CLOCK_MONOTONIC, &decodeStart);
1146
1147             fclose(stream);
1148             free(printBuffer);
1149         }
1150
1151         switch (pdu->present) {
1152             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1153                 asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer);
1154                 break;
1155             }
1156             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1157                 asnSuccessfulMsg(pdu, sctpMap, message, rmrMessageBuffer);
1158                 break;
1159             }
1160             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1161                 asnUnSuccsesfulMsg(pdu, sctpMap, message, rmrMessageBuffer);
1162                 break;
1163             }
1164             default:
1165                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1166                 break;
1167         }
1168         if (loglevel >= MDCLOG_DEBUG) {
1169             clock_gettime(CLOCK_MONOTONIC, &end);
1170             mdclog_write(MDCLOG_DEBUG,
1171                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1172                          message.peerInfo->enodbName, end.tv_sec - decodeStart.tv_sec, end.tv_nsec - decodeStart.tv_nsec);
1173         }
1174         numOfMessages++;
1175         if (pdu != nullptr) {
1176             ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1177             //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1178             //pdu = nullptr;
1179         }
1180     }
1181
1182     if (done) {
1183         if (loglevel >= MDCLOG_INFO) {
1184             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1185         }
1186         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1187                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1188                          256,
1189                          "%s|CU disconnected unexpectedly",
1190                          message.peerInfo->enodbName);
1191         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1192
1193         if (sendRequestToXapp(message,
1194                               RIC_SCTP_CONNECTION_FAILURE,
1195                               rmrMessageBuffer) != 0) {
1196             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1197         }
1198
1199         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1200         close(message.peerInfo->fileDescriptor);
1201         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
1202     }
1203     if (loglevel >= MDCLOG_DEBUG) {
1204         clock_gettime(CLOCK_MONOTONIC, &end);
1205         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1206                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1207
1208     }
1209     return 0;
1210 }
1211
1212 static void buildAndSendSetupRequest(ReportingMessages_t &message,
1213                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1214                                      E2AP_PDU_t *pdu/*,
1215                                      string const &messageName,
1216                                      string const &ieName,
1217                                      vector<string> &functionsToAdd_v,
1218                                      vector<string> &functionsToModified_v*/) {
1219     auto logLevel = mdclog_level_get();
1220     // now we can send the data to e2Mgr
1221
1222     asn_enc_rval_t er;
1223     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1224     unsigned char *buffer = nullptr;
1225     buffer = (unsigned char *) calloc(buffer_size, sizeof(unsigned char));
1226     if(!buffer)
1227     {
1228         mdclog_write(MDCLOG_ERR, "Allocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1229         return;
1230     }
1231     while (true) {
1232         er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size);
1233         if (er.encoded == -1) {
1234             mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1235             return;
1236         } else if (er.encoded > (ssize_t) buffer_size) {
1237             buffer_size = er.encoded + 128;
1238             mdclog_write(MDCLOG_WARN, "Buffer of size %d is to small for %s. Reallocate buffer of size %d",
1239                          (int) buffer_size,
1240                          asn_DEF_E2AP_PDU.name, buffer_size);
1241             buffer_size = er.encoded + 128;
1242
1243             unsigned char *newBuffer = nullptr;
1244             newBuffer = (unsigned char *) realloc(buffer, buffer_size);
1245             if (!newBuffer)
1246             {
1247                 // out of memory
1248                 mdclog_write(MDCLOG_ERR, "Reallocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1249                 free(buffer);
1250                 return;
1251             }
1252             buffer = newBuffer;
1253             continue;
1254         }
1255         buffer[er.encoded] = '\0';
1256         break;
1257     }
1258     // encode to xml
1259
1260     string res((char *)buffer);
1261     res.erase(std::remove(res.begin(), res.end(), '\n'), res.end());
1262     res.erase(std::remove(res.begin(), res.end(), '\t'), res.end());
1263     res.erase(std::remove(res.begin(), res.end(), ' '), res.end());
1264
1265 //    string res {};
1266 //    if (!functionsToAdd_v.empty() || !functionsToModified_v.empty()) {
1267 //        res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded);
1268 //    }
1269     rmr_mbuf_t *rmrMsg;
1270 //    if (res.length() == 0) {
1271 //        rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size + 256);
1272 //        rmrMsg->len = snprintf((char *) rmrMsg->payload, RECEIVE_SCTP_BUFFER_SIZE * 2, "%s:%d|%s",
1273 //                               message.peerInfo->sctpParams->myIP.c_str(),
1274 //                               message.peerInfo->sctpParams->rmrPort,
1275 //                               buffer);
1276 //    } else {
1277         rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, (int)res.length() + 256);
1278         rmrMsg->len = snprintf((char *) rmrMsg->payload, res.length() + 256, "%s:%d|%s",
1279                                message.peerInfo->sctpParams->myIP.c_str(),
1280                                message.peerInfo->sctpParams->rmrPort,
1281                                res.c_str());
1282 //    }
1283
1284     if (logLevel >= MDCLOG_DEBUG) {
1285         mdclog_write(MDCLOG_DEBUG, "Setup request of size %d :\n %s\n", rmrMsg->len, rmrMsg->payload);
1286     }
1287     // send to RMR
1288     rmrMsg->mtype = message.message.messageType;
1289     rmrMsg->state = 0;
1290     rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1291
1292     static unsigned char tx[32];
1293     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1294     rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1295
1296     rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1297     if (rmrMsg == nullptr) {
1298         mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1299     } else if (rmrMsg->state != 0) {
1300         char meid[RMR_MAX_MEID]{};
1301         if (rmrMsg->state == RMR_ERR_RETRY) {
1302             usleep(5);
1303             rmrMsg->state = 0;
1304             mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1305                          rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1306             rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1307             if (rmrMsg == nullptr) {
1308                 mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1309             } else if (rmrMsg->state != 0) {
1310                 mdclog_write(MDCLOG_ERR,
1311                              "RMR Retry failed %s sending request %d to Xapp from %s",
1312                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1313                              rmrMsg->mtype,
1314                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1315             }
1316         } else {
1317             mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1318                          translateRmrErrorMessages(rmrMsg->state).c_str(),
1319                          rmrMsg->mtype,
1320                          rmr_get_meid(rmrMsg, (unsigned char *) meid));
1321         }
1322     }
1323     message.peerInfo->gotSetup = true;
1324     buildJsonMessage(message);
1325
1326     if (rmrMsg != nullptr) {
1327         rmr_free_msg(rmrMsg);
1328     }
1329     free(buffer);
1330
1331     return;
1332 }
1333
1334 #if 0
1335 int RAN_Function_list_To_Vector(RANfunctions_List_t& list, vector <string> &runFunXML_v) {
1336     auto index = 0;
1337     runFunXML_v.clear();
1338     for (auto j = 0; j < list.list.count; j++) {
1339         auto *raNfunctionItemIEs = (RANfunction_ItemIEs_t *)list.list.array[j];
1340         if (raNfunctionItemIEs->id == ProtocolIE_ID_id_RANfunction_Item &&
1341             (raNfunctionItemIEs->value.present == RANfunction_ItemIEs__value_PR_RANfunction_Item)) {
1342             // encode to xml
1343             E2SM_gNB_NRT_RANfunction_Definition_t *ranFunDef = nullptr;
1344             auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER,
1345                                    &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1346                                    (void **)&ranFunDef,
1347                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.buf,
1348                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.size);
1349             if (rval.code != RC_OK) {
1350                 mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2SM message from : %s",
1351                              rval.code,
1352                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name);
1353                 return -1;
1354             }
1355
1356             auto xml_buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1357             unsigned char xml_buffer[RECEIVE_SCTP_BUFFER_SIZE * 2];
1358             memset(xml_buffer, 0, RECEIVE_SCTP_BUFFER_SIZE * 2);
1359             // encode to xml
1360             auto er = asn_encode_to_buffer(nullptr,
1361                                            ATS_BASIC_XER,
1362                                            &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1363                                            ranFunDef,
1364                                            xml_buffer,
1365                                            xml_buffer_size);
1366             if (er.encoded == -1) {
1367                 mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s",
1368                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1369                              strerror(errno));
1370             } else if (er.encoded > (ssize_t)xml_buffer_size) {
1371                 mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1372                              (int) xml_buffer_size,
1373                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name, __func__, __LINE__);
1374             } else {
1375                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1376                     mdclog_write(MDCLOG_DEBUG, "Encoding E2SM %s PDU number %d : %s",
1377                                  asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1378                                  index++,
1379                                  xml_buffer);
1380                 }
1381
1382                 string runFuncs = (char *)(xml_buffer);
1383                 runFunXML_v.emplace_back(runFuncs);
1384             }
1385         }
1386     }
1387     return 0;
1388 }
1389
1390 int collectServiceUpdate_RequestData(E2AP_PDU_t *pdu,
1391                                      Sctp_Map_t *sctpMap,
1392                                      ReportingMessages_t &message,
1393                                      vector <string> &RANfunctionsAdded_v,
1394                                      vector <string> &RANfunctionsModified_v) {
1395     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1396     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.count; i++) {
1397         auto *ie = pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.array[i];
1398         if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1399             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctionsID_List) {
1400                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1401                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1402                                  ie->value.choice.RANfunctions_List.list.count);
1403                 }
1404                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1405                     return -1;
1406                 }
1407             }
1408         } else if (ie->id == ProtocolIE_ID_id_RANfunctionsModified) {
1409             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctions_List) {
1410                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1411                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1412                                  ie->value.choice.RANfunctions_List.list.count);
1413                 }
1414                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsModified_v) != 0 ) {
1415                     return -1;
1416                 }
1417             }
1418         }
1419     }
1420     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1421         mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1422                      RANfunctionsAdded_v.size());
1423     }
1424     return 0;
1425 }
1426
1427 #endif
1428
1429
1430 void buildPrometheusList(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFamily) {
1431     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}});
1432     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}});
1433
1434     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Messages"}});
1435     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Bytes"}});
1436
1437     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICindication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Messages"}});
1438     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICindication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Bytes"}});
1439
1440     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Messages"}});
1441     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Bytes"}});
1442
1443     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Messages"}});
1444     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Bytes"}});
1445     // ---------------------------------------------
1446     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Messages"}});
1447     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Bytes"}});
1448
1449     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Messages"}});
1450     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Bytes"}});
1451
1452     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Messages"}});
1453     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Bytes"}});
1454
1455     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Messages"}});
1456     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Bytes"}});
1457     //-------------------------------------------------------------
1458
1459     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Messages"}});
1460     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Bytes"}});
1461
1462     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Messages"}});
1463     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Bytes"}});
1464
1465     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Messages"}});
1466     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Bytes"}});
1467
1468     //====================================================================================
1469     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Messages"}});
1470     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Bytes"}});
1471
1472     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Messages"}});
1473     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Bytes"}});
1474
1475     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Messages"}});
1476     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Bytes"}});
1477
1478     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceQuery)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Messages"}});
1479     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceQuery)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Bytes"}});
1480
1481     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Messages"}});
1482     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Bytes"}});
1483
1484     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Messages"}});
1485     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Bytes"}});
1486     //---------------------------------------------------------------------------------------------------------
1487     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Messages"}});
1488     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Bytes"}});
1489
1490     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Messages"}});
1491     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Bytes"}});
1492
1493     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Messages"}});
1494     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Bytes"}});
1495     //----------------------------------------------------------------------------------------------------------------
1496     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Messages"}});
1497     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Bytes"}});
1498
1499     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Messages"}});
1500     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Bytes"}});
1501 }
1502 /**
1503  *
1504  * @param pdu
1505  * @param sctpMap
1506  * @param message
1507  * @param RANfunctionsAdded_v
1508  * @return
1509  */
1510 int collectSetupRequestData(E2AP_PDU_t *pdu,
1511                                      Sctp_Map_t *sctpMap,
1512                                      ReportingMessages_t &message /*, vector <string> &RANfunctionsAdded_v*/) {
1513     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1514     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1515         auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1516         if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1517             // get the ran name for meid
1518             if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1519                 if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1520                     mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1521                     // no message will be sent
1522                     return -1;
1523                 }
1524
1525                 memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1526                 sctpMap->setkey(message.message.enodbName, message.peerInfo);
1527             }
1528         } /*else if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1529             if (ie->value.present == E2setupRequestIEs__value_PR_RANfunctions_List) {
1530                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1531                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1532                                  ie->value.choice.RANfunctions_List.list.count);
1533                 }
1534                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1535                     return -1;
1536                 }
1537             }
1538         } */
1539     }
1540 //    if (mdclog_level_get() >= MDCLOG_DEBUG) {
1541 //        mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1542 //                     RANfunctionsAdded_v.size());
1543 //    }
1544     return 0;
1545 }
1546
1547 int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1548     E2AP_PDU_t *pdu = nullptr;
1549
1550     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1551         mdclog_write(MDCLOG_DEBUG, "got PER message of size %d is:%s",
1552                      rmrMessageBuffer.sendMessage->len, rmrMessageBuffer.sendMessage->payload);
1553     }
1554     auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1555                            rmrMessageBuffer.sendMessage->payload, rmrMessageBuffer.sendMessage->len);
1556     if (rval.code != RC_OK) {
1557         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
1558                      rval.code,
1559                      message.message.enodbName);
1560         return -1;
1561     }
1562
1563     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
1564     auto er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu,
1565                                    rmrMessageBuffer.sendMessage->payload, buff_size);
1566     if (er.encoded == -1) {
1567         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1568         return -1;
1569     } else if (er.encoded > (ssize_t)buff_size) {
1570         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1571                      (int)rmrMessageBuffer.sendMessage->len,
1572                      asn_DEF_E2AP_PDU.name,
1573                      __func__,
1574                      __LINE__);
1575         return -1;
1576     }
1577     rmrMessageBuffer.sendMessage->len = er.encoded;
1578     return 0;
1579
1580 }
1581
1582 /**
1583  *
1584  * @param pdu
1585  * @param message
1586  * @param rmrMessageBuffer
1587  */
1588 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1589                           Sctp_Map_t *sctpMap,
1590                           ReportingMessages_t &message,
1591                           RmrMessagesBuffer_t &rmrMessageBuffer) {
1592     auto logLevel = mdclog_level_get();
1593     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1594     if (logLevel >= MDCLOG_DEBUG) {
1595         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1596     }
1597     switch (procedureCode) {
1598         case ProcedureCode_id_E2setup: {
1599             if (logLevel >= MDCLOG_DEBUG) {
1600                 mdclog_write(MDCLOG_DEBUG, "Got E2setup");
1601             }
1602
1603 //            vector <string> RANfunctionsAdded_v;
1604 //            vector <string> RANfunctionsModified_v;
1605 //            RANfunctionsAdded_v.clear();
1606 //            RANfunctionsModified_v.clear();
1607             if (collectSetupRequestData(pdu, sctpMap, message) != 0) {
1608                 break;
1609             }
1610
1611             buildPrometheusList(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily);
1612
1613             string messageName("E2setupRequest");
1614             string ieName("E2setupRequestIEs");
1615             message.message.messageType = RIC_E2_SETUP_REQ;
1616             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
1617             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment((double)message.message.asnLength);
1618             buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
1619             break;
1620         }
1621         case ProcedureCode_id_RICserviceUpdate: {
1622             if (logLevel >= MDCLOG_DEBUG) {
1623                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1624             }
1625 //            vector <string> RANfunctionsAdded_v;
1626 //            vector <string> RANfunctionsModified_v;
1627 //            RANfunctionsAdded_v.clear();
1628 //            RANfunctionsModified_v.clear();
1629 //            if (collectServiceUpdate_RequestData(pdu, sctpMap, message,
1630 //                                                 RANfunctionsAdded_v, RANfunctionsModified_v) != 0) {
1631 //                break;
1632 //            }
1633
1634             string messageName("RICserviceUpdate");
1635             string ieName("RICserviceUpdateIEs");
1636             message.message.messageType = RIC_SERVICE_UPDATE;
1637             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
1638             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment((double)message.message.asnLength);
1639
1640             buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
1641             break;
1642         }
1643         case ProcedureCode_id_ErrorIndication: {
1644             if (logLevel >= MDCLOG_DEBUG) {
1645                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1646             }
1647             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
1648             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment((double)message.message.asnLength);
1649             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1650                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1651             }
1652             break;
1653         }
1654         case ProcedureCode_id_Reset: {
1655             if (logLevel >= MDCLOG_DEBUG) {
1656                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1657             }
1658
1659             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
1660             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
1661             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
1662                 break;
1663             }
1664
1665             if (sendRequestToXapp(message, RIC_E2_RESET_REQ, rmrMessageBuffer) != 0) {
1666                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_REQ message failed to send to xAPP");
1667             }
1668             break;
1669         }
1670         case ProcedureCode_id_RICindication: {
1671             if (logLevel >= MDCLOG_DEBUG) {
1672                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1673             }
1674             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1675                 auto messageSent = false;
1676                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1677                 if (logLevel >= MDCLOG_DEBUG) {
1678                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1679                 }
1680                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1681                     if (logLevel >= MDCLOG_DEBUG) {
1682                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1683                     }
1684                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1685                         static unsigned char tx[32];
1686                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1687                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1688                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1689                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1690                                        (unsigned char *)message.message.enodbName,
1691                                        strlen(message.message.enodbName));
1692                         rmrMessageBuffer.sendMessage->state = 0;
1693                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1694
1695                         //ie->value.choice.RICrequestID.ricInstanceID;
1696                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1697                             mdclog_write(MDCLOG_DEBUG, "sub id = %d, mtype = %d, ric instance id %ld, requestor id = %ld",
1698                                          rmrMessageBuffer.sendMessage->sub_id,
1699                                          rmrMessageBuffer.sendMessage->mtype,
1700                                          ie->value.choice.RICrequestID.ricInstanceID,
1701                                          ie->value.choice.RICrequestID.ricRequestorID);
1702                         }
1703                         message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication]->Increment();
1704                         message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication]->Increment((double)message.message.asnLength);
1705                         sendRmrMessage(rmrMessageBuffer, message);
1706                         messageSent = true;
1707                     } else {
1708                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
1709                     }
1710                 }
1711                 if (messageSent) {
1712                     break;
1713                 }
1714             }
1715             break;
1716         }
1717         default: {
1718             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1719             message.message.messageType = 0; // no RMR message type yet
1720
1721             buildJsonMessage(message);
1722
1723             break;
1724         }
1725     }
1726 }
1727
1728 /**
1729  *
1730  * @param pdu
1731  * @param message
1732  * @param rmrMessageBuffer
1733  */
1734 void asnSuccessfulMsg(E2AP_PDU_t *pdu,
1735                       Sctp_Map_t *sctpMap,
1736                       ReportingMessages_t &message,
1737                       RmrMessagesBuffer_t &rmrMessageBuffer) {
1738     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1739     auto logLevel = mdclog_level_get();
1740     if (logLevel >= MDCLOG_INFO) {
1741         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1742     }
1743     switch (procedureCode) {
1744         case ProcedureCode_id_Reset: {
1745             if (logLevel >= MDCLOG_DEBUG) {
1746                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1747             }
1748             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
1749             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
1750             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
1751                 break;
1752             }
1753             if (sendRequestToXapp(message, RIC_E2_RESET_RESP, rmrMessageBuffer) != 0) {
1754                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_RESP message failed to send to xAPP");
1755             }
1756             break;
1757         }
1758         case ProcedureCode_id_RICcontrol: {
1759             if (logLevel >= MDCLOG_DEBUG) {
1760                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1761             }
1762             for (auto i = 0;
1763                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1764                 auto messageSent = false;
1765                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1766                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1767                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1768                 }
1769                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1770                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1771                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1772                     }
1773                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1774                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1775                         rmrMessageBuffer.sendMessage->state = 0;
1776 //                        rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1777                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1778
1779                         static unsigned char tx[32];
1780                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1781                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1782                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1783                                        (unsigned char *)message.message.enodbName,
1784                                        strlen(message.message.enodbName));
1785
1786                         message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
1787                         message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
1788                         sendRmrMessage(rmrMessageBuffer, message);
1789                         messageSent = true;
1790                     } else {
1791                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
1792                     }
1793                 }
1794                 if (messageSent) {
1795                     break;
1796                 }
1797             }
1798
1799             break;
1800         }
1801         case ProcedureCode_id_RICsubscription: {
1802             if (logLevel >= MDCLOG_DEBUG) {
1803                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1804             }
1805             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
1806             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
1807             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
1808                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1809             }
1810             break;
1811         }
1812         case ProcedureCode_id_RICsubscriptionDelete: {
1813             if (logLevel >= MDCLOG_DEBUG) {
1814                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1815             }
1816             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
1817             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
1818             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
1819                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1820             }
1821             break;
1822         }
1823         default: {
1824             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1825             message.message.messageType = 0; // no RMR message type yet
1826             buildJsonMessage(message);
1827
1828             break;
1829         }
1830     }
1831 }
1832
1833 /**
1834  *
1835  * @param pdu
1836  * @param message
1837  * @param rmrMessageBuffer
1838  */
1839 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1840                         Sctp_Map_t *sctpMap,
1841                         ReportingMessages_t &message,
1842                         RmrMessagesBuffer_t &rmrMessageBuffer) {
1843     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1844     auto logLevel = mdclog_level_get();
1845     if (logLevel >= MDCLOG_INFO) {
1846         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1847     }
1848     switch (procedureCode) {
1849         case ProcedureCode_id_RICcontrol: {
1850             if (logLevel >= MDCLOG_DEBUG) {
1851                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1852             }
1853             for (int i = 0;
1854                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1855                 auto messageSent = false;
1856                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1857                 if (logLevel >= MDCLOG_DEBUG) {
1858                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1859                 }
1860                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1861                     if (logLevel >= MDCLOG_DEBUG) {
1862                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1863                     }
1864                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1865                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1866                         rmrMessageBuffer.sendMessage->state = 0;
1867 //                        rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricRequestorID;
1868                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1869                         static unsigned char tx[32];
1870                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1871                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1872                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
1873                                        strlen(message.message.enodbName));
1874                         message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
1875                         message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
1876                         sendRmrMessage(rmrMessageBuffer, message);
1877                         messageSent = true;
1878                     } else {
1879                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
1880                     }
1881                 }
1882                 if (messageSent) {
1883                     break;
1884                 }
1885             }
1886             break;
1887         }
1888         case ProcedureCode_id_RICsubscription: {
1889             if (logLevel >= MDCLOG_DEBUG) {
1890                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1891             }
1892             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
1893             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
1894             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1895                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1896             }
1897             break;
1898         }
1899         case ProcedureCode_id_RICsubscriptionDelete: {
1900             if (logLevel >= MDCLOG_DEBUG) {
1901                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1902             }
1903             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
1904             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
1905             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1906                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1907             }
1908             break;
1909         }
1910         default: {
1911             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1912             message.message.messageType = 0; // no RMR message type yet
1913
1914             buildJsonMessage(message);
1915
1916             break;
1917         }
1918     }
1919 }
1920
1921 /**
1922  *
1923  * @param message
1924  * @param requestId
1925  * @param rmrMmessageBuffer
1926  * @return
1927  */
1928 int sendRequestToXapp(ReportingMessages_t &message,
1929                       int requestId,
1930                       RmrMessagesBuffer_t &rmrMmessageBuffer) {
1931     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1932                    (unsigned char *)message.message.enodbName,
1933                    strlen(message.message.enodbName));
1934     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1935     rmrMmessageBuffer.sendMessage->state = 0;
1936     static unsigned char tx[32];
1937     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1938     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1939
1940     auto rc = sendRmrMessage(rmrMmessageBuffer, message);
1941     return rc;
1942 }
1943
1944 /**
1945  *
1946  * @param pSctpParams
1947  */
1948 void getRmrContext(sctp_params_t &pSctpParams) {
1949     pSctpParams.rmrCtx = nullptr;
1950     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RECEIVE_XAPP_BUFFER_SIZE, RMRFL_NONE);
1951     if (pSctpParams.rmrCtx == nullptr) {
1952         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1953         return;
1954     }
1955
1956     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1957     // we need to find that routing table exist and we can run
1958     if (mdclog_level_get() >= MDCLOG_INFO) {
1959         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1960     }
1961     int rmrReady = 0;
1962     int count = 0;
1963     while (!rmrReady) {
1964         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1965             sleep(1);
1966         }
1967         count++;
1968         if (count % 60 == 0) {
1969             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1970         }
1971     }
1972     if (mdclog_level_get() >= MDCLOG_INFO) {
1973         mdclog_write(MDCLOG_INFO, "RMR running");
1974     }
1975     rmr_init_trace(pSctpParams.rmrCtx, 200);
1976     // get the RMR fd for the epoll
1977     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1978     struct epoll_event event{};
1979     // add RMR fd to epoll
1980     event.events = (EPOLLIN);
1981     event.data.fd = pSctpParams.rmrListenFd;
1982     // add listening RMR FD to epoll
1983     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
1984         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
1985         close(pSctpParams.rmrListenFd);
1986         rmr_close(pSctpParams.rmrCtx);
1987         pSctpParams.rmrCtx = nullptr;
1988     }
1989 }
1990
1991 /**
1992  *
1993  * @param message
1994  * @param rmrMessageBuffer
1995  * @return
1996  */
1997 int PER_FromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1998     E2AP_PDU_t *pdu = nullptr;
1999
2000     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2001         mdclog_write(MDCLOG_DEBUG, "got xml Format  data from xApp of size %d is:%s",
2002                 rmrMessageBuffer.rcvMessage->len, rmrMessageBuffer.rcvMessage->payload);
2003     }
2004     auto rval = asn_decode(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, (void **) &pdu,
2005                            rmrMessageBuffer.rcvMessage->payload, rmrMessageBuffer.rcvMessage->len);
2006     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2007         mdclog_write(MDCLOG_DEBUG, "%s After  decoding the XML to PDU", __func__ );
2008     }
2009     if (rval.code != RC_OK) {
2010         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
2011                      rval.code,
2012                      message.message.enodbName);
2013         return -1;
2014     }
2015
2016     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
2017     auto er = asn_encode_to_buffer(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, pdu,
2018                                    rmrMessageBuffer.rcvMessage->payload, buff_size);
2019     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2020         mdclog_write(MDCLOG_DEBUG, "%s After encoding PDU to PER", __func__ );
2021     }
2022     if (er.encoded == -1) {
2023         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
2024         return -1;
2025     } else if (er.encoded > (ssize_t)buff_size) {
2026         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
2027                      (int)rmrMessageBuffer.rcvMessage->len,
2028                      asn_DEF_E2AP_PDU.name,
2029                      __func__,
2030                      __LINE__);
2031         return -1;
2032     }
2033     rmrMessageBuffer.rcvMessage->len = er.encoded;
2034     return 0;
2035 }
2036
2037 /**
2038  *
2039  * @param sctpMap
2040  * @param rmrMessageBuffer
2041  * @param ts
2042  * @return
2043  */
2044 int receiveXappMessages(Sctp_Map_t *sctpMap,
2045                         RmrMessagesBuffer_t &rmrMessageBuffer,
2046                         struct timespec &ts) {
2047     int loglevel = mdclog_level_get();
2048     if (rmrMessageBuffer.rcvMessage == nullptr) {
2049         //we have error
2050         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
2051         return -1;
2052     }
2053
2054 //    if (loglevel >= MDCLOG_DEBUG) {
2055 //        mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
2056 //    }
2057     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2058     if (rmrMessageBuffer.rcvMessage == nullptr) {
2059         mdclog_write(MDCLOG_ERR, "RMR Receiving message with null pointer, Reallocated rmr message buffer");
2060         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2061         return -2;
2062     }
2063     ReportingMessages_t message;
2064     message.message.direction = 'D';
2065     message.message.time.tv_nsec = ts.tv_nsec;
2066     message.message.time.tv_sec = ts.tv_sec;
2067
2068     // get message payload
2069     //auto msgData = msg->payload;
2070     if (rmrMessageBuffer.rcvMessage->state != 0) {
2071         mdclog_write(MDCLOG_ERR, "RMR Receiving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
2072         return -1;
2073     }
2074     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName);
2075     message.peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2076     if (message.peerInfo == nullptr) {
2077         auto type = rmrMessageBuffer.rcvMessage->mtype;
2078         switch (type) {
2079             case RIC_SCTP_CLEAR_ALL:
2080             case E2_TERM_KEEP_ALIVE_REQ:
2081             case RIC_HEALTH_CHECK_REQ:
2082                 break;
2083             default:
2084                 mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2085                 return -1;
2086         }
2087     }
2088
2089     if (rmrMessageBuffer.rcvMessage->mtype != RIC_HEALTH_CHECK_REQ) {
2090         num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
2091
2092     }
2093     switch (rmrMessageBuffer.rcvMessage->mtype) {
2094         case RIC_E2_SETUP_RESP : {
2095             if (loglevel >= MDCLOG_DEBUG) {
2096                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_SETUP_RESP");
2097             }
2098             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2099                 break;
2100             }
2101             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2102             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2103             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2104                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
2105                 return -6;
2106             }
2107             break;
2108         }
2109         case RIC_E2_SETUP_FAILURE : {
2110             if (loglevel >= MDCLOG_DEBUG) {
2111                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_SETUP_FAILURE");
2112             }
2113             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2114                 break;
2115             }
2116             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2117             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2118             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2119                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
2120                 return -6;
2121             }
2122             break;
2123         }
2124         case RIC_ERROR_INDICATION: {
2125             if (loglevel >= MDCLOG_DEBUG) {
2126                 mdclog_write(MDCLOG_DEBUG, "RIC_ERROR_INDICATION");
2127             }
2128             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
2129             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len);
2130             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2131                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
2132                 return -6;
2133             }
2134             break;
2135         }
2136         case RIC_SUB_REQ: {
2137             if (loglevel >= MDCLOG_DEBUG) {
2138                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_REQ");
2139             }
2140             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
2141             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len);
2142             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2143                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
2144                 return -6;
2145             }
2146             break;
2147         }
2148         case RIC_SUB_DEL_REQ: {
2149             if (loglevel >= MDCLOG_DEBUG) {
2150                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_DEL_REQ");
2151             }
2152             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
2153             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len);
2154             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2155                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
2156                 return -6;
2157             }
2158             break;
2159         }
2160         case RIC_CONTROL_REQ: {
2161             if (loglevel >= MDCLOG_DEBUG) {
2162                 mdclog_write(MDCLOG_DEBUG, "RIC_CONTROL_REQ");
2163             }
2164             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
2165             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len);
2166             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2167                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
2168                 return -6;
2169             }
2170             break;
2171         }
2172         case RIC_SERVICE_QUERY: {
2173             if (loglevel >= MDCLOG_DEBUG) {
2174                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_QUERY");
2175             }
2176             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2177                 break;
2178             }
2179             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment();
2180             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len);
2181             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2182                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
2183                 return -6;
2184             }
2185             break;
2186         }
2187         case RIC_SERVICE_UPDATE_ACK: {
2188             if (loglevel >= MDCLOG_DEBUG) {
2189                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_UPDATE_ACK");
2190             }
2191             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2192                 mdclog_write(MDCLOG_ERR, "error in PER_FromXML");
2193                 break;
2194             }
2195             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2196             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2197             if (loglevel >= MDCLOG_DEBUG) {
2198                 mdclog_write(MDCLOG_DEBUG, "Before sending to CU");
2199             }
2200             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2201                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
2202                 return -6;
2203             }
2204             break;
2205         }
2206         case RIC_SERVICE_UPDATE_FAILURE: {
2207             if (loglevel >= MDCLOG_DEBUG) {
2208                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_UPDATE_FAILURE");
2209             }
2210             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2211                 break;
2212             }
2213             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2214             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2215             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2216                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
2217                 return -6;
2218             }
2219             break;
2220         }
2221         case RIC_E2_RESET_REQ: {
2222             if (loglevel >= MDCLOG_DEBUG) {
2223                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_RESET_REQ");
2224             }
2225             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2226                 break;
2227             }
2228             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2229             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2230             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2231                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET");
2232                 return -6;
2233             }
2234             break;
2235         }
2236         case RIC_E2_RESET_RESP: {
2237             if (loglevel >= MDCLOG_DEBUG) {
2238                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_RESET_RESP");
2239             }
2240             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2241                 break;
2242             }
2243             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2244             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2245             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2246                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP");
2247                 return -6;
2248             }
2249             break;
2250         }
2251         case RIC_SCTP_CLEAR_ALL: {
2252             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
2253             // loop on all keys and close socket and then erase all map.
2254             vector<char *> v;
2255             sctpMap->getKeys(v);
2256             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
2257                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
2258                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
2259                     if (peerInfo == nullptr) {
2260                         continue;
2261                     }
2262                     close(peerInfo->fileDescriptor);
2263                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
2264                     message.message.direction = 'D';
2265                     message.message.time.tv_nsec = ts.tv_nsec;
2266                     message.message.time.tv_sec = ts.tv_sec;
2267
2268                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2269                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2270                                      256,
2271                                      "%s|RIC_SCTP_CLEAR_ALL",
2272                                      peerInfo->enodbName);
2273                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2274                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
2275                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
2276                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
2277                     }
2278                     free(peerInfo);
2279                 }
2280             }
2281
2282             sleep(1);
2283             sctpMap->clear();
2284             break;
2285         }
2286         case E2_TERM_KEEP_ALIVE_REQ: {
2287             // send message back
2288             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2289                               (unsigned char *)rmrMessageBuffer.ka_message,
2290                               rmrMessageBuffer.ka_message_len);
2291             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
2292             rmrMessageBuffer.sendMessage->state = 0;
2293             static unsigned char tx[32];
2294             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2295             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2296             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2297             if (rmrMessageBuffer.sendMessage == nullptr) {
2298                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2299                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
2300             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
2301                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
2302                              rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
2303             } else if (loglevel >= MDCLOG_DEBUG) {
2304                 mdclog_write(MDCLOG_DEBUG, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
2305             }
2306
2307             break;
2308         }
2309         case RIC_HEALTH_CHECK_REQ: {
2310             static int counter = 0;
2311             // send message back
2312             rmr_bytes2payload(rmrMessageBuffer.rcvMessage,
2313                               (unsigned char *)"OK",
2314                               2);
2315             rmrMessageBuffer.rcvMessage->mtype = RIC_HEALTH_CHECK_RESP;
2316             rmrMessageBuffer.rcvMessage->state = 0;
2317             static unsigned char tx[32];
2318             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2319             rmr_bytes2xact(rmrMessageBuffer.rcvMessage, tx, txLen);
2320             rmrMessageBuffer.rcvMessage = rmr_rts_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2321             //rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2322             if (rmrMessageBuffer.rcvMessage == nullptr) {
2323                 rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2324                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP RMR message returned NULL");
2325             } else if (rmrMessageBuffer.rcvMessage->state != 0)  {
2326                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP, on RMR state = %d ( %s)",
2327                              rmrMessageBuffer.rcvMessage->state, translateRmrErrorMessages(rmrMessageBuffer.rcvMessage->state).c_str());
2328             } else if (loglevel >= MDCLOG_DEBUG && ++counter % 100 == 0) {
2329                 mdclog_write(MDCLOG_DEBUG, "Got %d RIC_HEALTH_CHECK_REQ Request send : OK", counter);
2330             }
2331
2332             break;
2333         }
2334
2335         default:
2336             mdclog_write(MDCLOG_WARN, "Message Type : %d is not supported", rmrMessageBuffer.rcvMessage->mtype);
2337             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2338             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2339             message.message.time.tv_nsec = ts.tv_nsec;
2340             message.message.time.tv_sec = ts.tv_sec;
2341             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2342
2343             buildJsonMessage(message);
2344
2345
2346             return -7;
2347     }
2348     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2349         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2350     }
2351     return 0;
2352 }
2353
2354 /**
2355  * Send message to the CU that is not expecting for successful or unsuccessful results
2356  * @param messageBuffer
2357  * @param message
2358  * @param failedMsgId
2359  * @param sctpMap
2360  * @return
2361  */
2362 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2363                            ReportingMessages_t &message,
2364                            int failedMsgId,
2365                            Sctp_Map_t *sctpMap) {
2366     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2367         mdclog_write(MDCLOG_DEBUG, "send message: %d to %s address", message.message.messageType, message.message.enodbName);
2368     }
2369
2370     getRequestMetaData(message, messageBuffer);
2371     if (mdclog_level_get() >= MDCLOG_INFO) {
2372         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2373     }
2374
2375     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId);
2376     return rc;
2377 }
2378
2379 /**
2380  *
2381  * @param sctpMap
2382  * @param messageBuffer
2383  * @param message
2384  * @param failedMesgId
2385  * @return
2386  */
2387 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2388                     RmrMessagesBuffer_t &messageBuffer,
2389                     ReportingMessages_t &message,
2390                     int failedMesgId) {
2391     // get the FD
2392     message.message.messageType = messageBuffer.rcvMessage->mtype;
2393     auto rc = sendSctpMsg(message.peerInfo, message, sctpMap);
2394     return rc;
2395 }
2396
2397
2398 /**
2399  *
2400  * @param epoll_fd
2401  * @param peerInfo
2402  * @param events
2403  * @param sctpMap
2404  * @param enodbName
2405  * @param msgType
2406  * @return
2407  */
2408 int addToEpoll(int epoll_fd,
2409                ConnectedCU_t *peerInfo,
2410                uint32_t events,
2411                Sctp_Map_t *sctpMap,
2412                char *enodbName,
2413                int msgType) {
2414     // Add to Epol
2415     struct epoll_event event{};
2416     event.data.ptr = peerInfo;
2417     event.events = events;
2418     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2419         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2420             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here), %s, %s %d",
2421                          strerror(errno), __func__, __LINE__);
2422         }
2423         close(peerInfo->fileDescriptor);
2424         if (enodbName != nullptr) {
2425             cleanHashEntry(peerInfo, sctpMap);
2426             char key[MAX_ENODB_NAME_SIZE * 2];
2427             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2428             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2429                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2430             }
2431             auto tmp = sctpMap->find(key);
2432             if (tmp) {
2433                 free(tmp);
2434                 sctpMap->erase(key);
2435             }
2436         } else {
2437             peerInfo->enodbName[0] = 0;
2438         }
2439         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)");
2440         return -1;
2441     }
2442     return 0;
2443 }
2444
2445 /**
2446  *
2447  * @param epoll_fd
2448  * @param peerInfo
2449  * @param events
2450  * @param sctpMap
2451  * @param enodbName
2452  * @param msgType
2453  * @return
2454  */
2455 int modifyToEpoll(int epoll_fd,
2456                   ConnectedCU_t *peerInfo,
2457                   uint32_t events,
2458                   Sctp_Map_t *sctpMap,
2459                   char *enodbName,
2460                   int msgType) {
2461     // Add to Epol
2462     struct epoll_event event{};
2463     event.data.ptr = peerInfo;
2464     event.events = events;
2465     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2466         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2467             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may check not to quit here), %s, %s %d",
2468                          strerror(errno), __func__, __LINE__);
2469         }
2470         close(peerInfo->fileDescriptor);
2471         cleanHashEntry(peerInfo, sctpMap);
2472         char key[MAX_ENODB_NAME_SIZE * 2];
2473         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2474         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2475             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2476         }
2477         auto tmp = sctpMap->find(key);
2478         if (tmp) {
2479             free(tmp);
2480         }
2481         sctpMap->erase(key);
2482         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)");
2483         return -1;
2484     }
2485     return 0;
2486 }
2487
2488
2489 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message) {
2490     buildJsonMessage(message);
2491
2492     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2493
2494     if (rmrMessageBuffer.sendMessage == nullptr) {
2495         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2496         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2497         return -1;
2498     }
2499
2500     if (rmrMessageBuffer.sendMessage->state != 0) {
2501         char meid[RMR_MAX_MEID]{};
2502         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2503             usleep(5);
2504             rmrMessageBuffer.sendMessage->state = 0;
2505             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2506                          rmrMessageBuffer.sendMessage->mtype,
2507                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2508             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2509             if (rmrMessageBuffer.sendMessage == nullptr) {
2510                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2511                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2512                 return -1;
2513             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2514                 mdclog_write(MDCLOG_ERR,
2515                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2516                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2517                              rmrMessageBuffer.sendMessage->mtype,
2518                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2519                 auto rc = rmrMessageBuffer.sendMessage->state;
2520                 return rc;
2521             }
2522         } else {
2523             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2524                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2525                          rmrMessageBuffer.sendMessage->mtype,
2526                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2527             return rmrMessageBuffer.sendMessage->state;
2528         }
2529     }
2530     return 0;
2531 }
2532
2533 void buildJsonMessage(ReportingMessages_t &message) {
2534     if (jsonTrace) {
2535         message.outLen = sizeof(message.base64Data);
2536         base64::encode((const unsigned char *) message.message.asndata,
2537                        (const int) message.message.asnLength,
2538                        message.base64Data,
2539                        message.outLen);
2540         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2541             mdclog_write(MDCLOG_DEBUG, "Tracing: ASN length = %d, base64 message length = %d ",
2542                          (int) message.message.asnLength,
2543                          (int) message.outLen);
2544         }
2545
2546         snprintf(message.buffer, sizeof(message.buffer),
2547                  "{\"header\": {\"ts\": \"%ld.%09ld\","
2548                  "\"ranName\": \"%s\","
2549                  "\"messageType\": %d,"
2550                  "\"direction\": \"%c\"},"
2551                  "\"base64Length\": %d,"
2552                  "\"asnBase64\": \"%s\"}",
2553                  message.message.time.tv_sec,
2554                  message.message.time.tv_nsec,
2555                  message.message.enodbName,
2556                  message.message.messageType,
2557                  message.message.direction,
2558                  (int) message.outLen,
2559                  message.base64Data);
2560         static src::logger_mt &lg = my_logger::get();
2561
2562         BOOST_LOG(lg) << message.buffer;
2563     }
2564 }
2565
2566
2567 /**
2568  * take RMR error code to string
2569  * @param state
2570  * @return
2571  */
2572 string translateRmrErrorMessages(int state) {
2573     string str = {};
2574     switch (state) {
2575         case RMR_OK:
2576             str = "RMR_OK - state is good";
2577             break;
2578         case RMR_ERR_BADARG:
2579             str = "RMR_ERR_BADARG - argument passed to function was unusable";
2580             break;
2581         case RMR_ERR_NOENDPT:
2582             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2583             break;
2584         case RMR_ERR_EMPTY:
2585             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2586             break;
2587         case RMR_ERR_NOHDR:
2588             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2589             break;
2590         case RMR_ERR_SENDFAILED:
2591             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2592             break;
2593         case RMR_ERR_CALLFAILED:
2594             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2595             break;
2596         case RMR_ERR_NOWHOPEN:
2597             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2598             break;
2599         case RMR_ERR_WHID:
2600             str = "RMR_ERR_WHID - wormhole id was invalid";
2601             break;
2602         case RMR_ERR_OVERFLOW:
2603             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2604             break;
2605         case RMR_ERR_RETRY:
2606             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2607             break;
2608         case RMR_ERR_RCVFAILED:
2609             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2610             break;
2611         case RMR_ERR_TIMEOUT:
2612             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2613             break;
2614         case RMR_ERR_UNSET:
2615             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2616             break;
2617         case RMR_ERR_TRUNC:
2618             str = "RMR_ERR_TRUNC - received message likely truncated";
2619             break;
2620         case RMR_ERR_INITFAILED:
2621             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2622             break;
2623         case RMR_ERR_NOTSUPP:
2624             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2625             break;
2626         default:
2627             char buf[128]{};
2628             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2629             str = buf;
2630             break;
2631     }
2632     return str;
2633 }