7d581fc5835c5648a6a081599eead889f6f8a35f
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include <3rdparty/oranE2/RANfunctions-List.h>
24 #include "sctpThread.h"
25 #include "BuildRunName.h"
26
27 //#include "3rdparty/oranE2SM/E2SM-gNB-NRT-RANfunction-Definition.h"
28 //#include "BuildXml.h"
29 //#include "pugixml/src/pugixml.hpp"
30
31 using namespace std;
32 //using namespace std::placeholders;
33 using namespace boost::filesystem;
34 using namespace prometheus;
35
36
37 //#ifdef __cplusplus
38 //extern "C"
39 //{
40 //#endif
41
42 // need to expose without the include of gcov
43 extern "C" void __gcov_flush(void);
44
45 static void catch_function(int signal) {
46     __gcov_flush();
47     exit(signal);
48 }
49
50
51 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
52
53 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
54 double cpuClock = 0.0;
55 bool jsonTrace = true;
56
57 void init_log() {
58     mdclog_attr_t *attr;
59     mdclog_attr_init(&attr);
60     mdclog_attr_set_ident(attr, "E2Terminator");
61     mdclog_init(attr);
62     mdclog_attr_destroy(attr);
63 }
64 auto start_time = std::chrono::high_resolution_clock::now();
65 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
66
67 double age() {
68     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
69 }
70
71 double approx_CPU_MHz(unsigned sleeptime) {
72     using namespace std::chrono_literals;
73     uint32_t aux = 0;
74     uint64_t cycles_start = rdtscp(aux);
75     double time_start = age();
76     std::this_thread::sleep_for(sleeptime * 1ms);
77     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
78     double elapsed_time = age() - time_start;
79     return elapsed_cycles / elapsed_time;
80 }
81
82 //std::atomic<int64_t> rmrCounter{0};
83 std::atomic<int64_t> num_of_messages{0};
84 std::atomic<int64_t> num_of_XAPP_messages{0};
85 static long transactionCounter = 0;
86
87 int buildListeningPort(sctp_params_t &sctpParams) {
88     sctpParams.listenFD = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
89     if (sctpParams.listenFD <= 0) {
90         mdclog_write(MDCLOG_ERR, "Error Opening socket, %s", strerror(errno));
91         return -1;
92     }
93
94     struct sockaddr_in6 servaddr {};
95     servaddr.sin6_family = AF_INET6;
96     servaddr.sin6_addr   = in6addr_any;
97     servaddr.sin6_port = htons(sctpParams.sctpPort);
98     if (bind(sctpParams.listenFD, (SA *)&servaddr, sizeof(servaddr)) < 0 ) {
99         mdclog_write(MDCLOG_ERR, "Error binding port %d. %s", sctpParams.sctpPort, strerror(errno));
100         return -1;
101     }
102     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
103         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
104         return -1;
105     }
106     if (mdclog_level_get() >= MDCLOG_DEBUG) {
107         struct sockaddr_in6 cliaddr {};
108         socklen_t len = sizeof(cliaddr);
109         getsockname(sctpParams.listenFD, (SA *)&cliaddr, &len);
110         char buff[1024] {};
111         inet_ntop(AF_INET6, &cliaddr.sin6_addr, buff, sizeof(buff));
112         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(cliaddr.sin6_port));
113     }
114
115     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
116         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
117         return -1;
118     }
119     struct epoll_event event {};
120     event.events = EPOLLIN | EPOLLET;
121     event.data.fd = sctpParams.listenFD;
122
123     // add listening port to epoll
124     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
125         printf("Failed to add descriptor to epoll\n");
126         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
127         return -1;
128     }
129
130     return 0;
131 }
132
133 int buildConfiguration(sctp_params_t &sctpParams) {
134     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
135     if (exists(p)) {
136         const int size = 2048;
137         auto fileSize = file_size(p);
138         if (fileSize > size) {
139             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
140             return -1;
141         }
142     } else {
143         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
144         return -1;
145     }
146
147     ReadConfigFile conf;
148     if (conf.openConfigFile(p.string()) == -1) {
149         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
150                      p.string().c_str(), strerror(errno));
151         return -1;
152     }
153     int rmrPort = conf.getIntValue("nano");
154     if (rmrPort == -1) {
155         mdclog_write(MDCLOG_ERR, "illigal RMR port ");
156         return -1;
157     }
158     sctpParams.rmrPort = (uint16_t)rmrPort;
159     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
160
161     auto tmpStr = conf.getStringValue("loglevel");
162     if (tmpStr.length() == 0) {
163         mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
164         tmpStr = "info";
165     }
166     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
167
168     if ((tmpStr.compare("debug")) == 0) {
169         sctpParams.logLevel = MDCLOG_DEBUG;
170     } else if ((tmpStr.compare("info")) == 0) {
171         sctpParams.logLevel = MDCLOG_INFO;
172     } else if ((tmpStr.compare("warning")) == 0) {
173         sctpParams.logLevel = MDCLOG_WARN;
174     } else if ((tmpStr.compare("error")) == 0) {
175         sctpParams.logLevel = MDCLOG_ERR;
176     } else {
177         mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
178         sctpParams.logLevel = MDCLOG_INFO;
179     }
180     mdclog_level_set(sctpParams.logLevel);
181
182     tmpStr = conf.getStringValue("volume");
183     if (tmpStr.length() == 0) {
184         mdclog_write(MDCLOG_ERR, "illigal volume.");
185         return -1;
186     }
187
188     char tmpLogFilespec[VOLUME_URL_SIZE];
189     tmpLogFilespec[0] = 0;
190     sctpParams.volume[0] = 0;
191     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
192     // copy the name to temp file as well
193     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
194
195
196     // define the file name in the tmp directory under the volume
197     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
198
199     sctpParams.myIP = conf.getStringValue("local-ip");
200     if (sctpParams.myIP.length() == 0) {
201         mdclog_write(MDCLOG_ERR, "illigal local-ip.");
202         return -1;
203     }
204
205     int sctpPort = conf.getIntValue("sctp-port");
206     if (sctpPort == -1) {
207         mdclog_write(MDCLOG_ERR, "illigal SCTP port ");
208         return -1;
209     }
210     sctpParams.sctpPort = (uint16_t)sctpPort;
211
212     sctpParams.fqdn = conf.getStringValue("external-fqdn");
213     if (sctpParams.fqdn.length() == 0) {
214         mdclog_write(MDCLOG_ERR, "illigal external-fqdn");
215         return -1;
216     }
217
218     std::string pod = conf.getStringValue("pod_name");
219     if (pod.length() == 0) {
220         mdclog_write(MDCLOG_ERR, "illigal pod_name in config file");
221         return -1;
222     }
223     auto *podName = getenv(pod.c_str());
224     if (podName == nullptr) {
225         mdclog_write(MDCLOG_ERR, "illigal pod_name or environment varible not exists : %s", pod.c_str());
226         return -1;
227
228     } else {
229         sctpParams.podName.assign(podName);
230         if (sctpParams.podName.length() == 0) {
231             mdclog_write(MDCLOG_ERR, "illigal pod_name");
232             return -1;
233         }
234     }
235
236     tmpStr = conf.getStringValue("trace");
237     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
238     if ((tmpStr.compare("start")) == 0) {
239         mdclog_write(MDCLOG_INFO, "Trace set to: start");
240         sctpParams.trace = true;
241     } else if ((tmpStr.compare("stop")) == 0) {
242         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
243         sctpParams.trace = false;
244     }
245     jsonTrace = sctpParams.trace;
246
247     sctpParams.epollTimeOut = -1;
248     tmpStr = conf.getStringValue("prometheusMode");
249     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
250     if (tmpStr.length() != 0) {
251         if (tmpStr.compare("push") == 0) {
252             sctpParams.prometheusPushAddress = tmpStr;
253             auto timeout = conf.getIntValue("prometheusPushTimeOut");
254             if (timeout >= 5 && timeout <= 300) {
255                 sctpParams.epollTimeOut = timeout * 1000;
256             } else {
257                 sctpParams.epollTimeOut = 10 * 1000;
258             }
259         }
260     }
261
262     tmpStr = conf.getStringValue("prometheusPushAddr");
263     if (tmpStr.length() != 0) {
264         sctpParams.prometheusMode = tmpStr;
265     }
266
267     tmpStr = conf.getStringValue("prometheusPort");
268     if (tmpStr.length() != 0) {
269         sctpParams.prometheusPort = tmpStr;
270     }
271
272     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, KA_MESSAGE_SIZE, "{\"address\": \"%s:%d\","
273                                                                                     "\"fqdn\": \"%s\","
274                                                                                     "\"pod_name\": \"%s\"}",
275                                             (const char *)sctpParams.myIP.c_str(),
276                                             sctpParams.rmrPort,
277                                             sctpParams.fqdn.c_str(),
278                                             sctpParams.podName.c_str());
279
280     if (mdclog_level_get() >= MDCLOG_INFO) {
281         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
282         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
283         mdclog_mdc_add("volume", sctpParams.volume);
284         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
285         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
286         mdclog_mdc_add("pod name", sctpParams.podName.c_str());
287
288         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
289     }
290     mdclog_mdc_clean();
291
292     // Files written to the current working directory
293     boostLogger = logging::add_file_log(
294             keywords::file_name = tmpLogFilespec, // to temp directory
295             keywords::rotation_size = 10 * 1024 * 1024,
296             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
297             keywords::format = "%Message%"
298             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
299     );
300
301     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
302     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
303             keywords::target = sctpParams.volume
304     ));
305
306     // Upon restart, scan the directory for files matching the file_name pattern
307     boostLogger->locked_backend()->scan_for_files();
308
309     // Enable auto-flushing after each tmpStr record written
310     if (mdclog_level_get() >= MDCLOG_DEBUG) {
311         boostLogger->locked_backend()->auto_flush(true);
312     }
313
314     return 0;
315 }
316
317 static std::string GetHostName() {
318     char hostname[1024];
319
320     if (::gethostname(hostname, sizeof(hostname))) {
321         return {};
322     }
323     return hostname;
324 }
325
326
327
328 int main(const int argc, char **argv) {
329     sctp_params_t sctpParams;
330
331     {
332         std::random_device device{};
333         std::mt19937 generator(device());
334         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
335         transactionCounter = distribution(generator);
336     }
337
338 //    uint64_t st = 0;
339 //    uint32_t aux1 = 0;
340 //   st = rdtscp(aux1);
341
342     unsigned num_cpus = std::thread::hardware_concurrency();
343     init_log();
344     mdclog_level_set(MDCLOG_INFO);
345
346     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
347         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
348         exit(1);
349     }
350     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
351         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
352         exit(1);
353     }
354     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
355         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
356         exit(1);
357     }
358
359     cpuClock = approx_CPU_MHz(100);
360
361     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
362
363     auto result = parse(argc, argv, sctpParams);
364
365     if (buildConfiguration(sctpParams) != 0) {
366         exit(-1);
367     }
368
369     //auto registry = std::make_shared<Registry>();
370     sctpParams.prometheusRegistry = std::make_shared<Registry>();
371
372     //sctpParams.promtheusFamily = new Family<Counter>("E2T", "E2T message counter", {{"E", sctpParams.podName}});
373
374     sctpParams.prometheusFamily = &BuildCounter()
375             .Name("E2T")
376             .Help("E2T message counter")
377             .Labels({{"E", sctpParams.podName}})
378             .Register(*sctpParams.prometheusRegistry);
379
380
381     // start epoll
382     sctpParams.epoll_fd = epoll_create1(0);
383     if (sctpParams.epoll_fd == -1) {
384         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
385         exit(-1);
386     }
387
388     getRmrContext(sctpParams);
389     if (sctpParams.rmrCtx == nullptr) {
390         close(sctpParams.epoll_fd);
391         exit(-1);
392     }
393
394     if (buildInotify(sctpParams) == -1) {
395         close(sctpParams.rmrListenFd);
396         rmr_close(sctpParams.rmrCtx);
397         close(sctpParams.epoll_fd);
398         exit(-1);
399     }
400
401     if (buildListeningPort(sctpParams) != 0) {
402         close(sctpParams.rmrListenFd);
403         rmr_close(sctpParams.rmrCtx);
404         close(sctpParams.epoll_fd);
405         exit(-1);
406     }
407
408     sctpParams.sctpMap = new mapWrapper();
409
410     std::vector<std::thread> threads(num_cpus);
411 //    std::vector<std::thread> threads;
412
413     if (sctpParams.prometheusMode.compare("pull") == 0) {
414         sctpParams.prometheusExposer = new Exposer(sctpParams.myIP + ":" + sctpParams.prometheusPort, 1);
415         sctpParams.prometheusExposer->RegisterCollectable(sctpParams.prometheusRegistry);
416     } else if (sctpParams.prometheusMode.compare("push") == 0) {
417         const auto labels = Gateway::GetInstanceLabel(GetHostName());
418         string address {};
419         string port {};
420         char ch = ':';
421         auto found = sctpParams.prometheusPushAddress.find_last_of(ch);
422         // If string doesn't have
423         // character ch present in it
424         if (found != string::npos) {
425             address = sctpParams.prometheusPushAddress.substr(0,found);
426             port = sctpParams.prometheusPushAddress.substr(found + 1);
427             sctpParams.prometheusGateway = new Gateway(address, port, "E2T", labels);
428             sctpParams.prometheusGateway->RegisterCollectable(sctpParams.prometheusRegistry);
429         } else {
430             mdclog_write(MDCLOG_ERR, "failed to build Prometheus gateway no stats will be sent");
431         }
432     }
433
434     num_cpus = 1;
435     for (unsigned int i = 0; i < num_cpus; i++) {
436         threads[i] = std::thread(listener, &sctpParams);
437
438         cpu_set_t cpuset;
439         CPU_ZERO(&cpuset);
440         CPU_SET(i, &cpuset);
441         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
442         if (rc != 0) {
443             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
444         }
445     }
446
447
448     //loop over term_init until first message from xApp
449     handleTermInit(sctpParams);
450
451     for (auto &t : threads) {
452         t.join();
453     }
454
455     return 0;
456 }
457
458 void handleTermInit(sctp_params_t &sctpParams) {
459     sendTermInit(sctpParams);
460     //send to e2 manager init of e2 term
461     //E2_TERM_INIT
462
463     int count = 0;
464     while (true) {
465         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
466         if (xappMessages > 0) {
467             if (mdclog_level_get() >=  MDCLOG_INFO) {
468                 mdclog_write(MDCLOG_INFO, "Got a message from some appliction, stop sending E2_TERM_INIT");
469             }
470             return;
471         }
472         usleep(100000);
473         count++;
474         if (count % 1000 == 0) {
475             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
476             sendTermInit(sctpParams);
477         }
478     }
479 }
480
481 void sendTermInit(sctp_params_t &sctpParams) {
482     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
483     auto count = 0;
484     while (true) {
485         msg->mtype = E2_TERM_INIT;
486         msg->state = 0;
487         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
488         static unsigned char tx[32];
489         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
490         rmr_bytes2xact(msg, tx, txLen);
491         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
492         if (msg == nullptr) {
493             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
494         } else if (msg->state == 0) {
495             rmr_free_msg(msg);
496             if (mdclog_level_get() >=  MDCLOG_INFO) {
497                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT succsesfuly sent ");
498             }
499             return;
500         } else {
501             if (count % 100 == 0) {
502                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %s ", translateRmrErrorMessages(msg->state).c_str());
503             }
504             sleep(1);
505         }
506         count++;
507     }
508 }
509
510 /**
511  *
512  * @param argc
513  * @param argv
514  * @param sctpParams
515  * @return
516  */
517 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
518     cxxopts::Options options(argv[0], "e2 term help");
519     options.positional_help("[optional args]").show_positional_help();
520     options.allow_unrecognised_options().add_options()
521             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
522             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
523             ("h,help", "Print help");
524
525     auto result = options.parse(argc, argv);
526
527     if (result.count("help")) {
528         std::cout << options.help({""}) << std::endl;
529         exit(0);
530     }
531     return result;
532 }
533
534 /**
535  *
536  * @param sctpParams
537  * @return -1 failed 0 success
538  */
539 int buildInotify(sctp_params_t &sctpParams) {
540     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
541     if (sctpParams.inotifyFD == -1) {
542         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
543         close(sctpParams.rmrListenFd);
544         rmr_close(sctpParams.rmrCtx);
545         close(sctpParams.epoll_fd);
546         return -1;
547     }
548
549     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
550                                              (const char *)sctpParams.configFilePath.c_str(),
551                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
552     if (sctpParams.inotifyWD == -1) {
553         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
554                      sctpParams.configFilePath.c_str(),
555                      strerror(errno));
556         close(sctpParams.inotifyFD);
557         return -1;
558     }
559
560     struct epoll_event event{};
561     event.events = (EPOLLIN);
562     event.data.fd = sctpParams.inotifyFD;
563     // add listening RMR FD to epoll
564     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
565         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
566         close(sctpParams.inotifyFD);
567         return -1;
568     }
569     return 0;
570 }
571
572 /**
573  *
574  * @param args
575  * @return
576  */
577 void listener(sctp_params_t *params) {
578     int num_of_SCTP_messages = 0;
579     auto totalTime = 0.0;
580     mdclog_mdc_clean();
581     mdclog_level_set(params->logLevel);
582
583     std::thread::id this_id = std::this_thread::get_id();
584     //save cout
585     streambuf *oldCout = cout.rdbuf();
586     ostringstream memCout;
587     // create new cout
588     cout.rdbuf(memCout.rdbuf());
589     cout << this_id;
590     //return to the normal cout
591     cout.rdbuf(oldCout);
592
593     char tid[32];
594     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
595     tid[memCout.str().length()] = 0;
596     mdclog_mdc_add("thread id", tid);
597
598     if (mdclog_level_get() >= MDCLOG_DEBUG) {
599         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
600     }
601
602     RmrMessagesBuffer_t rmrMessageBuffer{};
603     //create and init RMR
604     rmrMessageBuffer.rmrCtx = params->rmrCtx;
605
606     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
607     struct timespec end{0, 0};
608     struct timespec start{0, 0};
609
610     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
611     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
612
613     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
614     rmrMessageBuffer.ka_message_len = params->ka_message_length;
615     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
616
617     if (mdclog_level_get() >= MDCLOG_DEBUG) {
618         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
619     }
620
621     ReportingMessages_t message {};
622
623 //    for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
624 //        rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
625 //        rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
626 //    }
627
628     bool gatewayflag = false;
629     while (true) {
630         future<int> gateWay;
631
632         if (mdclog_level_get() >= MDCLOG_DEBUG) {
633             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait. Timeout = %d", params->epollTimeOut);
634         }
635         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, params->epollTimeOut);
636         if (numOfEvents == 0) {
637             if (params->prometheusGateway != nullptr) {
638                 gateWay = params->prometheusGateway->AsyncPush();
639                 gatewayflag = true;
640             }
641             continue;
642         } else if (numOfEvents < 0) {
643             if (errno == EINTR) {
644                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
645                     mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
646                 }
647                 continue;
648             }
649             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
650             return;
651         }
652         if (gatewayflag) {
653             gatewayflag = false;
654             auto rc = gateWay.get();
655             if (rc != 200) {
656                 mdclog_write(MDCLOG_ERR, "Async Send to Promethues faild with Return Code %d", rc);
657             } else if (mdclog_level_get() >= MDCLOG_DEBUG) {
658                 mdclog_write(MDCLOG_DEBUG, "Stats sent to Prometheus");
659             }
660         }
661         for (auto i = 0; i < numOfEvents; i++) {
662             if (mdclog_level_get() >= MDCLOG_DEBUG) {
663                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
664             }
665             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
666             start.tv_sec = message.message.time.tv_sec;
667             start.tv_nsec = message.message.time.tv_nsec;
668
669
670             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
671                 handlepoll_error(events[i], message, rmrMessageBuffer, params);
672             } else if (events[i].events & EPOLLOUT) {
673                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params);
674             } else if (params->listenFD == events[i].data.fd) {
675                 if (mdclog_level_get() >= MDCLOG_INFO) {
676                     mdclog_write(MDCLOG_INFO, "New connection request from sctp network\n");
677                 }
678                 // new connection is requested from RAN  start build connection
679                 while (true) {
680                     struct sockaddr in_addr {};
681                     socklen_t in_len;
682                     char hostBuff[NI_MAXHOST];
683                     char portBuff[NI_MAXSERV];
684
685                     in_len = sizeof(in_addr);
686                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
687                     peerInfo->sctpParams = params;
688                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
689                     if (peerInfo->fileDescriptor == -1) {
690                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
691                             /* We have processed all incoming connections. */
692                             break;
693                         } else {
694                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
695                             break;
696                         }
697                     }
698                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
699                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
700                         close(peerInfo->fileDescriptor);
701                         break;
702                     }
703                     auto  ans = getnameinfo(&in_addr, in_len,
704                                             peerInfo->hostName, NI_MAXHOST,
705                                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
706                     if (ans < 0) {
707                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
708                         close(peerInfo->fileDescriptor);
709                         break;
710                     }
711                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
712                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
713                     }
714                     peerInfo->isConnected = false;
715                     peerInfo->gotSetup = false;
716                     if (addToEpoll(params->epoll_fd,
717                                    peerInfo,
718                                    (EPOLLIN | EPOLLET),
719                                    params->sctpMap, nullptr,
720                                    0) != 0) {
721                         break;
722                     }
723                     break;
724                 }
725             } else if (params->rmrListenFd == events[i].data.fd) {
726                 // got message from XAPP
727                 num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
728                 num_of_messages.fetch_add(1, std::memory_order_release);
729                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
730                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
731                 }
732                 if (receiveXappMessages(params->sctpMap,
733                                         rmrMessageBuffer,
734                                         message.message.time) != 0) {
735                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
736                 }
737             } else if (params->inotifyFD == events[i].data.fd) {
738                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
739                 handleConfigChange(params);
740             } else {
741                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
742                  * We must read whatever data is available completely, as we are running
743                  *  in edge-triggered mode and won't get a notification again for the same data. */
744                 num_of_messages.fetch_add(1, std::memory_order_release);
745                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
746                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
747                 }
748                 receiveDataFromSctp(&events[i],
749                                     params->sctpMap,
750                                     num_of_SCTP_messages,
751                                     rmrMessageBuffer,
752                                     message.message.time);
753             }
754
755             clock_gettime(CLOCK_MONOTONIC, &end);
756             if (mdclog_level_get() >= MDCLOG_INFO) {
757                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
758                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
759             }
760             if (mdclog_level_get() >= MDCLOG_DEBUG) {
761                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
762                              end.tv_sec - start.tv_sec,
763                              end.tv_nsec - start.tv_nsec);
764             }
765         }
766     }
767 }
768
769 /**
770  *
771  * @param sctpParams
772  */
773 void handleConfigChange(sctp_params_t *sctpParams) {
774     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
775     const struct inotify_event *event;
776     char *ptr;
777
778     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
779     auto endlessLoop = true;
780     while (endlessLoop) {
781         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
782         if (len == -1) {
783             if (errno != EAGAIN) {
784                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
785                 endlessLoop = false;
786                 continue;
787             }
788             else {
789                 endlessLoop = false;
790                 continue;
791             }
792         }
793
794         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
795             event = (const struct inotify_event *)ptr;
796             if (event->mask & (uint32_t)IN_ISDIR) {
797                 continue;
798             }
799
800             // the directory name
801             if (sctpParams->inotifyWD == event->wd) {
802                 // not the directory
803             }
804             if (event->len) {
805                 auto  retVal = strcmp(sctpParams->configFileName.c_str(), event->name);
806                 if (retVal != 0) {
807                     continue;
808                 }
809             }
810             // only the file we want
811             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
812                 if (mdclog_level_get() >= MDCLOG_INFO) {
813                     mdclog_write(MDCLOG_INFO, "Configuration file changed");
814                 }
815                 if (exists(p)) {
816                     const int size = 2048;
817                     auto fileSize = file_size(p);
818                     if (fileSize > size) {
819                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
820                         return;
821                     }
822                 } else {
823                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
824                     return;
825                 }
826
827                 ReadConfigFile conf;
828                 if (conf.openConfigFile(p.string()) == -1) {
829                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
830                                  p.string().c_str(), strerror(errno));
831                     return;
832                 }
833
834                 auto tmpStr = conf.getStringValue("loglevel");
835                 if (tmpStr.length() == 0) {
836                     mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
837                     tmpStr = "info";
838                 }
839                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
840
841                 if ((tmpStr.compare("debug")) == 0) {
842                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
843                     sctpParams->logLevel = MDCLOG_DEBUG;
844                 } else if ((tmpStr.compare("info")) == 0) {
845                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
846                     sctpParams->logLevel = MDCLOG_INFO;
847                 } else if ((tmpStr.compare("warning")) == 0) {
848                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
849                     sctpParams->logLevel = MDCLOG_WARN;
850                 } else if ((tmpStr.compare("error")) == 0) {
851                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
852                     sctpParams->logLevel = MDCLOG_ERR;
853                 } else {
854                     mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
855                     sctpParams->logLevel = MDCLOG_INFO;
856                 }
857                 mdclog_level_set(sctpParams->logLevel);
858
859
860                 tmpStr = conf.getStringValue("trace");
861                 if (tmpStr.length() == 0) {
862                     mdclog_write(MDCLOG_ERR, "illigal trace. Set trace to stop");
863                     tmpStr = "stop";
864                 }
865
866                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
867                 if ((tmpStr.compare("start")) == 0) {
868                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
869                     sctpParams->trace = true;
870                 } else if ((tmpStr.compare("stop")) == 0) {
871                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
872                     sctpParams->trace = false;
873                 } else {
874                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
875                     sctpParams->trace = false;
876                 }
877                 jsonTrace = sctpParams->trace;
878
879                 if (sctpParams->prometheusMode.compare("push") == 0) {
880                     auto timeout = conf.getIntValue("prometheusPushTimeOut");
881                     if (timeout >= 5 && timeout <= 300) {
882                         sctpParams->epollTimeOut = timeout * 1000;
883                     } else {
884                         mdclog_write(MDCLOG_ERR, "prometheusPushTimeOut set wrong value %d, values are [5..300]",
885                                      timeout);
886                     }
887                 }
888
889                 endlessLoop = false;
890             }
891         }
892     }
893 }
894
895 /**
896  *
897  * @param event
898  * @param message
899  * @param rmrMessageBuffer
900  * @param params
901  */
902 void handleEinprogressMessages(struct epoll_event &event,
903                                ReportingMessages_t &message,
904                                RmrMessagesBuffer_t &rmrMessageBuffer,
905                                sctp_params_t *params) {
906     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
907     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
908
909     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
910     auto retVal = 0;
911     socklen_t retValLen = 0;
912     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
913     if (rc != 0 || retVal != 0) {
914         if (rc != 0) {
915             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
916                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
917                                                          peerInfo->enodbName, strerror(errno));
918         } else if (retVal != 0) {
919             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
920                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
921                                                          peerInfo->enodbName);
922         }
923
924         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
925         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
926         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
927         message.message.direction = 'N';
928         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
929             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
930         }
931         memset(peerInfo->asnData, 0, peerInfo->asnLength);
932         peerInfo->asnLength = 0;
933         peerInfo->mtype = 0;
934         return;
935     }
936
937     peerInfo->isConnected = true;
938
939     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
940                       peerInfo->mtype) != 0) {
941         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
942         return;
943     }
944
945     message.message.asndata = (unsigned char *)peerInfo->asnData;
946     message.message.asnLength = peerInfo->asnLength;
947     message.message.messageType = peerInfo->mtype;
948     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
949     num_of_messages.fetch_add(1, std::memory_order_release);
950     if (mdclog_level_get() >= MDCLOG_DEBUG) {
951         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
952                      message.message.enodbName);
953     }
954     if (sendSctpMsg(peerInfo, message, params->sctpMap) != 0) {
955         if (mdclog_level_get() >= MDCLOG_DEBUG) {
956             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
957         }
958         return;
959     }
960
961     memset(peerInfo->asnData, 0, peerInfo->asnLength);
962     peerInfo->asnLength = 0;
963     peerInfo->mtype = 0;
964 }
965
966
967 void handlepoll_error(struct epoll_event &event,
968                       ReportingMessages_t &message,
969                       RmrMessagesBuffer_t &rmrMessageBuffer,
970                       sctp_params_t *params) {
971     if (event.data.fd != params->rmrListenFd) {
972         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
973         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
974                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
975
976         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
977                                                      "%s|Failed SCTP Connection",
978                                                      peerInfo->enodbName);
979         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
980         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
981
982         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
983         message.message.direction = 'N';
984         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
985             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
986         }
987
988         close(peerInfo->fileDescriptor);
989         params->sctpMap->erase(peerInfo->enodbName);
990         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
991     } else {
992         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
993     }
994 }
995 /**
996  *
997  * @param socket
998  * @return
999  */
1000 int setSocketNoBlocking(int socket) {
1001     auto flags = fcntl(socket, F_GETFL, 0);
1002
1003     if (flags == -1) {
1004         mdclog_mdc_add("func", "fcntl");
1005         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
1006         mdclog_mdc_clean();
1007         return -1;
1008     }
1009
1010     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
1011     if (fcntl(socket, F_SETFL, flags) == -1) {
1012         mdclog_mdc_add("func", "fcntl");
1013         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
1014         mdclog_mdc_clean();
1015         return -1;
1016     }
1017
1018     return 0;
1019 }
1020
1021 /**
1022  *
1023  * @param val
1024  * @param m
1025  */
1026 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
1027     char *dummy;
1028     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
1029     char searchBuff[2048]{};
1030
1031     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
1032     m->erase(searchBuff);
1033
1034     m->erase(val->enodbName);
1035     free(val);
1036 }
1037
1038 /**
1039  *
1040  * @param fd file discriptor
1041  * @param data the asn data to send
1042  * @param len  length of the data
1043  * @param enodbName the enodbName as in the map for printing purpose
1044  * @param m map host information
1045  * @param mtype message number
1046  * @return 0 success, anegative number on fail
1047  */
1048 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
1049     auto loglevel = mdclog_level_get();
1050     int fd = peerInfo->fileDescriptor;
1051     if (loglevel >= MDCLOG_DEBUG) {
1052         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
1053                      message.message.enodbName, __FUNCTION__);
1054     }
1055
1056     while (true) {
1057         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
1058             if (errno == EINTR) {
1059                 continue;
1060             }
1061             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
1062             if (!peerInfo->isConnected) {
1063                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
1064                 return -1;
1065             }
1066             cleanHashEntry(peerInfo, m);
1067             close(fd);
1068             char key[MAX_ENODB_NAME_SIZE * 2];
1069             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
1070                      message.message.messageType);
1071             if (loglevel >= MDCLOG_DEBUG) {
1072                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
1073             }
1074             auto tmp = m->find(key);
1075             if (tmp) {
1076                 free(tmp);
1077             }
1078             m->erase(key);
1079             return -1;
1080         }
1081         message.message.direction = 'D';
1082         // send report.buffer of size
1083         buildJsonMessage(message);
1084
1085         if (loglevel >= MDCLOG_DEBUG) {
1086             mdclog_write(MDCLOG_DEBUG,
1087                          "SCTP message for CU %s sent from %s",
1088                          message.message.enodbName,
1089                          __FUNCTION__);
1090         }
1091         return 0;
1092     }
1093 }
1094
1095 /**
1096  *
1097  * @param message
1098  * @param rmrMessageBuffer
1099  */
1100 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1101     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1102     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1103
1104     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1105         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
1106                      message.message.enodbName, (unsigned long) message.message.asnLength);
1107     }
1108 }
1109
1110
1111
1112 /**
1113  *
1114  * @param events
1115  * @param sctpMap
1116  * @param numOfMessages
1117  * @param rmrMessageBuffer
1118  * @param ts
1119  * @return
1120  */
1121 int receiveDataFromSctp(struct epoll_event *events,
1122                         Sctp_Map_t *sctpMap,
1123                         int &numOfMessages,
1124                         RmrMessagesBuffer_t &rmrMessageBuffer,
1125                         struct timespec &ts) {
1126     /* We have data on the fd waiting to be read. Read and display it.
1127  * We must read whatever data is available completely, as we are running
1128  *  in edge-triggered mode and won't get a notification again for the same data. */
1129     ReportingMessages_t message {};
1130     auto done = 0;
1131     auto loglevel = mdclog_level_get();
1132
1133     // get the identity of the interface
1134     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1135
1136     struct timespec start{0, 0};
1137     struct timespec decodestart{0, 0};
1138     struct timespec end{0, 0};
1139
1140     E2AP_PDU_t *pdu = nullptr;
1141
1142     while (true) {
1143         if (loglevel >= MDCLOG_DEBUG) {
1144             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1145             clock_gettime(CLOCK_MONOTONIC, &start);
1146         }
1147         // read the buffer directly to rmr payload
1148         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1149         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1150                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1151
1152         if (loglevel >= MDCLOG_DEBUG) {
1153             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1154                          message.peerInfo->fileDescriptor, message.message.asnLength);
1155         }
1156
1157         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1158         message.message.direction = 'U';
1159         message.message.time.tv_nsec = ts.tv_nsec;
1160         message.message.time.tv_sec = ts.tv_sec;
1161
1162         if (message.message.asnLength < 0) {
1163             if (errno == EINTR) {
1164                 continue;
1165             }
1166             /* If errno == EAGAIN, that means we have read all
1167                data. So goReportingMessages_t back to the main loop. */
1168             if (errno != EAGAIN) {
1169                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1170                 done = 1;
1171             } else if (loglevel >= MDCLOG_DEBUG) {
1172                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1173             }
1174             break;
1175         } else if (message.message.asnLength == 0) {
1176             /* End of file. The remote has closed the connection. */
1177             if (loglevel >= MDCLOG_INFO) {
1178                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1179                              message.peerInfo->fileDescriptor);
1180             }
1181             done = 1;
1182             break;
1183         }
1184
1185         if (loglevel >= MDCLOG_DEBUG) {
1186             char printBuffer[4096]{};
1187             char *tmp = printBuffer;
1188             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1189                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1190                 tmp += 2;
1191             }
1192             printBuffer[message.message.asnLength] = 0;
1193             clock_gettime(CLOCK_MONOTONIC, &end);
1194             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1195                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1196             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1197                          printBuffer);
1198             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1199         }
1200
1201         auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1202                           message.message.asndata, message.message.asnLength);
1203         if (rval.code != RC_OK) {
1204             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1205                          message.peerInfo->enodbName);
1206             break;
1207         }
1208
1209         if (loglevel >= MDCLOG_DEBUG) {
1210             clock_gettime(CLOCK_MONOTONIC, &end);
1211             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1212                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1213             char *printBuffer;
1214             size_t size;
1215             FILE *stream = open_memstream(&printBuffer, &size);
1216             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1217             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1218             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1219         }
1220
1221         switch (pdu->present) {
1222             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1223                 asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer);
1224                 break;
1225             }
1226             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1227                 asnSuccsesfulMsg(pdu, sctpMap, message,  rmrMessageBuffer);
1228                 break;
1229             }
1230             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1231                 asnUnSuccsesfulMsg(pdu, sctpMap, message, rmrMessageBuffer);
1232                 break;
1233             }
1234             default:
1235                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1236                 break;
1237         }
1238         if (loglevel >= MDCLOG_DEBUG) {
1239             clock_gettime(CLOCK_MONOTONIC, &end);
1240             mdclog_write(MDCLOG_DEBUG,
1241                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1242                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1243         }
1244         numOfMessages++;
1245         if (pdu != nullptr) {
1246             ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1247             //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1248             //pdu = nullptr;
1249         }
1250     }
1251
1252     if (done) {
1253         if (loglevel >= MDCLOG_INFO) {
1254             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1255         }
1256         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1257                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1258                          256,
1259                          "%s|CU disconnected unexpectedly",
1260                          message.peerInfo->enodbName);
1261         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1262
1263         if (sendRequestToXapp(message,
1264                               RIC_SCTP_CONNECTION_FAILURE,
1265                               rmrMessageBuffer) != 0) {
1266             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1267         }
1268
1269         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1270         close(message.peerInfo->fileDescriptor);
1271         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
1272     }
1273     if (loglevel >= MDCLOG_DEBUG) {
1274         clock_gettime(CLOCK_MONOTONIC, &end);
1275         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1276                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1277
1278     }
1279     return 0;
1280 }
1281
1282 static void buildAndsendSetupRequest(ReportingMessages_t &message,
1283                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1284                                      E2AP_PDU_t *pdu/*,
1285                                      string const &messageName,
1286                                      string const &ieName,
1287                                      vector<string> &functionsToAdd_v,
1288                                      vector<string> &functionsToModified_v*/) {
1289     auto logLevel = mdclog_level_get();
1290     // now we can send the data to e2Mgr
1291
1292     asn_enc_rval_t er;
1293     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1294     unsigned char buffer[RECEIVE_SCTP_BUFFER_SIZE * 2];
1295     while (true) {
1296         er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size);
1297         if (er.encoded == -1) {
1298             mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1299             return;
1300         } else if (er.encoded > (ssize_t) buffer_size) {
1301             buffer_size = er.encoded + 128;
1302             mdclog_write(MDCLOG_WARN, "Buffer of size %d is to small for %s. Reallocate buffer of size %d",
1303                          (int) buffer_size,
1304                          asn_DEF_E2AP_PDU.name, buffer_size);
1305             buffer_size = er.encoded + 128;
1306 //            free(buffer);
1307             continue;
1308         }
1309         buffer[er.encoded] = '\0';
1310         break;
1311     }
1312     // encode to xml
1313
1314     string res((char *)buffer);
1315     res.erase(std::remove(res.begin(), res.end(), '\n'), res.end());
1316     res.erase(std::remove(res.begin(), res.end(), '\t'), res.end());
1317     res.erase(std::remove(res.begin(), res.end(), ' '), res.end());
1318
1319 //    string res {};
1320 //    if (!functionsToAdd_v.empty() || !functionsToModified_v.empty()) {
1321 //        res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded);
1322 //    }
1323     rmr_mbuf_t *rmrMsg;
1324 //    if (res.length() == 0) {
1325 //        rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size + 256);
1326 //        rmrMsg->len = snprintf((char *) rmrMsg->payload, RECEIVE_SCTP_BUFFER_SIZE * 2, "%s:%d|%s",
1327 //                               message.peerInfo->sctpParams->myIP.c_str(),
1328 //                               message.peerInfo->sctpParams->rmrPort,
1329 //                               buffer);
1330 //    } else {
1331         rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, (int)res.length() + 256);
1332         rmrMsg->len = snprintf((char *) rmrMsg->payload, res.length() + 256, "%s:%d|%s",
1333                                message.peerInfo->sctpParams->myIP.c_str(),
1334                                message.peerInfo->sctpParams->rmrPort,
1335                                res.c_str());
1336 //    }
1337
1338     if (logLevel >= MDCLOG_DEBUG) {
1339         mdclog_write(MDCLOG_DEBUG, "Setup request of size %d :\n %s\n", rmrMsg->len, rmrMsg->payload);
1340     }
1341     // send to RMR
1342     rmrMsg->mtype = message.message.messageType;
1343     rmrMsg->state = 0;
1344     rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1345
1346     static unsigned char tx[32];
1347     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1348     rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1349
1350     rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1351     if (rmrMsg == nullptr) {
1352         mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1353     } else if (rmrMsg->state != 0) {
1354         char meid[RMR_MAX_MEID]{};
1355         if (rmrMsg->state == RMR_ERR_RETRY) {
1356             usleep(5);
1357             rmrMsg->state = 0;
1358             mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1359                          rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1360             rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1361             if (rmrMsg == nullptr) {
1362                 mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1363             } else if (rmrMsg->state != 0) {
1364                 mdclog_write(MDCLOG_ERR,
1365                              "RMR Retry failed %s sending request %d to Xapp from %s",
1366                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1367                              rmrMsg->mtype,
1368                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1369             }
1370         } else {
1371             mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1372                          translateRmrErrorMessages(rmrMsg->state).c_str(),
1373                          rmrMsg->mtype,
1374                          rmr_get_meid(rmrMsg, (unsigned char *) meid));
1375         }
1376     }
1377     message.peerInfo->gotSetup = true;
1378     buildJsonMessage(message);
1379     if (rmrMsg != nullptr) {
1380         rmr_free_msg(rmrMsg);
1381     }
1382 }
1383
1384 #if 0
1385 int RAN_Function_list_To_Vector(RANfunctions_List_t& list, vector <string> &runFunXML_v) {
1386     auto index = 0;
1387     runFunXML_v.clear();
1388     for (auto j = 0; j < list.list.count; j++) {
1389         auto *raNfunctionItemIEs = (RANfunction_ItemIEs_t *)list.list.array[j];
1390         if (raNfunctionItemIEs->id == ProtocolIE_ID_id_RANfunction_Item &&
1391             (raNfunctionItemIEs->value.present == RANfunction_ItemIEs__value_PR_RANfunction_Item)) {
1392             // encode to xml
1393             E2SM_gNB_NRT_RANfunction_Definition_t *ranFunDef = nullptr;
1394             auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER,
1395                                    &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1396                                    (void **)&ranFunDef,
1397                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.buf,
1398                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.size);
1399             if (rval.code != RC_OK) {
1400                 mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2SM message from : %s",
1401                              rval.code,
1402                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name);
1403                 return -1;
1404             }
1405
1406             auto xml_buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1407             unsigned char xml_buffer[RECEIVE_SCTP_BUFFER_SIZE * 2];
1408             memset(xml_buffer, 0, RECEIVE_SCTP_BUFFER_SIZE * 2);
1409             // encode to xml
1410             auto er = asn_encode_to_buffer(nullptr,
1411                                            ATS_BASIC_XER,
1412                                            &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1413                                            ranFunDef,
1414                                            xml_buffer,
1415                                            xml_buffer_size);
1416             if (er.encoded == -1) {
1417                 mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s",
1418                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1419                              strerror(errno));
1420             } else if (er.encoded > (ssize_t)xml_buffer_size) {
1421                 mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1422                              (int) xml_buffer_size,
1423                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name, __func__, __LINE__);
1424             } else {
1425                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1426                     mdclog_write(MDCLOG_DEBUG, "Encoding E2SM %s PDU number %d : %s",
1427                                  asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1428                                  index++,
1429                                  xml_buffer);
1430                 }
1431
1432                 string runFuncs = (char *)(xml_buffer);
1433                 runFunXML_v.emplace_back(runFuncs);
1434             }
1435         }
1436     }
1437     return 0;
1438 }
1439
1440 int collectServiceUpdate_RequestData(E2AP_PDU_t *pdu,
1441                                      Sctp_Map_t *sctpMap,
1442                                      ReportingMessages_t &message,
1443                                      vector <string> &RANfunctionsAdded_v,
1444                                      vector <string> &RANfunctionsModified_v) {
1445     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1446     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.count; i++) {
1447         auto *ie = pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.array[i];
1448         if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1449             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctionsID_List) {
1450                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1451                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1452                                  ie->value.choice.RANfunctions_List.list.count);
1453                 }
1454                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1455                     return -1;
1456                 }
1457             }
1458         } else if (ie->id == ProtocolIE_ID_id_RANfunctionsModified) {
1459             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctions_List) {
1460                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1461                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1462                                  ie->value.choice.RANfunctions_List.list.count);
1463                 }
1464                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsModified_v) != 0 ) {
1465                     return -1;
1466                 }
1467             }
1468         }
1469     }
1470     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1471         mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1472                      RANfunctionsAdded_v.size());
1473     }
1474     return 0;
1475 }
1476
1477 #endif
1478
1479
1480 void buildPrometheuslist(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFamily) {
1481     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}});
1482     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}});
1483
1484     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Messages"}});
1485     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Bytes"}});
1486
1487     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICindication - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Messages"}});
1488     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICindication - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Bytes"}});
1489
1490     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_Reset - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Messages"}});
1491     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Bytes"}});
1492
1493     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Messages"}});
1494     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Bytes"}});
1495     // ---------------------------------------------
1496     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Messages"}});
1497     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Bytes"}});
1498
1499     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Messages"}});
1500     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Bytes"}});
1501
1502     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Messages"}});
1503     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Bytes"}});
1504
1505     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Messages"}});
1506     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Bytes"}});
1507     //-------------------------------------------------------------
1508
1509     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Messages"}});
1510     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Bytes"}});
1511
1512     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Messages"}});
1513     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Bytes"}});
1514
1515     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Messages"}});
1516     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Bytes"}});
1517
1518     //====================================================================================
1519     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Messages"}});
1520     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Bytes"}});
1521
1522     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_Reset - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Messages"}});
1523     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Bytes"}});
1524
1525     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICcontrol - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Messages"}});
1526     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICcontrol - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Bytes"}});
1527
1528     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceQuery - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Messages"}});
1529     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceQuery - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Bytes"}});
1530
1531     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscription - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Messages"}});
1532     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscription - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Bytes"}});
1533
1534     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Messages"}});
1535     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Bytes"}});
1536     //---------------------------------------------------------------------------------------------------------
1537     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Messages"}});
1538     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Bytes"}});
1539
1540     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Messages"}});
1541     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Bytes"}});
1542
1543     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Messages"}});
1544     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Bytes"}});
1545     //----------------------------------------------------------------------------------------------------------------
1546     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Messages"}});
1547     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Bytes"}});
1548
1549     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Messages"}});
1550     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Bytes"}});
1551 }
1552 /**
1553  *
1554  * @param pdu
1555  * @param sctpMap
1556  * @param message
1557  * @param RANfunctionsAdded_v
1558  * @return
1559  */
1560 int collectSetupRequestData(E2AP_PDU_t *pdu,
1561                                      Sctp_Map_t *sctpMap,
1562                                      ReportingMessages_t &message /*, vector <string> &RANfunctionsAdded_v*/) {
1563     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1564     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1565         auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1566         if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1567             // get the ran name for meid
1568             if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1569                 if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1570                     mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1571                     // no mesage will be sent
1572                     return -1;
1573                 }
1574
1575                 memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1576                 sctpMap->setkey(message.message.enodbName, message.peerInfo);
1577             }
1578         } /*else if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1579             if (ie->value.present == E2setupRequestIEs__value_PR_RANfunctions_List) {
1580                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1581                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1582                                  ie->value.choice.RANfunctions_List.list.count);
1583                 }
1584                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1585                     return -1;
1586                 }
1587             }
1588         } */
1589     }
1590 //    if (mdclog_level_get() >= MDCLOG_DEBUG) {
1591 //        mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1592 //                     RANfunctionsAdded_v.size());
1593 //    }
1594     return 0;
1595 }
1596
1597 int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1598     E2AP_PDU_t *pdu = nullptr;
1599
1600     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1601         mdclog_write(MDCLOG_DEBUG, "got PER message of size %d is:%s",
1602                      rmrMessageBuffer.sendMessage->len, rmrMessageBuffer.sendMessage->payload);
1603     }
1604     auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1605                            rmrMessageBuffer.sendMessage->payload, rmrMessageBuffer.sendMessage->len);
1606     if (rval.code != RC_OK) {
1607         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
1608                      rval.code,
1609                      message.message.enodbName);
1610         return -1;
1611     }
1612
1613     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
1614     auto er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu,
1615                                    rmrMessageBuffer.sendMessage->payload, buff_size);
1616     if (er.encoded == -1) {
1617         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1618         return -1;
1619     } else if (er.encoded > (ssize_t)buff_size) {
1620         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1621                      (int)rmrMessageBuffer.sendMessage->len,
1622                      asn_DEF_E2AP_PDU.name,
1623                      __func__,
1624                      __LINE__);
1625         return -1;
1626     }
1627     rmrMessageBuffer.sendMessage->len = er.encoded;
1628     return 0;
1629
1630 }
1631
1632 /**
1633  *
1634  * @param pdu
1635  * @param message
1636  * @param rmrMessageBuffer
1637  */
1638 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1639                           Sctp_Map_t *sctpMap,
1640                           ReportingMessages_t &message,
1641                           RmrMessagesBuffer_t &rmrMessageBuffer) {
1642     auto logLevel = mdclog_level_get();
1643     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1644     if (logLevel >= MDCLOG_DEBUG) {
1645         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1646     }
1647     switch (procedureCode) {
1648         case ProcedureCode_id_E2setup: {
1649             if (logLevel >= MDCLOG_DEBUG) {
1650                 mdclog_write(MDCLOG_DEBUG, "Got E2setup");
1651             }
1652
1653 //            vector <string> RANfunctionsAdded_v;
1654 //            vector <string> RANfunctionsModified_v;
1655 //            RANfunctionsAdded_v.clear();
1656 //            RANfunctionsModified_v.clear();
1657             if (collectSetupRequestData(pdu, sctpMap, message) != 0) {
1658                 break;
1659             }
1660
1661             buildPrometheuslist(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily);
1662
1663             string messageName("E2setupRequest");
1664             string ieName("E2setupRequestIEs");
1665             message.message.messageType = RIC_E2_SETUP_REQ;
1666             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment();
1667             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1668             buildAndsendSetupRequest(message, rmrMessageBuffer, pdu);
1669             break;
1670         }
1671         case ProcedureCode_id_RICserviceUpdate: {
1672             if (logLevel >= MDCLOG_DEBUG) {
1673                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1674             }
1675 //            vector <string> RANfunctionsAdded_v;
1676 //            vector <string> RANfunctionsModified_v;
1677 //            RANfunctionsAdded_v.clear();
1678 //            RANfunctionsModified_v.clear();
1679 //            if (collectServiceUpdate_RequestData(pdu, sctpMap, message,
1680 //                                                 RANfunctionsAdded_v, RANfunctionsModified_v) != 0) {
1681 //                break;
1682 //            }
1683
1684             string messageName("RICserviceUpdate");
1685             string ieName("RICserviceUpdateIEs");
1686             message.message.messageType = RIC_SERVICE_UPDATE;
1687             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment();
1688             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1689
1690             buildAndsendSetupRequest(message, rmrMessageBuffer, pdu);
1691             break;
1692         }
1693         case ProcedureCode_id_ErrorIndication: {
1694             if (logLevel >= MDCLOG_DEBUG) {
1695                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1696             }
1697             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment();
1698             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1699             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
1700                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1701             }
1702             break;
1703         }
1704         case ProcedureCode_id_Reset: {
1705             if (logLevel >= MDCLOG_DEBUG) {
1706                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1707             }
1708
1709             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment();
1710             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1711             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
1712                 break;
1713             }
1714
1715             if (sendRequestToXapp(message, RIC_E2_RESET_REQ, rmrMessageBuffer) != 0) {
1716                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_REQ message failed to send to xAPP");
1717             }
1718             break;
1719         }
1720         case ProcedureCode_id_RICindication: {
1721             if (logLevel >= MDCLOG_DEBUG) {
1722                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1723             }
1724             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1725                 auto messageSent = false;
1726                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1727                 if (logLevel >= MDCLOG_DEBUG) {
1728                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1729                 }
1730                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1731                     if (logLevel >= MDCLOG_DEBUG) {
1732                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1733                     }
1734                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1735                         static unsigned char tx[32];
1736                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1737                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1738                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1739                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1740                                        (unsigned char *)message.message.enodbName,
1741                                        strlen(message.message.enodbName));
1742                         rmrMessageBuffer.sendMessage->state = 0;
1743                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1744
1745                         //ie->value.choice.RICrequestID.ricInstanceID;
1746                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1747                             mdclog_write(MDCLOG_DEBUG, "sub id = %d, mtype = %d, ric instance id %ld, requestor id = %ld",
1748                                          rmrMessageBuffer.sendMessage->sub_id,
1749                                          rmrMessageBuffer.sendMessage->mtype,
1750                                          ie->value.choice.RICrequestID.ricInstanceID,
1751                                          ie->value.choice.RICrequestID.ricRequestorID);
1752                         }
1753                         message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication - 1]->Increment();
1754                         message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1755                         sendRmrMessage(rmrMessageBuffer, message);
1756                         messageSent = true;
1757                     } else {
1758                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1759                     }
1760                 }
1761                 if (messageSent) {
1762                     break;
1763                 }
1764             }
1765             break;
1766         }
1767         default: {
1768             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1769             message.message.messageType = 0; // no RMR message type yet
1770
1771             buildJsonMessage(message);
1772
1773             break;
1774         }
1775     }
1776 }
1777
1778 /**
1779  *
1780  * @param pdu
1781  * @param message
1782  * @param rmrMessageBuffer
1783  */
1784 void asnSuccsesfulMsg(E2AP_PDU_t *pdu,
1785                       Sctp_Map_t *sctpMap,
1786                       ReportingMessages_t &message,
1787                       RmrMessagesBuffer_t &rmrMessageBuffer) {
1788     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1789     auto logLevel = mdclog_level_get();
1790     if (logLevel >= MDCLOG_INFO) {
1791         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1792     }
1793     switch (procedureCode) {
1794         case ProcedureCode_id_Reset: {
1795             if (logLevel >= MDCLOG_DEBUG) {
1796                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1797             }
1798             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment();
1799             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1800             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
1801                 break;
1802             }
1803             if (sendRequestToXapp(message, RIC_E2_RESET_RESP, rmrMessageBuffer) != 0) {
1804                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_RESP message failed to send to xAPP");
1805             }
1806             break;
1807         }
1808         case ProcedureCode_id_RICcontrol: {
1809             if (logLevel >= MDCLOG_DEBUG) {
1810                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1811             }
1812             for (auto i = 0;
1813                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1814                 auto messageSent = false;
1815                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1816                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1817                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1818                 }
1819                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1820                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1821                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1822                     }
1823                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1824                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1825                         rmrMessageBuffer.sendMessage->state = 0;
1826 //                        rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1827                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1828
1829                         static unsigned char tx[32];
1830                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1831                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1832                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1833                                        (unsigned char *)message.message.enodbName,
1834                                        strlen(message.message.enodbName));
1835
1836                         message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment();
1837                         message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1838                         sendRmrMessage(rmrMessageBuffer, message);
1839                         messageSent = true;
1840                     } else {
1841                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1842                     }
1843                 }
1844                 if (messageSent) {
1845                     break;
1846                 }
1847             }
1848
1849             break;
1850         }
1851         case ProcedureCode_id_RICsubscription: {
1852             if (logLevel >= MDCLOG_DEBUG) {
1853                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1854             }
1855             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment();
1856             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1857             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
1858                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1859             }
1860             break;
1861         }
1862         case ProcedureCode_id_RICsubscriptionDelete: {
1863             if (logLevel >= MDCLOG_DEBUG) {
1864                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1865             }
1866             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment();
1867             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1868             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
1869                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1870             }
1871             break;
1872         }
1873         default: {
1874             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1875             message.message.messageType = 0; // no RMR message type yet
1876             buildJsonMessage(message);
1877
1878             break;
1879         }
1880     }
1881 }
1882
1883 /**
1884  *
1885  * @param pdu
1886  * @param message
1887  * @param rmrMessageBuffer
1888  */
1889 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1890                         Sctp_Map_t *sctpMap,
1891                         ReportingMessages_t &message,
1892                         RmrMessagesBuffer_t &rmrMessageBuffer) {
1893     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1894     auto logLevel = mdclog_level_get();
1895     if (logLevel >= MDCLOG_INFO) {
1896         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1897     }
1898     switch (procedureCode) {
1899         case ProcedureCode_id_RICcontrol: {
1900             if (logLevel >= MDCLOG_DEBUG) {
1901                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1902             }
1903             for (int i = 0;
1904                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1905                 auto messageSent = false;
1906                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1907                 if (logLevel >= MDCLOG_DEBUG) {
1908                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1909                 }
1910                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1911                     if (logLevel >= MDCLOG_DEBUG) {
1912                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1913                     }
1914                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1915                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1916                         rmrMessageBuffer.sendMessage->state = 0;
1917 //                        rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricRequestorID;
1918                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
1919                         static unsigned char tx[32];
1920                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1921                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1922                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
1923                                        strlen(message.message.enodbName));
1924                         message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment();
1925                         message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1926                         sendRmrMessage(rmrMessageBuffer, message);
1927                         messageSent = true;
1928                     } else {
1929                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1930                     }
1931                 }
1932                 if (messageSent) {
1933                     break;
1934                 }
1935             }
1936             break;
1937         }
1938         case ProcedureCode_id_RICsubscription: {
1939             if (logLevel >= MDCLOG_DEBUG) {
1940                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1941             }
1942             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment();
1943             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1944             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1945                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1946             }
1947             break;
1948         }
1949         case ProcedureCode_id_RICsubscriptionDelete: {
1950             if (logLevel >= MDCLOG_DEBUG) {
1951                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1952             }
1953             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment();
1954             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
1955             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
1956                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1957             }
1958             break;
1959         }
1960         default: {
1961             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1962             message.message.messageType = 0; // no RMR message type yet
1963
1964             buildJsonMessage(message);
1965
1966             break;
1967         }
1968     }
1969 }
1970
1971 /**
1972  *
1973  * @param message
1974  * @param requestId
1975  * @param rmrMmessageBuffer
1976  * @return
1977  */
1978 int sendRequestToXapp(ReportingMessages_t &message,
1979                       int requestId,
1980                       RmrMessagesBuffer_t &rmrMmessageBuffer) {
1981     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1982                    (unsigned char *)message.message.enodbName,
1983                    strlen(message.message.enodbName));
1984     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1985     rmrMmessageBuffer.sendMessage->state = 0;
1986     static unsigned char tx[32];
1987     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1988     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1989
1990     auto rc = sendRmrMessage(rmrMmessageBuffer, message);
1991     return rc;
1992 }
1993
1994 /**
1995  *
1996  * @param pSctpParams
1997  */
1998 void getRmrContext(sctp_params_t &pSctpParams) {
1999     pSctpParams.rmrCtx = nullptr;
2000     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RECEIVE_XAPP_BUFFER_SIZE, RMRFL_NONE);
2001     if (pSctpParams.rmrCtx == nullptr) {
2002         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
2003         return;
2004     }
2005
2006     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
2007     // we need to find that routing table exist and we can run
2008     if (mdclog_level_get() >= MDCLOG_INFO) {
2009         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
2010     }
2011     int rmrReady = 0;
2012     int count = 0;
2013     while (!rmrReady) {
2014         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
2015             sleep(1);
2016         }
2017         count++;
2018         if (count % 60 == 0) {
2019             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
2020         }
2021     }
2022     if (mdclog_level_get() >= MDCLOG_INFO) {
2023         mdclog_write(MDCLOG_INFO, "RMR running");
2024     }
2025     rmr_init_trace(pSctpParams.rmrCtx, 200);
2026     // get the RMR fd for the epoll
2027     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
2028     struct epoll_event event{};
2029     // add RMR fd to epoll
2030     event.events = (EPOLLIN);
2031     event.data.fd = pSctpParams.rmrListenFd;
2032     // add listening RMR FD to epoll
2033     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
2034         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
2035         close(pSctpParams.rmrListenFd);
2036         rmr_close(pSctpParams.rmrCtx);
2037         pSctpParams.rmrCtx = nullptr;
2038     }
2039 }
2040
2041 /**
2042  *
2043  * @param message
2044  * @param rmrMessageBuffer
2045  * @return
2046  */
2047 int PER_FromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
2048     E2AP_PDU_t *pdu = nullptr;
2049
2050     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2051         mdclog_write(MDCLOG_DEBUG, "got xml Format  data from xApp of size %d is:%s",
2052                 rmrMessageBuffer.rcvMessage->len, rmrMessageBuffer.rcvMessage->payload);
2053     }
2054     auto rval = asn_decode(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, (void **) &pdu,
2055                            rmrMessageBuffer.rcvMessage->payload, rmrMessageBuffer.rcvMessage->len);
2056     if (rval.code != RC_OK) {
2057         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
2058                      rval.code,
2059                      message.message.enodbName);
2060         return -1;
2061     }
2062
2063     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
2064     auto er = asn_encode_to_buffer(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, pdu,
2065                                    rmrMessageBuffer.rcvMessage->payload, buff_size);
2066     if (er.encoded == -1) {
2067         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
2068         return -1;
2069     } else if (er.encoded > (ssize_t)buff_size) {
2070         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
2071                      (int)rmrMessageBuffer.rcvMessage->len,
2072                      asn_DEF_E2AP_PDU.name,
2073                      __func__,
2074                      __LINE__);
2075         return -1;
2076     }
2077     rmrMessageBuffer.rcvMessage->len = er.encoded;
2078     return 0;
2079 }
2080
2081 /**
2082  *
2083  * @param sctpMap
2084  * @param rmrMessageBuffer
2085  * @param ts
2086  * @return
2087  */
2088 int receiveXappMessages(Sctp_Map_t *sctpMap,
2089                         RmrMessagesBuffer_t &rmrMessageBuffer,
2090                         struct timespec &ts) {
2091     if (rmrMessageBuffer.rcvMessage == nullptr) {
2092         //we have error
2093         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
2094         return -1;
2095     }
2096
2097     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2098         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
2099     }
2100     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2101     if (rmrMessageBuffer.rcvMessage == nullptr) {
2102         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
2103         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2104         return -2;
2105     }
2106     ReportingMessages_t message;
2107     message.message.direction = 'D';
2108     message.message.time.tv_nsec = ts.tv_nsec;
2109     message.message.time.tv_sec = ts.tv_sec;
2110
2111     // get message payload
2112     //auto msgData = msg->payload;
2113     if (rmrMessageBuffer.rcvMessage->state != 0) {
2114         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
2115         return -1;
2116     }
2117     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName);
2118     message.peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2119     if (message.peerInfo == nullptr) {
2120         auto type = rmrMessageBuffer.rcvMessage->mtype;
2121         switch (type) {
2122             case RIC_SCTP_CLEAR_ALL:
2123             case E2_TERM_KEEP_ALIVE_REQ:
2124             case RIC_HEALTH_CHECK_REQ:
2125                 break;
2126             default:
2127                 mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2128                 return -1;
2129         }
2130     }
2131
2132     switch (rmrMessageBuffer.rcvMessage->mtype) {
2133         case RIC_E2_SETUP_RESP : {
2134             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2135                 break;
2136             }
2137             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment();
2138             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2139             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2140                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
2141                 return -6;
2142             }
2143             break;
2144         }
2145         case RIC_E2_SETUP_FAILURE : {
2146             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2147                 break;
2148             }
2149             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment();
2150             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2151             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2152                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
2153                 return -6;
2154             }
2155             break;
2156         }
2157         case RIC_ERROR_INDICATION: {
2158             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment();
2159             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2160             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2161                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
2162                 return -6;
2163             }
2164             break;
2165         }
2166         case RIC_SUB_REQ: {
2167             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment();
2168             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2169             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2170                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
2171                 return -6;
2172             }
2173             break;
2174         }
2175         case RIC_SUB_DEL_REQ: {
2176             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment();
2177             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2178             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2179                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
2180                 return -6;
2181             }
2182             break;
2183         }
2184         case RIC_CONTROL_REQ: {
2185             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment();
2186             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2187             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2188                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
2189                 return -6;
2190             }
2191             break;
2192         }
2193         case RIC_SERVICE_QUERY: {
2194             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2195                 break;
2196             }
2197             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment();
2198             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2199             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2200                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
2201                 return -6;
2202             }
2203             break;
2204         }
2205         case RIC_SERVICE_UPDATE_ACK: {
2206             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2207                 break;
2208             }
2209             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment();
2210             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2211             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2212                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
2213                 return -6;
2214             }
2215             break;
2216         }
2217         case RIC_SERVICE_UPDATE_FAILURE: {
2218             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2219                 break;
2220             }
2221             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment();
2222             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2223             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2224                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
2225                 return -6;
2226             }
2227             break;
2228         }
2229         case RIC_E2_RESET_REQ: {
2230             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2231                 break;
2232             }
2233             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment();
2234             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2235             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2236                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET");
2237                 return -6;
2238             }
2239             break;
2240         }
2241         case RIC_E2_RESET_RESP: {
2242             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2243                 break;
2244             }
2245             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment();
2246             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
2247             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2248                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP");
2249                 return -6;
2250             }
2251             break;
2252         }
2253         case RIC_SCTP_CLEAR_ALL: {
2254             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
2255             // loop on all keys and close socket and then erase all map.
2256             vector<char *> v;
2257             sctpMap->getKeys(v);
2258             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
2259                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
2260                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
2261                     if (peerInfo == nullptr) {
2262                         continue;
2263                     }
2264                     close(peerInfo->fileDescriptor);
2265                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
2266                     message.message.direction = 'D';
2267                     message.message.time.tv_nsec = ts.tv_nsec;
2268                     message.message.time.tv_sec = ts.tv_sec;
2269
2270                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2271                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2272                                      256,
2273                                      "%s|RIC_SCTP_CLEAR_ALL",
2274                                      peerInfo->enodbName);
2275                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2276                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
2277                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
2278                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
2279                     }
2280                     free(peerInfo);
2281                 }
2282             }
2283
2284             sleep(1);
2285             sctpMap->clear();
2286             break;
2287         }
2288         case E2_TERM_KEEP_ALIVE_REQ: {
2289             // send message back
2290             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2291                               (unsigned char *)rmrMessageBuffer.ka_message,
2292                               rmrMessageBuffer.ka_message_len);
2293             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
2294             rmrMessageBuffer.sendMessage->state = 0;
2295             static unsigned char tx[32];
2296             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2297             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2298             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2299             if (rmrMessageBuffer.sendMessage == nullptr) {
2300                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2301                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
2302             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
2303                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
2304                              rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
2305             } else if (mdclog_level_get() >= MDCLOG_DEBUG) {
2306                 mdclog_write(MDCLOG_DEBUG, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
2307             }
2308
2309             break;
2310         }
2311         case RIC_HEALTH_CHECK_REQ: {
2312             // send message back
2313             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2314                               (unsigned char *)"OK",
2315                               2);
2316             rmrMessageBuffer.sendMessage->mtype = RIC_HEALTH_CHECK_RESP;
2317             rmrMessageBuffer.sendMessage->state = 0;
2318             static unsigned char tx[32];
2319             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2320             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2321             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2322             if (rmrMessageBuffer.sendMessage == nullptr) {
2323                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2324                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP RMR message returned NULL");
2325             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
2326                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP, on RMR state = %d ( %s)",
2327                              rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
2328             } else if (mdclog_level_get() >= MDCLOG_DEBUG) {
2329                 mdclog_write(MDCLOG_DEBUG, "Got RIC_HEALTH_CHECK_REQ Request send : OK");
2330             }
2331
2332             break;
2333         }
2334
2335         default:
2336             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
2337             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2338             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2339             message.message.time.tv_nsec = ts.tv_nsec;
2340             message.message.time.tv_sec = ts.tv_sec;
2341             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2342
2343             buildJsonMessage(message);
2344
2345
2346             return -7;
2347     }
2348     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2349         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2350     }
2351     return 0;
2352 }
2353
2354 /**
2355  * Send message to the CU that is not expecting for successful or unsuccessful results
2356  * @param messageBuffer
2357  * @param message
2358  * @param failedMsgId
2359  * @param sctpMap
2360  * @return
2361  */
2362 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2363                            ReportingMessages_t &message,
2364                            int failedMsgId,
2365                            Sctp_Map_t *sctpMap) {
2366
2367     getRequestMetaData(message, messageBuffer);
2368     if (mdclog_level_get() >= MDCLOG_INFO) {
2369         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2370     }
2371
2372     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId);
2373     return rc;
2374 }
2375
2376 /**
2377  *
2378  * @param sctpMap
2379  * @param messageBuffer
2380  * @param message
2381  * @param failedMesgId
2382  * @return
2383  */
2384 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2385                     RmrMessagesBuffer_t &messageBuffer,
2386                     ReportingMessages_t &message,
2387                     int failedMesgId) {
2388     // get the FD
2389     message.message.messageType = messageBuffer.rcvMessage->mtype;
2390     auto rc = sendSctpMsg(message.peerInfo, message, sctpMap);
2391     return rc;
2392 }
2393
2394 /**
2395  *
2396  * @param rmrCtx the rmr context to send and receive
2397  * @param msg the msg we got fromxApp
2398  * @param metaData data from xApp in ordered struct
2399  * @param failedMesgId the return message type error
2400  */
2401 void
2402 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId) {
2403     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
2404     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
2405                         message.message.enodbName);
2406     if (mdclog_level_get() >= MDCLOG_INFO) {
2407         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
2408     }
2409     msg->mtype = failedMesgId;
2410     msg->state = 0;
2411
2412     static unsigned char tx[32];
2413     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2414     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
2415
2416     sendRmrMessage(rmrMessageBuffer, message);
2417 }
2418
2419
2420
2421 /**
2422  *
2423  * @param epoll_fd
2424  * @param peerInfo
2425  * @param events
2426  * @param sctpMap
2427  * @param enodbName
2428  * @param msgType
2429  * @return
2430  */
2431 int addToEpoll(int epoll_fd,
2432                ConnectedCU_t *peerInfo,
2433                uint32_t events,
2434                Sctp_Map_t *sctpMap,
2435                char *enodbName,
2436                int msgType) {
2437     // Add to Epol
2438     struct epoll_event event{};
2439     event.data.ptr = peerInfo;
2440     event.events = events;
2441     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2442         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2443             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2444                          strerror(errno), __func__, __LINE__);
2445         }
2446         close(peerInfo->fileDescriptor);
2447         if (enodbName != nullptr) {
2448             cleanHashEntry(peerInfo, sctpMap);
2449             char key[MAX_ENODB_NAME_SIZE * 2];
2450             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2451             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2452                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2453             }
2454             auto tmp = sctpMap->find(key);
2455             if (tmp) {
2456                 free(tmp);
2457                 sctpMap->erase(key);
2458             }
2459         } else {
2460             peerInfo->enodbName[0] = 0;
2461         }
2462         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2463         return -1;
2464     }
2465     return 0;
2466 }
2467
2468 /**
2469  *
2470  * @param epoll_fd
2471  * @param peerInfo
2472  * @param events
2473  * @param sctpMap
2474  * @param enodbName
2475  * @param msgType
2476  * @return
2477  */
2478 int modifyToEpoll(int epoll_fd,
2479                   ConnectedCU_t *peerInfo,
2480                   uint32_t events,
2481                   Sctp_Map_t *sctpMap,
2482                   char *enodbName,
2483                   int msgType) {
2484     // Add to Epol
2485     struct epoll_event event{};
2486     event.data.ptr = peerInfo;
2487     event.events = events;
2488     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2489         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2490             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2491                          strerror(errno), __func__, __LINE__);
2492         }
2493         close(peerInfo->fileDescriptor);
2494         cleanHashEntry(peerInfo, sctpMap);
2495         char key[MAX_ENODB_NAME_SIZE * 2];
2496         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2497         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2498             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2499         }
2500         auto tmp = sctpMap->find(key);
2501         if (tmp) {
2502             free(tmp);
2503         }
2504         sctpMap->erase(key);
2505         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2506         return -1;
2507     }
2508     return 0;
2509 }
2510
2511
2512 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message) {
2513     buildJsonMessage(message);
2514
2515     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2516
2517     if (rmrMessageBuffer.sendMessage == nullptr) {
2518         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2519         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2520         return -1;
2521     }
2522
2523     if (rmrMessageBuffer.sendMessage->state != 0) {
2524         char meid[RMR_MAX_MEID]{};
2525         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2526             usleep(5);
2527             rmrMessageBuffer.sendMessage->state = 0;
2528             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2529                          rmrMessageBuffer.sendMessage->mtype,
2530                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2531             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2532             if (rmrMessageBuffer.sendMessage == nullptr) {
2533                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2534                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2535                 return -1;
2536             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2537                 mdclog_write(MDCLOG_ERR,
2538                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2539                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2540                              rmrMessageBuffer.sendMessage->mtype,
2541                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2542                 auto rc = rmrMessageBuffer.sendMessage->state;
2543                 return rc;
2544             }
2545         } else {
2546             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2547                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2548                          rmrMessageBuffer.sendMessage->mtype,
2549                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2550             return rmrMessageBuffer.sendMessage->state;
2551         }
2552     }
2553     return 0;
2554 }
2555
2556 void buildJsonMessage(ReportingMessages_t &message) {
2557     if (jsonTrace) {
2558         message.outLen = sizeof(message.base64Data);
2559         base64::encode((const unsigned char *) message.message.asndata,
2560                        (const int) message.message.asnLength,
2561                        message.base64Data,
2562                        message.outLen);
2563         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2564             mdclog_write(MDCLOG_DEBUG, "Tracing: ASN length = %d, base64 message length = %d ",
2565                          (int) message.message.asnLength,
2566                          (int) message.outLen);
2567         }
2568
2569         snprintf(message.buffer, sizeof(message.buffer),
2570                  "{\"header\": {\"ts\": \"%ld.%09ld\","
2571                  "\"ranName\": \"%s\","
2572                  "\"messageType\": %d,"
2573                  "\"direction\": \"%c\"},"
2574                  "\"base64Length\": %d,"
2575                  "\"asnBase64\": \"%s\"}",
2576                  message.message.time.tv_sec,
2577                  message.message.time.tv_nsec,
2578                  message.message.enodbName,
2579                  message.message.messageType,
2580                  message.message.direction,
2581                  (int) message.outLen,
2582                  message.base64Data);
2583         static src::logger_mt &lg = my_logger::get();
2584
2585         BOOST_LOG(lg) << message.buffer;
2586     }
2587 }
2588
2589
2590 /**
2591  * take RMR error code to string
2592  * @param state
2593  * @return
2594  */
2595 string translateRmrErrorMessages(int state) {
2596     string str = {};
2597     switch (state) {
2598         case RMR_OK:
2599             str = "RMR_OK - state is good";
2600             break;
2601         case RMR_ERR_BADARG:
2602             str = "RMR_ERR_BADARG - argument passd to function was unusable";
2603             break;
2604         case RMR_ERR_NOENDPT:
2605             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2606             break;
2607         case RMR_ERR_EMPTY:
2608             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2609             break;
2610         case RMR_ERR_NOHDR:
2611             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2612             break;
2613         case RMR_ERR_SENDFAILED:
2614             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2615             break;
2616         case RMR_ERR_CALLFAILED:
2617             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2618             break;
2619         case RMR_ERR_NOWHOPEN:
2620             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2621             break;
2622         case RMR_ERR_WHID:
2623             str = "RMR_ERR_WHID - wormhole id was invalid";
2624             break;
2625         case RMR_ERR_OVERFLOW:
2626             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2627             break;
2628         case RMR_ERR_RETRY:
2629             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2630             break;
2631         case RMR_ERR_RCVFAILED:
2632             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2633             break;
2634         case RMR_ERR_TIMEOUT:
2635             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2636             break;
2637         case RMR_ERR_UNSET:
2638             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2639             break;
2640         case RMR_ERR_TRUNC:
2641             str = "RMR_ERR_TRUNC - received message likely truncated";
2642             break;
2643         case RMR_ERR_INITFAILED:
2644             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2645             break;
2646         case RMR_ERR_NOTSUPP:
2647             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2648             break;
2649         default:
2650             char buf[128]{};
2651             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2652             str = buf;
2653             break;
2654     }
2655     return str;
2656 }
2657
2658