Copy latest code to master
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22 #include "sctpThread.h"
23
24
25 using namespace std::placeholders;
26 using namespace boost::filesystem;
27
28 #ifdef __TRACING__
29 using namespace opentracing;
30 #endif
31 //#ifdef __cplusplus
32 //extern "C"
33 //{
34 //#endif
35
36 // need to expose without the include of gcov
37 extern "C" void __gcov_flush(void);
38
39 static void catch_function(int signal) {
40     __gcov_flush();
41     exit(signal);
42 }
43
44
45 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
46
47 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
48 double cpuClock = 0.0;
49 bool jsonTrace = true;
50
51 void init_log() {
52     mdclog_attr_t *attr;
53     mdclog_attr_init(&attr);
54     mdclog_attr_set_ident(attr, "E2Terminator");
55     mdclog_init(attr);
56     mdclog_attr_destroy(attr);
57 }
58 auto start_time = std::chrono::high_resolution_clock::now();
59 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
60
61 double age() {
62     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
63 }
64
65 double approx_CPU_MHz(unsigned sleeptime) {
66     using namespace std::chrono_literals;
67     uint32_t aux = 0;
68     uint64_t cycles_start = rdtscp(aux);
69     double time_start = age();
70     std::this_thread::sleep_for(sleeptime * 1ms);
71     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
72     double elapsed_time = age() - time_start;
73     return elapsed_cycles / elapsed_time;
74 }
75
76 //std::atomic<int64_t> rmrCounter{0};
77 std::atomic<int64_t> num_of_messages{0};
78 std::atomic<int64_t> num_of_XAPP_messages{0};
79 static long transactionCounter = 0;
80
81
82 int main(const int argc, char **argv) {
83     sctp_params_t sctpParams;
84
85
86
87 #ifdef __TRACING__
88     opentracing::Tracer::InitGlobal(tracelibcpp::createTracer("E2 Terminator"));
89     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
90 #else
91     otSpan span = 0;
92 #endif
93
94     {
95         std::random_device device{};
96         std::mt19937 generator(device());
97         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
98         transactionCounter = distribution(generator);
99     }
100
101     uint64_t st = 0,en = 0;
102     uint32_t aux1 = 0;
103     uint32_t aux2 = 0;
104     st = rdtscp(aux1);
105
106     unsigned num_cpus = std::thread::hardware_concurrency();
107     init_log();
108     mdclog_level_set(MDCLOG_INFO);
109
110     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
111         mdclog_write(MDCLOG_ERR, "Errir initializing SIGINT");
112         exit(1);
113     }
114     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
115         mdclog_write(MDCLOG_ERR, "Errir initializing SIGABRT");
116         exit(1);
117     }
118     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
119         mdclog_write(MDCLOG_ERR, "Errir initializing SIGTERM");
120         exit(1);
121     }
122
123
124     cpuClock = approx_CPU_MHz(100);
125
126     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
127     auto result = parse(argc, argv, sctpParams);
128
129     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
130     if (exists(p)) {
131         const int size = 2048;
132         auto fileSize = file_size(p);
133         if (fileSize > size) {
134             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
135             exit(-1);
136         }
137     } else {
138         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
139         exit(-1);
140     }
141
142
143     ReadConfigFile conf;
144     if (conf.openConfigFile(p.string()) == -1) {
145         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
146                      p.string().c_str(), strerror(errno));
147         exit(-1);
148     }
149     int rmrPort = conf.getIntValue("nano");
150     if (rmrPort == -1) {
151         mdclog_write(MDCLOG_ERR, "illigal RMR port ");
152         exit(-1);
153     }
154     sctpParams.rmrPort = (uint16_t)rmrPort;
155     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
156
157     auto tmpStr = conf.getStringValue("loglevel");
158     if (tmpStr.length() == 0) {
159         mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
160         tmpStr = "info";
161     }
162     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
163
164     if ((tmpStr.compare("debug")) == 0) {
165         sctpParams.logLevel = MDCLOG_DEBUG;
166     } else if ((tmpStr.compare("info")) == 0) {
167         sctpParams.logLevel = MDCLOG_INFO;
168     } else if ((tmpStr.compare("warning")) == 0) {
169         sctpParams.logLevel = MDCLOG_WARN;
170     } else if ((tmpStr.compare("error")) == 0) {
171         sctpParams.logLevel = MDCLOG_ERR;
172     } else {
173         mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
174         sctpParams.logLevel = MDCLOG_INFO;
175     }
176     mdclog_level_set(sctpParams.logLevel);
177
178     tmpStr = conf.getStringValue("volume");
179     if (tmpStr.length() == 0) {
180         mdclog_write(MDCLOG_ERR, "illigal volume.");
181         exit(-1);
182     }
183
184     char tmpLogFilespec[VOLUME_URL_SIZE];
185     tmpLogFilespec[0] = 0;
186     sctpParams.volume[0] = 0;
187     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
188     // copy the name to temp file as well
189     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
190
191
192     // define the file name in the tmp directory under the volume
193     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
194
195     sctpParams.myIP = conf.getStringValue("local-ip");
196     if (sctpParams.myIP.length() == 0) {
197         mdclog_write(MDCLOG_ERR, "illigal local-ip.");
198         exit(-1);
199     }
200
201     sctpParams.myIP = conf.getStringValue("external-fqdn");
202     if (sctpParams.myIP.length() == 0) {
203         mdclog_write(MDCLOG_ERR, "illigal external-fqdn.");
204         exit(-1);
205     }
206
207     tmpStr = conf.getStringValue("trace");
208     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
209     if ((tmpStr.compare("start")) == 0) {
210         mdclog_write(MDCLOG_INFO, "Trace set to: start");
211         sctpParams.trace = true;
212     } else if ((tmpStr.compare("stop")) == 0) {
213         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
214         sctpParams.trace = false;
215     }
216     jsonTrace = sctpParams.trace;
217
218     en = rdtscp(aux2);
219
220     mdclog_write(MDCLOG_INFO, "start = %lx end = %lx diff = %lx\n", st, en, en - st);
221     mdclog_write(MDCLOG_INFO, "start high = %lx start lo = %lx end high = %lx end lo = %lx\n",
222             st >> 32, st & 0xFFFFFFFF, (int64_t)en >> 32, en & 0xFFFFFFFF);
223     mdclog_write(MDCLOG_INFO, "ellapsed time = %5.9f\n", (double)(en - st)/cpuClock);
224
225     if (mdclog_level_get() >= MDCLOG_INFO) {
226         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
227         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
228         mdclog_mdc_add("volume", sctpParams.volume);
229         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
230         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
231
232         mdclog_write(MDCLOG_INFO, "running parameters");
233     }
234     mdclog_mdc_clean();
235     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, 4096, "{\"address\": \"%s:%d\","
236                                                                          "\"fqdn\": \"%s\"}",
237                                             (const char *)sctpParams.myIP.c_str(),
238                                             sctpParams.rmrPort,
239                                             sctpParams.fqdn.c_str());
240
241
242     // Files written to the current working directory
243     boostLogger = logging::add_file_log(
244             keywords::file_name = tmpLogFilespec, // to temp directory
245             keywords::rotation_size = 10 * 1024 * 1024,
246             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
247             keywords::format = "%Message%"
248             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
249     );
250
251     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
252     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
253             keywords::target = sctpParams.volume
254     ));
255
256     // Upon restart, scan the directory for files matching the file_name pattern
257     boostLogger->locked_backend()->scan_for_files();
258
259     // Enable auto-flushing after each tmpStr record written
260     if (mdclog_level_get() >= MDCLOG_DEBUG) {
261         boostLogger->locked_backend()->auto_flush(true);
262     }
263
264     // start epoll
265     sctpParams.epoll_fd = epoll_create1(0);
266     if (sctpParams.epoll_fd == -1) {
267         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
268         exit(-1);
269     }
270
271     getRmrContext(sctpParams, &span);
272     if (sctpParams.rmrCtx == nullptr) {
273         close(sctpParams.epoll_fd);
274         exit(-1);
275     }
276
277     if (buildInotify(sctpParams) == -1) {
278         close(sctpParams.rmrListenFd);
279         rmr_close(sctpParams.rmrCtx);
280         close(sctpParams.epoll_fd);
281         exit(-1);
282      }
283
284     sctpParams.sctpMap = new mapWrapper();
285
286     std::vector<std::thread> threads(num_cpus);
287 //    std::vector<std::thread> threads;
288
289     num_cpus = 1;
290     for (unsigned int i = 0; i < num_cpus; i++) {
291         threads[i] = std::thread(listener, &sctpParams);
292
293         cpu_set_t cpuset;
294         CPU_ZERO(&cpuset);
295         CPU_SET(i, &cpuset);
296         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
297         if (rc != 0) {
298             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
299         }
300     }
301
302     //loop over term_init until first message from xApp
303     handleTermInit(sctpParams);
304
305     for (auto &t : threads) {
306         t.join();
307     }
308
309 #ifdef __TRACING__
310     opentracing::Tracer::Global()->Close();
311 #endif
312     return 0;
313 }
314
315 void handleTermInit(sctp_params_t &sctpParams) {
316     sendTermInit(sctpParams);
317     //send to e2 manager init of e2 term
318     //E2_TERM_INIT
319
320     int count = 0;
321     while (true) {
322         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
323         if (xappMessages > 0) {
324             if (mdclog_level_get() >=  MDCLOG_INFO) {
325                 mdclog_write(MDCLOG_INFO, "Got a message from some appliction, stop sending E@_TERM_INIT");
326             }
327             return;
328         }
329         usleep(100000);
330         count++;
331         if (count % 1000 == 0) {
332             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
333             sendTermInit(sctpParams);
334         }
335     }
336 }
337
338 void sendTermInit(sctp_params_t &sctpParams) {
339     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
340     auto count = 0;
341     while (true) {
342         msg->mtype = E2_TERM_INIT;
343         msg->state = 0;
344         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
345         static unsigned char tx[32];
346         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
347         rmr_bytes2xact(msg, tx, txLen);
348         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
349         if (msg == nullptr) {
350             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.myIP.length());
351         } else if (msg->state == 0) {
352             rmr_free_msg(msg);
353             if (mdclog_level_get() >=  MDCLOG_INFO) {
354                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT succsesfuly sent ");
355             }
356             return;
357         } else {
358             if (count % 100 == 0) {
359                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %d ", msg->state);
360             }
361             sleep(1);
362         }
363         count++;
364     }
365
366 }
367
368 /**
369  *
370  * @param argc
371  * @param argv
372  * @param sctpParams
373  * @return
374  */
375 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
376     cxxopts::Options options(argv[0], "e2 term help");
377     options.positional_help("[optional args]").show_positional_help();
378     options.allow_unrecognised_options().add_options()
379             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
380             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
381             ("h,help", "Print help");
382
383     auto result = options.parse(argc, argv);
384
385     if (result.count("help")) {
386         std::cout << options.help({""}) << std::endl;
387         exit(0);
388     }
389     return result;
390 }
391
392 /**
393  *
394  * @param sctpParams
395  * @return -1 failed 0 success
396  */
397 int buildInotify(sctp_params_t &sctpParams) {
398     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
399     if (sctpParams.inotifyFD == -1) {
400         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
401         close(sctpParams.rmrListenFd);
402         rmr_close(sctpParams.rmrCtx);
403         close(sctpParams.epoll_fd);
404         return -1;
405     }
406
407     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
408                                               (const char *)sctpParams.configFilePath.c_str(),
409                                               IN_OPEN | IN_CLOSE);
410     if (sctpParams.inotifyWD == -1) {
411         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
412                 sctpParams.configFilePath.c_str(),
413                 strerror(errno));
414         close(sctpParams.inotifyFD);
415         return -1;
416     }
417
418     struct epoll_event event{};
419     event.events = (EPOLLIN);
420     event.data.fd = sctpParams.inotifyFD;
421     // add listening RMR FD to epoll
422     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
423         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
424         close(sctpParams.inotifyFD);
425         return -1;
426     }
427     return 0;
428 }
429
430 /**
431  *
432  * @param args
433  * @return
434  */
435 void listener(sctp_params_t *params) {
436 #ifdef __TRACING__
437     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
438 #else
439     otSpan span = 0;
440 #endif
441     int num_of_SCTP_messages = 0;
442     auto totalTime = 0.0;
443     mdclog_mdc_clean();
444     mdclog_level_set(params->logLevel);
445
446     std::thread::id this_id = std::this_thread::get_id();
447     //save cout
448     streambuf *oldCout = cout.rdbuf();
449     ostringstream memCout;
450     // create new cout
451     cout.rdbuf(memCout.rdbuf());
452     cout << this_id;
453     //return to the normal cout
454     cout.rdbuf(oldCout);
455
456     char tid[32];
457     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
458     tid[memCout.str().length()] = 0;
459     mdclog_mdc_add("thread id", tid);
460
461     if (mdclog_level_get() >= MDCLOG_DEBUG) {
462         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
463     }
464
465     RmrMessagesBuffer_t rmrMessageBuffer{};
466     //create and init RMR
467     rmrMessageBuffer.rmrCtx = params->rmrCtx;
468
469     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
470     struct timespec end{0, 0};
471     struct timespec start{0, 0};
472
473     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
474     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
475
476     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
477     rmrMessageBuffer.ka_message_len = params->ka_message_length;
478     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
479
480     if (mdclog_level_get() >= MDCLOG_DEBUG) {
481         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
482     }
483
484     ReportingMessages_t message {};
485
486     for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
487         rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
488         rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
489     }
490
491     while (true) {
492         if (mdclog_level_get() >= MDCLOG_DEBUG) {
493             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
494         }
495         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
496         if (numOfEvents < 0 && errno == EINTR) {
497             if (mdclog_level_get() >= MDCLOG_DEBUG) {
498                 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
499             }
500             continue;
501         }
502         if (numOfEvents < 0) {
503             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
504             return;
505         }
506         for (auto i = 0; i < numOfEvents; i++) {
507             if (mdclog_level_get() >= MDCLOG_DEBUG) {
508                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
509             }
510             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
511             start.tv_sec = message.message.time.tv_sec;
512             start.tv_nsec = message.message.time.tv_nsec;
513
514
515             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
516                 handlepoll_error(events[i], message, rmrMessageBuffer, params, &span);
517             } else if (events[i].events & EPOLLOUT) {
518                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params, &span);
519             } else if (params->rmrListenFd == events[i].data.fd) {
520                 // got message from XAPP
521                 num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
522                 num_of_messages.fetch_add(1, std::memory_order_release);
523                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
524                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
525                 }
526                 if (receiveXappMessages(params->epoll_fd,
527                                         params->sctpMap,
528                                         rmrMessageBuffer,
529                                         message.message.time,
530                                         &span) != 0) {
531                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
532                 }
533             } else if (params->inotifyFD == events[i].data.fd) {
534                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
535                 handleConfigChange(params);
536             } else {
537                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
538                  * We must read whatever data is available completely, as we are running
539                  *  in edge-triggered mode and won't get a notification again for the same data. */
540                 num_of_messages.fetch_add(1, std::memory_order_release);
541                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
542                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
543                 }
544                 receiveDataFromSctp(&events[i],
545                                     params->sctpMap,
546                                     num_of_SCTP_messages,
547                                     rmrMessageBuffer,
548                                     message.message.time,
549                                     &span);
550             }
551
552             clock_gettime(CLOCK_MONOTONIC, &end);
553             if (mdclog_level_get() >= MDCLOG_INFO) {
554                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
555                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
556             }
557             if (mdclog_level_get() >= MDCLOG_DEBUG) {
558                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
559                              end.tv_sec - start.tv_sec,
560                              end.tv_nsec - start.tv_nsec);
561             }
562         }
563     }
564 #ifdef __TRACING__
565     span->Finish();
566 #else
567
568 #endif
569 }
570
571 /**
572  *
573  * @param sctpParams
574  */
575 void handleConfigChange(sctp_params_t *sctpParams) {
576     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
577     const struct inotify_event *event;
578     char *ptr;
579
580     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
581     auto endlessLoop = true;
582     while (endlessLoop) {
583         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
584         if (len == -1) {
585             if (errno != EAGAIN) {
586                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
587                 endlessLoop = false;
588                 continue;
589             }
590             else {
591                 endlessLoop = false;
592                 continue;
593             }
594         }
595
596         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
597             event = (const struct inotify_event *)ptr;
598             if (event->mask & (uint32_t)IN_ISDIR) {
599                 continue;
600             }
601
602             // the directory name
603             if (sctpParams->inotifyWD == event->wd) {
604                 // not the directory
605             }
606             if (event->len) {
607                 if (!(sctpParams->configFileName.compare(event->name))) {
608                     continue;
609                 }
610             }
611             // only the file we want
612             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
613                 if (exists(p)) {
614                     const int size = 2048;
615                     auto fileSize = file_size(p);
616                     if (fileSize > size) {
617                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
618                         return;
619                     }
620                 } else {
621                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
622                     return;
623                 }
624
625                 ReadConfigFile conf;
626                 if (conf.openConfigFile(p.string()) == -1) {
627                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
628                                  p.string().c_str(), strerror(errno));
629                     return;
630                 }
631
632                 auto tmpStr = conf.getStringValue("loglevel");
633                 if (tmpStr.length() == 0) {
634                     mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
635                     tmpStr = "info";
636                 }
637                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
638
639                 if ((tmpStr.compare("debug")) == 0) {
640                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
641                     sctpParams->logLevel = MDCLOG_DEBUG;
642                 } else if ((tmpStr.compare("info")) == 0) {
643                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
644                     sctpParams->logLevel = MDCLOG_INFO;
645                 } else if ((tmpStr.compare("warning")) == 0) {
646                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
647                     sctpParams->logLevel = MDCLOG_WARN;
648                 } else if ((tmpStr.compare("error")) == 0) {
649                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
650                     sctpParams->logLevel = MDCLOG_ERR;
651                 } else {
652                     mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
653                     sctpParams->logLevel = MDCLOG_INFO;
654                 }
655                 mdclog_level_set(sctpParams->logLevel);
656
657
658                 tmpStr = conf.getStringValue("trace");
659                 if (tmpStr.length() == 0) {
660                     mdclog_write(MDCLOG_ERR, "illigal trace. Set trace to stop");
661                     tmpStr = "stop";
662                 }
663
664                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
665                 if ((tmpStr.compare("start")) == 0) {
666                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
667                     sctpParams->trace = true;
668                 } else if ((tmpStr.compare("stop")) == 0) {
669                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
670                     sctpParams->trace = false;
671                 } else {
672                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
673                     sctpParams->trace = false;
674                 }
675                 jsonTrace = sctpParams->trace;
676                 endlessLoop = false;
677             }
678         }
679     }
680 }
681
682 /**
683  *
684  * @param event
685  * @param message
686  * @param rmrMessageBuffer
687  * @param params
688  * @param pSpan
689  */
690 void handleEinprogressMessages(struct epoll_event &event,
691                                ReportingMessages_t &message,
692                                RmrMessagesBuffer_t &rmrMessageBuffer,
693                                sctp_params_t *params,
694                                otSpan *pSpan) {
695 #ifdef __TRACING__
696     auto lspan = opentracing::Tracer::Global()->StartSpan(
697             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
698 #else
699     otSpan lspan = 0;
700 #endif
701     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
702     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
703
704     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
705     auto retVal = 0;
706     socklen_t retValLen = 0;
707     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
708     if (rc != 0 || retVal != 0) {
709         if (rc != 0) {
710             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
711                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
712                                                          peerInfo->enodbName, strerror(errno));
713         } else if (retVal != 0) {
714             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
715                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
716                                                          peerInfo->enodbName);
717         }
718
719         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
720         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
721         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
722         message.message.direction = 'N';
723         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
724             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
725         }
726         memset(peerInfo->asnData, 0, peerInfo->asnLength);
727         peerInfo->asnLength = 0;
728         peerInfo->mtype = 0;
729 #ifdef __TRACING__
730         lspan->Finish();
731 #endif
732         return;
733     }
734
735     peerInfo->isConnected = true;
736
737     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
738                       peerInfo->mtype, &lspan) != 0) {
739         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
740 #ifdef __TRACING__
741         lspan->Finish();
742 #endif
743         return;
744     }
745
746     message.message.asndata = (unsigned char *)peerInfo->asnData;
747     message.message.asnLength = peerInfo->asnLength;
748     message.message.messageType = peerInfo->mtype;
749     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
750     num_of_messages.fetch_add(1, std::memory_order_release);
751     if (mdclog_level_get() >= MDCLOG_DEBUG) {
752         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
753                      message.message.enodbName);
754     }
755     if (sendSctpMsg(peerInfo, message, params->sctpMap, &lspan) != 0) {
756         if (mdclog_level_get() >= MDCLOG_DEBUG) {
757             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
758         }
759 #ifdef __TRACING__
760         lspan->Finish();
761 #endif
762         return;
763     }
764
765     memset(peerInfo->asnData, 0, peerInfo->asnLength);
766     peerInfo->asnLength = 0;
767     peerInfo->mtype = 0;
768 #ifdef __TRACING__
769     lspan->Finish();
770 #endif
771 }
772
773
774 void handlepoll_error(struct epoll_event &event,
775                       ReportingMessages_t &message,
776                       RmrMessagesBuffer_t &rmrMessageBuffer,
777                       sctp_params_t *params,
778                       otSpan *pSpan) {
779 #ifdef __TRACING__
780     auto lspan = opentracing::Tracer::Global()->StartSpan(
781             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
782 #else
783     otSpan lspan = 0;
784 #endif
785     if (event.data.fd != params->rmrListenFd) {
786         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
787         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
788                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
789
790         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
791                                                      "%s|Failed SCTP Connection",
792                                                      peerInfo->enodbName);
793         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
794         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
795
796         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
797         message.message.direction = 'N';
798         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
799             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
800         }
801
802         close(peerInfo->fileDescriptor);
803         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap, &lspan);
804     } else {
805         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
806     }
807 #ifdef __TRACING__
808     lspan->Finish();
809 #endif
810
811 }
812 /**
813  *
814  * @param socket
815  * @return
816  */
817 int setSocketNoBlocking(int socket) {
818     auto flags = fcntl(socket, F_GETFL, 0);
819
820     if (flags == -1) {
821         mdclog_mdc_add("func", "fcntl");
822         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
823         mdclog_mdc_clean();
824         return -1;
825     }
826
827     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
828     if (fcntl(socket, F_SETFL, flags) == -1) {
829         mdclog_mdc_add("func", "fcntl");
830         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
831         mdclog_mdc_clean();
832         return -1;
833     }
834
835     return 0;
836 }
837
838 /**
839  *
840  * @param val
841  * @param m
842  * @param pSpan
843  */
844 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m, otSpan *pSpan) {
845 #ifdef __TRACING__
846     auto lspan = opentracing::Tracer::Global()->StartSpan(
847             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
848 #else
849 //    otSpan lspan = 0;
850 #endif
851     char *dummy;
852     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
853     char searchBuff[256]{};
854
855     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
856     m->erase(searchBuff);
857
858     m->erase(val->enodbName);
859     free(val);
860 #ifdef __TRACING__
861     lspan->Finish();
862 #endif
863 }
864
865 /**
866  *
867  * @param fd file discriptor
868  * @param data the asn data to send
869  * @param len  length of the data
870  * @param enodbName the enodbName as in the map for printing purpose
871  * @param m map host information
872  * @param mtype message number
873  * @return 0 success, anegative number on fail
874  */
875 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m, otSpan *pSpan) {
876 #ifdef __TRACING__
877     auto lspan = opentracing::Tracer::Global()->StartSpan(
878             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
879 #else
880     otSpan lspan = 0;
881 #endif
882     auto loglevel = mdclog_level_get();
883     int fd = peerInfo->fileDescriptor;
884     if (loglevel >= MDCLOG_DEBUG) {
885         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
886                      message.message.enodbName, __FUNCTION__);
887     }
888
889     while (true) {
890         //TODO add send to VES client or KAFKA
891         //format ts|mtype|direction(D/U)|length of asn data|raw data
892 //        auto length = sizeof message.message.time
893 //                      + sizeof message.message.enodbName
894 //                      + sizeof message.message.messageType
895 //                      + sizeof message.message.direction
896 //                      + sizeof message.message.asnLength
897 //                      + message.message.asnLength;
898
899         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
900             if (errno == EINTR) {
901                 continue;
902             }
903             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
904             // Prevent double free() of peerInfo in the event of connection failure.
905             // Returning failure will trigger, in x2/endc setup flow, RIC_SCTP_CONNECTION_FAILURE rmr message causing the E2M to retry.
906             if (!peerInfo->isConnected){
907                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
908 #ifdef __TRACING__
909             lspan->Finish();
910 #endif
911                 return -1;
912             }
913             cleanHashEntry(peerInfo, m, &lspan);
914             close(fd);
915             char key[MAX_ENODB_NAME_SIZE * 2];
916             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
917                      message.message.messageType);
918             if (loglevel >= MDCLOG_DEBUG) {
919                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
920             }
921             auto tmp = m->find(key);
922             if (tmp) {
923                 free(tmp);
924             }
925             m->erase(key);
926 #ifdef __TRACING__
927             lspan->Finish();
928 #endif
929             return -1;
930         }
931         message.message.direction = 'D';
932         // send report.buffer of size
933         buildJsonMessage(message);
934
935         if (loglevel >= MDCLOG_DEBUG) {
936             mdclog_write(MDCLOG_DEBUG,
937                          "SCTP message for CU %s sent from %s",
938                          message.message.enodbName,
939                          __FUNCTION__);
940         }
941 #ifdef __TRACING__
942         lspan->Finish();
943 #endif
944
945         return 0;
946     }
947 }
948
949 /**
950  *
951  * @param message
952  * @param rmrMessageBuffer
953  * @param pSpan
954  */
955 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
956 #ifdef __TRACING__
957     auto lspan = opentracing::Tracer::Global()->StartSpan(
958             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
959 #else
960 //    otSpan lspan = 0;
961 #endif
962     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)(message.message.enodbName));
963
964     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
965     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
966
967     if (mdclog_level_get() >= MDCLOG_DEBUG) {
968         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
969                      message.message.enodbName, (unsigned long) message.message.asnLength);
970     }
971 #ifdef __TRACING__
972     lspan->Finish();
973 #endif
974
975 }
976
977
978 /**
979  *
980  * @param metaData all the data strip to structure
981  * @param data the data recived from xAPP
982  * @return 0 success all other values are fault
983  */
984 int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan) {
985 #ifdef __TRACING__
986     auto lspan = opentracing::Tracer::Global()->StartSpan(
987             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
988 #else
989 //    otSpan lspan = 0;
990 #endif
991     auto loglevel = mdclog_level_get();
992
993     char delimiter[4] {};
994     memset(delimiter, 0, (size_t)4);
995     delimiter[0] = '|';
996     char *tmp;
997
998     char *val = strtok_r(data, delimiter, &tmp);
999     if (val != nullptr) {
1000         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1001             mdclog_write(MDCLOG_DEBUG, "SCTP ADDRESS parameter from message = %s", val);
1002         }
1003         memcpy(host, val, tmp - val );
1004     } else {
1005         mdclog_write(MDCLOG_ERR, "wrong Host Name for setup request %s", data);
1006 #ifdef __TRACING__
1007         lspan->Finish();
1008 #endif
1009         return -1;
1010     }
1011
1012     val = strtok_r(nullptr, delimiter, &tmp);
1013     if (val != nullptr) {
1014         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1015             mdclog_write(MDCLOG_DEBUG, "PORT parameter from message = %s", val);
1016         }
1017         char *dummy;
1018         port = (uint16_t)strtol(val, &dummy, 10);
1019     } else {
1020         mdclog_write(MDCLOG_ERR, "wrong Port for setup request %s", data);
1021 #ifdef __TRACING__
1022         lspan->Finish();
1023 #endif
1024         return -2;
1025     }
1026
1027     val = strtok_r(nullptr, delimiter, &tmp);
1028     if (val != nullptr) {
1029         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1030             mdclog_write(MDCLOG_DEBUG, "RAN NAME parameter from message = %s", val);
1031         }
1032         memcpy(message.message.enodbName, val, tmp - val);
1033     } else {
1034         mdclog_write(MDCLOG_ERR, "wrong gNb/Enodeb name for setup request %s", data);
1035 #ifdef __TRACING__
1036         lspan->Finish();
1037 #endif
1038
1039         return -3;
1040     }
1041     val = strtok_r(nullptr, delimiter, &tmp);
1042     if (val != nullptr) {
1043         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1044             mdclog_write(MDCLOG_DEBUG, "ASN length parameter from message = %s", val);
1045         }
1046         char *dummy;
1047         message.message.asnLength = (uint16_t) strtol(val, &dummy, 10);
1048     } else {
1049         mdclog_write(MDCLOG_ERR, "wrong ASN length for setup request %s", data);
1050 #ifdef __TRACING__
1051         lspan->Finish();
1052 #endif
1053         return -4;
1054     }
1055
1056     message.message.asndata = (unsigned char *)tmp;  // tmp is local but point to the location in data
1057
1058     if (loglevel >= MDCLOG_INFO) {
1059         mdclog_write(MDCLOG_INFO, "Message from Xapp RAN name = %s host address = %s port = %d",
1060                      message.message.enodbName, host, port);
1061     }
1062 #ifdef __TRACING__
1063     lspan->Finish();
1064 #endif
1065
1066     return 0;
1067 }
1068
1069 /**
1070  *
1071  * @param events
1072  * @param sctpMap
1073  * @param numOfMessages
1074  * @param rmrMessageBuffer
1075  * @param ts
1076  * @param pSpan
1077  * @return
1078  */
1079 int receiveDataFromSctp(struct epoll_event *events,
1080                         Sctp_Map_t *sctpMap,
1081                         int &numOfMessages,
1082                         RmrMessagesBuffer_t &rmrMessageBuffer,
1083                         struct timespec &ts,
1084                         otSpan *pSpan) {
1085 #ifdef __TRACING__
1086     auto lspan = opentracing::Tracer::Global()->StartSpan(
1087             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1088 #else
1089     otSpan lspan = 0;
1090 #endif
1091     /* We have data on the fd waiting to be read. Read and display it.
1092  * We must read whatever data is available completely, as we are running
1093  *  in edge-triggered mode and won't get a notification again for the same data. */
1094     int done = 0;
1095     auto loglevel = mdclog_level_get();
1096     // get the identity of the interface
1097     auto *peerInfo = (ConnectedCU_t *)events->data.ptr;
1098     struct timespec start{0, 0};
1099     struct timespec decodestart{0, 0};
1100     struct timespec end{0, 0};
1101
1102     E2AP_PDU_t *pdu = nullptr;
1103
1104     ReportingMessages_t message {};
1105
1106     while (true) {
1107         if (loglevel >= MDCLOG_DEBUG) {
1108             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", peerInfo->fileDescriptor);
1109             clock_gettime(CLOCK_MONOTONIC, &start);
1110         }
1111         // read the buffer directly to rmr payload
1112         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1113         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1114                 read(peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1115         if (loglevel >= MDCLOG_DEBUG) {
1116             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1117                     peerInfo->fileDescriptor, message.message.asnLength);
1118         }
1119         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1120         message.message.direction = 'U';
1121         message.message.time.tv_nsec = ts.tv_nsec;
1122         message.message.time.tv_sec = ts.tv_sec;
1123
1124         if (message.message.asnLength < 0) {
1125             if (errno == EINTR) {
1126                 continue;
1127             }
1128             /* If errno == EAGAIN, that means we have read all
1129                data. So go back to the main loop. */
1130             if (errno != EAGAIN) {
1131                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1132                 done = 1;
1133             } else if (loglevel >= MDCLOG_DEBUG) {
1134                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", peerInfo->fileDescriptor);
1135             }
1136             break;
1137         } else if (message.message.asnLength == 0) {
1138             /* End of file. The remote has closed the connection. */
1139             if (loglevel >= MDCLOG_INFO) {
1140                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1141                              peerInfo->fileDescriptor);
1142             }
1143             done = 1;
1144             break;
1145         }
1146
1147         asn_dec_rval_t rval;
1148         if (loglevel >= MDCLOG_DEBUG) {
1149             char printBuffer[4096]{};
1150             char *tmp = printBuffer;
1151             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1152                 snprintf(tmp, 2, "%02x", message.message.asndata[i]);
1153                 tmp += 2;
1154             }
1155             printBuffer[message.message.asnLength] = 0;
1156             clock_gettime(CLOCK_MONOTONIC, &end);
1157             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1158                          peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1159             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1160                          printBuffer);
1161             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1162         }
1163
1164         rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1165                           message.message.asndata, message.message.asnLength);
1166         if (rval.code != RC_OK) {
1167             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1168                          peerInfo->enodbName);
1169             break;
1170         }
1171
1172         if (loglevel >= MDCLOG_DEBUG) {
1173             clock_gettime(CLOCK_MONOTONIC, &end);
1174             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1175                          peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1176             char *printBuffer;
1177             size_t size;
1178             FILE *stream = open_memstream(&printBuffer, &size);
1179             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1180             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1181             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1182         }
1183
1184         switch (pdu->present) {
1185             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1186                 asnInitiatingRequest(pdu, message, rmrMessageBuffer, &lspan);
1187                 break;
1188             }
1189             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1190                 asnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
1191                 break;
1192             }
1193             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1194                 asnUnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
1195                 break;
1196             }
1197             default:
1198                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1199                 break;
1200         }
1201         if (loglevel >= MDCLOG_DEBUG) {
1202             clock_gettime(CLOCK_MONOTONIC, &end);
1203             mdclog_write(MDCLOG_DEBUG,
1204                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1205                          peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1206
1207         }
1208         numOfMessages++;
1209         // remove the break for EAGAIN
1210         //break;
1211         if (pdu != nullptr) {
1212             //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
1213             //ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1214             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1215             pdu = nullptr;
1216         }
1217         //clock_gettime(CLOCK_MONOTONIC, &start);
1218     }
1219     // in case of break to avoid memory leak
1220     if (pdu != nullptr) {
1221         ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1222         pdu = nullptr;
1223     }
1224
1225     if (done) {
1226         if (loglevel >= MDCLOG_INFO) {
1227             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", peerInfo->fileDescriptor);
1228         }
1229         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1230                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1231                         256,
1232                         "%s|CU disconnected unexpectedly",
1233                         peerInfo->enodbName);
1234         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1235
1236         if (sendRequestToXapp(message,
1237                               RIC_SCTP_CONNECTION_FAILURE,
1238                               rmrMessageBuffer,
1239                               &lspan) != 0) {
1240             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1241         }
1242
1243         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1244         close(peerInfo->fileDescriptor);
1245         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap, &lspan);
1246     }
1247     if (loglevel >= MDCLOG_DEBUG) {
1248         clock_gettime(CLOCK_MONOTONIC, &end);
1249         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1250                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1251
1252     }
1253 #ifdef __TRACING__
1254     lspan->Finish();
1255 #endif
1256
1257     return 0;
1258 }
1259
1260 /**
1261  *
1262  * @param pdu
1263  * @param message
1264  * @param rmrMessageBuffer
1265  * @param pSpan
1266  */
1267 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1268                           ReportingMessages_t &message,
1269                           RmrMessagesBuffer_t &rmrMessageBuffer,
1270                           otSpan *pSpan) {
1271 #ifdef __TRACING__
1272     auto lspan = opentracing::Tracer::Global()->StartSpan(
1273             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1274 #else
1275     otSpan lspan = 0;
1276 #endif
1277
1278     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1279     if (mdclog_level_get() >= MDCLOG_INFO) {
1280         mdclog_write(MDCLOG_INFO, "Initiating message %ld", procedureCode);
1281     }
1282     switch (procedureCode) {
1283         case ProcedureCode_id_x2Setup: {
1284             if (mdclog_level_get() >= MDCLOG_INFO) {
1285                 mdclog_write(MDCLOG_INFO, "Got Setup Initiating  message from CU - %s",
1286                              message.message.enodbName);
1287             }
1288             break;
1289         }
1290         case ProcedureCode_id_endcX2Setup: {
1291             if (mdclog_level_get() >= MDCLOG_INFO) {
1292                 mdclog_write(MDCLOG_INFO, "Got X2 EN-DC Setup Request from CU - %s",
1293                              message.message.enodbName);
1294             }
1295             break;
1296         }
1297         case ProcedureCode_id_ricSubscription: {
1298             if (mdclog_level_get() >= MDCLOG_INFO) {
1299                 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Request message from CU - %s",
1300                              message.message.enodbName);
1301             }
1302             break;
1303         }
1304         case ProcedureCode_id_ricSubscriptionDelete: {
1305             if (mdclog_level_get() >= MDCLOG_INFO) {
1306                 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Delete Request message from CU - %s",
1307                              message.message.enodbName);
1308             }
1309             break;
1310         }
1311         case ProcedureCode_id_endcConfigurationUpdate: {
1312             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1313                 mdclog_write(MDCLOG_ERR, "E2 EN-DC CONFIGURATION UPDATE message failed to send to xAPP");
1314             }
1315             break;
1316         }
1317         case ProcedureCode_id_eNBConfigurationUpdate: {
1318             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1319                 mdclog_write(MDCLOG_ERR, "E2 EN-BC CONFIGURATION UPDATE message failed to send to xAPP");
1320             }
1321             break;
1322         }
1323         case ProcedureCode_id_x2Removal: {
1324             if (mdclog_level_get() >= MDCLOG_INFO) {
1325                 mdclog_write(MDCLOG_INFO, "Got E2 Removal Initiating  message from CU - %s",
1326                              message.message.enodbName);
1327             }
1328             break;
1329         }
1330         case ProcedureCode_id_loadIndication: {
1331             if (sendRequestToXapp(message, RIC_ENB_LOAD_INFORMATION, rmrMessageBuffer, &lspan) != 0) {
1332                 mdclog_write(MDCLOG_ERR, "Load indication message failed to send to xAPP");
1333             }
1334             break;
1335         }
1336         case ProcedureCode_id_resourceStatusReportingInitiation: {
1337             if (mdclog_level_get() >= MDCLOG_INFO) {
1338                 mdclog_write(MDCLOG_INFO, "Got Status reporting initiation message from CU - %s",
1339                              message.message.enodbName);
1340             }
1341             break;
1342         }
1343         case ProcedureCode_id_resourceStatusReporting: {
1344             if (sendRequestToXapp(message, RIC_RESOURCE_STATUS_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1345                 mdclog_write(MDCLOG_ERR, "Resource Status Reporting message failed to send to xAPP");
1346             }
1347             break;
1348         }
1349         case ProcedureCode_id_reset: {
1350             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer, &lspan) != 0) {
1351                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1352             }
1353             break;
1354         }
1355         case ProcedureCode_id_ricIndication: {
1356             for (int i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1357                 auto messageSent = false;
1358                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1359                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1360                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1361                 }
1362                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1363                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1364                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1365                     }
1366                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1367                         static unsigned char tx[32];
1368                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1369                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1370                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1371                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1372                                 (unsigned char *)message.message.enodbName,
1373                                 strlen(message.message.enodbName));
1374                         rmrMessageBuffer.sendMessage->state = 0;
1375                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1376                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1377                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1378                                     rmrMessageBuffer.sendMessage->sub_id,
1379                                     rmrMessageBuffer.sendMessage->mtype);
1380                         }
1381                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1382                         messageSent = true;
1383                     } else {
1384                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1385                     }
1386                 }
1387                 if (messageSent) {
1388                     break;
1389                 }
1390             }
1391             break;
1392         }
1393         case ProcedureCode_id_errorIndication: {
1394             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1395                 mdclog_write(MDCLOG_ERR, "Error Indication message failed to send to xAPP");
1396             }
1397             break;
1398         }
1399         case ProcedureCode_id_ricServiceUpdate : {
1400             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1401                 mdclog_write(MDCLOG_ERR, "Service Update message failed to send to xAPP");
1402             }
1403             break;
1404         }
1405         case ProcedureCode_id_gNBStatusIndication : {
1406             if (sendRequestToXapp(message, RIC_GNB_STATUS_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1407                 mdclog_write(MDCLOG_ERR, "RIC_GNB_STATUS_INDICATION failed to send to xAPP");
1408             }
1409             break;
1410         }
1411         default: {
1412             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1413             message.message.messageType = 0; // no RMR message type yet
1414
1415             buildJsonMessage(message);
1416
1417             break;
1418         }
1419     }
1420 #ifdef __TRACING__
1421     lspan->Finish();
1422 #endif
1423
1424 }
1425
1426 /**
1427  *
1428  * @param pdu
1429  * @param message
1430  * @param sctpMap
1431  * @param rmrMessageBuffer
1432  * @param pSpan
1433  */
1434 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, Sctp_Map_t *sctpMap,
1435                       RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
1436 #ifdef __TRACING__
1437     auto lspan = opentracing::Tracer::Global()->StartSpan(
1438             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1439 #else
1440     otSpan lspan = 0;
1441 #endif
1442     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1443     if (mdclog_level_get() >= MDCLOG_INFO) {
1444         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1445     }
1446     switch (procedureCode) {
1447         case ProcedureCode_id_x2Setup: {
1448             if (mdclog_level_get() >= MDCLOG_INFO) {
1449                 mdclog_write(MDCLOG_INFO, "Got Succesful Setup response from CU - %s",
1450                              message.message.enodbName);
1451             }
1452             if (sendResponseToXapp(message, RIC_X2_SETUP_RESP,
1453                                    RIC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1454                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful Setup response for CU - %s",
1455                              message.message.enodbName);
1456             }
1457             break;
1458         }
1459         case ProcedureCode_id_endcX2Setup: { //X2_EN_DC_SETUP_REQUEST_FROM_CU
1460             if (mdclog_level_get() >= MDCLOG_INFO) {
1461                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC Setup response from CU - %s",
1462                              message.message.enodbName);
1463             }
1464             if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_RESP,
1465                                    RIC_ENDC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1466                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful X2 EN DC Setup response for CU - %s",
1467                              message.message.enodbName);
1468             }
1469             break;
1470         }
1471         case ProcedureCode_id_endcConfigurationUpdate: {
1472             if (mdclog_level_get() >= MDCLOG_INFO) {
1473                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC CONFIGURATION UPDATE from CU - %s",
1474                              message.message.enodbName);
1475             }
1476             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1477                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 EN DC CONFIGURATION response for CU - %s",
1478                              message.message.enodbName);
1479             }
1480             break;
1481         }
1482         case ProcedureCode_id_eNBConfigurationUpdate: {
1483             if (mdclog_level_get() >= MDCLOG_INFO) {
1484                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 ENB CONFIGURATION UPDATE from CU - %s",
1485                              message.message.enodbName);
1486             }
1487             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1488                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 ENB CONFIGURATION response for CU - %s",
1489                              message.message.enodbName);
1490             }
1491             break;
1492         }
1493         case ProcedureCode_id_reset: {
1494             if (sendRequestToXapp(message, RIC_X2_RESET_RESP, rmrMessageBuffer, &lspan) != 0) {
1495                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2_RESET response for CU - %s",
1496                              message.message.enodbName);
1497             }
1498             break;
1499
1500         }
1501         case ProcedureCode_id_resourceStatusReportingInitiation: {
1502             if (sendRequestToXapp(message, RIC_RES_STATUS_RESP, rmrMessageBuffer, &lspan) != 0) {
1503                 mdclog_write(MDCLOG_ERR,
1504                              "Failed to send Succesful 2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1505                              message.message.enodbName);
1506             }
1507             break;
1508         }
1509         case ProcedureCode_id_ricSubscription: {
1510             if (mdclog_level_get() >= MDCLOG_INFO) {
1511                 mdclog_write(MDCLOG_INFO, "Got Succesful RIC Subscription response from CU - %s",
1512                              message.message.enodbName);
1513             }
1514             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer, &lspan) != 0) {
1515                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1516             }
1517             break;
1518
1519         }
1520         case ProcedureCode_id_ricSubscriptionDelete: {
1521             if (mdclog_level_get() >= MDCLOG_INFO) {
1522                 mdclog_write(MDCLOG_INFO,
1523                              "Got Succesful RIC Subscription Delete response from CU - %s",
1524                              message.message.enodbName);
1525             }
1526             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer, &lspan) != 0) {
1527                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1528             }
1529             break;
1530         }
1531         case ProcedureCode_id_ricControl: {
1532             if (mdclog_level_get() >= MDCLOG_INFO) {
1533                 mdclog_write(MDCLOG_INFO,
1534                              "Got Succesful RIC control response from CU - %s",
1535                              message.message.enodbName);
1536             }
1537             for (int i = 0;
1538                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1539                 auto messageSent = false;
1540                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1541                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1542                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1543                 }
1544                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1545                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1546                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1547                     }
1548                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1549                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1550                         rmrMessageBuffer.sendMessage->state = 0;
1551                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1552                         static unsigned char tx[32];
1553                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1554                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1555                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1556                                 (unsigned char *)message.message.enodbName,
1557                                 strlen(message.message.enodbName));
1558
1559                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1560                         messageSent = true;
1561                     } else {
1562                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1563                     }
1564                 }
1565                 if (messageSent) {
1566                     break;
1567                 }
1568             }
1569             break;
1570         }
1571         default: {
1572             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1573             message.message.messageType = 0; // no RMR message type yet
1574             buildJsonMessage(message);
1575
1576             break;
1577         }
1578     }
1579 #ifdef __TRACING__
1580     lspan->Finish();
1581 #endif
1582
1583 }
1584
1585 /**
1586  *
1587  * @param pdu
1588  * @param message
1589  * @param sctpMap
1590  * @param rmrMessageBuffer
1591  * @param pSpan
1592  */
1593 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1594                         ReportingMessages_t &message,
1595                         Sctp_Map_t *sctpMap,
1596                         RmrMessagesBuffer_t &rmrMessageBuffer,
1597                         otSpan *pSpan) {
1598 #ifdef __TRACING__
1599     auto lspan = opentracing::Tracer::Global()->StartSpan(
1600             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1601 #else
1602     otSpan lspan = 0;
1603 #endif
1604     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1605     if (mdclog_level_get() >= MDCLOG_INFO) {
1606         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1607     }
1608     switch (procedureCode) {
1609         case ProcedureCode_id_x2Setup: {
1610             if (mdclog_level_get() >= MDCLOG_INFO) {
1611                 mdclog_write(MDCLOG_INFO,
1612                              "Got Unsuccessful Setup response from CU - %s",
1613                              message.message.enodbName);
1614             }
1615             if (sendResponseToXapp(message,
1616                                    RIC_X2_SETUP_FAILURE, RIC_X2_SETUP_REQ,
1617                                    rmrMessageBuffer,
1618                                    sctpMap,
1619                                    &lspan) != 0) {
1620                 mdclog_write(MDCLOG_ERR,
1621                              "Failed to send Unsuccessful Setup response for CU - %s",
1622                              message.message.enodbName);
1623                 break;
1624             }
1625             break;
1626         }
1627         case ProcedureCode_id_endcX2Setup: {
1628             if (mdclog_level_get() >= MDCLOG_INFO) {
1629                 mdclog_write(MDCLOG_INFO,
1630                              "Got Unsuccessful E2 EN-DC Setup response from CU - %s",
1631                              message.message.enodbName);
1632             }
1633             if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_FAILURE,
1634                                    RIC_ENDC_X2_SETUP_REQ,
1635                                    rmrMessageBuffer,
1636                                    sctpMap,
1637                                    &lspan) != 0) {
1638                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC Setup response for CU - %s",
1639                              message.message.enodbName);
1640             }
1641             break;
1642         }
1643         case ProcedureCode_id_endcConfigurationUpdate: {
1644             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1645                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC CONFIGURATION response for CU - %s",
1646                              message.message.enodbName);
1647             }
1648             break;
1649         }
1650         case ProcedureCode_id_eNBConfigurationUpdate: {
1651             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1652                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 ENB CONFIGURATION response for CU - %s",
1653                              message.message.enodbName);
1654             }
1655             break;
1656         }
1657         case ProcedureCode_id_resourceStatusReportingInitiation: {
1658             if (sendRequestToXapp(message, RIC_RES_STATUS_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1659                 mdclog_write(MDCLOG_ERR,
1660                              "Failed to send Succesful E2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1661                              message.message.enodbName);
1662             }
1663             break;
1664         }
1665         case ProcedureCode_id_ricSubscription: {
1666             if (mdclog_level_get() >= MDCLOG_INFO) {
1667                 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Response from CU - %s",
1668                              message.message.enodbName);
1669             }
1670             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1671                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1672             }
1673             break;
1674         }
1675         case ProcedureCode_id_ricSubscriptionDelete: {
1676             if (mdclog_level_get() >= MDCLOG_INFO) {
1677                 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Delete Response from CU - %s",
1678                              message.message.enodbName);
1679             }
1680             if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1681                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1682             }
1683             break;
1684         }
1685         case ProcedureCode_id_ricControl: {
1686             if (mdclog_level_get() >= MDCLOG_INFO) {
1687                 mdclog_write(MDCLOG_INFO, "Got UNSuccesful RIC control response from CU - %s",
1688                              message.message.enodbName);
1689             }
1690             for (int i = 0;
1691                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1692                 auto messageSent = false;
1693                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1694                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1695                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1696                 }
1697                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1698                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1699                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1700                     }
1701                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1702                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1703                         rmrMessageBuffer.sendMessage->state = 0;
1704                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1705                         static unsigned char tx[32];
1706                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1707                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1708                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1709                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1710                         messageSent = true;
1711                     } else {
1712                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1713                     }
1714                 }
1715                 if (messageSent) {
1716                     break;
1717                 }
1718             }
1719             break;
1720         }
1721         default: {
1722             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1723             message.message.messageType = 0; // no RMR message type yet
1724
1725             buildJsonMessage(message);
1726
1727             break;
1728         }
1729     }
1730 #ifdef __TRACING__
1731     lspan->Finish();
1732 #endif
1733
1734 }
1735
1736 /**
1737  *
1738  * @param message
1739  * @param requestId
1740  * @param rmrMmessageBuffer
1741  * @param pSpan
1742  * @return
1743  */
1744 int sendRequestToXapp(ReportingMessages_t &message,
1745                       int requestId,
1746                       RmrMessagesBuffer_t &rmrMmessageBuffer,
1747                       otSpan *pSpan) {
1748 #ifdef __TRACING__
1749     auto lspan = opentracing::Tracer::Global()->StartSpan(
1750             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1751 #else
1752     otSpan lspan = 0;
1753 #endif
1754     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1755                    (unsigned char *)message.message.enodbName,
1756                    strlen(message.message.enodbName));
1757     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1758     rmrMmessageBuffer.sendMessage->state = 0;
1759     static unsigned char tx[32];
1760     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1761     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1762
1763     auto rc = sendRmrMessage(rmrMmessageBuffer, message, &lspan);
1764 #ifdef __TRACING__
1765     lspan->Finish();
1766 #endif
1767
1768     return rc;
1769 }
1770
1771
1772 void getRmrContext(sctp_params_t &pSctpParams, otSpan *pSpan) {
1773 #ifdef __TRACING__
1774     auto lspan = opentracing::Tracer::Global()->StartSpan(
1775             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1776 #else
1777 //    otSpan lspan = 0;
1778 #endif
1779     pSctpParams.rmrCtx = nullptr;
1780     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1781     if (pSctpParams.rmrCtx == nullptr) {
1782         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1783 #ifdef __TRACING__
1784         lspan->Finish();
1785 #endif
1786         return;
1787     }
1788
1789     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1790     // we need to find that routing table exist and we can run
1791     if (mdclog_level_get() >= MDCLOG_INFO) {
1792         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1793     }
1794     int rmrReady = 0;
1795     int count = 0;
1796     while (!rmrReady) {
1797         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1798             sleep(1);
1799         }
1800         count++;
1801         if (count % 60 == 0) {
1802             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1803         }
1804     }
1805     if (mdclog_level_get() >= MDCLOG_INFO) {
1806         mdclog_write(MDCLOG_INFO, "RMR running");
1807     }
1808 #ifdef __TRACING__
1809     lspan->Finish();
1810 #endif
1811     rmr_init_trace(pSctpParams.rmrCtx, 200);
1812     // get the RMR fd for the epoll
1813     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1814     struct epoll_event event{};
1815     // add RMR fd to epoll
1816     event.events = (EPOLLIN);
1817     event.data.fd = pSctpParams.rmrListenFd;
1818     // add listening RMR FD to epoll
1819     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
1820         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
1821         close(pSctpParams.rmrListenFd);
1822         rmr_close(pSctpParams.rmrCtx);
1823         pSctpParams.rmrCtx = nullptr;
1824     }
1825 }
1826
1827 /**
1828  *
1829  * @param epoll_fd
1830  * @param sctpMap
1831  * @param rmrMessageBuffer
1832  * @param ts
1833  * @param pSpan
1834  * @return
1835  */
1836 int receiveXappMessages(int epoll_fd,
1837                         Sctp_Map_t *sctpMap,
1838                         RmrMessagesBuffer_t &rmrMessageBuffer,
1839                         struct timespec &ts,
1840                         otSpan *pSpan) {
1841 #ifdef __TRACING__
1842     auto lspan = opentracing::Tracer::Global()->StartSpan(
1843             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1844 #else
1845     otSpan lspan = 0;
1846 #endif
1847     if (rmrMessageBuffer.rcvMessage == nullptr) {
1848         //we have error
1849         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1850 #ifdef __TRACING__
1851         lspan->Finish();
1852 #endif
1853
1854         return -1;
1855     }
1856
1857     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1858         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1859     }
1860     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1861     if (rmrMessageBuffer.rcvMessage == nullptr) {
1862         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1863         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1864 #ifdef __TRACING__
1865         lspan->Finish();
1866 #endif
1867
1868         return -2;
1869     }
1870     ReportingMessages_t message;
1871     message.message.direction = 'D';
1872     message.message.time.tv_nsec = ts.tv_nsec;
1873     message.message.time.tv_sec = ts.tv_sec;
1874
1875     // get message payload
1876     //auto msgData = msg->payload;
1877     if (rmrMessageBuffer.rcvMessage->state != 0) {
1878         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1879 #ifdef __TRACING__
1880         lspan->Finish();
1881 #endif
1882
1883         return -1;
1884     }
1885     switch (rmrMessageBuffer.rcvMessage->mtype) {
1886         case RIC_X2_SETUP_REQ: {
1887             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1888                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1889                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1890                 message.message.direction = 'N';
1891                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1892                         snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1893                                 256,
1894                                 "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1895                 rmrMessageBuffer.sendMessage->state = 0;
1896                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1897
1898                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1899                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1900                 }
1901                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1902                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName);
1903
1904                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1905 #ifdef __TRACING__
1906                 lspan->Finish();
1907 #endif
1908                 return -3;
1909             }
1910             break;
1911         }
1912         case RIC_ENDC_X2_SETUP_REQ: {
1913             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1914                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1915                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1916                 message.message.direction = 'N';
1917                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1918                         snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
1919                                  "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1920                 rmrMessageBuffer.sendMessage->state = 0;
1921                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1922
1923                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1924                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1925                 }
1926
1927                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1928                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
1929
1930                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1931 #ifdef __TRACING__
1932                 lspan->Finish();
1933 #endif
1934                 return -3;
1935             }
1936             break;
1937         }
1938         case RIC_ENDC_CONF_UPDATE: {
1939             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1940                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1941 #ifdef __TRACING__
1942                 lspan->Finish();
1943 #endif
1944                 return -4;
1945             }
1946             break;
1947         }
1948         case RIC_ENDC_CONF_UPDATE_ACK: {
1949             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1950                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_ACK");
1951 #ifdef __TRACING__
1952                 lspan->Finish();
1953 #endif
1954                 return -4;
1955             }
1956             break;
1957         }
1958         case RIC_ENDC_CONF_UPDATE_FAILURE: {
1959             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1960                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_FAILURE");
1961 #ifdef __TRACING__
1962                 lspan->Finish();
1963 #endif
1964
1965                 return -4;
1966             }
1967             break;
1968         }
1969         case RIC_ENB_CONF_UPDATE: {
1970             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1971                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1972 #ifdef __TRACING__
1973                 lspan->Finish();
1974 #endif
1975                 return -4;
1976             }
1977             break;
1978         }
1979         case RIC_ENB_CONF_UPDATE_ACK: {
1980             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1981                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_ACK");
1982 #ifdef __TRACING__
1983                 lspan->Finish();
1984 #endif
1985                 return -4;
1986             }
1987             break;
1988         }
1989         case RIC_ENB_CONF_UPDATE_FAILURE: {
1990             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1991                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_FAILURE");
1992 #ifdef __TRACING__
1993                 lspan->Finish();
1994 #endif
1995                 return -4;
1996             }
1997             break;
1998         }
1999         case RIC_RES_STATUS_REQ: {
2000             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2001                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_RES_STATUS_REQ");
2002 #ifdef __TRACING__
2003                 lspan->Finish();
2004 #endif
2005                 return -6;
2006             }
2007             break;
2008         }
2009         case RIC_SUB_REQ: {
2010             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2011                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
2012 #ifdef __TRACING__
2013                 lspan->Finish();
2014 #endif
2015                 return -6;
2016             }
2017             break;
2018         }
2019         case RIC_SUB_DEL_REQ: {
2020             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2021                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
2022 #ifdef __TRACING__
2023                 lspan->Finish();
2024 #endif
2025                 return -6;
2026             }
2027             break;
2028         }
2029         case RIC_CONTROL_REQ: {
2030             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2031                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
2032 #ifdef __TRACING__
2033                 lspan->Finish();
2034 #endif
2035                 return -6;
2036             }
2037             break;
2038         }
2039         case RIC_SERVICE_QUERY: {
2040             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2041                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
2042 #ifdef __TRACING__
2043                 lspan->Finish();
2044 #endif
2045                 return -6;
2046             }
2047             break;
2048         }
2049         case RIC_SERVICE_UPDATE_ACK: {
2050             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2051                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
2052 #ifdef __TRACING__
2053                 lspan->Finish();
2054 #endif
2055                 return -6;
2056             }
2057             break;
2058         }
2059         case RIC_SERVICE_UPDATE_FAILURE: {
2060             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2061                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
2062 #ifdef __TRACING__
2063                 lspan->Finish();
2064 #endif
2065                 return -6;
2066             }
2067             break;
2068         }
2069         case RIC_X2_RESET: {
2070             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2071                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
2072 #ifdef __TRACING__
2073                 lspan->Finish();
2074 #endif
2075                 return -6;
2076             }
2077             break;
2078         }
2079         case RIC_X2_RESET_RESP: {
2080             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2081                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
2082 #ifdef __TRACING__
2083                 lspan->Finish();
2084 #endif
2085                 return -6;
2086             }
2087             break;
2088         }
2089         case RIC_SCTP_CLEAR_ALL: {
2090             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
2091             // loop on all keys and close socket and then erase all map.
2092             vector<char *> v;
2093             sctpMap->getKeys(v);
2094             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
2095                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
2096                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
2097                     if (peerInfo == nullptr) {
2098                         continue;
2099                     }
2100                     close(peerInfo->fileDescriptor);
2101                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
2102                     message.message.direction = 'D';
2103                     message.message.time.tv_nsec = ts.tv_nsec;
2104                     message.message.time.tv_sec = ts.tv_sec;
2105
2106                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2107                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2108                                                                    256,
2109                                                                    "%s|RIC_SCTP_CLEAR_ALL",
2110                                                                    peerInfo->enodbName);
2111                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2112                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
2113                     if (sendRequestToXapp(message,
2114                                           RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
2115                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
2116                     }
2117                     free(peerInfo);
2118                 }
2119             }
2120
2121             sleep(1);
2122             sctpMap->clear();
2123             break;
2124         }
2125         case E2_TERM_KEEP_ALIVE_REQ: {
2126             // send message back
2127             if (mdclog_level_get() >= MDCLOG_INFO) {
2128                 mdclog_write(MDCLOG_INFO, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
2129             }
2130             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2131                     (unsigned char *)rmrMessageBuffer.ka_message,
2132                     rmrMessageBuffer.ka_message_len);
2133             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
2134             rmrMessageBuffer.sendMessage->state = 0;
2135             static unsigned char tx[32];
2136             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2137             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2138             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2139             if (rmrMessageBuffer.sendMessage == nullptr) {
2140                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2141                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP");
2142             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
2143                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP");
2144             }
2145             break;
2146         }
2147         default:
2148             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
2149             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2150             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2151             message.message.time.tv_nsec = ts.tv_nsec;
2152             message.message.time.tv_sec = ts.tv_sec;
2153             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2154
2155             buildJsonMessage(message);
2156
2157
2158 #ifdef __TRACING__
2159             lspan->Finish();
2160 #endif
2161             return -7;
2162     }
2163     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2164         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2165     }
2166 #ifdef __TRACING__
2167     lspan->Finish();
2168 #endif
2169     return 0;
2170 }
2171
2172 /**
2173  * Send message to the CU that is not expecting for successful or unsuccessful results
2174  * @param messageBuffer
2175  * @param message
2176  * @param failedMsgId
2177  * @param sctpMap
2178  * @param pSpan
2179  * @return
2180  */
2181 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2182                            ReportingMessages_t &message,
2183                            int failedMsgId,
2184                            Sctp_Map_t *sctpMap,
2185                            otSpan *pSpan) {
2186 #ifdef __TRACING__
2187     auto lspan = opentracing::Tracer::Global()->StartSpan(
2188             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2189 #else
2190     otSpan lspan = 0;
2191 #endif
2192
2193     getRequestMetaData(message, messageBuffer, &lspan);
2194     if (mdclog_level_get() >= MDCLOG_INFO) {
2195         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2196     }
2197
2198     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId, &lspan);
2199 #ifdef __TRACING__
2200     lspan->Finish();
2201 #endif
2202
2203     return rc;
2204 }
2205
2206 /**
2207  *
2208  * @param sctpMap
2209  * @param messageBuffer
2210  * @param message
2211  * @param failedMesgId
2212  * @param pSpan
2213  * @return
2214  */
2215 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2216                     RmrMessagesBuffer_t &messageBuffer,
2217                     ReportingMessages_t &message,
2218                     int failedMesgId,
2219                     otSpan *pSpan) {
2220 #ifdef __TRACING__
2221     auto lspan = opentracing::Tracer::Global()->StartSpan(
2222             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2223 #else
2224     otSpan lspan = 0;
2225 #endif
2226     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2227     if (peerInfo == nullptr) {
2228         if (failedMesgId != 0) {
2229             sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId, &lspan);
2230         } else {
2231             mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2232         }
2233 #ifdef __TRACING__
2234         lspan->Finish();
2235 #endif
2236
2237         return -1;
2238     }
2239
2240     // get the FD
2241     message.message.messageType = messageBuffer.rcvMessage->mtype;
2242     auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2243 #ifdef __TRACING__
2244     lspan->Finish();
2245 #endif
2246
2247     return rc;
2248 }
2249
2250 /**
2251  *
2252  * @param rmrCtx the rmr context to send and receive
2253  * @param msg the msg we got fromxApp
2254  * @param metaData data from xApp in ordered struct
2255  * @param failedMesgId the return message type error
2256  */
2257 void
2258 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId,
2259                                otSpan *pSpan) {
2260 #ifdef __TRACING__
2261     auto lspan = opentracing::Tracer::Global()->StartSpan(
2262             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2263 #else
2264     otSpan lspan = 0;
2265 #endif
2266     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
2267     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
2268                         message.message.enodbName);
2269     if (mdclog_level_get() >= MDCLOG_INFO) {
2270         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
2271     }
2272     msg->mtype = failedMesgId;
2273     msg->state = 0;
2274
2275     static unsigned char tx[32];
2276     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2277     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
2278
2279     sendRmrMessage(rmrMessageBuffer, message, &lspan);
2280 #ifdef __TRACING__
2281     lspan->Finish();pLogSink
2282 #endif
2283
2284 }
2285
2286 /**
2287  * Send Response back to xApp, message is used only when there was a request from the xApp
2288  *
2289  * @param enodbName the name of the gNb/eNodeB
2290  * @param msgType  the value of the message to the xApp
2291  * @param requestType The request that was sent by the xAPP
2292  * @param rmrCtx the rmr identifier
2293  * @param sctpMap hash map holds data on the requestrs
2294  * @param buf  the buffer to send to xAPP
2295  * @param size size of the buffer to send
2296  * @return
2297  */
2298 int sendResponseToXapp(ReportingMessages_t &message,
2299                        int msgType,
2300                        int requestType,
2301                        RmrMessagesBuffer_t &rmrMessageBuffer,
2302                        Sctp_Map_t *sctpMap,
2303                        otSpan *pSpan) {
2304 #ifdef __TRACING__
2305     auto lspan = opentracing::Tracer::Global()->StartSpan(
2306             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2307 #else
2308     otSpan lspan = 0;
2309 #endif
2310     char key[MAX_ENODB_NAME_SIZE * 2];
2311     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, requestType);
2312
2313     auto xact = sctpMap->find(key);
2314     if (xact == nullptr) {
2315         mdclog_write(MDCLOG_ERR, "NO Request %s found for this response from CU: %s", key,
2316                      message.message.enodbName);
2317 #ifdef __TRACING__
2318         lspan->Finish();
2319 #endif
2320
2321         return -1;
2322     }
2323     sctpMap->erase(key);
2324
2325     message.message.messageType = rmrMessageBuffer.sendMessage->mtype = msgType; //SETUP_RESPONSE_MESSAGE_TYPE;
2326     rmr_bytes2payload(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.asndata,
2327                       message.message.asnLength);
2328     rmr_bytes2xact(rmrMessageBuffer.sendMessage, (const unsigned char *)xact, strlen((const char *)xact));
2329     rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
2330     rmrMessageBuffer.sendMessage->state = 0;
2331
2332     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2333         mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2334     }
2335     free(xact);
2336
2337     auto rc = sendRmrMessage(rmrMessageBuffer, message, &lspan);
2338 #ifdef __TRACING__
2339     lspan->Finish();
2340 #endif
2341     return rc;
2342 }
2343
2344 /**
2345  * build the SCTP connection to eNodB or gNb
2346  * @param rmrMessageBuffer
2347  * @param message
2348  * @param epoll_fd
2349  * @param sctpMap
2350  * @param pSpan
2351  * @return
2352  */
2353 int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
2354                         ReportingMessages_t &message,
2355                         int epoll_fd,
2356                         Sctp_Map_t *sctpMap,
2357                         otSpan *pSpan) {
2358 #ifdef __TRACING__
2359     auto lspan = opentracing::Tracer::Global()->StartSpan(
2360             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2361 #else
2362     otSpan lspan = 0;
2363 #endif
2364     struct sockaddr_in6 servaddr{};
2365     struct addrinfo hints{}, *result;
2366     auto msgData = rmrMessageBuffer.rcvMessage->payload;
2367     unsigned char meid[RMR_MAX_MEID]{};
2368     char host[256]{};
2369     uint16_t port = 0;
2370
2371     message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2372     rmr_mbuf_t *msg = rmrMessageBuffer.rcvMessage;
2373     rmr_get_meid(msg, meid);
2374
2375     if (mdclog_level_get() >= MDCLOG_INFO) {
2376         mdclog_write(MDCLOG_INFO, "message %d Received for MEID :%s. SETUP/EN-DC Setup Request from xApp, Message = %s",
2377                      msg->mtype, meid, msgData);
2378     }
2379     if (getSetupRequestMetaData(message, (char *)msgData, host, port, &lspan) < 0) {
2380         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2381             mdclog_write(MDCLOG_DEBUG, "Error in setup parameters %s, %d", __func__, __LINE__);
2382         }
2383 #ifdef __TRACING__
2384         lspan->Finish();
2385 #endif
2386         return -1;
2387     }
2388
2389     //// message asndata points to the start of the asndata of the message and not to start of payload
2390     // search if the same host:port but not the same enodbname
2391     char searchBuff[256]{};
2392     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", host, port);
2393     auto e = (char *)sctpMap->find(searchBuff);
2394     if (e != nullptr) {
2395         // found one compare if not the same
2396         if (strcmp(message.message.enodbName, e) != 0) {
2397             mdclog_write(MDCLOG_ERR,
2398                          "Try to connect CU %s to Host %s but %s already connected",
2399                          message.message.enodbName, host, e);
2400 #ifdef __TRACING__
2401             lspan->Finish();
2402 #endif
2403             return -1;
2404         }
2405     }
2406
2407     // check if not alread connected. if connected send the request and return
2408     auto *peerInfo = (ConnectedCU_t *)sctpMap->find(message.message.enodbName);
2409     if (peerInfo != nullptr) {
2410 //        snprintf(strErr,
2411 //                128,
2412 //                "Device %s already connected please remove and then setup again",
2413 //                message.message.enodbName);
2414         if (mdclog_level_get() >= MDCLOG_INFO) {
2415             mdclog_write(MDCLOG_INFO,
2416                          "Device already connected to %s",
2417                          message.message.enodbName);
2418         }
2419         message.message.messageType = msg->mtype;
2420         auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2421         if (rc != 0) {
2422             mdclog_write(MDCLOG_ERR, "failed write to SCTP %s, %d", __func__, __LINE__);
2423 #ifdef __TRACING__
2424             lspan->Finish();
2425 #endif
2426             return -1;
2427         }
2428
2429         char key[MAX_ENODB_NAME_SIZE * 2];
2430         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2431         int xaction_len = strlen((const char *) msg->xaction);
2432         auto *xaction = (unsigned char *) calloc(1, xaction_len);
2433         memcpy(xaction, msg->xaction, xaction_len);
2434         sctpMap->setkey(key, xaction);
2435         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2436             mdclog_write(MDCLOG_DEBUG, "set key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2437         }
2438 #ifdef __TRACING__
2439         lspan->Finish();
2440 #endif
2441         return 0;
2442     }
2443
2444     peerInfo = (ConnectedCU_t *) calloc(1, sizeof(ConnectedCU_t));
2445     memcpy(peerInfo->enodbName, message.message.enodbName, sizeof(message.message.enodbName));
2446
2447     // new connection
2448     if ((peerInfo->fileDescriptor = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP)) < 0) {
2449         mdclog_write(MDCLOG_ERR, "Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2450 #ifdef __TRACING__
2451         lspan->Finish();
2452 #endif
2453         return -1;
2454     }
2455
2456     auto optval = 1;
2457     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval) != 0) {
2458         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEPORT Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2459 #ifdef __TRACING__
2460         lspan->Finish();
2461 #endif
2462         return -1;
2463     }
2464     optval = 1;
2465     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) != 0) {
2466         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEADDR Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2467 #ifdef __TRACING__
2468         lspan->Finish();
2469 #endif
2470         return -1;
2471     }
2472     servaddr.sin6_family = AF_INET6;
2473
2474     struct sockaddr_in6 localAddr {};
2475     localAddr.sin6_family = AF_INET6;
2476     localAddr.sin6_addr = in6addr_any;
2477     localAddr.sin6_port = htons(SRC_PORT);
2478
2479     if (bind(peerInfo->fileDescriptor, (struct sockaddr*)&localAddr , sizeof(struct sockaddr_in6)) < 0) {
2480         mdclog_write(MDCLOG_ERR, "bind Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2481 #ifdef __TRACING__
2482         lspan->Finish();
2483 #endif
2484         return -1;
2485     }//Ends the binding.
2486
2487     memset(&hints, 0, sizeof hints);
2488     hints.ai_flags = AI_NUMERICHOST;
2489     if (getaddrinfo(host, nullptr, &hints, &result) < 0) {
2490         close(peerInfo->fileDescriptor);
2491         mdclog_write(MDCLOG_ERR, "getaddrinfo error for %s, Error = %s", host, strerror(errno));
2492 #ifdef __TRACING__
2493         lspan->Finish();
2494 #endif
2495         return -1;
2496     }
2497     memcpy(&servaddr, result->ai_addr, sizeof(struct sockaddr_in6));
2498     freeaddrinfo(result);
2499
2500     servaddr.sin6_port = htons(port);      /* daytime server */
2501     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2502         mdclog_write(MDCLOG_DEBUG, "Send Connect FD = %d host : %s port %d",
2503                      peerInfo->fileDescriptor,
2504                      host,
2505                      port);
2506     }
2507
2508     // Add to Epol
2509     if (addToEpoll(epoll_fd, peerInfo, (EPOLLOUT | EPOLLIN | EPOLLET), sctpMap, message.message.enodbName,
2510                    msg->mtype, &lspan) != 0) {
2511 #ifdef __TRACING__
2512         lspan->Finish();
2513 #endif
2514         return -1;
2515     }
2516
2517     char hostBuff[NI_MAXHOST];
2518     char portBuff[NI_MAXHOST];
2519
2520     if (getnameinfo((SA *) &servaddr, sizeof(servaddr),
2521                     hostBuff, sizeof(hostBuff),
2522                     portBuff, sizeof(portBuff),
2523                     (uint) (NI_NUMERICHOST) | (uint) (NI_NUMERICSERV)) != 0) {
2524         mdclog_write(MDCLOG_ERR, "getnameinfo() Error, %s  %s %d", strerror(errno), __func__, __LINE__);
2525 #ifdef __TRACING__
2526         lspan->Finish();
2527 #endif
2528         return -1;
2529     }
2530
2531     if (setSocketNoBlocking(peerInfo->fileDescriptor) != 0) {
2532         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on sctpPort %s", hostBuff,
2533                      portBuff);
2534         close(peerInfo->fileDescriptor);
2535 #ifdef __TRACING__
2536         lspan->Finish();
2537 #endif
2538         return -1;
2539     }
2540
2541     memcpy(peerInfo->hostName, hostBuff, strlen(hostBuff));
2542     peerInfo->hostName[strlen(hostBuff)] = 0;
2543     memcpy(peerInfo->portNumber, portBuff, strlen(portBuff));
2544     peerInfo->portNumber[strlen(portBuff)] = 0;
2545
2546     // map by enoodb/gnb name
2547     sctpMap->setkey(message.message.enodbName, peerInfo);
2548     //map host and port to enodeb
2549     sctpMap->setkey(searchBuff, message.message.enodbName);
2550
2551     // save message for the return values
2552     char key[MAX_ENODB_NAME_SIZE * 2];
2553     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2554     int xaction_len = strlen((const char *) msg->xaction);
2555     auto *xaction = (unsigned char *) calloc(1, xaction_len);
2556     memcpy(xaction, msg->xaction, xaction_len);
2557     sctpMap->setkey(key, xaction);
2558     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2559         mdclog_write(MDCLOG_DEBUG, "End building peerinfo: %s for CU %s", key, message.message.enodbName);
2560     }
2561
2562     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2563         mdclog_write(MDCLOG_DEBUG, "Send connect to FD %d, %s, %d",
2564                      peerInfo->fileDescriptor, __func__, __LINE__);
2565     }
2566     if (connect(peerInfo->fileDescriptor, (SA *) &servaddr, sizeof(servaddr)) < 0) {
2567         if (errno != EINPROGRESS) {
2568             mdclog_write(MDCLOG_ERR, "connect FD %d to host : %s port %d, %s",
2569                          peerInfo->fileDescriptor, host, port, strerror(errno));
2570             close(peerInfo->fileDescriptor);
2571 #ifdef __TRACING__
2572             lspan->Finish();
2573 #endif
2574             return -1;
2575         }
2576         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2577             mdclog_write(MDCLOG_DEBUG,
2578                          "Connect to FD %d returned with EINPROGRESS : %s",
2579                          peerInfo->fileDescriptor, strerror(errno));
2580         }
2581         // since message.message.asndata is pointing to the asndata in the rmr message payload we copy it like this
2582         memcpy(peerInfo->asnData, message.message.asndata, message.message.asnLength);
2583         peerInfo->asnLength = message.message.asnLength;
2584         peerInfo->mtype = msg->mtype;
2585 #ifdef __TRACING__
2586         lspan->Finish();
2587 #endif
2588         return 0;
2589     }
2590
2591     if (mdclog_level_get() >= MDCLOG_INFO) {
2592         mdclog_write(MDCLOG_INFO, "Connect to FD %d returned OK without EINPROGRESS", peerInfo->fileDescriptor);
2593     }
2594
2595     peerInfo->isConnected = true;
2596
2597     if (modifyToEpoll(epoll_fd, peerInfo, (EPOLLIN | EPOLLET), sctpMap, message.message.enodbName, msg->mtype,
2598                       &lspan) != 0) {
2599 #ifdef __TRACING__
2600         lspan->Finish();
2601 #endif
2602         return -1;
2603     }
2604
2605     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2606         mdclog_write(MDCLOG_DEBUG, "Connected to host : %s port %d", host, port);
2607     }
2608
2609     message.message.messageType = msg->mtype;
2610     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2611         mdclog_write(MDCLOG_DEBUG, "Send SCTP message to FD %d", peerInfo->fileDescriptor);
2612     }
2613     if (sendSctpMsg(peerInfo, message, sctpMap, &lspan) != 0) {
2614         mdclog_write(MDCLOG_ERR, "Error write to SCTP  %s %d", __func__, __LINE__);
2615 #ifdef __TRACING__
2616         lspan->Finish();
2617 #endif
2618         return -1;
2619     }
2620     memset(peerInfo->asnData, 0, message.message.asnLength);
2621     peerInfo->asnLength = 0;
2622     peerInfo->mtype = 0;
2623
2624     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2625         mdclog_write(MDCLOG_DEBUG, "Sent message to SCTP for %s", message.message.enodbName);
2626     }
2627 #ifdef __TRACING__
2628     lspan->Finish();
2629 #endif
2630     return 0;
2631 }
2632
2633 /**
2634  *
2635  * @param epoll_fd
2636  * @param peerInfo
2637  * @param events
2638  * @param sctpMap
2639  * @param enodbName
2640  * @param msgType
2641  * @param pSpan
2642  * @return
2643  */
2644 int addToEpoll(int epoll_fd,
2645                ConnectedCU_t *peerInfo,
2646                uint32_t events,
2647                Sctp_Map_t *sctpMap,
2648                char *enodbName,
2649                int msgType,
2650                otSpan *pSpan) {
2651 #ifdef __TRACING__
2652     auto lspan = opentracing::Tracer::Global()->StartSpan(
2653             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2654 #else
2655     otSpan lspan = 0;
2656 #endif
2657     // Add to Epol
2658     struct epoll_event event{};
2659     event.data.ptr = peerInfo;
2660     event.events = events;
2661     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2662         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2663             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2664                          strerror(errno), __func__, __LINE__);
2665         }
2666         close(peerInfo->fileDescriptor);
2667         cleanHashEntry(peerInfo, sctpMap, &lspan);
2668         char key[MAX_ENODB_NAME_SIZE * 2];
2669         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2670         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2671             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2672         }
2673         auto tmp = sctpMap->find(key);
2674         if (tmp) {
2675             free(tmp);
2676         }
2677         sctpMap->erase(key);
2678         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2679 #ifdef __TRACING__
2680         lspan->Finish();
2681 #endif
2682         return -1;
2683     }
2684 #ifdef __TRACING__
2685     lspan->Finish();
2686 #endif
2687     return 0;
2688 }
2689
2690 /**
2691  *
2692  * @param epoll_fd
2693  * @param peerInfo
2694  * @param events
2695  * @param sctpMap
2696  * @param enodbName
2697  * @param msgType
2698  * @param pSpan
2699  * @return
2700  */
2701 int modifyToEpoll(int epoll_fd,
2702                   ConnectedCU_t *peerInfo,
2703                   uint32_t events,
2704                   Sctp_Map_t *sctpMap,
2705                   char *enodbName,
2706                   int msgType,
2707                   otSpan *pSpan) {
2708 #ifdef __TRACING__
2709     auto lspan = opentracing::Tracer::Global()->StartSpan(
2710             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2711 #else
2712     otSpan lspan = 0;
2713 #endif
2714     // Add to Epol
2715     struct epoll_event event{};
2716     event.data.ptr = peerInfo;
2717     event.events = events;
2718     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2719         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2720             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2721                          strerror(errno), __func__, __LINE__);
2722         }
2723         close(peerInfo->fileDescriptor);
2724         cleanHashEntry(peerInfo, sctpMap, &lspan);
2725         char key[MAX_ENODB_NAME_SIZE * 2];
2726         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2727         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2728             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2729         }
2730         auto tmp = sctpMap->find(key);
2731         if (tmp) {
2732             free(tmp);
2733         }
2734         sctpMap->erase(key);
2735         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2736 #ifdef __TRACING__
2737         lspan->Finish();
2738 #endif
2739         return -1;
2740     }
2741 #ifdef __TRACING__
2742     lspan->Finish();
2743 #endif
2744     return 0;
2745 }
2746
2747
2748 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan) {
2749 #ifdef __TRACING__
2750     auto lspan = opentracing::Tracer::Global()->StartSpan(
2751             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2752 #else
2753 //    otSpan lspan = 0;
2754 #endif
2755     //serialize the span
2756 #ifdef __TRACING__
2757     std::unordered_map<std::string, std::string> data;
2758     RICCarrierWriter carrier(data);
2759     opentracing::Tracer::Global()->Inject((lspan.get())->context(), carrier);
2760     nlohmann::json j = data;
2761     std::string str = j.dump();
2762     static auto maxTraceLength = 0;
2763
2764     maxTraceLength = str.length() > maxTraceLength ? str.length() : maxTraceLength;
2765     // serialized context can be put to RMR message using function:
2766     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2767         mdclog_write(MDCLOG_DEBUG, "max trace length is %d trace data length = %ld data = %s", maxTraceLength,
2768                      str.length(), str.c_str());
2769     }
2770     rmr_set_trace(rmrMessageBuffer.sendMessage, (const unsigned char *) str.c_str(), str.length());
2771 #endif
2772     buildJsonMessage(message);
2773
2774     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2775
2776     if (rmrMessageBuffer.sendMessage == nullptr) {
2777         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2778         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2779 #ifdef __TRACING__
2780         lspan->Finish();
2781 #endif
2782         return -1;
2783     }
2784
2785     if (rmrMessageBuffer.sendMessage->state != 0) {
2786         char meid[RMR_MAX_MEID]{};
2787         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2788             usleep(5);
2789             rmrMessageBuffer.sendMessage->state = 0;
2790             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2791                          rmrMessageBuffer.sendMessage->mtype,
2792                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2793             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2794             if (rmrMessageBuffer.sendMessage == nullptr) {
2795                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2796                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2797 #ifdef __TRACING__
2798                 lspan->Finish();
2799 #endif
2800                 return -1;
2801             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2802                 mdclog_write(MDCLOG_ERR,
2803                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2804                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2805                              rmrMessageBuffer.sendMessage->mtype,
2806                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2807                 auto rc = rmrMessageBuffer.sendMessage->state;
2808 #ifdef __TRACING__
2809                 lspan->Finish();
2810 #endif
2811                 return rc;
2812             }
2813         } else {
2814             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2815                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2816                          rmrMessageBuffer.sendMessage->mtype,
2817                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2818 #ifdef __TRACING__
2819             lspan->Finish();
2820 #endif
2821             return rmrMessageBuffer.sendMessage->state;
2822         }
2823     }
2824     return 0;
2825 }
2826
2827 void buildJsonMessage(ReportingMessages_t &message) {
2828     if (jsonTrace) {
2829         message.outLen = sizeof(message.base64Data);
2830         base64::encode((const unsigned char *) message.message.asndata,
2831                        (const int) message.message.asnLength,
2832                        message.base64Data,
2833                        message.outLen);
2834         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2835             mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2836                          (int) message.message.asnLength,
2837                          (int) message.outLen);
2838         }
2839
2840         snprintf(message.buffer, sizeof(message.buffer),
2841                                      "{\"header\": {\"ts\": \"%ld.%09ld\","
2842                                      "\"ranName\": \"%s\","
2843                                      "\"messageType\": %d,"
2844                                      "\"direction\": \"%c\"},"
2845                                      "\"base64Length\": %d,"
2846                                      "\"asnBase64\": \"%s\"}",
2847                                      message.message.time.tv_sec,
2848                                      message.message.time.tv_nsec,
2849                                      message.message.enodbName,
2850                                      message.message.messageType,
2851                                      message.message.direction,
2852                                      (int) message.outLen,
2853                                      message.base64Data);
2854         static src::logger_mt &lg = my_logger::get();
2855
2856         BOOST_LOG(lg) << message.buffer;
2857     }
2858 }
2859
2860
2861 /**
2862  * take RMR error code to string
2863  * @param state
2864  * @return
2865  */
2866 string translateRmrErrorMessages(int state) {
2867     string str = {};
2868     switch (state) {
2869         case RMR_OK:
2870             str = "RMR_OK - state is good";
2871             break;
2872         case RMR_ERR_BADARG:
2873             str = "RMR_ERR_BADARG - argument passd to function was unusable";
2874             break;
2875         case RMR_ERR_NOENDPT:
2876             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2877             break;
2878         case RMR_ERR_EMPTY:
2879             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2880             break;
2881         case RMR_ERR_NOHDR:
2882             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2883             break;
2884         case RMR_ERR_SENDFAILED:
2885             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2886             break;
2887         case RMR_ERR_CALLFAILED:
2888             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2889             break;
2890         case RMR_ERR_NOWHOPEN:
2891             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2892             break;
2893         case RMR_ERR_WHID:
2894             str = "RMR_ERR_WHID - wormhole id was invalid";
2895             break;
2896         case RMR_ERR_OVERFLOW:
2897             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2898             break;
2899         case RMR_ERR_RETRY:
2900             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2901             break;
2902         case RMR_ERR_RCVFAILED:
2903             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2904             break;
2905         case RMR_ERR_TIMEOUT:
2906             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2907             break;
2908         case RMR_ERR_UNSET:
2909             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2910             break;
2911         case RMR_ERR_TRUNC:
2912             str = "RMR_ERR_TRUNC - received message likely truncated";
2913             break;
2914         case RMR_ERR_INITFAILED:
2915             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2916             break;
2917         case RMR_ERR_NOTSUPP:
2918             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2919             break;
2920         default:
2921             char buf[128]{};
2922             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2923             str = buf;
2924             break;
2925     }
2926     return str;
2927 }
2928
2929