version 4.0.0 first support for new E2 ASN
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include "sctpThread.h"
24 #include "BuildRunName.h"
25
26 using namespace std;
27 //using namespace std::placeholders;
28 using namespace boost::filesystem;
29
30 #ifdef __TRACING__
31 using namespace opentracing;
32 #endif
33 //#ifdef __cplusplus
34 //extern "C"
35 //{
36 //#endif
37
38 // need to expose without the include of gcov
39 extern "C" void __gcov_flush(void);
40
41 static void catch_function(int signal) {
42     __gcov_flush();
43     exit(signal);
44 }
45
46
47 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
48
49 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
50 double cpuClock = 0.0;
51 bool jsonTrace = true;
52
53 void init_log() {
54     mdclog_attr_t *attr;
55     mdclog_attr_init(&attr);
56     mdclog_attr_set_ident(attr, "E2Terminator");
57     mdclog_init(attr);
58     mdclog_attr_destroy(attr);
59 }
60 auto start_time = std::chrono::high_resolution_clock::now();
61 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
62
63 double age() {
64     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
65 }
66
67 double approx_CPU_MHz(unsigned sleeptime) {
68     using namespace std::chrono_literals;
69     uint32_t aux = 0;
70     uint64_t cycles_start = rdtscp(aux);
71     double time_start = age();
72     std::this_thread::sleep_for(sleeptime * 1ms);
73     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
74     double elapsed_time = age() - time_start;
75     return elapsed_cycles / elapsed_time;
76 }
77
78 //std::atomic<int64_t> rmrCounter{0};
79 std::atomic<int64_t> num_of_messages{0};
80 std::atomic<int64_t> num_of_XAPP_messages{0};
81 static long transactionCounter = 0;
82
83 int buildListeningPort(sctp_params_t &sctpParams) {
84     sctpParams.listenFD = socket (AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
85     struct sockaddr_in6 servaddr {};
86     servaddr.sin6_family = AF_INET6;
87     servaddr.sin6_addr   = in6addr_any;
88     servaddr.sin6_port = htons(SRC_PORT);
89     if (bind(sctpParams.listenFD, (SA *)&servaddr, sizeof(servaddr)) < 0 ) {
90         mdclog_write(MDCLOG_ERR, "Error binding. %s\n", strerror(errno));
91         return -1;
92     }
93     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
94         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
95         return -1;
96     }
97     if (mdclog_level_get() >= MDCLOG_DEBUG) {
98         struct sockaddr_in6 cliaddr {};
99         socklen_t len = sizeof(cliaddr);
100         getsockname(sctpParams.listenFD, (SA *)&cliaddr, &len);
101         char buff[1024] {};
102         inet_ntop(AF_INET6, &cliaddr.sin6_addr, buff, sizeof(buff));
103         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(cliaddr.sin6_port));
104     }
105
106     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
107         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
108         return -1;
109     }
110     struct epoll_event event {};
111     event.events = EPOLLIN | EPOLLET;
112     event.data.fd = sctpParams.listenFD;
113
114     // add listening port to epoll
115     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
116         printf("Failed to add descriptor to epoll\n");
117         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
118         return -1;
119     }
120
121     return 0;
122 }
123
124 int buildConfiguration(sctp_params_t &sctpParams) {
125     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
126     if (exists(p)) {
127         const int size = 2048;
128         auto fileSize = file_size(p);
129         if (fileSize > size) {
130             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
131             return -1;
132         }
133     } else {
134         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
135         return -1;
136     }
137
138     ReadConfigFile conf;
139     if (conf.openConfigFile(p.string()) == -1) {
140         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
141                      p.string().c_str(), strerror(errno));
142         return -1;
143     }
144     int rmrPort = conf.getIntValue("nano");
145     if (rmrPort == -1) {
146         mdclog_write(MDCLOG_ERR, "illigal RMR port ");
147         return -1;
148     }
149     sctpParams.rmrPort = (uint16_t)rmrPort;
150     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
151
152     auto tmpStr = conf.getStringValue("loglevel");
153     if (tmpStr.length() == 0) {
154         mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
155         tmpStr = "info";
156     }
157     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
158
159     if ((tmpStr.compare("debug")) == 0) {
160         sctpParams.logLevel = MDCLOG_DEBUG;
161     } else if ((tmpStr.compare("info")) == 0) {
162         sctpParams.logLevel = MDCLOG_INFO;
163     } else if ((tmpStr.compare("warning")) == 0) {
164         sctpParams.logLevel = MDCLOG_WARN;
165     } else if ((tmpStr.compare("error")) == 0) {
166         sctpParams.logLevel = MDCLOG_ERR;
167     } else {
168         mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
169         sctpParams.logLevel = MDCLOG_INFO;
170     }
171     mdclog_level_set(sctpParams.logLevel);
172
173     tmpStr = conf.getStringValue("volume");
174     if (tmpStr.length() == 0) {
175         mdclog_write(MDCLOG_ERR, "illigal volume.");
176         return -1;
177     }
178
179     char tmpLogFilespec[VOLUME_URL_SIZE];
180     tmpLogFilespec[0] = 0;
181     sctpParams.volume[0] = 0;
182     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
183     // copy the name to temp file as well
184     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
185
186
187     // define the file name in the tmp directory under the volume
188     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
189
190 //    std::string localIP = conf.getStringValue("local-ip");
191 //    if (localIP.length() == 0) {
192 //        mdclog_write(MDCLOG_ERR, "illigal local-ip. environment variable");
193 //        exit(-1);
194 //    }
195
196     //sctpParams.myIP.assign(getenv(localIP.c_str()));
197     sctpParams.myIP = conf.getStringValue("local-ip");
198     if (sctpParams.myIP.length() == 0) {
199         mdclog_write(MDCLOG_ERR, "illigal local-ip.");
200         return -1;
201     }
202
203     sctpParams.fqdn = conf.getStringValue("external-fqdn");
204     if (sctpParams.fqdn.length() == 0) {
205         mdclog_write(MDCLOG_ERR, "illigal external-fqdn");
206         return -1;
207     }
208
209     std::string pod = conf.getStringValue("pod_name");
210     if (pod.length() == 0) {
211         mdclog_write(MDCLOG_ERR, "illigal pod_name in config file");
212         return -1;
213     }
214     auto *podName = getenv(pod.c_str());
215     if (podName == nullptr) {
216         mdclog_write(MDCLOG_ERR, "illigal pod_name or environment varible not exists : %s", pod.c_str());
217         return -1;
218
219     } else {
220         sctpParams.podName.assign(podName);
221         if (sctpParams.podName.length() == 0) {
222             mdclog_write(MDCLOG_ERR, "illigal pod_name");
223             return -1;
224         }
225     }
226
227     tmpStr = conf.getStringValue("trace");
228     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
229     if ((tmpStr.compare("start")) == 0) {
230         mdclog_write(MDCLOG_INFO, "Trace set to: start");
231         sctpParams.trace = true;
232     } else if ((tmpStr.compare("stop")) == 0) {
233         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
234         sctpParams.trace = false;
235     }
236     jsonTrace = sctpParams.trace;
237
238     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, 4096, "{\"address\": \"%s:%d\","
239                                                                          "\"fqdn\": \"%s\","
240                                                                          "\"pod_name\": \"%s\"}",
241                                             (const char *)sctpParams.myIP.c_str(),
242                                             sctpParams.rmrPort,
243                                             sctpParams.fqdn.c_str(),
244                                             sctpParams.podName.c_str());
245
246     if (mdclog_level_get() >= MDCLOG_INFO) {
247         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
248         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
249         mdclog_mdc_add("volume", sctpParams.volume);
250         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
251         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
252         mdclog_mdc_add("pod name", sctpParams.podName.c_str());
253
254         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
255     }
256     mdclog_mdc_clean();
257
258     // Files written to the current working directory
259     boostLogger = logging::add_file_log(
260             keywords::file_name = tmpLogFilespec, // to temp directory
261             keywords::rotation_size = 10 * 1024 * 1024,
262             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
263             keywords::format = "%Message%"
264             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
265     );
266
267     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
268     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
269             keywords::target = sctpParams.volume
270     ));
271
272     // Upon restart, scan the directory for files matching the file_name pattern
273     boostLogger->locked_backend()->scan_for_files();
274
275     // Enable auto-flushing after each tmpStr record written
276     if (mdclog_level_get() >= MDCLOG_DEBUG) {
277         boostLogger->locked_backend()->auto_flush(true);
278     }
279
280     return 0;
281 }
282
283 int main(const int argc, char **argv) {
284     sctp_params_t sctpParams;
285
286 #ifdef __TRACING__
287     opentracing::Tracer::InitGlobal(tracelibcpp::createTracer("E2 Terminator"));
288     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
289 #else
290     otSpan span = 0;
291 #endif
292
293     {
294         std::random_device device{};
295         std::mt19937 generator(device());
296         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
297         transactionCounter = distribution(generator);
298     }
299
300 //    uint64_t st = 0;
301 //    uint32_t aux1 = 0;
302 //   st = rdtscp(aux1);
303
304     unsigned num_cpus = std::thread::hardware_concurrency();
305     init_log();
306     mdclog_level_set(MDCLOG_INFO);
307
308     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
309         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
310         exit(1);
311     }
312     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
313         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
314         exit(1);
315     }
316     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
317         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
318         exit(1);
319     }
320
321     cpuClock = approx_CPU_MHz(100);
322
323     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
324
325     auto result = parse(argc, argv, sctpParams);
326
327     if (buildConfiguration(sctpParams) != 0) {
328         exit(-1);
329     }
330
331     // start epoll
332     sctpParams.epoll_fd = epoll_create1(0);
333     if (sctpParams.epoll_fd == -1) {
334         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
335         exit(-1);
336     }
337
338     getRmrContext(sctpParams, &span);
339     if (sctpParams.rmrCtx == nullptr) {
340         close(sctpParams.epoll_fd);
341         exit(-1);
342     }
343
344     if (buildInotify(sctpParams) == -1) {
345         close(sctpParams.rmrListenFd);
346         rmr_close(sctpParams.rmrCtx);
347         close(sctpParams.epoll_fd);
348         exit(-1);
349     }
350
351     if (buildListeningPort(sctpParams) != 0) {
352         close(sctpParams.rmrListenFd);
353         rmr_close(sctpParams.rmrCtx);
354         close(sctpParams.epoll_fd);
355         exit(-1);
356     }
357
358     sctpParams.sctpMap = new mapWrapper();
359
360     std::vector<std::thread> threads(num_cpus);
361 //    std::vector<std::thread> threads;
362
363     num_cpus = 1;
364     for (unsigned int i = 0; i < num_cpus; i++) {
365         threads[i] = std::thread(listener, &sctpParams);
366
367         cpu_set_t cpuset;
368         CPU_ZERO(&cpuset);
369         CPU_SET(i, &cpuset);
370         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
371         if (rc != 0) {
372             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
373         }
374     }
375
376     //loop over term_init until first message from xApp
377     handleTermInit(sctpParams);
378
379     for (auto &t : threads) {
380         t.join();
381     }
382
383 #ifdef __TRACING__
384     opentracing::Tracer::Global()->Close();
385 #endif
386     return 0;
387 }
388
389 void handleTermInit(sctp_params_t &sctpParams) {
390     sendTermInit(sctpParams);
391     //send to e2 manager init of e2 term
392     //E2_TERM_INIT
393
394     int count = 0;
395     while (true) {
396         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
397         if (xappMessages > 0) {
398             if (mdclog_level_get() >=  MDCLOG_INFO) {
399                 mdclog_write(MDCLOG_INFO, "Got a message from some appliction, stop sending E2_TERM_INIT");
400             }
401             return;
402         }
403         usleep(100000);
404         count++;
405         if (count % 1000 == 0) {
406             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
407             sendTermInit(sctpParams);
408         }
409     }
410 }
411
412 void sendTermInit(sctp_params_t &sctpParams) {
413     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
414     auto count = 0;
415     while (true) {
416         msg->mtype = E2_TERM_INIT;
417         msg->state = 0;
418         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
419         static unsigned char tx[32];
420         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
421         rmr_bytes2xact(msg, tx, txLen);
422         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
423         if (msg == nullptr) {
424             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.myIP.length());
425         } else if (msg->state == 0) {
426             rmr_free_msg(msg);
427             if (mdclog_level_get() >=  MDCLOG_INFO) {
428                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT succsesfuly sent ");
429             }
430             return;
431         } else {
432             if (count % 100 == 0) {
433                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %d ", msg->state);
434             }
435             sleep(1);
436         }
437         count++;
438     }
439 }
440
441 /**
442  *
443  * @param argc
444  * @param argv
445  * @param sctpParams
446  * @return
447  */
448 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
449     cxxopts::Options options(argv[0], "e2 term help");
450     options.positional_help("[optional args]").show_positional_help();
451     options.allow_unrecognised_options().add_options()
452             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
453             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
454             ("h,help", "Print help");
455
456     auto result = options.parse(argc, argv);
457
458     if (result.count("help")) {
459         std::cout << options.help({""}) << std::endl;
460         exit(0);
461     }
462     return result;
463 }
464
465 /**
466  *
467  * @param sctpParams
468  * @return -1 failed 0 success
469  */
470 int buildInotify(sctp_params_t &sctpParams) {
471     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
472     if (sctpParams.inotifyFD == -1) {
473         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
474         close(sctpParams.rmrListenFd);
475         rmr_close(sctpParams.rmrCtx);
476         close(sctpParams.epoll_fd);
477         return -1;
478     }
479
480     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
481                                               (const char *)sctpParams.configFilePath.c_str(),
482                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
483     if (sctpParams.inotifyWD == -1) {
484         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
485                 sctpParams.configFilePath.c_str(),
486                 strerror(errno));
487         close(sctpParams.inotifyFD);
488         return -1;
489     }
490
491     struct epoll_event event{};
492     event.events = (EPOLLIN);
493     event.data.fd = sctpParams.inotifyFD;
494     // add listening RMR FD to epoll
495     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
496         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
497         close(sctpParams.inotifyFD);
498         return -1;
499     }
500     return 0;
501 }
502
503 /**
504  *
505  * @param args
506  * @return
507  */
508 void listener(sctp_params_t *params) {
509 #ifdef __TRACING__
510     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
511 #else
512     otSpan span = 0;
513 #endif
514     int num_of_SCTP_messages = 0;
515     auto totalTime = 0.0;
516     mdclog_mdc_clean();
517     mdclog_level_set(params->logLevel);
518
519     std::thread::id this_id = std::this_thread::get_id();
520     //save cout
521     streambuf *oldCout = cout.rdbuf();
522     ostringstream memCout;
523     // create new cout
524     cout.rdbuf(memCout.rdbuf());
525     cout << this_id;
526     //return to the normal cout
527     cout.rdbuf(oldCout);
528
529     char tid[32];
530     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
531     tid[memCout.str().length()] = 0;
532     mdclog_mdc_add("thread id", tid);
533
534     if (mdclog_level_get() >= MDCLOG_DEBUG) {
535         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
536     }
537
538
539     RmrMessagesBuffer_t rmrMessageBuffer{};
540     //create and init RMR
541     rmrMessageBuffer.rmrCtx = params->rmrCtx;
542
543     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
544     struct timespec end{0, 0};
545     struct timespec start{0, 0};
546
547     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
548     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
549
550     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
551     rmrMessageBuffer.ka_message_len = params->ka_message_length;
552     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
553
554     if (mdclog_level_get() >= MDCLOG_DEBUG) {
555         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
556     }
557
558     ReportingMessages_t message {};
559
560     for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
561         rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
562         rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
563     }
564
565     while (true) {
566         if (mdclog_level_get() >= MDCLOG_DEBUG) {
567             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
568         }
569         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
570         if (numOfEvents < 0 && errno == EINTR) {
571             if (mdclog_level_get() >= MDCLOG_DEBUG) {
572                 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
573             }
574             continue;
575         }
576         if (numOfEvents < 0) {
577             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
578             return;
579         }
580         for (auto i = 0; i < numOfEvents; i++) {
581             if (mdclog_level_get() >= MDCLOG_DEBUG) {
582                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
583             }
584             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
585             start.tv_sec = message.message.time.tv_sec;
586             start.tv_nsec = message.message.time.tv_nsec;
587
588
589             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
590                 handlepoll_error(events[i], message, rmrMessageBuffer, params, &span);
591             } else if (events[i].events & EPOLLOUT) {
592                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params, &span);
593             } else if (params->listenFD == events[i].data.fd) {
594                 // new connection is requested from RAN  start build connection
595                 while (true) {
596                     struct sockaddr in_addr {};
597                     socklen_t in_len;
598                     char hostBuff[NI_MAXHOST];
599                     char portBuff[NI_MAXSERV];
600
601                     in_len = sizeof(in_addr);
602                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
603                     peerInfo->sctpParams = params;
604                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
605                     if (peerInfo->fileDescriptor == -1) {
606                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
607                             /* We have processed all incoming connections. */
608                             break;
609                         } else {
610                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
611                             break;
612                         }
613                     }
614                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
615                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
616                         close(peerInfo->fileDescriptor);
617                         break;
618                     }
619                     auto  ans = getnameinfo(&in_addr, in_len,
620                             peerInfo->hostName, NI_MAXHOST,
621                             peerInfo->portNumber, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
622                     if (ans < 0) {
623                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
624                         close(peerInfo->fileDescriptor);
625                         break;
626                     }
627                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
628                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
629                     }
630                     peerInfo->isConnected = false;
631                     peerInfo->gotSetup = false;
632                     if (addToEpoll(params->epoll_fd,
633                                    peerInfo,
634                                    (EPOLLIN | EPOLLET),
635                                    params->sctpMap, nullptr,
636                                    0, &span) != 0) {
637 #ifdef __TRACING__
638                         lspan->Finish();
639 #endif
640                         break;
641                     }
642                 }
643             } else if (params->rmrListenFd == events[i].data.fd) {
644                 // got message from XAPP
645                 num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
646                 num_of_messages.fetch_add(1, std::memory_order_release);
647                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
648                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
649                 }
650                 if (receiveXappMessages(params->epoll_fd,
651                                         params->sctpMap,
652                                         rmrMessageBuffer,
653                                         message.message.time,
654                                         &span) != 0) {
655                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
656                 }
657             } else if (params->inotifyFD == events[i].data.fd) {
658                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
659                 handleConfigChange(params);
660             } else {
661                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
662                  * We must read whatever data is available completely, as we are running
663                  *  in edge-triggered mode and won't get a notification again for the same data. */
664                 num_of_messages.fetch_add(1, std::memory_order_release);
665                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
666                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
667                 }
668                 receiveDataFromSctp(&events[i],
669                                     params->sctpMap,
670                                     num_of_SCTP_messages,
671                                     rmrMessageBuffer,
672                                     message.message.time,
673                                     &span);
674             }
675
676             clock_gettime(CLOCK_MONOTONIC, &end);
677             if (mdclog_level_get() >= MDCLOG_INFO) {
678                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
679                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
680             }
681             if (mdclog_level_get() >= MDCLOG_DEBUG) {
682                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
683                              end.tv_sec - start.tv_sec,
684                              end.tv_nsec - start.tv_nsec);
685             }
686         }
687     }
688 #ifdef __TRACING__
689     span->Finish();
690 #else
691
692 #endif
693 }
694
695 /**
696  *
697  * @param sctpParams
698  */
699 void handleConfigChange(sctp_params_t *sctpParams) {
700     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
701     const struct inotify_event *event;
702     char *ptr;
703
704     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
705     auto endlessLoop = true;
706     while (endlessLoop) {
707         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
708         if (len == -1) {
709             if (errno != EAGAIN) {
710                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
711                 endlessLoop = false;
712                 continue;
713             }
714             else {
715                 endlessLoop = false;
716                 continue;
717             }
718         }
719
720         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
721             event = (const struct inotify_event *)ptr;
722             if (event->mask & (uint32_t)IN_ISDIR) {
723                 continue;
724             }
725
726             // the directory name
727             if (sctpParams->inotifyWD == event->wd) {
728                 // not the directory
729             }
730             if (event->len) {
731                 if (!(sctpParams->configFileName.compare(event->name))) {
732                     continue;
733                 }
734             }
735             // only the file we want
736             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
737                 if (exists(p)) {
738                     const int size = 2048;
739                     auto fileSize = file_size(p);
740                     if (fileSize > size) {
741                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
742                         return;
743                     }
744                 } else {
745                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
746                     return;
747                 }
748
749                 ReadConfigFile conf;
750                 if (conf.openConfigFile(p.string()) == -1) {
751                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
752                                  p.string().c_str(), strerror(errno));
753                     return;
754                 }
755
756                 auto tmpStr = conf.getStringValue("loglevel");
757                 if (tmpStr.length() == 0) {
758                     mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
759                     tmpStr = "info";
760                 }
761                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
762
763                 if ((tmpStr.compare("debug")) == 0) {
764                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
765                     sctpParams->logLevel = MDCLOG_DEBUG;
766                 } else if ((tmpStr.compare("info")) == 0) {
767                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
768                     sctpParams->logLevel = MDCLOG_INFO;
769                 } else if ((tmpStr.compare("warning")) == 0) {
770                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
771                     sctpParams->logLevel = MDCLOG_WARN;
772                 } else if ((tmpStr.compare("error")) == 0) {
773                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
774                     sctpParams->logLevel = MDCLOG_ERR;
775                 } else {
776                     mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
777                     sctpParams->logLevel = MDCLOG_INFO;
778                 }
779                 mdclog_level_set(sctpParams->logLevel);
780
781
782                 tmpStr = conf.getStringValue("trace");
783                 if (tmpStr.length() == 0) {
784                     mdclog_write(MDCLOG_ERR, "illigal trace. Set trace to stop");
785                     tmpStr = "stop";
786                 }
787
788                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
789                 if ((tmpStr.compare("start")) == 0) {
790                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
791                     sctpParams->trace = true;
792                 } else if ((tmpStr.compare("stop")) == 0) {
793                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
794                     sctpParams->trace = false;
795                 } else {
796                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
797                     sctpParams->trace = false;
798                 }
799                 jsonTrace = sctpParams->trace;
800                 endlessLoop = false;
801             }
802         }
803     }
804 }
805
806 /**
807  *
808  * @param event
809  * @param message
810  * @param rmrMessageBuffer
811  * @param params
812  * @param pSpan
813  */
814 void handleEinprogressMessages(struct epoll_event &event,
815                                ReportingMessages_t &message,
816                                RmrMessagesBuffer_t &rmrMessageBuffer,
817                                sctp_params_t *params,
818                                otSpan *pSpan) {
819 #ifdef __TRACING__
820     auto lspan = opentracing::Tracer::Global()->StartSpan(
821             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
822 #else
823     otSpan lspan = 0;
824 #endif
825     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
826     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
827
828     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
829     auto retVal = 0;
830     socklen_t retValLen = 0;
831     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
832     if (rc != 0 || retVal != 0) {
833         if (rc != 0) {
834             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
835                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
836                                                          peerInfo->enodbName, strerror(errno));
837         } else if (retVal != 0) {
838             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
839                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
840                                                          peerInfo->enodbName);
841         }
842
843         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
844         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
845         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
846         message.message.direction = 'N';
847         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
848             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
849         }
850         memset(peerInfo->asnData, 0, peerInfo->asnLength);
851         peerInfo->asnLength = 0;
852         peerInfo->mtype = 0;
853 #ifdef __TRACING__
854         lspan->Finish();
855 #endif
856         return;
857     }
858
859     peerInfo->isConnected = true;
860
861     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
862                       peerInfo->mtype, &lspan) != 0) {
863         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
864 #ifdef __TRACING__
865         lspan->Finish();
866 #endif
867         return;
868     }
869
870     message.message.asndata = (unsigned char *)peerInfo->asnData;
871     message.message.asnLength = peerInfo->asnLength;
872     message.message.messageType = peerInfo->mtype;
873     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
874     num_of_messages.fetch_add(1, std::memory_order_release);
875     if (mdclog_level_get() >= MDCLOG_DEBUG) {
876         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
877                      message.message.enodbName);
878     }
879     if (sendSctpMsg(peerInfo, message, params->sctpMap, &lspan) != 0) {
880         if (mdclog_level_get() >= MDCLOG_DEBUG) {
881             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
882         }
883 #ifdef __TRACING__
884         lspan->Finish();
885 #endif
886         return;
887     }
888
889     memset(peerInfo->asnData, 0, peerInfo->asnLength);
890     peerInfo->asnLength = 0;
891     peerInfo->mtype = 0;
892 #ifdef __TRACING__
893     lspan->Finish();
894 #endif
895 }
896
897
898 void handlepoll_error(struct epoll_event &event,
899                       ReportingMessages_t &message,
900                       RmrMessagesBuffer_t &rmrMessageBuffer,
901                       sctp_params_t *params,
902                       otSpan *pSpan) {
903 #ifdef __TRACING__
904     auto lspan = opentracing::Tracer::Global()->StartSpan(
905             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
906 #else
907     otSpan lspan = 0;
908 #endif
909     if (event.data.fd != params->rmrListenFd) {
910         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
911         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
912                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
913
914         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
915                                                      "%s|Failed SCTP Connection",
916                                                      peerInfo->enodbName);
917         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
918         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
919
920         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
921         message.message.direction = 'N';
922         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
923             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
924         }
925
926         close(peerInfo->fileDescriptor);
927         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap, &lspan);
928     } else {
929         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
930     }
931 #ifdef __TRACING__
932     lspan->Finish();
933 #endif
934
935 }
936 /**
937  *
938  * @param socket
939  * @return
940  */
941 int setSocketNoBlocking(int socket) {
942     auto flags = fcntl(socket, F_GETFL, 0);
943
944     if (flags == -1) {
945         mdclog_mdc_add("func", "fcntl");
946         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
947         mdclog_mdc_clean();
948         return -1;
949     }
950
951     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
952     if (fcntl(socket, F_SETFL, flags) == -1) {
953         mdclog_mdc_add("func", "fcntl");
954         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
955         mdclog_mdc_clean();
956         return -1;
957     }
958
959     return 0;
960 }
961
962 /**
963  *
964  * @param val
965  * @param m
966  * @param pSpan
967  */
968 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m, otSpan *pSpan) {
969 #ifdef __TRACING__
970     auto lspan = opentracing::Tracer::Global()->StartSpan(
971             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
972 #else
973 //    otSpan lspan = 0;
974 #endif
975     char *dummy;
976     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
977     char searchBuff[2048]{};
978
979     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
980     m->erase(searchBuff);
981
982     m->erase(val->enodbName);
983     free(val);
984 #ifdef __TRACING__
985     lspan->Finish();
986 #endif
987 }
988
989 /**
990  *
991  * @param fd file discriptor
992  * @param data the asn data to send
993  * @param len  length of the data
994  * @param enodbName the enodbName as in the map for printing purpose
995  * @param m map host information
996  * @param mtype message number
997  * @return 0 success, anegative number on fail
998  */
999 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m, otSpan *pSpan) {
1000 #ifdef __TRACING__
1001     auto lspan = opentracing::Tracer::Global()->StartSpan(
1002             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1003 #else
1004     otSpan lspan = 0;
1005 #endif
1006     auto loglevel = mdclog_level_get();
1007     int fd = peerInfo->fileDescriptor;
1008     if (loglevel >= MDCLOG_DEBUG) {
1009         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
1010                      message.message.enodbName, __FUNCTION__);
1011     }
1012
1013     while (true) {
1014         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
1015             if (errno == EINTR) {
1016                 continue;
1017             }
1018             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
1019             if (!peerInfo->isConnected) {
1020                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
1021 #ifdef __TRACING__
1022                 lspan->Finish();
1023 #endif
1024                 return -1;
1025             }
1026             cleanHashEntry(peerInfo, m, &lspan);
1027             close(fd);
1028             char key[MAX_ENODB_NAME_SIZE * 2];
1029             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
1030                      message.message.messageType);
1031             if (loglevel >= MDCLOG_DEBUG) {
1032                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
1033             }
1034             auto tmp = m->find(key);
1035             if (tmp) {
1036                 free(tmp);
1037             }
1038             m->erase(key);
1039 #ifdef __TRACING__
1040             lspan->Finish();
1041 #endif
1042             return -1;
1043         }
1044         message.message.direction = 'D';
1045         // send report.buffer of size
1046         buildJsonMessage(message);
1047
1048         if (loglevel >= MDCLOG_DEBUG) {
1049             mdclog_write(MDCLOG_DEBUG,
1050                          "SCTP message for CU %s sent from %s",
1051                          message.message.enodbName,
1052                          __FUNCTION__);
1053         }
1054 #ifdef __TRACING__
1055         lspan->Finish();
1056 #endif
1057         return 0;
1058     }
1059 }
1060
1061 /**
1062  *
1063  * @param message
1064  * @param rmrMessageBuffer
1065  * @param pSpan
1066  */
1067 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
1068 #ifdef __TRACING__
1069     auto lspan = opentracing::Tracer::Global()->StartSpan(
1070             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1071 #else
1072 //    otSpan lspan = 0;
1073 #endif
1074     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)(message.message.enodbName));
1075
1076     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1077     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1078
1079     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1080         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
1081                      message.message.enodbName, (unsigned long) message.message.asnLength);
1082     }
1083 #ifdef __TRACING__
1084     lspan->Finish();
1085 #endif
1086
1087 }
1088
1089
1090 /**
1091  *
1092  * @param metaData all the data strip to structure
1093  * @param data the data recived from xAPP
1094  * @return 0 success all other values are fault
1095  */
1096 int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan) {
1097 #ifdef __TRACING__
1098     auto lspan = opentracing::Tracer::Global()->StartSpan(
1099             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1100 #else
1101 //    otSpan lspan = 0;
1102 #endif
1103     auto loglevel = mdclog_level_get();
1104
1105     char delimiter[4] {};
1106     memset(delimiter, 0, (size_t)4);
1107     delimiter[0] = '|';
1108     char *tmp;
1109
1110     char *val = strtok_r(data, delimiter, &tmp);
1111     if (val != nullptr) {
1112         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1113             mdclog_write(MDCLOG_DEBUG, "SCTP ADDRESS parameter from message = %s", val);
1114         }
1115         memcpy(host, val, tmp - val );
1116     } else {
1117         mdclog_write(MDCLOG_ERR, "wrong Host Name for setup request %s", data);
1118 #ifdef __TRACING__
1119         lspan->Finish();
1120 #endif
1121         return -1;
1122     }
1123
1124     val = strtok_r(nullptr, delimiter, &tmp);
1125     if (val != nullptr) {
1126         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1127             mdclog_write(MDCLOG_DEBUG, "PORT parameter from message = %s", val);
1128         }
1129         char *dummy;
1130         port = (uint16_t)strtol(val, &dummy, 10);
1131     } else {
1132         mdclog_write(MDCLOG_ERR, "wrong Port for setup request %s", data);
1133 #ifdef __TRACING__
1134         lspan->Finish();
1135 #endif
1136         return -2;
1137     }
1138
1139     val = strtok_r(nullptr, delimiter, &tmp);
1140     if (val != nullptr) {
1141         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1142             mdclog_write(MDCLOG_DEBUG, "RAN NAME parameter from message = %s", val);
1143         }
1144         memcpy(message.message.enodbName, val, tmp - val);
1145     } else {
1146         mdclog_write(MDCLOG_ERR, "wrong gNb/Enodeb name for setup request %s", data);
1147 #ifdef __TRACING__
1148         lspan->Finish();
1149 #endif
1150
1151         return -3;
1152     }
1153     val = strtok_r(nullptr, delimiter, &tmp);
1154     if (val != nullptr) {
1155         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1156             mdclog_write(MDCLOG_DEBUG, "ASN length parameter from message = %s", val);
1157         }
1158         char *dummy;
1159         message.message.asnLength = (uint16_t) strtol(val, &dummy, 10);
1160     } else {
1161         mdclog_write(MDCLOG_ERR, "wrong ASN length for setup request %s", data);
1162 #ifdef __TRACING__
1163         lspan->Finish();
1164 #endif
1165         return -4;
1166     }
1167
1168     message.message.asndata = (unsigned char *)tmp;  // tmp is local but point to the location in data
1169
1170     if (loglevel >= MDCLOG_INFO) {
1171         mdclog_write(MDCLOG_INFO, "Message from Xapp RAN name = %s host address = %s port = %d",
1172                      message.message.enodbName, host, port);
1173     }
1174 #ifdef __TRACING__
1175     lspan->Finish();
1176 #endif
1177
1178     return 0;
1179 }
1180
1181 /**
1182  *
1183  * @param events
1184  * @param sctpMap
1185  * @param numOfMessages
1186  * @param rmrMessageBuffer
1187  * @param ts
1188  * @param pSpan
1189  * @return
1190  */
1191 int receiveDataFromSctp(struct epoll_event *events,
1192                         Sctp_Map_t *sctpMap,
1193                         int &numOfMessages,
1194                         RmrMessagesBuffer_t &rmrMessageBuffer,
1195                         struct timespec &ts,
1196                         otSpan *pSpan) {
1197 #ifdef __TRACING__
1198     auto lspan = opentracing::Tracer::Global()->StartSpan(
1199             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1200 #else
1201     otSpan lspan = 0;
1202 #endif
1203     /* We have data on the fd waiting to be read. Read and display it.
1204  * We must read whatever data is available completely, as we are running
1205  *  in edge-triggered mode and won't get a notification again for the same data. */
1206     ReportingMessages_t message {};
1207     auto done = 0;
1208     auto loglevel = mdclog_level_get();
1209
1210     // get the identity of the interface
1211     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1212
1213
1214     struct timespec start{0, 0};
1215     struct timespec decodestart{0, 0};
1216     struct timespec end{0, 0};
1217
1218     E2AP_PDU_t *pdu = nullptr;
1219
1220
1221     while (true) {
1222         if (loglevel >= MDCLOG_DEBUG) {
1223             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1224             clock_gettime(CLOCK_MONOTONIC, &start);
1225         }
1226         // read the buffer directly to rmr payload
1227         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1228         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1229                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1230
1231         if (loglevel >= MDCLOG_DEBUG) {
1232             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1233                     message.peerInfo->fileDescriptor, message.message.asnLength);
1234         }
1235         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1236         message.message.direction = 'U';
1237         message.message.time.tv_nsec = ts.tv_nsec;
1238         message.message.time.tv_sec = ts.tv_sec;
1239
1240         if (message.message.asnLength < 0) {
1241             if (errno == EINTR) {
1242                 continue;
1243             }
1244             /* If errno == EAGAIN, that means we have read all
1245                data. So goReportingMessages_t back to the main loop. */
1246             if (errno != EAGAIN) {
1247                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1248                 done = 1;
1249             } else if (loglevel >= MDCLOG_DEBUG) {
1250                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1251             }
1252             break;
1253         } else if (message.message.asnLength == 0) {
1254             /* End of file. The remote has closed the connection. */
1255             if (loglevel >= MDCLOG_INFO) {
1256                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1257                              message.peerInfo->fileDescriptor);
1258             }
1259             done = 1;
1260             break;
1261         }
1262
1263         asn_dec_rval_t rval;
1264         if (loglevel >= MDCLOG_DEBUG) {
1265             char printBuffer[4096]{};
1266             char *tmp = printBuffer;
1267             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1268                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1269                 tmp += 2;
1270             }
1271             printBuffer[message.message.asnLength] = 0;
1272             clock_gettime(CLOCK_MONOTONIC, &end);
1273             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1274                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1275             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1276                          printBuffer);
1277             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1278         }
1279
1280         rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1281                           message.message.asndata, message.message.asnLength);
1282         if (rval.code != RC_OK) {
1283             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1284                          message.peerInfo->enodbName);
1285             break;
1286         }
1287
1288         if (loglevel >= MDCLOG_DEBUG) {
1289             clock_gettime(CLOCK_MONOTONIC, &end);
1290             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1291                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1292             char *printBuffer;
1293             size_t size;
1294             FILE *stream = open_memstream(&printBuffer, &size);
1295             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1296             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1297             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1298         }
1299
1300         switch (pdu->present) {
1301             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1302                 asnInitiatingRequest(pdu, message, rmrMessageBuffer, &lspan);
1303                 break;
1304             }
1305             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1306                 asnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
1307                 break;
1308             }
1309             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1310                 asnUnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
1311                 break;
1312             }
1313             default:
1314                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1315                 break;
1316         }
1317         if (loglevel >= MDCLOG_DEBUG) {
1318             clock_gettime(CLOCK_MONOTONIC, &end);
1319             mdclog_write(MDCLOG_DEBUG,
1320                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1321                          message.peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1322         }
1323         numOfMessages++;
1324         // remove the break for EAGAIN
1325         //break;
1326         if (pdu != nullptr) {
1327             //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
1328             //ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1329             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1330             pdu = nullptr;
1331         }
1332         //clock_gettime(CLOCK_MONOTONIC, &start);
1333     }
1334     // in case of break to avoid memory leak
1335     if (pdu != nullptr) {
1336         ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1337         pdu = nullptr;
1338     }
1339
1340     if (done) {
1341         if (loglevel >= MDCLOG_INFO) {
1342             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1343         }
1344         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1345                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1346                         256,
1347                         "%s|CU disconnected unexpectedly",
1348                         message.peerInfo->enodbName);
1349         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1350
1351         if (sendRequestToXapp(message,
1352                               RIC_SCTP_CONNECTION_FAILURE,
1353                               rmrMessageBuffer,
1354                               &lspan) != 0) {
1355             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1356         }
1357
1358         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1359         close(message.peerInfo->fileDescriptor);
1360         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap, &lspan);
1361     }
1362     if (loglevel >= MDCLOG_DEBUG) {
1363         clock_gettime(CLOCK_MONOTONIC, &end);
1364         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1365                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1366
1367     }
1368 #ifdef __TRACING__
1369     lspan->Finish();
1370 #endif
1371
1372     return 0;
1373 }
1374
1375 static void buildAndsendSetupRequest(ReportingMessages_t &message,
1376                                      E2setupRequestIEs_t *ie,
1377                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1378                                      E2AP_PDU_t *pdu) {
1379     auto logLevel = mdclog_level_get();
1380
1381
1382     if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1383         mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1384     } else {
1385         memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1386     }
1387     // now we can send the data to e2Mgr
1388     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE;
1389
1390     auto *rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size);
1391     // add addrees to message
1392     auto j = snprintf((char *)rmrMsg->payload, 256, "%s:%d|", message.peerInfo->sctpParams->myIP.c_str(), message.peerInfo->sctpParams->rmrPort);
1393
1394
1395     unsigned char *buffer = &rmrMsg->payload[j];
1396     // encode to xml
1397     asn_enc_rval_t er;
1398     er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size - j);
1399     if (er.encoded == -1) {
1400         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1401     } else if (er.encoded > (ssize_t) buffer_size) {
1402         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s",
1403                      (int) buffer_size,
1404                      asn_DEF_E2AP_PDU.name);
1405     } else {
1406         if (logLevel >= MDCLOG_DEBUG) {
1407             mdclog_write(MDCLOG_DEBUG, "Buffer of size %d, data = %s", (int) er.encoded, buffer);
1408         }
1409         // TODO send to RMR
1410         message.message.messageType = rmrMsg->mtype = RIC_X2_SETUP_REQ;
1411         rmrMsg->state = 0;
1412         rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1413
1414         static unsigned char tx[32];
1415         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1416         rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1417
1418         rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1419         if (rmrMsg == nullptr) {
1420             mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1421         } else if (rmrMsg->state != 0) {
1422             char meid[RMR_MAX_MEID]{};
1423             if (rmrMsg->state == RMR_ERR_RETRY) {
1424                 usleep(5);
1425                 rmrMsg->state = 0;
1426                 mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1427                              rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1428                 rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1429                 if (rmrMsg == nullptr) {
1430                     mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1431                 } else if (rmrMsg->state != 0) {
1432                     mdclog_write(MDCLOG_ERR,
1433                                  "RMR Retry failed %s sending request %d to Xapp from %s",
1434                                  translateRmrErrorMessages(rmrMsg->state).c_str(),
1435                                  rmrMsg->mtype,
1436                                  rmr_get_meid(rmrMsg, (unsigned char *) meid));
1437                 }
1438             } else {
1439                 mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1440                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1441                              rmrMsg->mtype,
1442                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1443             }
1444         }
1445         message.peerInfo->gotSetup = true;
1446         buildJsonMessage(message);
1447     }
1448
1449 }
1450 /**
1451  *
1452  * @param pdu
1453  * @param message
1454  * @param rmrMessageBuffer
1455  * @param pSpan
1456  */
1457 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1458                           ReportingMessages_t &message,
1459                           RmrMessagesBuffer_t &rmrMessageBuffer,
1460                           otSpan *pSpan) {
1461 #ifdef __TRACING__
1462     auto lspan = opentracing::Tracer::Global()->StartSpan(
1463             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1464 #else
1465     otSpan lspan = 0;
1466 #endif
1467
1468     auto logLevel = mdclog_level_get();
1469     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1470     if (logLevel >= MDCLOG_DEBUG) {
1471         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1472     }
1473     switch (procedureCode) {
1474         case ProcedureCode_id_E2setup: {
1475             if (logLevel >= MDCLOG_DEBUG) {
1476                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1477             }
1478
1479             memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1480             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1481                 auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1482                 if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1483                     if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1484                         buildAndsendSetupRequest(message, ie, rmrMessageBuffer, pdu);
1485                         break;
1486                     }
1487                 }
1488             }
1489             break;
1490         }
1491         case ProcedureCode_id_ErrorIndication: {
1492             if (logLevel >= MDCLOG_DEBUG) {
1493                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1494             }
1495             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1496                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1497             }
1498             break;
1499         }
1500         case ProcedureCode_id_Reset: {
1501             if (logLevel >= MDCLOG_DEBUG) {
1502                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1503             }
1504             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer, &lspan) != 0) {
1505                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1506             }
1507             break;
1508         }
1509         case ProcedureCode_id_RICcontrol: {
1510             if (logLevel >= MDCLOG_DEBUG) {
1511                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1512             }
1513             break;
1514         }
1515         case ProcedureCode_id_RICindication: {
1516             if (logLevel >= MDCLOG_DEBUG) {
1517                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1518             }
1519             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1520                 auto messageSent = false;
1521                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1522                 if (logLevel >= MDCLOG_DEBUG) {
1523                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1524                 }
1525                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1526                     if (logLevel >= MDCLOG_DEBUG) {
1527                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1528                     }
1529                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1530                         static unsigned char tx[32];
1531                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1532                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1533                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1534                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1535                                        (unsigned char *)message.message.enodbName,
1536                                        strlen(message.message.enodbName));
1537                         rmrMessageBuffer.sendMessage->state = 0;
1538                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1539                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1540                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1541                                          rmrMessageBuffer.sendMessage->sub_id,
1542                                          rmrMessageBuffer.sendMessage->mtype);
1543                         }
1544                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1545                         messageSent = true;
1546                     } else {
1547                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1548                     }
1549                 }
1550                 if (messageSent) {
1551                     break;
1552                 }
1553             }
1554             break;
1555         }
1556         case ProcedureCode_id_RICserviceQuery: {
1557             if (logLevel >= MDCLOG_DEBUG) {
1558                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1559             }
1560             break;
1561         }
1562         case ProcedureCode_id_RICserviceUpdate: {
1563             if (logLevel >= MDCLOG_DEBUG) {
1564                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1565             }
1566             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1567                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1568             }
1569             break;
1570         }
1571         case ProcedureCode_id_RICsubscription: {
1572             if (logLevel >= MDCLOG_DEBUG) {
1573                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1574             }
1575             break;
1576         }
1577         case ProcedureCode_id_RICsubscriptionDelete: {
1578             if (logLevel >= MDCLOG_DEBUG) {
1579                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1580             }
1581             break;
1582         }
1583         default: {
1584             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1585             message.message.messageType = 0; // no RMR message type yet
1586
1587             buildJsonMessage(message);
1588
1589             break;
1590         }
1591     }
1592 #ifdef __TRACING__
1593     lspan->Finish();
1594 #endif
1595
1596 }
1597
1598 /**
1599  *
1600  * @param pdu
1601  * @param message
1602  * @param sctpMap
1603  * @param rmrMessageBuffer
1604  * @param pSpan
1605  */
1606 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, Sctp_Map_t *sctpMap,
1607                       RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
1608 #ifdef __TRACING__
1609     auto lspan = opentracing::Tracer::Global()->StartSpan(
1610             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1611 #else
1612     otSpan lspan = 0;
1613 #endif
1614     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1615     auto logLevel = mdclog_level_get();
1616     if (logLevel >= MDCLOG_INFO) {
1617         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1618     }
1619     switch (procedureCode) {
1620         case ProcedureCode_id_E2setup: {
1621             if (logLevel >= MDCLOG_DEBUG) {
1622                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1623             }
1624             break;
1625         }
1626         case ProcedureCode_id_ErrorIndication: {
1627             if (logLevel >= MDCLOG_DEBUG) {
1628                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1629             }
1630             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1631                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1632             }
1633             break;
1634         }
1635         case ProcedureCode_id_Reset: {
1636             if (logLevel >= MDCLOG_DEBUG) {
1637                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1638             }
1639             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer, &lspan) != 0) {
1640                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1641             }
1642             break;
1643         }
1644         case ProcedureCode_id_RICcontrol: {
1645             if (logLevel >= MDCLOG_DEBUG) {
1646                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1647             }
1648             for (auto i = 0;
1649                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1650                 auto messageSent = false;
1651                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1652                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1653                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1654                 }
1655                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1656                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1657                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1658                     }
1659                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1660                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1661                         rmrMessageBuffer.sendMessage->state = 0;
1662                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1663                         static unsigned char tx[32];
1664                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1665                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1666                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1667                                        (unsigned char *)message.message.enodbName,
1668                                        strlen(message.message.enodbName));
1669
1670                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1671                         messageSent = true;
1672                     } else {
1673                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1674                     }
1675                 }
1676                 if (messageSent) {
1677                     break;
1678                 }
1679             }
1680
1681             break;
1682         }
1683         case ProcedureCode_id_RICindication: {
1684             if (logLevel >= MDCLOG_DEBUG) {
1685                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1686             }
1687             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1688                 auto messageSent = false;
1689                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1690                 if (logLevel >= MDCLOG_DEBUG) {
1691                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1692                 }
1693                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1694                     if (logLevel >= MDCLOG_DEBUG) {
1695                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1696                     }
1697                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1698                         static unsigned char tx[32];
1699                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1700                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1701                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1702                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1703                                        (unsigned char *)message.message.enodbName,
1704                                        strlen(message.message.enodbName));
1705                         rmrMessageBuffer.sendMessage->state = 0;
1706                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1707                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1708                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1709                                          rmrMessageBuffer.sendMessage->sub_id,
1710                                          rmrMessageBuffer.sendMessage->mtype);
1711                         }
1712                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1713                         messageSent = true;
1714                     } else {
1715                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1716                     }
1717                 }
1718                 if (messageSent) {
1719                     break;
1720                 }
1721             }
1722             break;
1723         }
1724         case ProcedureCode_id_RICserviceQuery: {
1725             if (logLevel >= MDCLOG_DEBUG) {
1726                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1727             }
1728             break;
1729         }
1730         case ProcedureCode_id_RICserviceUpdate: {
1731             if (logLevel >= MDCLOG_DEBUG) {
1732                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1733             }
1734             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1735                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1736             }
1737             break;
1738         }
1739         case ProcedureCode_id_RICsubscription: {
1740             if (logLevel >= MDCLOG_DEBUG) {
1741                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1742             }
1743             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer, &lspan) != 0) {
1744                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1745             }
1746             break;
1747         }
1748         case ProcedureCode_id_RICsubscriptionDelete: {
1749             if (logLevel >= MDCLOG_DEBUG) {
1750                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1751             }
1752             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer, &lspan) != 0) {
1753                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1754             }
1755             break;
1756         }
1757         default: {
1758             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1759             message.message.messageType = 0; // no RMR message type yet
1760             buildJsonMessage(message);
1761
1762             break;
1763         }
1764     }
1765 #ifdef __TRACING__
1766     lspan->Finish();
1767 #endif
1768
1769 }
1770
1771 /**
1772  *
1773  * @param pdu
1774  * @param message
1775  * @param sctpMap
1776  * @param rmrMessageBuffer
1777  * @param pSpan
1778  */
1779 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1780                         ReportingMessages_t &message,
1781                         Sctp_Map_t *sctpMap,
1782                         RmrMessagesBuffer_t &rmrMessageBuffer,
1783                         otSpan *pSpan) {
1784 #ifdef __TRACING__
1785     auto lspan = opentracing::Tracer::Global()->StartSpan(
1786             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1787 #else
1788     otSpan lspan = 0;
1789 #endif
1790     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1791     auto logLevel = mdclog_level_get();
1792     if (logLevel >= MDCLOG_INFO) {
1793         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1794     }
1795     switch (procedureCode) {
1796         case ProcedureCode_id_E2setup: {
1797             if (logLevel >= MDCLOG_DEBUG) {
1798                 mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
1799             }
1800             break;
1801         }
1802         case ProcedureCode_id_ErrorIndication: {
1803             if (logLevel >= MDCLOG_DEBUG) {
1804                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
1805             }
1806             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1807                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
1808             }
1809             break;
1810         }
1811         case ProcedureCode_id_Reset: {
1812             if (logLevel >= MDCLOG_DEBUG) {
1813                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
1814             }
1815             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer, &lspan) != 0) {
1816                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1817             }
1818             break;
1819         }
1820         case ProcedureCode_id_RICcontrol: {
1821             if (logLevel >= MDCLOG_DEBUG) {
1822                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
1823             }
1824             for (int i = 0;
1825                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1826                 auto messageSent = false;
1827                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1828                 if (logLevel >= MDCLOG_DEBUG) {
1829                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1830                 }
1831                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1832                     if (logLevel >= MDCLOG_DEBUG) {
1833                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1834                     }
1835                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1836                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1837                         rmrMessageBuffer.sendMessage->state = 0;
1838                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1839                         static unsigned char tx[32];
1840                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1841                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1842                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
1843                                        strlen(message.message.enodbName));
1844                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1845                         messageSent = true;
1846                     } else {
1847                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1848                     }
1849                 }
1850                 if (messageSent) {
1851                     break;
1852                 }
1853             }
1854             break;
1855         }
1856         case ProcedureCode_id_RICindication: {
1857             if (logLevel >= MDCLOG_DEBUG) {
1858                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
1859             }
1860             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1861                 auto messageSent = false;
1862                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1863                 if (logLevel >= MDCLOG_DEBUG) {
1864                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1865                 }
1866                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1867                     if (logLevel >= MDCLOG_DEBUG) {
1868                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1869                     }
1870                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1871                         static unsigned char tx[32];
1872                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1873                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1874                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1875                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1876                                        (unsigned char *)message.message.enodbName,
1877                                        strlen(message.message.enodbName));
1878                         rmrMessageBuffer.sendMessage->state = 0;
1879                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1880                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1881                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1882                                          rmrMessageBuffer.sendMessage->sub_id,
1883                                          rmrMessageBuffer.sendMessage->mtype);
1884                         }
1885                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1886                         messageSent = true;
1887                     } else {
1888                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1889                     }
1890                 }
1891                 if (messageSent) {
1892                     break;
1893                 }
1894             }
1895             break;
1896         }
1897         case ProcedureCode_id_RICserviceQuery: {
1898             if (logLevel >= MDCLOG_DEBUG) {
1899                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
1900             }
1901             break;
1902         }
1903         case ProcedureCode_id_RICserviceUpdate: {
1904             if (logLevel >= MDCLOG_DEBUG) {
1905                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
1906             }
1907             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1908                 mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
1909             }
1910             break;
1911         }
1912         case ProcedureCode_id_RICsubscription: {
1913             if (logLevel >= MDCLOG_DEBUG) {
1914                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
1915             }
1916             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1917                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1918             }
1919             break;
1920         }
1921         case ProcedureCode_id_RICsubscriptionDelete: {
1922             if (logLevel >= MDCLOG_DEBUG) {
1923                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
1924             }
1925             if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1926                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1927             }
1928             break;
1929         }
1930         default: {
1931             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1932             message.message.messageType = 0; // no RMR message type yet
1933
1934             buildJsonMessage(message);
1935
1936             break;
1937         }
1938     }
1939 #ifdef __TRACING__
1940     lspan->Finish();
1941 #endif
1942
1943 }
1944
1945 /**
1946  *
1947  * @param message
1948  * @param requestId
1949  * @param rmrMmessageBuffer
1950  * @param pSpan
1951  * @return
1952  */
1953 int sendRequestToXapp(ReportingMessages_t &message,
1954                       int requestId,
1955                       RmrMessagesBuffer_t &rmrMmessageBuffer,
1956                       otSpan *pSpan) {
1957 #ifdef __TRACING__
1958     auto lspan = opentracing::Tracer::Global()->StartSpan(
1959             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1960 #else
1961     otSpan lspan = 0;
1962 #endif
1963     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1964                    (unsigned char *)message.message.enodbName,
1965                    strlen(message.message.enodbName));
1966     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1967     rmrMmessageBuffer.sendMessage->state = 0;
1968     static unsigned char tx[32];
1969     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1970     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1971
1972     auto rc = sendRmrMessage(rmrMmessageBuffer, message, &lspan);
1973 #ifdef __TRACING__
1974     lspan->Finish();
1975 #endif
1976
1977     return rc;
1978 }
1979
1980
1981 void getRmrContext(sctp_params_t &pSctpParams, otSpan *pSpan) {
1982 #ifdef __TRACING__
1983     auto lspan = opentracing::Tracer::Global()->StartSpan(
1984             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1985 #else
1986 //    otSpan lspan = 0;
1987 #endif
1988     pSctpParams.rmrCtx = nullptr;
1989     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1990     if (pSctpParams.rmrCtx == nullptr) {
1991         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1992 #ifdef __TRACING__
1993         lspan->Finish();
1994 #endif
1995         return;
1996     }
1997
1998     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1999     // we need to find that routing table exist and we can run
2000     if (mdclog_level_get() >= MDCLOG_INFO) {
2001         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
2002     }
2003     int rmrReady = 0;
2004     int count = 0;
2005     while (!rmrReady) {
2006         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
2007             sleep(1);
2008         }
2009         count++;
2010         if (count % 60 == 0) {
2011             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
2012         }
2013     }
2014     if (mdclog_level_get() >= MDCLOG_INFO) {
2015         mdclog_write(MDCLOG_INFO, "RMR running");
2016     }
2017 #ifdef __TRACING__
2018     lspan->Finish();
2019 #endif
2020     rmr_init_trace(pSctpParams.rmrCtx, 200);
2021     // get the RMR fd for the epoll
2022     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
2023     struct epoll_event event{};
2024     // add RMR fd to epoll
2025     event.events = (EPOLLIN);
2026     event.data.fd = pSctpParams.rmrListenFd;
2027     // add listening RMR FD to epoll
2028     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
2029         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
2030         close(pSctpParams.rmrListenFd);
2031         rmr_close(pSctpParams.rmrCtx);
2032         pSctpParams.rmrCtx = nullptr;
2033     }
2034 }
2035
2036 /**
2037  *
2038  * @param epoll_fd
2039  * @param sctpMap
2040  * @param rmrMessageBuffer
2041  * @param ts
2042  * @param pSpan
2043  * @return
2044  */
2045 int receiveXappMessages(int epoll_fd,
2046                         Sctp_Map_t *sctpMap,
2047                         RmrMessagesBuffer_t &rmrMessageBuffer,
2048                         struct timespec &ts,
2049                         otSpan *pSpan) {
2050 #ifdef __TRACING__
2051     auto lspan = opentracing::Tracer::Global()->StartSpan(
2052             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2053 #else
2054     otSpan lspan = 0;
2055 #endif
2056     if (rmrMessageBuffer.rcvMessage == nullptr) {
2057         //we have error
2058         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
2059 #ifdef __TRACING__
2060         lspan->Finish();
2061 #endif
2062
2063         return -1;
2064     }
2065
2066     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2067         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
2068     }
2069     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2070     if (rmrMessageBuffer.rcvMessage == nullptr) {
2071         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
2072         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2073 #ifdef __TRACING__
2074         lspan->Finish();
2075 #endif
2076
2077         return -2;
2078     }
2079     ReportingMessages_t message;
2080     message.message.direction = 'D';
2081     message.message.time.tv_nsec = ts.tv_nsec;
2082     message.message.time.tv_sec = ts.tv_sec;
2083
2084     // get message payload
2085     //auto msgData = msg->payload;
2086     if (rmrMessageBuffer.rcvMessage->state != 0) {
2087         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
2088 #ifdef __TRACING__
2089         lspan->Finish();
2090 #endif
2091
2092         return -1;
2093     }
2094     switch (rmrMessageBuffer.rcvMessage->mtype) {
2095         case RIC_X2_SETUP_REQ: {
2096             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
2097                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
2098                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
2099                 message.message.direction = 'N';
2100                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2101                         snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2102                                 256,
2103                                 "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
2104                 rmrMessageBuffer.sendMessage->state = 0;
2105                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2106
2107                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2108                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
2109                 }
2110                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
2111                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName);
2112
2113                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
2114 #ifdef __TRACING__
2115                 lspan->Finish();
2116 #endif
2117                 return -3;
2118             }
2119             break;
2120         }
2121         case RIC_ENDC_X2_SETUP_REQ: {
2122             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
2123                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
2124                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
2125                 message.message.direction = 'N';
2126                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2127                         snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
2128                                  "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
2129                 rmrMessageBuffer.sendMessage->state = 0;
2130                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2131
2132                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2133                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
2134                 }
2135
2136                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
2137                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
2138
2139                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
2140 #ifdef __TRACING__
2141                 lspan->Finish();
2142 #endif
2143                 return -3;
2144             }
2145             break;
2146         }
2147         case RIC_ENDC_CONF_UPDATE: {
2148             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2149                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
2150 #ifdef __TRACING__
2151                 lspan->Finish();
2152 #endif
2153                 return -4;
2154             }
2155             break;
2156         }
2157         case RIC_ENDC_CONF_UPDATE_ACK: {
2158             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2159                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_ACK");
2160 #ifdef __TRACING__
2161                 lspan->Finish();
2162 #endif
2163                 return -4;
2164             }
2165             break;
2166         }
2167         case RIC_ENDC_CONF_UPDATE_FAILURE: {
2168             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2169                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_FAILURE");
2170 #ifdef __TRACING__
2171                 lspan->Finish();
2172 #endif
2173
2174                 return -4;
2175             }
2176             break;
2177         }
2178         case RIC_ENB_CONF_UPDATE: {
2179             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2180                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
2181 #ifdef __TRACING__
2182                 lspan->Finish();
2183 #endif
2184                 return -4;
2185             }
2186             break;
2187         }
2188         case RIC_ENB_CONF_UPDATE_ACK: {
2189             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2190                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_ACK");
2191 #ifdef __TRACING__
2192                 lspan->Finish();
2193 #endif
2194                 return -4;
2195             }
2196             break;
2197         }
2198         case RIC_ENB_CONF_UPDATE_FAILURE: {
2199             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2200                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_FAILURE");
2201 #ifdef __TRACING__
2202                 lspan->Finish();
2203 #endif
2204                 return -4;
2205             }
2206             break;
2207         }
2208         case RIC_RES_STATUS_REQ: {
2209             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2210                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_RES_STATUS_REQ");
2211 #ifdef __TRACING__
2212                 lspan->Finish();
2213 #endif
2214                 return -6;
2215             }
2216             break;
2217         }
2218         case RIC_SUB_REQ: {
2219             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2220                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
2221 #ifdef __TRACING__
2222                 lspan->Finish();
2223 #endif
2224                 return -6;
2225             }
2226             break;
2227         }
2228         case RIC_SUB_DEL_REQ: {
2229             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2230                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
2231 #ifdef __TRACING__
2232                 lspan->Finish();
2233 #endif
2234                 return -6;
2235             }
2236             break;
2237         }
2238         case RIC_CONTROL_REQ: {
2239             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2240                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
2241 #ifdef __TRACING__
2242                 lspan->Finish();
2243 #endif
2244                 return -6;
2245             }
2246             break;
2247         }
2248         case RIC_SERVICE_QUERY: {
2249             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2250                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
2251 #ifdef __TRACING__
2252                 lspan->Finish();
2253 #endif
2254                 return -6;
2255             }
2256             break;
2257         }
2258         case RIC_SERVICE_UPDATE_ACK: {
2259             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2260                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
2261 #ifdef __TRACING__
2262                 lspan->Finish();
2263 #endif
2264                 return -6;
2265             }
2266             break;
2267         }
2268         case RIC_SERVICE_UPDATE_FAILURE: {
2269             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2270                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
2271 #ifdef __TRACING__
2272                 lspan->Finish();
2273 #endif
2274                 return -6;
2275             }
2276             break;
2277         }
2278         case RIC_X2_RESET: {
2279             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2280                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
2281 #ifdef __TRACING__
2282                 lspan->Finish();
2283 #endif
2284                 return -6;
2285             }
2286             break;
2287         }
2288         case RIC_X2_RESET_RESP: {
2289             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2290                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
2291 #ifdef __TRACING__
2292                 lspan->Finish();
2293 #endif
2294                 return -6;
2295             }
2296             break;
2297         }
2298         case RIC_SCTP_CLEAR_ALL: {
2299             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
2300             // loop on all keys and close socket and then erase all map.
2301             vector<char *> v;
2302             sctpMap->getKeys(v);
2303             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
2304                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
2305                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
2306                     if (peerInfo == nullptr) {
2307                         continue;
2308                     }
2309                     close(peerInfo->fileDescriptor);
2310                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
2311                     message.message.direction = 'D';
2312                     message.message.time.tv_nsec = ts.tv_nsec;
2313                     message.message.time.tv_sec = ts.tv_sec;
2314
2315                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2316                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2317                                                                    256,
2318                                                                    "%s|RIC_SCTP_CLEAR_ALL",
2319                                                                    peerInfo->enodbName);
2320                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2321                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
2322                     if (sendRequestToXapp(message,
2323                                           RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
2324                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
2325                     }
2326                     free(peerInfo);
2327                 }
2328             }
2329
2330             sleep(1);
2331             sctpMap->clear();
2332             break;
2333         }
2334         case E2_TERM_KEEP_ALIVE_REQ: {
2335             // send message back
2336             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2337                     (unsigned char *)rmrMessageBuffer.ka_message,
2338                     rmrMessageBuffer.ka_message_len);
2339             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
2340             rmrMessageBuffer.sendMessage->state = 0;
2341             static unsigned char tx[32];
2342             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2343             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2344             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2345             if (rmrMessageBuffer.sendMessage == nullptr) {
2346                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2347                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
2348             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
2349                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
2350                         rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
2351             } else if (mdclog_level_get() >= MDCLOG_INFO) {
2352                 mdclog_write(MDCLOG_INFO, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
2353             }
2354
2355             break;
2356         }
2357         default:
2358             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
2359             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2360             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2361             message.message.time.tv_nsec = ts.tv_nsec;
2362             message.message.time.tv_sec = ts.tv_sec;
2363             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2364
2365             buildJsonMessage(message);
2366
2367
2368 #ifdef __TRACING__
2369             lspan->Finish();
2370 #endif
2371             return -7;
2372     }
2373     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2374         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2375     }
2376 #ifdef __TRACING__
2377     lspan->Finish();
2378 #endif
2379     return 0;
2380 }
2381
2382 /**
2383  * Send message to the CU that is not expecting for successful or unsuccessful results
2384  * @param messageBuffer
2385  * @param message
2386  * @param failedMsgId
2387  * @param sctpMap
2388  * @param pSpan
2389  * @return
2390  */
2391 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2392                            ReportingMessages_t &message,
2393                            int failedMsgId,
2394                            Sctp_Map_t *sctpMap,
2395                            otSpan *pSpan) {
2396 #ifdef __TRACING__
2397     auto lspan = opentracing::Tracer::Global()->StartSpan(
2398             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2399 #else
2400     otSpan lspan = 0;
2401 #endif
2402
2403     getRequestMetaData(message, messageBuffer, &lspan);
2404     if (mdclog_level_get() >= MDCLOG_INFO) {
2405         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2406     }
2407
2408     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId, &lspan);
2409 #ifdef __TRACING__
2410     lspan->Finish();
2411 #endif
2412
2413     return rc;
2414 }
2415
2416 /**
2417  *
2418  * @param sctpMap
2419  * @param messageBuffer
2420  * @param message
2421  * @param failedMesgId
2422  * @param pSpan
2423  * @return
2424  */
2425 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2426                     RmrMessagesBuffer_t &messageBuffer,
2427                     ReportingMessages_t &message,
2428                     int failedMesgId,
2429                     otSpan *pSpan) {
2430 #ifdef __TRACING__
2431     auto lspan = opentracing::Tracer::Global()->StartSpan(
2432             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2433 #else
2434     otSpan lspan = 0;
2435 #endif
2436     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2437     if (peerInfo == nullptr) {
2438         if (failedMesgId != 0) {
2439             sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId, &lspan);
2440         } else {
2441             mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2442         }
2443 #ifdef __TRACING__
2444         lspan->Finish();
2445 #endif
2446
2447         return -1;
2448     }
2449
2450     // get the FD
2451     message.message.messageType = messageBuffer.rcvMessage->mtype;
2452     auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2453 #ifdef __TRACING__
2454     lspan->Finish();
2455 #endif
2456
2457     return rc;
2458 }
2459
2460 /**
2461  *
2462  * @param rmrCtx the rmr context to send and receive
2463  * @param msg the msg we got fromxApp
2464  * @param metaData data from xApp in ordered struct
2465  * @param failedMesgId the return message type error
2466  */
2467 void
2468 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId,
2469                                otSpan *pSpan) {
2470 #ifdef __TRACING__
2471     auto lspan = opentracing::Tracer::Global()->StartSpan(
2472             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2473 #else
2474     otSpan lspan = 0;
2475 #endif
2476     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
2477     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
2478                         message.message.enodbName);
2479     if (mdclog_level_get() >= MDCLOG_INFO) {
2480         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
2481     }
2482     msg->mtype = failedMesgId;
2483     msg->state = 0;
2484
2485     static unsigned char tx[32];
2486     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2487     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
2488
2489     sendRmrMessage(rmrMessageBuffer, message, &lspan);
2490 #ifdef __TRACING__
2491     lspan->Finish();pLogSink
2492 #endif
2493
2494 }
2495
2496 /**
2497  * Send Response back to xApp, message is used only when there was a request from the xApp
2498  *
2499  * @param enodbName the name of the gNb/eNodeB
2500  * @param msgType  the value of the message to the xApp
2501  * @param requestType The request that was sent by the xAPP
2502  * @param rmrCtx the rmr identifier
2503  * @param sctpMap hash map holds data on the requestrs
2504  * @param buf  the buffer to send to xAPP
2505  * @param size size of the buffer to send
2506  * @return
2507  */
2508 /*
2509 int sendResponseToXapp(ReportingMessages_t &message,
2510                        int msgType,
2511                        int requestType,
2512                        RmrMessagesBuffer_t &rmrMessageBuffer,
2513                        Sctp_Map_t *sctpMap,
2514                        otSpan *pSpan) {
2515 #ifdef __TRACING__
2516     auto lspan = opentracing::Tracer::Global()->StartSpan(
2517             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2518 #else
2519     otSpan lspan = 0;
2520 #endif
2521     char key[MAX_ENODB_NAME_SIZE * 2];
2522     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, requestType);
2523
2524     auto xact = sctpMap->find(key);
2525     if (xact == nullptr) {
2526         mdclog_write(MDCLOG_ERR, "NO Request %s found for this response from CU: %s", key,
2527                      message.message.enodbName);
2528 #ifdef __TRACING__
2529         lspan->Finish();
2530 #endif
2531
2532         return -1;
2533     }
2534     sctpMap->erase(key);
2535
2536     message.message.messageType = rmrMessageBuffer.sendMessage->mtype = msgType; //SETUP_RESPONSE_MESSAGE_TYPE;
2537     rmr_bytes2payload(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.asndata,
2538                       message.message.asnLength);
2539     rmr_bytes2xact(rmrMessageBuffer.sendMessage, (const unsigned char *)xact, strlen((const char *)xact));
2540     rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
2541     rmrMessageBuffer.sendMessage->state = 0;
2542
2543     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2544         mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2545     }
2546     free(xact);
2547
2548     auto rc = sendRmrMessage(rmrMessageBuffer, message, &lspan);
2549 #ifdef __TRACING__
2550     lspan->Finish();
2551 #endif
2552     return rc;
2553 }
2554 */
2555
2556 /**
2557  * build the SCTP connection to eNodB or gNb
2558  * @param rmrMessageBuffer
2559  * @param message
2560  * @param epoll_fd
2561  * @param sctpMap
2562  * @param pSpan
2563  * @return
2564  */
2565 int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
2566                         ReportingMessages_t &message,
2567                         int epoll_fd,
2568                         Sctp_Map_t *sctpMap,
2569                         otSpan *pSpan) {
2570 #ifdef __TRACING__
2571     auto lspan = opentracing::Tracer::Global()->StartSpan(
2572             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2573 #else
2574     otSpan lspan = 0;
2575 #endif
2576     struct sockaddr_in6 servaddr{};
2577     struct addrinfo hints{}, *result;
2578     auto msgData = rmrMessageBuffer.rcvMessage->payload;
2579     unsigned char meid[RMR_MAX_MEID]{};
2580     char host[256]{};
2581     uint16_t port = 0;
2582
2583     message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2584     rmr_mbuf_t *msg = rmrMessageBuffer.rcvMessage;
2585     rmr_get_meid(msg, meid);
2586
2587     if (mdclog_level_get() >= MDCLOG_INFO) {
2588         mdclog_write(MDCLOG_INFO, "message %d Received for MEID :%s. SETUP/EN-DC Setup Request from xApp, Message = %s",
2589                      msg->mtype, meid, msgData);
2590     }
2591     if (getSetupRequestMetaData(message, (char *)msgData, host, port, &lspan) < 0) {
2592         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2593             mdclog_write(MDCLOG_DEBUG, "Error in setup parameters %s, %d", __func__, __LINE__);
2594         }
2595 #ifdef __TRACING__
2596         lspan->Finish();
2597 #endif
2598         return -1;
2599     }
2600
2601     //// message asndata points to the start of the asndata of the message and not to start of payload
2602     // search if the same host:port but not the same enodbname
2603     char searchBuff[256]{};
2604     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", host, port);
2605     auto e = (char *)sctpMap->find(searchBuff);
2606     if (e != nullptr) {
2607         // found one compare if not the same
2608         if (strcmp(message.message.enodbName, e) != 0) {
2609             mdclog_write(MDCLOG_ERR,
2610                          "Try to connect CU %s to Host %s but %s already connected",
2611                          message.message.enodbName, host, e);
2612 #ifdef __TRACING__
2613             lspan->Finish();
2614 #endif
2615             return -1;
2616         }
2617     }
2618
2619     // check if not connected. if connected send the request and return
2620     auto *peerInfo = (ConnectedCU_t *)sctpMap->find(message.message.enodbName);
2621     if (peerInfo != nullptr) {
2622         if (mdclog_level_get() >= MDCLOG_INFO) {
2623             mdclog_write(MDCLOG_INFO, "Device already connected to %s",
2624                          message.message.enodbName);
2625         }
2626         message.message.messageType = msg->mtype;
2627         auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2628         if (rc != 0) {
2629             mdclog_write(MDCLOG_ERR, "failed write to SCTP %s, %d", __func__, __LINE__);
2630 #ifdef __TRACING__
2631             lspan->Finish();
2632 #endif
2633             return -1;
2634         }
2635
2636         char key[MAX_ENODB_NAME_SIZE * 2];
2637         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2638         int xaction_len = strlen((const char *) msg->xaction);
2639         auto *xaction = (unsigned char *) calloc(1, xaction_len);
2640         memcpy(xaction, msg->xaction, xaction_len);
2641         sctpMap->setkey(key, xaction);
2642         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2643             mdclog_write(MDCLOG_DEBUG, "set key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2644         }
2645 #ifdef __TRACING__
2646         lspan->Finish();
2647 #endif
2648         return 0;
2649     }
2650
2651     peerInfo = (ConnectedCU_t *) calloc(1, sizeof(ConnectedCU_t));
2652     memcpy(peerInfo->enodbName, message.message.enodbName, sizeof(message.message.enodbName));
2653
2654     // new connection
2655     if ((peerInfo->fileDescriptor = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP)) < 0) {
2656         mdclog_write(MDCLOG_ERR, "Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2657 #ifdef __TRACING__
2658         lspan->Finish();
2659 #endif
2660         return -1;
2661     }
2662
2663     auto optval = 1;
2664     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval) != 0) {
2665         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEPORT Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2666 #ifdef __TRACING__
2667         lspan->Finish();
2668 #endif
2669         return -1;
2670     }
2671     optval = 1;
2672     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) != 0) {
2673         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEADDR Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2674 #ifdef __TRACING__
2675         lspan->Finish();
2676 #endif
2677         return -1;
2678     }
2679     servaddr.sin6_family = AF_INET6;
2680
2681     struct sockaddr_in6 localAddr {};
2682     localAddr.sin6_family = AF_INET6;
2683     localAddr.sin6_addr = in6addr_any;
2684     localAddr.sin6_port = htons(SRC_PORT);
2685
2686     if (bind(peerInfo->fileDescriptor, (struct sockaddr*)&localAddr , sizeof(struct sockaddr_in6)) < 0) {
2687         mdclog_write(MDCLOG_ERR, "bind Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2688 #ifdef __TRACING__
2689         lspan->Finish();
2690 #endif
2691         return -1;
2692     }//Ends the binding.
2693
2694     memset(&hints, 0, sizeof hints);
2695     hints.ai_flags = AI_NUMERICHOST;
2696     if (getaddrinfo(host, nullptr, &hints, &result) < 0) {
2697         close(peerInfo->fileDescriptor);
2698         mdclog_write(MDCLOG_ERR, "getaddrinfo error for %s, Error = %s", host, strerror(errno));
2699 #ifdef __TRACING__
2700         lspan->Finish();
2701 #endif
2702         return -1;
2703     }
2704     memcpy(&servaddr, result->ai_addr, sizeof(struct sockaddr_in6));
2705     freeaddrinfo(result);
2706
2707     servaddr.sin6_port = htons(port);      /* daytime server */
2708     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2709         mdclog_write(MDCLOG_DEBUG, "Send Connect FD = %d host : %s port %d",
2710                      peerInfo->fileDescriptor,
2711                      host,
2712                      port);
2713     }
2714
2715     // Add to Epol
2716     if (addToEpoll(epoll_fd, peerInfo, (EPOLLOUT | EPOLLIN | EPOLLET), sctpMap, message.message.enodbName,
2717                    msg->mtype, &lspan) != 0) {
2718 #ifdef __TRACING__
2719         lspan->Finish();
2720 #endif
2721         return -1;
2722     }
2723
2724     char hostBuff[NI_MAXHOST];
2725     char portBuff[NI_MAXHOST];
2726
2727     if (getnameinfo((SA *) &servaddr, sizeof(servaddr),
2728                     hostBuff, sizeof(hostBuff),
2729                     portBuff, sizeof(portBuff),
2730                     (uint) (NI_NUMERICHOST) | (uint) (NI_NUMERICSERV)) != 0) {
2731         mdclog_write(MDCLOG_ERR, "getnameinfo() Error, %s  %s %d", strerror(errno), __func__, __LINE__);
2732 #ifdef __TRACING__
2733         lspan->Finish();
2734 #endif
2735         return -1;
2736     }
2737
2738     if (setSocketNoBlocking(peerInfo->fileDescriptor) != 0) {
2739         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on sctpPort %s", hostBuff,
2740                      portBuff);
2741         close(peerInfo->fileDescriptor);
2742 #ifdef __TRACING__
2743         lspan->Finish();
2744 #endif
2745         return -1;
2746     }
2747
2748     memcpy(peerInfo->hostName, hostBuff, strlen(hostBuff));
2749     peerInfo->hostName[strlen(hostBuff)] = 0;
2750     memcpy(peerInfo->portNumber, portBuff, strlen(portBuff));
2751     peerInfo->portNumber[strlen(portBuff)] = 0;
2752
2753     // map by enoodb/gnb name
2754     sctpMap->setkey(message.message.enodbName, peerInfo);
2755     //map host and port to enodeb
2756     sctpMap->setkey(searchBuff, message.message.enodbName);
2757
2758     // save message for the return values
2759     char key[MAX_ENODB_NAME_SIZE * 2];
2760     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2761     int xaction_len = strlen((const char *) msg->xaction);
2762     auto *xaction = (unsigned char *) calloc(1, xaction_len);
2763     memcpy(xaction, msg->xaction, xaction_len);
2764     sctpMap->setkey(key, xaction);
2765     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2766         mdclog_write(MDCLOG_DEBUG, "End building peerinfo: %s for CU %s", key, message.message.enodbName);
2767     }
2768
2769     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2770         mdclog_write(MDCLOG_DEBUG, "Send connect to FD %d, %s, %d",
2771                      peerInfo->fileDescriptor, __func__, __LINE__);
2772     }
2773     if (connect(peerInfo->fileDescriptor, (SA *) &servaddr, sizeof(servaddr)) < 0) {
2774         if (errno != EINPROGRESS) {
2775             mdclog_write(MDCLOG_ERR, "connect FD %d to host : %s port %d, %s",
2776                          peerInfo->fileDescriptor, host, port, strerror(errno));
2777             close(peerInfo->fileDescriptor);
2778 #ifdef __TRACING__
2779             lspan->Finish();
2780 #endif
2781             return -1;
2782         }
2783         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2784             mdclog_write(MDCLOG_DEBUG,
2785                          "Connect to FD %d returned with EINPROGRESS : %s",
2786                          peerInfo->fileDescriptor, strerror(errno));
2787         }
2788         // since message.message.asndata is pointing to the asndata in the rmr message payload we copy it like this
2789         memcpy(peerInfo->asnData, message.message.asndata, message.message.asnLength);
2790         peerInfo->asnLength = message.message.asnLength;
2791         peerInfo->mtype = msg->mtype;
2792 #ifdef __TRACING__
2793         lspan->Finish();
2794 #endif
2795         return 0;
2796     }
2797
2798     if (mdclog_level_get() >= MDCLOG_INFO) {
2799         mdclog_write(MDCLOG_INFO, "Connect to FD %d returned OK without EINPROGRESS", peerInfo->fileDescriptor);
2800     }
2801
2802     peerInfo->isConnected = true;
2803
2804     if (modifyToEpoll(epoll_fd, peerInfo, (EPOLLIN | EPOLLET), sctpMap, message.message.enodbName, msg->mtype,
2805                       &lspan) != 0) {
2806 #ifdef __TRACING__
2807         lspan->Finish();
2808 #endif
2809         return -1;
2810     }
2811
2812     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2813         mdclog_write(MDCLOG_DEBUG, "Connected to host : %s port %d", host, port);
2814     }
2815
2816     message.message.messageType = msg->mtype;
2817     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2818         mdclog_write(MDCLOG_DEBUG, "Send SCTP message to FD %d", peerInfo->fileDescriptor);
2819     }
2820     if (sendSctpMsg(peerInfo, message, sctpMap, &lspan) != 0) {
2821         mdclog_write(MDCLOG_ERR, "Error write to SCTP  %s %d", __func__, __LINE__);
2822 #ifdef __TRACING__
2823         lspan->Finish();
2824 #endif
2825         return -1;
2826     }
2827     memset(peerInfo->asnData, 0, message.message.asnLength);
2828     peerInfo->asnLength = 0;
2829     peerInfo->mtype = 0;
2830
2831     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2832         mdclog_write(MDCLOG_DEBUG, "Sent message to SCTP for %s", message.message.enodbName);
2833     }
2834 #ifdef __TRACING__
2835     lspan->Finish();
2836 #endif
2837     return 0;
2838 }
2839
2840 /**
2841  *
2842  * @param epoll_fd
2843  * @param peerInfo
2844  * @param events
2845  * @param sctpMap
2846  * @param enodbName
2847  * @param msgType
2848  * @param pSpan
2849  * @return
2850  */
2851 int addToEpoll(int epoll_fd,
2852                ConnectedCU_t *peerInfo,
2853                uint32_t events,
2854                Sctp_Map_t *sctpMap,
2855                char *enodbName,
2856                int msgType,
2857                otSpan *pSpan) {
2858 #ifdef __TRACING__
2859     auto lspan = opentracing::Tracer::Global()->StartSpan(
2860             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2861 #else
2862     otSpan lspan = 0;
2863 #endif
2864     // Add to Epol
2865     struct epoll_event event{};
2866     event.data.ptr = peerInfo;
2867     event.events = events;
2868     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2869         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2870             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2871                          strerror(errno), __func__, __LINE__);
2872         }
2873         close(peerInfo->fileDescriptor);
2874         if (enodbName != nullptr) {
2875             cleanHashEntry(peerInfo, sctpMap, &lspan);
2876             char key[MAX_ENODB_NAME_SIZE * 2];
2877             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2878             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2879                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2880             }
2881             auto tmp = sctpMap->find(key);
2882             if (tmp) {
2883                 free(tmp);
2884                 sctpMap->erase(key);
2885             }
2886         } else {
2887             peerInfo->enodbName[0] = 0;
2888         }
2889         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2890 #ifdef __TRACING__
2891         lspan->Finish();
2892 #endif
2893         return -1;
2894     }
2895 #ifdef __TRACING__
2896     lspan->Finish();
2897 #endif
2898     return 0;
2899 }
2900
2901 /**
2902  *
2903  * @param epoll_fd
2904  * @param peerInfo
2905  * @param events
2906  * @param sctpMap
2907  * @param enodbName
2908  * @param msgType
2909  * @param pSpan
2910  * @return
2911  */
2912 int modifyToEpoll(int epoll_fd,
2913                   ConnectedCU_t *peerInfo,
2914                   uint32_t events,
2915                   Sctp_Map_t *sctpMap,
2916                   char *enodbName,
2917                   int msgType,
2918                   otSpan *pSpan) {
2919 #ifdef __TRACING__
2920     auto lspan = opentracing::Tracer::Global()->StartSpan(
2921             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2922 #else
2923     otSpan lspan = 0;
2924 #endif
2925     // Add to Epol
2926     struct epoll_event event{};
2927     event.data.ptr = peerInfo;
2928     event.events = events;
2929     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2930         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2931             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2932                          strerror(errno), __func__, __LINE__);
2933         }
2934         close(peerInfo->fileDescriptor);
2935         cleanHashEntry(peerInfo, sctpMap, &lspan);
2936         char key[MAX_ENODB_NAME_SIZE * 2];
2937         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2938         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2939             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2940         }
2941         auto tmp = sctpMap->find(key);
2942         if (tmp) {
2943             free(tmp);
2944         }
2945         sctpMap->erase(key);
2946         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2947 #ifdef __TRACING__
2948         lspan->Finish();
2949 #endif
2950         return -1;
2951     }
2952 #ifdef __TRACING__
2953     lspan->Finish();
2954 #endif
2955     return 0;
2956 }
2957
2958
2959 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan) {
2960 #ifdef __TRACING__
2961     auto lspan = opentracing::Tracer::Global()->StartSpan(
2962             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2963 #else
2964 //    otSpan lspan = 0;
2965 #endif
2966     //serialize the span
2967 #ifdef __TRACING__
2968     std::unordered_map<std::string, std::string> data;
2969     RICCarrierWriter carrier(data);
2970     opentracing::Tracer::Global()->Inject((lspan.get())->context(), carrier);
2971     nlohmann::json j = data;
2972     std::string str = j.dump();
2973     static auto maxTraceLength = 0;
2974
2975     maxTraceLength = str.length() > maxTraceLength ? str.length() : maxTraceLength;
2976     // serialized context can be put to RMR message using function:
2977     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2978         mdclog_write(MDCLOG_DEBUG, "max trace length is %d trace data length = %ld data = %s", maxTraceLength,
2979                      str.length(), str.c_str());
2980     }
2981     rmr_set_trace(rmrMessageBuffer.sendMessage, (const unsigned char *) str.c_str(), str.length());
2982 #endif
2983     buildJsonMessage(message);
2984
2985     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2986
2987     if (rmrMessageBuffer.sendMessage == nullptr) {
2988         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2989         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2990 #ifdef __TRACING__
2991         lspan->Finish();
2992 #endif
2993         return -1;
2994     }
2995
2996     if (rmrMessageBuffer.sendMessage->state != 0) {
2997         char meid[RMR_MAX_MEID]{};
2998         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2999             usleep(5);
3000             rmrMessageBuffer.sendMessage->state = 0;
3001             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
3002                          rmrMessageBuffer.sendMessage->mtype,
3003                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
3004             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
3005             if (rmrMessageBuffer.sendMessage == nullptr) {
3006                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
3007                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
3008 #ifdef __TRACING__
3009                 lspan->Finish();
3010 #endif
3011                 return -1;
3012             } else if (rmrMessageBuffer.sendMessage->state != 0) {
3013                 mdclog_write(MDCLOG_ERR,
3014                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
3015                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
3016                              rmrMessageBuffer.sendMessage->mtype,
3017                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
3018                 auto rc = rmrMessageBuffer.sendMessage->state;
3019 #ifdef __TRACING__
3020                 lspan->Finish();
3021 #endif
3022                 return rc;
3023             }
3024         } else {
3025             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
3026                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
3027                          rmrMessageBuffer.sendMessage->mtype,
3028                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
3029 #ifdef __TRACING__
3030             lspan->Finish();
3031 #endif
3032             return rmrMessageBuffer.sendMessage->state;
3033         }
3034     }
3035     return 0;
3036 }
3037
3038 void buildJsonMessage(ReportingMessages_t &message) {
3039     if (jsonTrace) {
3040         message.outLen = sizeof(message.base64Data);
3041         base64::encode((const unsigned char *) message.message.asndata,
3042                        (const int) message.message.asnLength,
3043                        message.base64Data,
3044                        message.outLen);
3045         if (mdclog_level_get() >= MDCLOG_DEBUG) {
3046             mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
3047                          (int) message.message.asnLength,
3048                          (int) message.outLen);
3049         }
3050
3051         snprintf(message.buffer, sizeof(message.buffer),
3052                                      "{\"header\": {\"ts\": \"%ld.%09ld\","
3053                                      "\"ranName\": \"%s\","
3054                                      "\"messageType\": %d,"
3055                                      "\"direction\": \"%c\"},"
3056                                      "\"base64Length\": %d,"
3057                                      "\"asnBase64\": \"%s\"}",
3058                                      message.message.time.tv_sec,
3059                                      message.message.time.tv_nsec,
3060                                      message.message.enodbName,
3061                                      message.message.messageType,
3062                                      message.message.direction,
3063                                      (int) message.outLen,
3064                                      message.base64Data);
3065         static src::logger_mt &lg = my_logger::get();
3066
3067         BOOST_LOG(lg) << message.buffer;
3068     }
3069 }
3070
3071
3072 /**
3073  * take RMR error code to string
3074  * @param state
3075  * @return
3076  */
3077 string translateRmrErrorMessages(int state) {
3078     string str = {};
3079     switch (state) {
3080         case RMR_OK:
3081             str = "RMR_OK - state is good";
3082             break;
3083         case RMR_ERR_BADARG:
3084             str = "RMR_ERR_BADARG - argument passd to function was unusable";
3085             break;
3086         case RMR_ERR_NOENDPT:
3087             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
3088             break;
3089         case RMR_ERR_EMPTY:
3090             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
3091             break;
3092         case RMR_ERR_NOHDR:
3093             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
3094             break;
3095         case RMR_ERR_SENDFAILED:
3096             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
3097             break;
3098         case RMR_ERR_CALLFAILED:
3099             str = "RMR_ERR_CALLFAILED - unable to send call() message";
3100             break;
3101         case RMR_ERR_NOWHOPEN:
3102             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
3103             break;
3104         case RMR_ERR_WHID:
3105             str = "RMR_ERR_WHID - wormhole id was invalid";
3106             break;
3107         case RMR_ERR_OVERFLOW:
3108             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
3109             break;
3110         case RMR_ERR_RETRY:
3111             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
3112             break;
3113         case RMR_ERR_RCVFAILED:
3114             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
3115             break;
3116         case RMR_ERR_TIMEOUT:
3117             str = "RMR_ERR_TIMEOUT - message processing call timed out";
3118             break;
3119         case RMR_ERR_UNSET:
3120             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
3121             break;
3122         case RMR_ERR_TRUNC:
3123             str = "RMR_ERR_TRUNC - received message likely truncated";
3124             break;
3125         case RMR_ERR_INITFAILED:
3126             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
3127             break;
3128         case RMR_ERR_NOTSUPP:
3129             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
3130             break;
3131         default:
3132             char buf[128]{};
3133             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
3134             str = buf;
3135             break;
3136     }
3137     return str;
3138 }
3139
3140