fix typo
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22 #include "sctpThread.h"
23
24
25 using namespace std::placeholders;
26 using namespace boost::filesystem;
27
28 #ifdef __TRACING__
29 using namespace opentracing;
30 #endif
31 //#ifdef __cplusplus
32 //extern "C"
33 //{
34 //#endif
35
36 // need to expose without the include of gcov
37 extern "C" void __gcov_flush(void);
38
39 static void catch_function(int signal) {
40     __gcov_flush();
41     exit(signal);
42 }
43
44
45 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
46
47 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
48 double cpuClock = 0.0;
49 bool jsonTrace = true;
50
51 void init_log() {
52     mdclog_attr_t *attr;
53     mdclog_attr_init(&attr);
54     mdclog_attr_set_ident(attr, "E2Terminator");
55     mdclog_init(attr);
56     mdclog_attr_destroy(attr);
57 }
58 auto start_time = std::chrono::high_resolution_clock::now();
59 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
60
61 double age() {
62     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
63 }
64
65 double approx_CPU_MHz(unsigned sleeptime) {
66     using namespace std::chrono_literals;
67     uint32_t aux = 0;
68     uint64_t cycles_start = rdtscp(aux);
69     double time_start = age();
70     std::this_thread::sleep_for(sleeptime * 1ms);
71     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
72     double elapsed_time = age() - time_start;
73     return elapsed_cycles / elapsed_time;
74 }
75
76 //std::atomic<int64_t> rmrCounter{0};
77 std::atomic<int64_t> num_of_messages{0};
78 std::atomic<int64_t> num_of_XAPP_messages{0};
79 static long transactionCounter = 0;
80
81
82 int main(const int argc, char **argv) {
83     sctp_params_t sctpParams;
84
85
86
87 #ifdef __TRACING__
88     opentracing::Tracer::InitGlobal(tracelibcpp::createTracer("E2 Terminator"));
89     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
90 #else
91     otSpan span = 0;
92 #endif
93
94     {
95         std::random_device device{};
96         std::mt19937 generator(device());
97         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
98         transactionCounter = distribution(generator);
99     }
100
101     uint64_t st = 0,en = 0;
102     uint32_t aux1 = 0;
103     uint32_t aux2 = 0;
104     st = rdtscp(aux1);
105
106     unsigned num_cpus = std::thread::hardware_concurrency();
107     init_log();
108     mdclog_level_set(MDCLOG_INFO);
109
110     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
111         mdclog_write(MDCLOG_ERR, "Errir initializing SIGINT");
112         exit(1);
113     }
114     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
115         mdclog_write(MDCLOG_ERR, "Errir initializing SIGABRT");
116         exit(1);
117     }
118     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
119         mdclog_write(MDCLOG_ERR, "Errir initializing SIGTERM");
120         exit(1);
121     }
122
123
124     cpuClock = approx_CPU_MHz(100);
125
126     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
127     auto result = parse(argc, argv, sctpParams);
128
129     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
130     if (exists(p)) {
131         const int size = 2048;
132         auto fileSize = file_size(p);
133         if (fileSize > size) {
134             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
135             exit(-1);
136         }
137     } else {
138         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
139         exit(-1);
140     }
141
142
143     ReadConfigFile conf;
144     if (conf.openConfigFile(p.string()) == -1) {
145         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
146                      p.string().c_str(), strerror(errno));
147         exit(-1);
148     }
149     int rmrPort = conf.getIntValue("nano");
150     if (rmrPort == -1) {
151         mdclog_write(MDCLOG_ERR, "illigal RMR port ");
152         exit(-1);
153     }
154     sctpParams.rmrPort = (uint16_t)rmrPort;
155     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
156
157     auto tmpStr = conf.getStringValue("loglevel");
158     if (tmpStr.length() == 0) {
159         mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
160         tmpStr = "info";
161     }
162     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
163
164     if ((tmpStr.compare("debug")) == 0) {
165         sctpParams.logLevel = MDCLOG_DEBUG;
166     } else if ((tmpStr.compare("info")) == 0) {
167         sctpParams.logLevel = MDCLOG_INFO;
168     } else if ((tmpStr.compare("warning")) == 0) {
169         sctpParams.logLevel = MDCLOG_WARN;
170     } else if ((tmpStr.compare("error")) == 0) {
171         sctpParams.logLevel = MDCLOG_ERR;
172     } else {
173         mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
174         sctpParams.logLevel = MDCLOG_INFO;
175     }
176     mdclog_level_set(sctpParams.logLevel);
177
178     tmpStr = conf.getStringValue("volume");
179     if (tmpStr.length() == 0) {
180         mdclog_write(MDCLOG_ERR, "illigal volume.");
181         exit(-1);
182     }
183
184     char tmpLogFilespec[VOLUME_URL_SIZE];
185     tmpLogFilespec[0] = 0;
186     sctpParams.volume[0] = 0;
187     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
188     // copy the name to temp file as well
189     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
190
191
192     // define the file name in the tmp directory under the volume
193     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
194
195 //    std::string localIP = conf.getStringValue("local-ip");
196 //    if (localIP.length() == 0) {
197 //        mdclog_write(MDCLOG_ERR, "illigal local-ip. environment variable");
198 //        exit(-1);
199 //    }
200
201     //sctpParams.myIP.assign(getenv(localIP.c_str()));
202     sctpParams.myIP = conf.getStringValue("local-ip");
203     if (sctpParams.myIP.length() == 0) {
204         mdclog_write(MDCLOG_ERR, "illigal local-ip.");
205         exit(-1);
206     }
207
208     sctpParams.myIP = conf.getStringValue("external-fqdn");
209     if (sctpParams.myIP.length() == 0) {
210         mdclog_write(MDCLOG_ERR, "illigal external-fqdn.");
211         exit(-1);
212     }
213
214     std::string pod = conf.getStringValue("pod_name");
215     if (pod.length() == 0) {
216         mdclog_write(MDCLOG_ERR, "illigal pod_name in config file");
217         exit(-1);
218     }
219     auto *podName = getenv(pod.c_str());
220     if (podName == nullptr) {
221         mdclog_write(MDCLOG_ERR, "illigal pod_name or environment varible not exists : %s", pod.c_str());
222         exit(-1);
223
224     } else {
225         sctpParams.podName.assign(podName);
226         if (sctpParams.podName.length() == 0) {
227             mdclog_write(MDCLOG_ERR, "illigal pod_name");
228             exit(-1);
229         }
230     }
231
232     tmpStr = conf.getStringValue("trace");
233     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
234     if ((tmpStr.compare("start")) == 0) {
235         mdclog_write(MDCLOG_INFO, "Trace set to: start");
236         sctpParams.trace = true;
237     } else if ((tmpStr.compare("stop")) == 0) {
238         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
239         sctpParams.trace = false;
240     }
241     jsonTrace = sctpParams.trace;
242
243     en = rdtscp(aux2);
244
245     mdclog_write(MDCLOG_INFO, "start = %lx end = %lx diff = %lx\n", st, en, en - st);
246     mdclog_write(MDCLOG_INFO, "start high = %lx start lo = %lx end high = %lx end lo = %lx\n",
247             st >> 32, st & 0xFFFFFFFF, (int64_t)en >> 32, en & 0xFFFFFFFF);
248     mdclog_write(MDCLOG_INFO, "ellapsed time = %5.9f\n", (double)(en - st)/cpuClock);
249
250     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, 4096, "{\"address\": \"%s:%d\","
251                                                                          "\"fqdn\": \"%s\","
252                                                                          "\"pod_name\": \"%s\"}",
253                                             (const char *)sctpParams.myIP.c_str(),
254                                             sctpParams.rmrPort,
255                                             sctpParams.fqdn.c_str(),
256                                             sctpParams.podName.c_str());
257
258     if (mdclog_level_get() >= MDCLOG_INFO) {
259         mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
260         mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
261         mdclog_mdc_add("volume", sctpParams.volume);
262         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
263         mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
264         mdclog_mdc_add("pod name", sctpParams.podName.c_str());
265
266         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
267     }
268     mdclog_mdc_clean();
269
270     // Files written to the current working directory
271     boostLogger = logging::add_file_log(
272             keywords::file_name = tmpLogFilespec, // to temp directory
273             keywords::rotation_size = 10 * 1024 * 1024,
274             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
275             keywords::format = "%Message%"
276             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
277     );
278
279     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
280     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
281             keywords::target = sctpParams.volume
282     ));
283
284     // Upon restart, scan the directory for files matching the file_name pattern
285     boostLogger->locked_backend()->scan_for_files();
286
287     // Enable auto-flushing after each tmpStr record written
288     if (mdclog_level_get() >= MDCLOG_DEBUG) {
289         boostLogger->locked_backend()->auto_flush(true);
290     }
291
292     // start epoll
293     sctpParams.epoll_fd = epoll_create1(0);
294     if (sctpParams.epoll_fd == -1) {
295         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
296         exit(-1);
297     }
298
299     getRmrContext(sctpParams, &span);
300     if (sctpParams.rmrCtx == nullptr) {
301         close(sctpParams.epoll_fd);
302         exit(-1);
303     }
304
305     if (buildInotify(sctpParams) == -1) {
306         close(sctpParams.rmrListenFd);
307         rmr_close(sctpParams.rmrCtx);
308         close(sctpParams.epoll_fd);
309         exit(-1);
310      }
311
312     sctpParams.sctpMap = new mapWrapper();
313
314     std::vector<std::thread> threads(num_cpus);
315 //    std::vector<std::thread> threads;
316
317     num_cpus = 1;
318     for (unsigned int i = 0; i < num_cpus; i++) {
319         threads[i] = std::thread(listener, &sctpParams);
320
321         cpu_set_t cpuset;
322         CPU_ZERO(&cpuset);
323         CPU_SET(i, &cpuset);
324         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
325         if (rc != 0) {
326             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
327         }
328     }
329
330     //loop over term_init until first message from xApp
331     handleTermInit(sctpParams);
332
333     for (auto &t : threads) {
334         t.join();
335     }
336
337 #ifdef __TRACING__
338     opentracing::Tracer::Global()->Close();
339 #endif
340     return 0;
341 }
342
343 void handleTermInit(sctp_params_t &sctpParams) {
344     sendTermInit(sctpParams);
345     //send to e2 manager init of e2 term
346     //E2_TERM_INIT
347
348     int count = 0;
349     while (true) {
350         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
351         if (xappMessages > 0) {
352             if (mdclog_level_get() >=  MDCLOG_INFO) {
353                 mdclog_write(MDCLOG_INFO, "Got a message from some appliction, stop sending E@_TERM_INIT");
354             }
355             return;
356         }
357         usleep(100000);
358         count++;
359         if (count % 1000 == 0) {
360             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
361             sendTermInit(sctpParams);
362         }
363     }
364 }
365
366 void sendTermInit(sctp_params_t &sctpParams) {
367     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
368     auto count = 0;
369     while (true) {
370         msg->mtype = E2_TERM_INIT;
371         msg->state = 0;
372         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
373         static unsigned char tx[32];
374         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
375         rmr_bytes2xact(msg, tx, txLen);
376         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
377         if (msg == nullptr) {
378             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.myIP.length());
379         } else if (msg->state == 0) {
380             rmr_free_msg(msg);
381             if (mdclog_level_get() >=  MDCLOG_INFO) {
382                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT succsesfuly sent ");
383             }
384             return;
385         } else {
386             if (count % 100 == 0) {
387                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %d ", msg->state);
388             }
389             sleep(1);
390         }
391         count++;
392     }
393
394 }
395
396 /**
397  *
398  * @param argc
399  * @param argv
400  * @param sctpParams
401  * @return
402  */
403 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
404     cxxopts::Options options(argv[0], "e2 term help");
405     options.positional_help("[optional args]").show_positional_help();
406     options.allow_unrecognised_options().add_options()
407             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
408             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
409             ("h,help", "Print help");
410
411     auto result = options.parse(argc, argv);
412
413     if (result.count("help")) {
414         std::cout << options.help({""}) << std::endl;
415         exit(0);
416     }
417     return result;
418 }
419
420 /**
421  *
422  * @param sctpParams
423  * @return -1 failed 0 success
424  */
425 int buildInotify(sctp_params_t &sctpParams) {
426     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
427     if (sctpParams.inotifyFD == -1) {
428         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
429         close(sctpParams.rmrListenFd);
430         rmr_close(sctpParams.rmrCtx);
431         close(sctpParams.epoll_fd);
432         return -1;
433     }
434
435     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
436                                               (const char *)sctpParams.configFilePath.c_str(),
437                                               IN_OPEN | IN_CLOSE);
438     if (sctpParams.inotifyWD == -1) {
439         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
440                 sctpParams.configFilePath.c_str(),
441                 strerror(errno));
442         close(sctpParams.inotifyFD);
443         return -1;
444     }
445
446     struct epoll_event event{};
447     event.events = (EPOLLIN);
448     event.data.fd = sctpParams.inotifyFD;
449     // add listening RMR FD to epoll
450     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
451         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
452         close(sctpParams.inotifyFD);
453         return -1;
454     }
455     return 0;
456 }
457
458 /**
459  *
460  * @param args
461  * @return
462  */
463 void listener(sctp_params_t *params) {
464 #ifdef __TRACING__
465     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
466 #else
467     otSpan span = 0;
468 #endif
469     int num_of_SCTP_messages = 0;
470     auto totalTime = 0.0;
471     mdclog_mdc_clean();
472     mdclog_level_set(params->logLevel);
473
474     std::thread::id this_id = std::this_thread::get_id();
475     //save cout
476     streambuf *oldCout = cout.rdbuf();
477     ostringstream memCout;
478     // create new cout
479     cout.rdbuf(memCout.rdbuf());
480     cout << this_id;
481     //return to the normal cout
482     cout.rdbuf(oldCout);
483
484     char tid[32];
485     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
486     tid[memCout.str().length()] = 0;
487     mdclog_mdc_add("thread id", tid);
488
489     if (mdclog_level_get() >= MDCLOG_DEBUG) {
490         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
491     }
492
493
494     RmrMessagesBuffer_t rmrMessageBuffer{};
495     //create and init RMR
496     rmrMessageBuffer.rmrCtx = params->rmrCtx;
497
498     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
499     struct timespec end{0, 0};
500     struct timespec start{0, 0};
501
502     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
503     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
504
505     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
506     rmrMessageBuffer.ka_message_len = params->ka_message_length;
507     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
508
509     if (mdclog_level_get() >= MDCLOG_DEBUG) {
510         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
511     }
512
513     ReportingMessages_t message {};
514
515     for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
516         rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
517         rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
518     }
519
520     while (true) {
521         if (mdclog_level_get() >= MDCLOG_DEBUG) {
522             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
523         }
524         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
525         if (numOfEvents < 0 && errno == EINTR) {
526             if (mdclog_level_get() >= MDCLOG_DEBUG) {
527                 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
528             }
529             continue;
530         }
531         if (numOfEvents < 0) {
532             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
533             return;
534         }
535         for (auto i = 0; i < numOfEvents; i++) {
536             if (mdclog_level_get() >= MDCLOG_DEBUG) {
537                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
538             }
539             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
540             start.tv_sec = message.message.time.tv_sec;
541             start.tv_nsec = message.message.time.tv_nsec;
542
543
544             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
545                 handlepoll_error(events[i], message, rmrMessageBuffer, params, &span);
546             } else if (events[i].events & EPOLLOUT) {
547                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params, &span);
548             } else if (params->rmrListenFd == events[i].data.fd) {
549                 // got message from XAPP
550                 num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
551                 num_of_messages.fetch_add(1, std::memory_order_release);
552                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
553                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
554                 }
555                 if (receiveXappMessages(params->epoll_fd,
556                                         params->sctpMap,
557                                         rmrMessageBuffer,
558                                         message.message.time,
559                                         &span) != 0) {
560                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
561                 }
562             } else if (params->inotifyFD == events[i].data.fd) {
563                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
564                 handleConfigChange(params);
565             } else {
566                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
567                  * We must read whatever data is available completely, as we are running
568                  *  in edge-triggered mode and won't get a notification again for the same data. */
569                 num_of_messages.fetch_add(1, std::memory_order_release);
570                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
571                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
572                 }
573                 receiveDataFromSctp(&events[i],
574                                     params->sctpMap,
575                                     num_of_SCTP_messages,
576                                     rmrMessageBuffer,
577                                     message.message.time,
578                                     &span);
579             }
580
581             clock_gettime(CLOCK_MONOTONIC, &end);
582             if (mdclog_level_get() >= MDCLOG_INFO) {
583                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
584                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
585             }
586             if (mdclog_level_get() >= MDCLOG_DEBUG) {
587                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
588                              end.tv_sec - start.tv_sec,
589                              end.tv_nsec - start.tv_nsec);
590             }
591         }
592     }
593 #ifdef __TRACING__
594     span->Finish();
595 #else
596
597 #endif
598 }
599
600 /**
601  *
602  * @param sctpParams
603  */
604 void handleConfigChange(sctp_params_t *sctpParams) {
605     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
606     const struct inotify_event *event;
607     char *ptr;
608
609     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
610     auto endlessLoop = true;
611     while (endlessLoop) {
612         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
613         if (len == -1) {
614             if (errno != EAGAIN) {
615                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
616                 endlessLoop = false;
617                 continue;
618             }
619             else {
620                 endlessLoop = false;
621                 continue;
622             }
623         }
624
625         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
626             event = (const struct inotify_event *)ptr;
627             if (event->mask & (uint32_t)IN_ISDIR) {
628                 continue;
629             }
630
631             // the directory name
632             if (sctpParams->inotifyWD == event->wd) {
633                 // not the directory
634             }
635             if (event->len) {
636                 if (!(sctpParams->configFileName.compare(event->name))) {
637                     continue;
638                 }
639             }
640             // only the file we want
641             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
642                 if (exists(p)) {
643                     const int size = 2048;
644                     auto fileSize = file_size(p);
645                     if (fileSize > size) {
646                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
647                         return;
648                     }
649                 } else {
650                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
651                     return;
652                 }
653
654                 ReadConfigFile conf;
655                 if (conf.openConfigFile(p.string()) == -1) {
656                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
657                                  p.string().c_str(), strerror(errno));
658                     return;
659                 }
660
661                 auto tmpStr = conf.getStringValue("loglevel");
662                 if (tmpStr.length() == 0) {
663                     mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
664                     tmpStr = "info";
665                 }
666                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
667
668                 if ((tmpStr.compare("debug")) == 0) {
669                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
670                     sctpParams->logLevel = MDCLOG_DEBUG;
671                 } else if ((tmpStr.compare("info")) == 0) {
672                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
673                     sctpParams->logLevel = MDCLOG_INFO;
674                 } else if ((tmpStr.compare("warning")) == 0) {
675                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
676                     sctpParams->logLevel = MDCLOG_WARN;
677                 } else if ((tmpStr.compare("error")) == 0) {
678                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
679                     sctpParams->logLevel = MDCLOG_ERR;
680                 } else {
681                     mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
682                     sctpParams->logLevel = MDCLOG_INFO;
683                 }
684                 mdclog_level_set(sctpParams->logLevel);
685
686
687                 tmpStr = conf.getStringValue("trace");
688                 if (tmpStr.length() == 0) {
689                     mdclog_write(MDCLOG_ERR, "illigal trace. Set trace to stop");
690                     tmpStr = "stop";
691                 }
692
693                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
694                 if ((tmpStr.compare("start")) == 0) {
695                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
696                     sctpParams->trace = true;
697                 } else if ((tmpStr.compare("stop")) == 0) {
698                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
699                     sctpParams->trace = false;
700                 } else {
701                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
702                     sctpParams->trace = false;
703                 }
704                 jsonTrace = sctpParams->trace;
705                 endlessLoop = false;
706             }
707         }
708     }
709 }
710
711 /**
712  *
713  * @param event
714  * @param message
715  * @param rmrMessageBuffer
716  * @param params
717  * @param pSpan
718  */
719 void handleEinprogressMessages(struct epoll_event &event,
720                                ReportingMessages_t &message,
721                                RmrMessagesBuffer_t &rmrMessageBuffer,
722                                sctp_params_t *params,
723                                otSpan *pSpan) {
724 #ifdef __TRACING__
725     auto lspan = opentracing::Tracer::Global()->StartSpan(
726             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
727 #else
728     otSpan lspan = 0;
729 #endif
730     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
731     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
732
733     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
734     auto retVal = 0;
735     socklen_t retValLen = 0;
736     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
737     if (rc != 0 || retVal != 0) {
738         if (rc != 0) {
739             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
740                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
741                                                          peerInfo->enodbName, strerror(errno));
742         } else if (retVal != 0) {
743             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
744                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
745                                                          peerInfo->enodbName);
746         }
747
748         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
749         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
750         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
751         message.message.direction = 'N';
752         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
753             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
754         }
755         memset(peerInfo->asnData, 0, peerInfo->asnLength);
756         peerInfo->asnLength = 0;
757         peerInfo->mtype = 0;
758 #ifdef __TRACING__
759         lspan->Finish();
760 #endif
761         return;
762     }
763
764     peerInfo->isConnected = true;
765
766     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
767                       peerInfo->mtype, &lspan) != 0) {
768         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
769 #ifdef __TRACING__
770         lspan->Finish();
771 #endif
772         return;
773     }
774
775     message.message.asndata = (unsigned char *)peerInfo->asnData;
776     message.message.asnLength = peerInfo->asnLength;
777     message.message.messageType = peerInfo->mtype;
778     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
779     num_of_messages.fetch_add(1, std::memory_order_release);
780     if (mdclog_level_get() >= MDCLOG_DEBUG) {
781         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
782                      message.message.enodbName);
783     }
784     if (sendSctpMsg(peerInfo, message, params->sctpMap, &lspan) != 0) {
785         if (mdclog_level_get() >= MDCLOG_DEBUG) {
786             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
787         }
788 #ifdef __TRACING__
789         lspan->Finish();
790 #endif
791         return;
792     }
793
794     memset(peerInfo->asnData, 0, peerInfo->asnLength);
795     peerInfo->asnLength = 0;
796     peerInfo->mtype = 0;
797 #ifdef __TRACING__
798     lspan->Finish();
799 #endif
800 }
801
802
803 void handlepoll_error(struct epoll_event &event,
804                       ReportingMessages_t &message,
805                       RmrMessagesBuffer_t &rmrMessageBuffer,
806                       sctp_params_t *params,
807                       otSpan *pSpan) {
808 #ifdef __TRACING__
809     auto lspan = opentracing::Tracer::Global()->StartSpan(
810             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
811 #else
812     otSpan lspan = 0;
813 #endif
814     if (event.data.fd != params->rmrListenFd) {
815         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
816         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
817                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
818
819         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
820                                                      "%s|Failed SCTP Connection",
821                                                      peerInfo->enodbName);
822         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
823         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
824
825         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
826         message.message.direction = 'N';
827         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
828             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
829         }
830
831         close(peerInfo->fileDescriptor);
832         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap, &lspan);
833     } else {
834         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
835     }
836 #ifdef __TRACING__
837     lspan->Finish();
838 #endif
839
840 }
841 /**
842  *
843  * @param socket
844  * @return
845  */
846 int setSocketNoBlocking(int socket) {
847     auto flags = fcntl(socket, F_GETFL, 0);
848
849     if (flags == -1) {
850         mdclog_mdc_add("func", "fcntl");
851         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
852         mdclog_mdc_clean();
853         return -1;
854     }
855
856     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
857     if (fcntl(socket, F_SETFL, flags) == -1) {
858         mdclog_mdc_add("func", "fcntl");
859         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
860         mdclog_mdc_clean();
861         return -1;
862     }
863
864     return 0;
865 }
866
867 /**
868  *
869  * @param val
870  * @param m
871  * @param pSpan
872  */
873 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m, otSpan *pSpan) {
874 #ifdef __TRACING__
875     auto lspan = opentracing::Tracer::Global()->StartSpan(
876             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
877 #else
878 //    otSpan lspan = 0;
879 #endif
880     char *dummy;
881     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
882     char searchBuff[256]{};
883
884     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
885     m->erase(searchBuff);
886
887     m->erase(val->enodbName);
888     free(val);
889 #ifdef __TRACING__
890     lspan->Finish();
891 #endif
892 }
893
894 /**
895  *
896  * @param fd file discriptor
897  * @param data the asn data to send
898  * @param len  length of the data
899  * @param enodbName the enodbName as in the map for printing purpose
900  * @param m map host information
901  * @param mtype message number
902  * @return 0 success, anegative number on fail
903  */
904 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m, otSpan *pSpan) {
905 #ifdef __TRACING__
906     auto lspan = opentracing::Tracer::Global()->StartSpan(
907             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
908 #else
909     otSpan lspan = 0;
910 #endif
911     auto loglevel = mdclog_level_get();
912     int fd = peerInfo->fileDescriptor;
913     if (loglevel >= MDCLOG_DEBUG) {
914         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
915                      message.message.enodbName, __FUNCTION__);
916     }
917
918     while (true) {
919         //TODO add send to VES client or KAFKA
920         //format ts|mtype|direction(D/U)|length of asn data|raw data
921 //        auto length = sizeof message.message.time
922 //                      + sizeof message.message.enodbName
923 //                      + sizeof message.message.messageType
924 //                      + sizeof message.message.direction
925 //                      + sizeof message.message.asnLength
926 //                      + message.message.asnLength;
927
928         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
929             if (errno == EINTR) {
930                 continue;
931             }
932             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
933             // Prevent double free() of peerInfo in the event of connection failure.
934             // Returning failure will trigger, in x2/endc setup flow, RIC_SCTP_CONNECTION_FAILURE rmr message causing the E2M to retry.
935             if (!peerInfo->isConnected){
936                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
937 #ifdef __TRACING__
938             lspan->Finish();
939 #endif
940                 return -1;
941             }
942             cleanHashEntry(peerInfo, m, &lspan);
943             close(fd);
944             char key[MAX_ENODB_NAME_SIZE * 2];
945             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
946                      message.message.messageType);
947             if (loglevel >= MDCLOG_DEBUG) {
948                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
949             }
950             auto tmp = m->find(key);
951             if (tmp) {
952                 free(tmp);
953             }
954             m->erase(key);
955 #ifdef __TRACING__
956             lspan->Finish();
957 #endif
958             return -1;
959         }
960         message.message.direction = 'D';
961         // send report.buffer of size
962         buildJsonMessage(message);
963
964         if (loglevel >= MDCLOG_DEBUG) {
965             mdclog_write(MDCLOG_DEBUG,
966                          "SCTP message for CU %s sent from %s",
967                          message.message.enodbName,
968                          __FUNCTION__);
969         }
970 #ifdef __TRACING__
971         lspan->Finish();
972 #endif
973
974         return 0;
975     }
976 }
977
978 /**
979  *
980  * @param message
981  * @param rmrMessageBuffer
982  * @param pSpan
983  */
984 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
985 #ifdef __TRACING__
986     auto lspan = opentracing::Tracer::Global()->StartSpan(
987             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
988 #else
989 //    otSpan lspan = 0;
990 #endif
991     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)(message.message.enodbName));
992
993     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
994     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
995
996     if (mdclog_level_get() >= MDCLOG_DEBUG) {
997         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
998                      message.message.enodbName, (unsigned long) message.message.asnLength);
999     }
1000 #ifdef __TRACING__
1001     lspan->Finish();
1002 #endif
1003
1004 }
1005
1006
1007 /**
1008  *
1009  * @param metaData all the data strip to structure
1010  * @param data the data recived from xAPP
1011  * @return 0 success all other values are fault
1012  */
1013 int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan) {
1014 #ifdef __TRACING__
1015     auto lspan = opentracing::Tracer::Global()->StartSpan(
1016             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1017 #else
1018 //    otSpan lspan = 0;
1019 #endif
1020     auto loglevel = mdclog_level_get();
1021
1022     char delimiter[4] {};
1023     memset(delimiter, 0, (size_t)4);
1024     delimiter[0] = '|';
1025     char *tmp;
1026
1027     char *val = strtok_r(data, delimiter, &tmp);
1028     if (val != nullptr) {
1029         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1030             mdclog_write(MDCLOG_DEBUG, "SCTP ADDRESS parameter from message = %s", val);
1031         }
1032         memcpy(host, val, tmp - val );
1033     } else {
1034         mdclog_write(MDCLOG_ERR, "wrong Host Name for setup request %s", data);
1035 #ifdef __TRACING__
1036         lspan->Finish();
1037 #endif
1038         return -1;
1039     }
1040
1041     val = strtok_r(nullptr, delimiter, &tmp);
1042     if (val != nullptr) {
1043         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1044             mdclog_write(MDCLOG_DEBUG, "PORT parameter from message = %s", val);
1045         }
1046         char *dummy;
1047         port = (uint16_t)strtol(val, &dummy, 10);
1048     } else {
1049         mdclog_write(MDCLOG_ERR, "wrong Port for setup request %s", data);
1050 #ifdef __TRACING__
1051         lspan->Finish();
1052 #endif
1053         return -2;
1054     }
1055
1056     val = strtok_r(nullptr, delimiter, &tmp);
1057     if (val != nullptr) {
1058         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1059             mdclog_write(MDCLOG_DEBUG, "RAN NAME parameter from message = %s", val);
1060         }
1061         memcpy(message.message.enodbName, val, tmp - val);
1062     } else {
1063         mdclog_write(MDCLOG_ERR, "wrong gNb/Enodeb name for setup request %s", data);
1064 #ifdef __TRACING__
1065         lspan->Finish();
1066 #endif
1067
1068         return -3;
1069     }
1070     val = strtok_r(nullptr, delimiter, &tmp);
1071     if (val != nullptr) {
1072         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1073             mdclog_write(MDCLOG_DEBUG, "ASN length parameter from message = %s", val);
1074         }
1075         char *dummy;
1076         message.message.asnLength = (uint16_t) strtol(val, &dummy, 10);
1077     } else {
1078         mdclog_write(MDCLOG_ERR, "wrong ASN length for setup request %s", data);
1079 #ifdef __TRACING__
1080         lspan->Finish();
1081 #endif
1082         return -4;
1083     }
1084
1085     message.message.asndata = (unsigned char *)tmp;  // tmp is local but point to the location in data
1086
1087     if (loglevel >= MDCLOG_INFO) {
1088         mdclog_write(MDCLOG_INFO, "Message from Xapp RAN name = %s host address = %s port = %d",
1089                      message.message.enodbName, host, port);
1090     }
1091 #ifdef __TRACING__
1092     lspan->Finish();
1093 #endif
1094
1095     return 0;
1096 }
1097
1098 /**
1099  *
1100  * @param events
1101  * @param sctpMap
1102  * @param numOfMessages
1103  * @param rmrMessageBuffer
1104  * @param ts
1105  * @param pSpan
1106  * @return
1107  */
1108 int receiveDataFromSctp(struct epoll_event *events,
1109                         Sctp_Map_t *sctpMap,
1110                         int &numOfMessages,
1111                         RmrMessagesBuffer_t &rmrMessageBuffer,
1112                         struct timespec &ts,
1113                         otSpan *pSpan) {
1114 #ifdef __TRACING__
1115     auto lspan = opentracing::Tracer::Global()->StartSpan(
1116             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1117 #else
1118     otSpan lspan = 0;
1119 #endif
1120     /* We have data on the fd waiting to be read. Read and display it.
1121  * We must read whatever data is available completely, as we are running
1122  *  in edge-triggered mode and won't get a notification again for the same data. */
1123     int done = 0;
1124     auto loglevel = mdclog_level_get();
1125     // get the identity of the interface
1126     auto *peerInfo = (ConnectedCU_t *)events->data.ptr;
1127     struct timespec start{0, 0};
1128     struct timespec decodestart{0, 0};
1129     struct timespec end{0, 0};
1130
1131     E2AP_PDU_t *pdu = nullptr;
1132
1133     ReportingMessages_t message {};
1134
1135     while (true) {
1136         if (loglevel >= MDCLOG_DEBUG) {
1137             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", peerInfo->fileDescriptor);
1138             clock_gettime(CLOCK_MONOTONIC, &start);
1139         }
1140         // read the buffer directly to rmr payload
1141         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1142         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1143                 read(peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1144         if (loglevel >= MDCLOG_DEBUG) {
1145             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1146                     peerInfo->fileDescriptor, message.message.asnLength);
1147         }
1148         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1149         message.message.direction = 'U';
1150         message.message.time.tv_nsec = ts.tv_nsec;
1151         message.message.time.tv_sec = ts.tv_sec;
1152
1153         if (message.message.asnLength < 0) {
1154             if (errno == EINTR) {
1155                 continue;
1156             }
1157             /* If errno == EAGAIN, that means we have read all
1158                data. So go back to the main loop. */
1159             if (errno != EAGAIN) {
1160                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1161                 done = 1;
1162             } else if (loglevel >= MDCLOG_DEBUG) {
1163                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", peerInfo->fileDescriptor);
1164             }
1165             break;
1166         } else if (message.message.asnLength == 0) {
1167             /* End of file. The remote has closed the connection. */
1168             if (loglevel >= MDCLOG_INFO) {
1169                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1170                              peerInfo->fileDescriptor);
1171             }
1172             done = 1;
1173             break;
1174         }
1175
1176         asn_dec_rval_t rval;
1177         if (loglevel >= MDCLOG_DEBUG) {
1178             char printBuffer[4096]{};
1179             char *tmp = printBuffer;
1180             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1181                 snprintf(tmp, 2, "%02x", message.message.asndata[i]);
1182                 tmp += 2;
1183             }
1184             printBuffer[message.message.asnLength] = 0;
1185             clock_gettime(CLOCK_MONOTONIC, &end);
1186             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1187                          peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1188             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1189                          printBuffer);
1190             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1191         }
1192
1193         rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1194                           message.message.asndata, message.message.asnLength);
1195         if (rval.code != RC_OK) {
1196             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1197                          peerInfo->enodbName);
1198             break;
1199         }
1200
1201         if (loglevel >= MDCLOG_DEBUG) {
1202             clock_gettime(CLOCK_MONOTONIC, &end);
1203             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1204                          peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1205             char *printBuffer;
1206             size_t size;
1207             FILE *stream = open_memstream(&printBuffer, &size);
1208             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1209             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1210             clock_gettime(CLOCK_MONOTONIC, &decodestart);
1211         }
1212
1213         switch (pdu->present) {
1214             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1215                 asnInitiatingRequest(pdu, message, rmrMessageBuffer, &lspan);
1216                 break;
1217             }
1218             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1219                 asnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
1220                 break;
1221             }
1222             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1223                 asnUnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
1224                 break;
1225             }
1226             default:
1227                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1228                 break;
1229         }
1230         if (loglevel >= MDCLOG_DEBUG) {
1231             clock_gettime(CLOCK_MONOTONIC, &end);
1232             mdclog_write(MDCLOG_DEBUG,
1233                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1234                          peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1235
1236         }
1237         numOfMessages++;
1238         // remove the break for EAGAIN
1239         //break;
1240         if (pdu != nullptr) {
1241             //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
1242             //ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1243             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1244             pdu = nullptr;
1245         }
1246         //clock_gettime(CLOCK_MONOTONIC, &start);
1247     }
1248     // in case of break to avoid memory leak
1249     if (pdu != nullptr) {
1250         ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1251         pdu = nullptr;
1252     }
1253
1254     if (done) {
1255         if (loglevel >= MDCLOG_INFO) {
1256             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", peerInfo->fileDescriptor);
1257         }
1258         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1259                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1260                         256,
1261                         "%s|CU disconnected unexpectedly",
1262                         peerInfo->enodbName);
1263         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1264
1265         if (sendRequestToXapp(message,
1266                               RIC_SCTP_CONNECTION_FAILURE,
1267                               rmrMessageBuffer,
1268                               &lspan) != 0) {
1269             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1270         }
1271
1272         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1273         close(peerInfo->fileDescriptor);
1274         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap, &lspan);
1275     }
1276     if (loglevel >= MDCLOG_DEBUG) {
1277         clock_gettime(CLOCK_MONOTONIC, &end);
1278         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1279                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1280
1281     }
1282 #ifdef __TRACING__
1283     lspan->Finish();
1284 #endif
1285
1286     return 0;
1287 }
1288
1289 /**
1290  *
1291  * @param pdu
1292  * @param message
1293  * @param rmrMessageBuffer
1294  * @param pSpan
1295  */
1296 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1297                           ReportingMessages_t &message,
1298                           RmrMessagesBuffer_t &rmrMessageBuffer,
1299                           otSpan *pSpan) {
1300 #ifdef __TRACING__
1301     auto lspan = opentracing::Tracer::Global()->StartSpan(
1302             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1303 #else
1304     otSpan lspan = 0;
1305 #endif
1306
1307     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1308     if (mdclog_level_get() >= MDCLOG_INFO) {
1309         mdclog_write(MDCLOG_INFO, "Initiating message %ld", procedureCode);
1310     }
1311     switch (procedureCode) {
1312         case ProcedureCode_id_x2Setup: {
1313             if (mdclog_level_get() >= MDCLOG_INFO) {
1314                 mdclog_write(MDCLOG_INFO, "Got Setup Initiating  message from CU - %s",
1315                              message.message.enodbName);
1316             }
1317             break;
1318         }
1319         case ProcedureCode_id_endcX2Setup: {
1320             if (mdclog_level_get() >= MDCLOG_INFO) {
1321                 mdclog_write(MDCLOG_INFO, "Got X2 EN-DC Setup Request from CU - %s",
1322                              message.message.enodbName);
1323             }
1324             break;
1325         }
1326         case ProcedureCode_id_ricSubscription: {
1327             if (mdclog_level_get() >= MDCLOG_INFO) {
1328                 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Request message from CU - %s",
1329                              message.message.enodbName);
1330             }
1331             break;
1332         }
1333         case ProcedureCode_id_ricSubscriptionDelete: {
1334             if (mdclog_level_get() >= MDCLOG_INFO) {
1335                 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Delete Request message from CU - %s",
1336                              message.message.enodbName);
1337             }
1338             break;
1339         }
1340         case ProcedureCode_id_endcConfigurationUpdate: {
1341             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1342                 mdclog_write(MDCLOG_ERR, "E2 EN-DC CONFIGURATION UPDATE message failed to send to xAPP");
1343             }
1344             break;
1345         }
1346         case ProcedureCode_id_eNBConfigurationUpdate: {
1347             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1348                 mdclog_write(MDCLOG_ERR, "E2 EN-BC CONFIGURATION UPDATE message failed to send to xAPP");
1349             }
1350             break;
1351         }
1352         case ProcedureCode_id_x2Removal: {
1353             if (mdclog_level_get() >= MDCLOG_INFO) {
1354                 mdclog_write(MDCLOG_INFO, "Got E2 Removal Initiating  message from CU - %s",
1355                              message.message.enodbName);
1356             }
1357             break;
1358         }
1359         case ProcedureCode_id_loadIndication: {
1360             if (sendRequestToXapp(message, RIC_ENB_LOAD_INFORMATION, rmrMessageBuffer, &lspan) != 0) {
1361                 mdclog_write(MDCLOG_ERR, "Load indication message failed to send to xAPP");
1362             }
1363             break;
1364         }
1365         case ProcedureCode_id_resourceStatusReportingInitiation: {
1366             if (mdclog_level_get() >= MDCLOG_INFO) {
1367                 mdclog_write(MDCLOG_INFO, "Got Status reporting initiation message from CU - %s",
1368                              message.message.enodbName);
1369             }
1370             break;
1371         }
1372         case ProcedureCode_id_resourceStatusReporting: {
1373             if (sendRequestToXapp(message, RIC_RESOURCE_STATUS_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1374                 mdclog_write(MDCLOG_ERR, "Resource Status Reporting message failed to send to xAPP");
1375             }
1376             break;
1377         }
1378         case ProcedureCode_id_reset: {
1379             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer, &lspan) != 0) {
1380                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1381             }
1382             break;
1383         }
1384         case ProcedureCode_id_ricIndication: {
1385             for (int i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1386                 auto messageSent = false;
1387                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1388                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1389                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1390                 }
1391                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1392                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1393                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1394                     }
1395                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1396                         static unsigned char tx[32];
1397                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1398                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1399                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1400                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1401                                 (unsigned char *)message.message.enodbName,
1402                                 strlen(message.message.enodbName));
1403                         rmrMessageBuffer.sendMessage->state = 0;
1404                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1405                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1406                             mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1407                                     rmrMessageBuffer.sendMessage->sub_id,
1408                                     rmrMessageBuffer.sendMessage->mtype);
1409                         }
1410                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1411                         messageSent = true;
1412                     } else {
1413                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1414                     }
1415                 }
1416                 if (messageSent) {
1417                     break;
1418                 }
1419             }
1420             break;
1421         }
1422         case ProcedureCode_id_errorIndication: {
1423             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1424                 mdclog_write(MDCLOG_ERR, "Error Indication message failed to send to xAPP");
1425             }
1426             break;
1427         }
1428         case ProcedureCode_id_ricServiceUpdate : {
1429             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1430                 mdclog_write(MDCLOG_ERR, "Service Update message failed to send to xAPP");
1431             }
1432             break;
1433         }
1434         case ProcedureCode_id_gNBStatusIndication : {
1435             if (sendRequestToXapp(message, RIC_GNB_STATUS_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1436                 mdclog_write(MDCLOG_ERR, "RIC_GNB_STATUS_INDICATION failed to send to xAPP");
1437             }
1438             break;
1439         }
1440         default: {
1441             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1442             message.message.messageType = 0; // no RMR message type yet
1443
1444             buildJsonMessage(message);
1445
1446             break;
1447         }
1448     }
1449 #ifdef __TRACING__
1450     lspan->Finish();
1451 #endif
1452
1453 }
1454
1455 /**
1456  *
1457  * @param pdu
1458  * @param message
1459  * @param sctpMap
1460  * @param rmrMessageBuffer
1461  * @param pSpan
1462  */
1463 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, Sctp_Map_t *sctpMap,
1464                       RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
1465 #ifdef __TRACING__
1466     auto lspan = opentracing::Tracer::Global()->StartSpan(
1467             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1468 #else
1469     otSpan lspan = 0;
1470 #endif
1471     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1472     if (mdclog_level_get() >= MDCLOG_INFO) {
1473         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1474     }
1475     switch (procedureCode) {
1476         case ProcedureCode_id_x2Setup: {
1477             if (mdclog_level_get() >= MDCLOG_INFO) {
1478                 mdclog_write(MDCLOG_INFO, "Got Succesful Setup response from CU - %s",
1479                              message.message.enodbName);
1480             }
1481             if (sendResponseToXapp(message, RIC_X2_SETUP_RESP,
1482                                    RIC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1483                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful Setup response for CU - %s",
1484                              message.message.enodbName);
1485             }
1486             break;
1487         }
1488         case ProcedureCode_id_endcX2Setup: { //X2_EN_DC_SETUP_REQUEST_FROM_CU
1489             if (mdclog_level_get() >= MDCLOG_INFO) {
1490                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC Setup response from CU - %s",
1491                              message.message.enodbName);
1492             }
1493             if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_RESP,
1494                                    RIC_ENDC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1495                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful X2 EN DC Setup response for CU - %s",
1496                              message.message.enodbName);
1497             }
1498             break;
1499         }
1500         case ProcedureCode_id_endcConfigurationUpdate: {
1501             if (mdclog_level_get() >= MDCLOG_INFO) {
1502                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC CONFIGURATION UPDATE from CU - %s",
1503                              message.message.enodbName);
1504             }
1505             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1506                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 EN DC CONFIGURATION response for CU - %s",
1507                              message.message.enodbName);
1508             }
1509             break;
1510         }
1511         case ProcedureCode_id_eNBConfigurationUpdate: {
1512             if (mdclog_level_get() >= MDCLOG_INFO) {
1513                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 ENB CONFIGURATION UPDATE from CU - %s",
1514                              message.message.enodbName);
1515             }
1516             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1517                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 ENB CONFIGURATION response for CU - %s",
1518                              message.message.enodbName);
1519             }
1520             break;
1521         }
1522         case ProcedureCode_id_reset: {
1523             if (sendRequestToXapp(message, RIC_X2_RESET_RESP, rmrMessageBuffer, &lspan) != 0) {
1524                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2_RESET response for CU - %s",
1525                              message.message.enodbName);
1526             }
1527             break;
1528
1529         }
1530         case ProcedureCode_id_resourceStatusReportingInitiation: {
1531             if (sendRequestToXapp(message, RIC_RES_STATUS_RESP, rmrMessageBuffer, &lspan) != 0) {
1532                 mdclog_write(MDCLOG_ERR,
1533                              "Failed to send Succesful 2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1534                              message.message.enodbName);
1535             }
1536             break;
1537         }
1538         case ProcedureCode_id_ricSubscription: {
1539             if (mdclog_level_get() >= MDCLOG_INFO) {
1540                 mdclog_write(MDCLOG_INFO, "Got Succesful RIC Subscription response from CU - %s",
1541                              message.message.enodbName);
1542             }
1543             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer, &lspan) != 0) {
1544                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1545             }
1546             break;
1547
1548         }
1549         case ProcedureCode_id_ricSubscriptionDelete: {
1550             if (mdclog_level_get() >= MDCLOG_INFO) {
1551                 mdclog_write(MDCLOG_INFO,
1552                              "Got Succesful RIC Subscription Delete response from CU - %s",
1553                              message.message.enodbName);
1554             }
1555             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer, &lspan) != 0) {
1556                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1557             }
1558             break;
1559         }
1560         case ProcedureCode_id_ricControl: {
1561             if (mdclog_level_get() >= MDCLOG_INFO) {
1562                 mdclog_write(MDCLOG_INFO,
1563                              "Got Succesful RIC control response from CU - %s",
1564                              message.message.enodbName);
1565             }
1566             for (int i = 0;
1567                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1568                 auto messageSent = false;
1569                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1570                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1571                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1572                 }
1573                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1574                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1575                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1576                     }
1577                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1578                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1579                         rmrMessageBuffer.sendMessage->state = 0;
1580                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1581                         static unsigned char tx[32];
1582                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1583                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1584                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1585                                 (unsigned char *)message.message.enodbName,
1586                                 strlen(message.message.enodbName));
1587
1588                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1589                         messageSent = true;
1590                     } else {
1591                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1592                     }
1593                 }
1594                 if (messageSent) {
1595                     break;
1596                 }
1597             }
1598             break;
1599         }
1600         default: {
1601             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1602             message.message.messageType = 0; // no RMR message type yet
1603             buildJsonMessage(message);
1604
1605             break;
1606         }
1607     }
1608 #ifdef __TRACING__
1609     lspan->Finish();
1610 #endif
1611
1612 }
1613
1614 /**
1615  *
1616  * @param pdu
1617  * @param message
1618  * @param sctpMap
1619  * @param rmrMessageBuffer
1620  * @param pSpan
1621  */
1622 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1623                         ReportingMessages_t &message,
1624                         Sctp_Map_t *sctpMap,
1625                         RmrMessagesBuffer_t &rmrMessageBuffer,
1626                         otSpan *pSpan) {
1627 #ifdef __TRACING__
1628     auto lspan = opentracing::Tracer::Global()->StartSpan(
1629             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1630 #else
1631     otSpan lspan = 0;
1632 #endif
1633     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1634     if (mdclog_level_get() >= MDCLOG_INFO) {
1635         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1636     }
1637     switch (procedureCode) {
1638         case ProcedureCode_id_x2Setup: {
1639             if (mdclog_level_get() >= MDCLOG_INFO) {
1640                 mdclog_write(MDCLOG_INFO,
1641                              "Got Unsuccessful Setup response from CU - %s",
1642                              message.message.enodbName);
1643             }
1644             if (sendResponseToXapp(message,
1645                                    RIC_X2_SETUP_FAILURE, RIC_X2_SETUP_REQ,
1646                                    rmrMessageBuffer,
1647                                    sctpMap,
1648                                    &lspan) != 0) {
1649                 mdclog_write(MDCLOG_ERR,
1650                              "Failed to send Unsuccessful Setup response for CU - %s",
1651                              message.message.enodbName);
1652                 break;
1653             }
1654             break;
1655         }
1656         case ProcedureCode_id_endcX2Setup: {
1657             if (mdclog_level_get() >= MDCLOG_INFO) {
1658                 mdclog_write(MDCLOG_INFO,
1659                              "Got Unsuccessful E2 EN-DC Setup response from CU - %s",
1660                              message.message.enodbName);
1661             }
1662             if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_FAILURE,
1663                                    RIC_ENDC_X2_SETUP_REQ,
1664                                    rmrMessageBuffer,
1665                                    sctpMap,
1666                                    &lspan) != 0) {
1667                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC Setup response for CU - %s",
1668                              message.message.enodbName);
1669             }
1670             break;
1671         }
1672         case ProcedureCode_id_endcConfigurationUpdate: {
1673             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1674                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC CONFIGURATION response for CU - %s",
1675                              message.message.enodbName);
1676             }
1677             break;
1678         }
1679         case ProcedureCode_id_eNBConfigurationUpdate: {
1680             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1681                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 ENB CONFIGURATION response for CU - %s",
1682                              message.message.enodbName);
1683             }
1684             break;
1685         }
1686         case ProcedureCode_id_resourceStatusReportingInitiation: {
1687             if (sendRequestToXapp(message, RIC_RES_STATUS_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1688                 mdclog_write(MDCLOG_ERR,
1689                              "Failed to send Succesful E2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1690                              message.message.enodbName);
1691             }
1692             break;
1693         }
1694         case ProcedureCode_id_ricSubscription: {
1695             if (mdclog_level_get() >= MDCLOG_INFO) {
1696                 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Response from CU - %s",
1697                              message.message.enodbName);
1698             }
1699             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1700                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1701             }
1702             break;
1703         }
1704         case ProcedureCode_id_ricSubscriptionDelete: {
1705             if (mdclog_level_get() >= MDCLOG_INFO) {
1706                 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Delete Response from CU - %s",
1707                              message.message.enodbName);
1708             }
1709             if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1710                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1711             }
1712             break;
1713         }
1714         case ProcedureCode_id_ricControl: {
1715             if (mdclog_level_get() >= MDCLOG_INFO) {
1716                 mdclog_write(MDCLOG_INFO, "Got UNSuccesful RIC control response from CU - %s",
1717                              message.message.enodbName);
1718             }
1719             for (int i = 0;
1720                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1721                 auto messageSent = false;
1722                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1723                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1724                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1725                 }
1726                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1727                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1728                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1729                     }
1730                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1731                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1732                         rmrMessageBuffer.sendMessage->state = 0;
1733                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1734                         static unsigned char tx[32];
1735                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1736                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1737                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1738                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1739                         messageSent = true;
1740                     } else {
1741                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1742                     }
1743                 }
1744                 if (messageSent) {
1745                     break;
1746                 }
1747             }
1748             break;
1749         }
1750         default: {
1751             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1752             message.message.messageType = 0; // no RMR message type yet
1753
1754             buildJsonMessage(message);
1755
1756             break;
1757         }
1758     }
1759 #ifdef __TRACING__
1760     lspan->Finish();
1761 #endif
1762
1763 }
1764
1765 /**
1766  *
1767  * @param message
1768  * @param requestId
1769  * @param rmrMmessageBuffer
1770  * @param pSpan
1771  * @return
1772  */
1773 int sendRequestToXapp(ReportingMessages_t &message,
1774                       int requestId,
1775                       RmrMessagesBuffer_t &rmrMmessageBuffer,
1776                       otSpan *pSpan) {
1777 #ifdef __TRACING__
1778     auto lspan = opentracing::Tracer::Global()->StartSpan(
1779             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1780 #else
1781     otSpan lspan = 0;
1782 #endif
1783     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1784                    (unsigned char *)message.message.enodbName,
1785                    strlen(message.message.enodbName));
1786     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1787     rmrMmessageBuffer.sendMessage->state = 0;
1788     static unsigned char tx[32];
1789     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1790     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1791
1792     auto rc = sendRmrMessage(rmrMmessageBuffer, message, &lspan);
1793 #ifdef __TRACING__
1794     lspan->Finish();
1795 #endif
1796
1797     return rc;
1798 }
1799
1800
1801 void getRmrContext(sctp_params_t &pSctpParams, otSpan *pSpan) {
1802 #ifdef __TRACING__
1803     auto lspan = opentracing::Tracer::Global()->StartSpan(
1804             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1805 #else
1806 //    otSpan lspan = 0;
1807 #endif
1808     pSctpParams.rmrCtx = nullptr;
1809     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1810     if (pSctpParams.rmrCtx == nullptr) {
1811         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1812 #ifdef __TRACING__
1813         lspan->Finish();
1814 #endif
1815         return;
1816     }
1817
1818     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
1819     // we need to find that routing table exist and we can run
1820     if (mdclog_level_get() >= MDCLOG_INFO) {
1821         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1822     }
1823     int rmrReady = 0;
1824     int count = 0;
1825     while (!rmrReady) {
1826         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1827             sleep(1);
1828         }
1829         count++;
1830         if (count % 60 == 0) {
1831             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1832         }
1833     }
1834     if (mdclog_level_get() >= MDCLOG_INFO) {
1835         mdclog_write(MDCLOG_INFO, "RMR running");
1836     }
1837 #ifdef __TRACING__
1838     lspan->Finish();
1839 #endif
1840     rmr_init_trace(pSctpParams.rmrCtx, 200);
1841     // get the RMR fd for the epoll
1842     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1843     struct epoll_event event{};
1844     // add RMR fd to epoll
1845     event.events = (EPOLLIN);
1846     event.data.fd = pSctpParams.rmrListenFd;
1847     // add listening RMR FD to epoll
1848     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
1849         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
1850         close(pSctpParams.rmrListenFd);
1851         rmr_close(pSctpParams.rmrCtx);
1852         pSctpParams.rmrCtx = nullptr;
1853     }
1854 }
1855
1856 /**
1857  *
1858  * @param epoll_fd
1859  * @param sctpMap
1860  * @param rmrMessageBuffer
1861  * @param ts
1862  * @param pSpan
1863  * @return
1864  */
1865 int receiveXappMessages(int epoll_fd,
1866                         Sctp_Map_t *sctpMap,
1867                         RmrMessagesBuffer_t &rmrMessageBuffer,
1868                         struct timespec &ts,
1869                         otSpan *pSpan) {
1870 #ifdef __TRACING__
1871     auto lspan = opentracing::Tracer::Global()->StartSpan(
1872             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1873 #else
1874     otSpan lspan = 0;
1875 #endif
1876     if (rmrMessageBuffer.rcvMessage == nullptr) {
1877         //we have error
1878         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1879 #ifdef __TRACING__
1880         lspan->Finish();
1881 #endif
1882
1883         return -1;
1884     }
1885
1886     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1887         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1888     }
1889     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1890     if (rmrMessageBuffer.rcvMessage == nullptr) {
1891         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1892         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1893 #ifdef __TRACING__
1894         lspan->Finish();
1895 #endif
1896
1897         return -2;
1898     }
1899     ReportingMessages_t message;
1900     message.message.direction = 'D';
1901     message.message.time.tv_nsec = ts.tv_nsec;
1902     message.message.time.tv_sec = ts.tv_sec;
1903
1904     // get message payload
1905     //auto msgData = msg->payload;
1906     if (rmrMessageBuffer.rcvMessage->state != 0) {
1907         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1908 #ifdef __TRACING__
1909         lspan->Finish();
1910 #endif
1911
1912         return -1;
1913     }
1914     switch (rmrMessageBuffer.rcvMessage->mtype) {
1915         case RIC_X2_SETUP_REQ: {
1916             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1917                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1918                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1919                 message.message.direction = 'N';
1920                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1921                         snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1922                                 256,
1923                                 "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1924                 rmrMessageBuffer.sendMessage->state = 0;
1925                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1926
1927                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1928                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1929                 }
1930                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1931                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName);
1932
1933                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1934 #ifdef __TRACING__
1935                 lspan->Finish();
1936 #endif
1937                 return -3;
1938             }
1939             break;
1940         }
1941         case RIC_ENDC_X2_SETUP_REQ: {
1942             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1943                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1944                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1945                 message.message.direction = 'N';
1946                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1947                         snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
1948                                  "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1949                 rmrMessageBuffer.sendMessage->state = 0;
1950                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1951
1952                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1953                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1954                 }
1955
1956                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1957                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
1958
1959                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1960 #ifdef __TRACING__
1961                 lspan->Finish();
1962 #endif
1963                 return -3;
1964             }
1965             break;
1966         }
1967         case RIC_ENDC_CONF_UPDATE: {
1968             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1969                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1970 #ifdef __TRACING__
1971                 lspan->Finish();
1972 #endif
1973                 return -4;
1974             }
1975             break;
1976         }
1977         case RIC_ENDC_CONF_UPDATE_ACK: {
1978             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1979                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_ACK");
1980 #ifdef __TRACING__
1981                 lspan->Finish();
1982 #endif
1983                 return -4;
1984             }
1985             break;
1986         }
1987         case RIC_ENDC_CONF_UPDATE_FAILURE: {
1988             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1989                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_FAILURE");
1990 #ifdef __TRACING__
1991                 lspan->Finish();
1992 #endif
1993
1994                 return -4;
1995             }
1996             break;
1997         }
1998         case RIC_ENB_CONF_UPDATE: {
1999             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2000                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
2001 #ifdef __TRACING__
2002                 lspan->Finish();
2003 #endif
2004                 return -4;
2005             }
2006             break;
2007         }
2008         case RIC_ENB_CONF_UPDATE_ACK: {
2009             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2010                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_ACK");
2011 #ifdef __TRACING__
2012                 lspan->Finish();
2013 #endif
2014                 return -4;
2015             }
2016             break;
2017         }
2018         case RIC_ENB_CONF_UPDATE_FAILURE: {
2019             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2020                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_FAILURE");
2021 #ifdef __TRACING__
2022                 lspan->Finish();
2023 #endif
2024                 return -4;
2025             }
2026             break;
2027         }
2028         case RIC_RES_STATUS_REQ: {
2029             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2030                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_RES_STATUS_REQ");
2031 #ifdef __TRACING__
2032                 lspan->Finish();
2033 #endif
2034                 return -6;
2035             }
2036             break;
2037         }
2038         case RIC_SUB_REQ: {
2039             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2040                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
2041 #ifdef __TRACING__
2042                 lspan->Finish();
2043 #endif
2044                 return -6;
2045             }
2046             break;
2047         }
2048         case RIC_SUB_DEL_REQ: {
2049             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2050                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
2051 #ifdef __TRACING__
2052                 lspan->Finish();
2053 #endif
2054                 return -6;
2055             }
2056             break;
2057         }
2058         case RIC_CONTROL_REQ: {
2059             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2060                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
2061 #ifdef __TRACING__
2062                 lspan->Finish();
2063 #endif
2064                 return -6;
2065             }
2066             break;
2067         }
2068         case RIC_SERVICE_QUERY: {
2069             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2070                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
2071 #ifdef __TRACING__
2072                 lspan->Finish();
2073 #endif
2074                 return -6;
2075             }
2076             break;
2077         }
2078         case RIC_SERVICE_UPDATE_ACK: {
2079             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2080                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
2081 #ifdef __TRACING__
2082                 lspan->Finish();
2083 #endif
2084                 return -6;
2085             }
2086             break;
2087         }
2088         case RIC_SERVICE_UPDATE_FAILURE: {
2089             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2090                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
2091 #ifdef __TRACING__
2092                 lspan->Finish();
2093 #endif
2094                 return -6;
2095             }
2096             break;
2097         }
2098         case RIC_X2_RESET: {
2099             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2100                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
2101 #ifdef __TRACING__
2102                 lspan->Finish();
2103 #endif
2104                 return -6;
2105             }
2106             break;
2107         }
2108         case RIC_X2_RESET_RESP: {
2109             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2110                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
2111 #ifdef __TRACING__
2112                 lspan->Finish();
2113 #endif
2114                 return -6;
2115             }
2116             break;
2117         }
2118         case RIC_SCTP_CLEAR_ALL: {
2119             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
2120             // loop on all keys and close socket and then erase all map.
2121             vector<char *> v;
2122             sctpMap->getKeys(v);
2123             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
2124                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
2125                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
2126                     if (peerInfo == nullptr) {
2127                         continue;
2128                     }
2129                     close(peerInfo->fileDescriptor);
2130                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
2131                     message.message.direction = 'D';
2132                     message.message.time.tv_nsec = ts.tv_nsec;
2133                     message.message.time.tv_sec = ts.tv_sec;
2134
2135                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2136                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2137                                                                    256,
2138                                                                    "%s|RIC_SCTP_CLEAR_ALL",
2139                                                                    peerInfo->enodbName);
2140                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2141                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
2142                     if (sendRequestToXapp(message,
2143                                           RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
2144                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
2145                     }
2146                     free(peerInfo);
2147                 }
2148             }
2149
2150             sleep(1);
2151             sctpMap->clear();
2152             break;
2153         }
2154         case E2_TERM_KEEP_ALIVE_REQ: {
2155             // send message back
2156             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2157                     (unsigned char *)rmrMessageBuffer.ka_message,
2158                     rmrMessageBuffer.ka_message_len);
2159             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
2160             rmrMessageBuffer.sendMessage->state = 0;
2161             static unsigned char tx[32];
2162             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2163             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2164             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2165             if (rmrMessageBuffer.sendMessage == nullptr) {
2166                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2167                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP");
2168             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
2169                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d", rmrMessageBuffer.sendMessage->state);
2170             } else if (mdclog_level_get() >= MDCLOG_INFO) {
2171                 mdclog_write(MDCLOG_INFO, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
2172             }
2173
2174             break;
2175         }
2176         default:
2177             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
2178             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2179             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2180             message.message.time.tv_nsec = ts.tv_nsec;
2181             message.message.time.tv_sec = ts.tv_sec;
2182             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2183
2184             buildJsonMessage(message);
2185
2186
2187 #ifdef __TRACING__
2188             lspan->Finish();
2189 #endif
2190             return -7;
2191     }
2192     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2193         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2194     }
2195 #ifdef __TRACING__
2196     lspan->Finish();
2197 #endif
2198     return 0;
2199 }
2200
2201 /**
2202  * Send message to the CU that is not expecting for successful or unsuccessful results
2203  * @param messageBuffer
2204  * @param message
2205  * @param failedMsgId
2206  * @param sctpMap
2207  * @param pSpan
2208  * @return
2209  */
2210 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2211                            ReportingMessages_t &message,
2212                            int failedMsgId,
2213                            Sctp_Map_t *sctpMap,
2214                            otSpan *pSpan) {
2215 #ifdef __TRACING__
2216     auto lspan = opentracing::Tracer::Global()->StartSpan(
2217             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2218 #else
2219     otSpan lspan = 0;
2220 #endif
2221
2222     getRequestMetaData(message, messageBuffer, &lspan);
2223     if (mdclog_level_get() >= MDCLOG_INFO) {
2224         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2225     }
2226
2227     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId, &lspan);
2228 #ifdef __TRACING__
2229     lspan->Finish();
2230 #endif
2231
2232     return rc;
2233 }
2234
2235 /**
2236  *
2237  * @param sctpMap
2238  * @param messageBuffer
2239  * @param message
2240  * @param failedMesgId
2241  * @param pSpan
2242  * @return
2243  */
2244 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2245                     RmrMessagesBuffer_t &messageBuffer,
2246                     ReportingMessages_t &message,
2247                     int failedMesgId,
2248                     otSpan *pSpan) {
2249 #ifdef __TRACING__
2250     auto lspan = opentracing::Tracer::Global()->StartSpan(
2251             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2252 #else
2253     otSpan lspan = 0;
2254 #endif
2255     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2256     if (peerInfo == nullptr) {
2257         if (failedMesgId != 0) {
2258             sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId, &lspan);
2259         } else {
2260             mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2261         }
2262 #ifdef __TRACING__
2263         lspan->Finish();
2264 #endif
2265
2266         return -1;
2267     }
2268
2269     // get the FD
2270     message.message.messageType = messageBuffer.rcvMessage->mtype;
2271     auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2272 #ifdef __TRACING__
2273     lspan->Finish();
2274 #endif
2275
2276     return rc;
2277 }
2278
2279 /**
2280  *
2281  * @param rmrCtx the rmr context to send and receive
2282  * @param msg the msg we got fromxApp
2283  * @param metaData data from xApp in ordered struct
2284  * @param failedMesgId the return message type error
2285  */
2286 void
2287 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId,
2288                                otSpan *pSpan) {
2289 #ifdef __TRACING__
2290     auto lspan = opentracing::Tracer::Global()->StartSpan(
2291             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2292 #else
2293     otSpan lspan = 0;
2294 #endif
2295     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
2296     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
2297                         message.message.enodbName);
2298     if (mdclog_level_get() >= MDCLOG_INFO) {
2299         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
2300     }
2301     msg->mtype = failedMesgId;
2302     msg->state = 0;
2303
2304     static unsigned char tx[32];
2305     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2306     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
2307
2308     sendRmrMessage(rmrMessageBuffer, message, &lspan);
2309 #ifdef __TRACING__
2310     lspan->Finish();pLogSink
2311 #endif
2312
2313 }
2314
2315 /**
2316  * Send Response back to xApp, message is used only when there was a request from the xApp
2317  *
2318  * @param enodbName the name of the gNb/eNodeB
2319  * @param msgType  the value of the message to the xApp
2320  * @param requestType The request that was sent by the xAPP
2321  * @param rmrCtx the rmr identifier
2322  * @param sctpMap hash map holds data on the requestrs
2323  * @param buf  the buffer to send to xAPP
2324  * @param size size of the buffer to send
2325  * @return
2326  */
2327 int sendResponseToXapp(ReportingMessages_t &message,
2328                        int msgType,
2329                        int requestType,
2330                        RmrMessagesBuffer_t &rmrMessageBuffer,
2331                        Sctp_Map_t *sctpMap,
2332                        otSpan *pSpan) {
2333 #ifdef __TRACING__
2334     auto lspan = opentracing::Tracer::Global()->StartSpan(
2335             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2336 #else
2337     otSpan lspan = 0;
2338 #endif
2339     char key[MAX_ENODB_NAME_SIZE * 2];
2340     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, requestType);
2341
2342     auto xact = sctpMap->find(key);
2343     if (xact == nullptr) {
2344         mdclog_write(MDCLOG_ERR, "NO Request %s found for this response from CU: %s", key,
2345                      message.message.enodbName);
2346 #ifdef __TRACING__
2347         lspan->Finish();
2348 #endif
2349
2350         return -1;
2351     }
2352     sctpMap->erase(key);
2353
2354     message.message.messageType = rmrMessageBuffer.sendMessage->mtype = msgType; //SETUP_RESPONSE_MESSAGE_TYPE;
2355     rmr_bytes2payload(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.asndata,
2356                       message.message.asnLength);
2357     rmr_bytes2xact(rmrMessageBuffer.sendMessage, (const unsigned char *)xact, strlen((const char *)xact));
2358     rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
2359     rmrMessageBuffer.sendMessage->state = 0;
2360
2361     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2362         mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2363     }
2364     free(xact);
2365
2366     auto rc = sendRmrMessage(rmrMessageBuffer, message, &lspan);
2367 #ifdef __TRACING__
2368     lspan->Finish();
2369 #endif
2370     return rc;
2371 }
2372
2373 /**
2374  * build the SCTP connection to eNodB or gNb
2375  * @param rmrMessageBuffer
2376  * @param message
2377  * @param epoll_fd
2378  * @param sctpMap
2379  * @param pSpan
2380  * @return
2381  */
2382 int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
2383                         ReportingMessages_t &message,
2384                         int epoll_fd,
2385                         Sctp_Map_t *sctpMap,
2386                         otSpan *pSpan) {
2387 #ifdef __TRACING__
2388     auto lspan = opentracing::Tracer::Global()->StartSpan(
2389             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2390 #else
2391     otSpan lspan = 0;
2392 #endif
2393     struct sockaddr_in6 servaddr{};
2394     struct addrinfo hints{}, *result;
2395     auto msgData = rmrMessageBuffer.rcvMessage->payload;
2396     unsigned char meid[RMR_MAX_MEID]{};
2397     char host[256]{};
2398     uint16_t port = 0;
2399
2400     message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2401     rmr_mbuf_t *msg = rmrMessageBuffer.rcvMessage;
2402     rmr_get_meid(msg, meid);
2403
2404     if (mdclog_level_get() >= MDCLOG_INFO) {
2405         mdclog_write(MDCLOG_INFO, "message %d Received for MEID :%s. SETUP/EN-DC Setup Request from xApp, Message = %s",
2406                      msg->mtype, meid, msgData);
2407     }
2408     if (getSetupRequestMetaData(message, (char *)msgData, host, port, &lspan) < 0) {
2409         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2410             mdclog_write(MDCLOG_DEBUG, "Error in setup parameters %s, %d", __func__, __LINE__);
2411         }
2412 #ifdef __TRACING__
2413         lspan->Finish();
2414 #endif
2415         return -1;
2416     }
2417
2418     //// message asndata points to the start of the asndata of the message and not to start of payload
2419     // search if the same host:port but not the same enodbname
2420     char searchBuff[256]{};
2421     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", host, port);
2422     auto e = (char *)sctpMap->find(searchBuff);
2423     if (e != nullptr) {
2424         // found one compare if not the same
2425         if (strcmp(message.message.enodbName, e) != 0) {
2426             mdclog_write(MDCLOG_ERR,
2427                          "Try to connect CU %s to Host %s but %s already connected",
2428                          message.message.enodbName, host, e);
2429 #ifdef __TRACING__
2430             lspan->Finish();
2431 #endif
2432             return -1;
2433         }
2434     }
2435
2436     // check if not alread connected. if connected send the request and return
2437     auto *peerInfo = (ConnectedCU_t *)sctpMap->find(message.message.enodbName);
2438     if (peerInfo != nullptr) {
2439 //        snprintf(strErr,
2440 //                128,
2441 //                "Device %s already connected please remove and then setup again",
2442 //                message.message.enodbName);
2443         if (mdclog_level_get() >= MDCLOG_INFO) {
2444             mdclog_write(MDCLOG_INFO,
2445                          "Device already connected to %s",
2446                          message.message.enodbName);
2447         }
2448         message.message.messageType = msg->mtype;
2449         auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2450         if (rc != 0) {
2451             mdclog_write(MDCLOG_ERR, "failed write to SCTP %s, %d", __func__, __LINE__);
2452 #ifdef __TRACING__
2453             lspan->Finish();
2454 #endif
2455             return -1;
2456         }
2457
2458         char key[MAX_ENODB_NAME_SIZE * 2];
2459         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2460         int xaction_len = strlen((const char *) msg->xaction);
2461         auto *xaction = (unsigned char *) calloc(1, xaction_len);
2462         memcpy(xaction, msg->xaction, xaction_len);
2463         sctpMap->setkey(key, xaction);
2464         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2465             mdclog_write(MDCLOG_DEBUG, "set key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2466         }
2467 #ifdef __TRACING__
2468         lspan->Finish();
2469 #endif
2470         return 0;
2471     }
2472
2473     peerInfo = (ConnectedCU_t *) calloc(1, sizeof(ConnectedCU_t));
2474     memcpy(peerInfo->enodbName, message.message.enodbName, sizeof(message.message.enodbName));
2475
2476     // new connection
2477     if ((peerInfo->fileDescriptor = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP)) < 0) {
2478         mdclog_write(MDCLOG_ERR, "Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2479 #ifdef __TRACING__
2480         lspan->Finish();
2481 #endif
2482         return -1;
2483     }
2484
2485     auto optval = 1;
2486     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval) != 0) {
2487         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEPORT Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2488 #ifdef __TRACING__
2489         lspan->Finish();
2490 #endif
2491         return -1;
2492     }
2493     optval = 1;
2494     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) != 0) {
2495         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEADDR Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2496 #ifdef __TRACING__
2497         lspan->Finish();
2498 #endif
2499         return -1;
2500     }
2501     servaddr.sin6_family = AF_INET6;
2502
2503     struct sockaddr_in6 localAddr {};
2504     localAddr.sin6_family = AF_INET6;
2505     localAddr.sin6_addr = in6addr_any;
2506     localAddr.sin6_port = htons(SRC_PORT);
2507
2508     if (bind(peerInfo->fileDescriptor, (struct sockaddr*)&localAddr , sizeof(struct sockaddr_in6)) < 0) {
2509         mdclog_write(MDCLOG_ERR, "bind Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2510 #ifdef __TRACING__
2511         lspan->Finish();
2512 #endif
2513         return -1;
2514     }//Ends the binding.
2515
2516     memset(&hints, 0, sizeof hints);
2517     hints.ai_flags = AI_NUMERICHOST;
2518     if (getaddrinfo(host, nullptr, &hints, &result) < 0) {
2519         close(peerInfo->fileDescriptor);
2520         mdclog_write(MDCLOG_ERR, "getaddrinfo error for %s, Error = %s", host, strerror(errno));
2521 #ifdef __TRACING__
2522         lspan->Finish();
2523 #endif
2524         return -1;
2525     }
2526     memcpy(&servaddr, result->ai_addr, sizeof(struct sockaddr_in6));
2527     freeaddrinfo(result);
2528
2529     servaddr.sin6_port = htons(port);      /* daytime server */
2530     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2531         mdclog_write(MDCLOG_DEBUG, "Send Connect FD = %d host : %s port %d",
2532                      peerInfo->fileDescriptor,
2533                      host,
2534                      port);
2535     }
2536
2537     // Add to Epol
2538     if (addToEpoll(epoll_fd, peerInfo, (EPOLLOUT | EPOLLIN | EPOLLET), sctpMap, message.message.enodbName,
2539                    msg->mtype, &lspan) != 0) {
2540 #ifdef __TRACING__
2541         lspan->Finish();
2542 #endif
2543         return -1;
2544     }
2545
2546     char hostBuff[NI_MAXHOST];
2547     char portBuff[NI_MAXHOST];
2548
2549     if (getnameinfo((SA *) &servaddr, sizeof(servaddr),
2550                     hostBuff, sizeof(hostBuff),
2551                     portBuff, sizeof(portBuff),
2552                     (uint) (NI_NUMERICHOST) | (uint) (NI_NUMERICSERV)) != 0) {
2553         mdclog_write(MDCLOG_ERR, "getnameinfo() Error, %s  %s %d", strerror(errno), __func__, __LINE__);
2554 #ifdef __TRACING__
2555         lspan->Finish();
2556 #endif
2557         return -1;
2558     }
2559
2560     if (setSocketNoBlocking(peerInfo->fileDescriptor) != 0) {
2561         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on sctpPort %s", hostBuff,
2562                      portBuff);
2563         close(peerInfo->fileDescriptor);
2564 #ifdef __TRACING__
2565         lspan->Finish();
2566 #endif
2567         return -1;
2568     }
2569
2570     memcpy(peerInfo->hostName, hostBuff, strlen(hostBuff));
2571     peerInfo->hostName[strlen(hostBuff)] = 0;
2572     memcpy(peerInfo->portNumber, portBuff, strlen(portBuff));
2573     peerInfo->portNumber[strlen(portBuff)] = 0;
2574
2575     // map by enoodb/gnb name
2576     sctpMap->setkey(message.message.enodbName, peerInfo);
2577     //map host and port to enodeb
2578     sctpMap->setkey(searchBuff, message.message.enodbName);
2579
2580     // save message for the return values
2581     char key[MAX_ENODB_NAME_SIZE * 2];
2582     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2583     int xaction_len = strlen((const char *) msg->xaction);
2584     auto *xaction = (unsigned char *) calloc(1, xaction_len);
2585     memcpy(xaction, msg->xaction, xaction_len);
2586     sctpMap->setkey(key, xaction);
2587     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2588         mdclog_write(MDCLOG_DEBUG, "End building peerinfo: %s for CU %s", key, message.message.enodbName);
2589     }
2590
2591     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2592         mdclog_write(MDCLOG_DEBUG, "Send connect to FD %d, %s, %d",
2593                      peerInfo->fileDescriptor, __func__, __LINE__);
2594     }
2595     if (connect(peerInfo->fileDescriptor, (SA *) &servaddr, sizeof(servaddr)) < 0) {
2596         if (errno != EINPROGRESS) {
2597             mdclog_write(MDCLOG_ERR, "connect FD %d to host : %s port %d, %s",
2598                          peerInfo->fileDescriptor, host, port, strerror(errno));
2599             close(peerInfo->fileDescriptor);
2600 #ifdef __TRACING__
2601             lspan->Finish();
2602 #endif
2603             return -1;
2604         }
2605         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2606             mdclog_write(MDCLOG_DEBUG,
2607                          "Connect to FD %d returned with EINPROGRESS : %s",
2608                          peerInfo->fileDescriptor, strerror(errno));
2609         }
2610         // since message.message.asndata is pointing to the asndata in the rmr message payload we copy it like this
2611         memcpy(peerInfo->asnData, message.message.asndata, message.message.asnLength);
2612         peerInfo->asnLength = message.message.asnLength;
2613         peerInfo->mtype = msg->mtype;
2614 #ifdef __TRACING__
2615         lspan->Finish();
2616 #endif
2617         return 0;
2618     }
2619
2620     if (mdclog_level_get() >= MDCLOG_INFO) {
2621         mdclog_write(MDCLOG_INFO, "Connect to FD %d returned OK without EINPROGRESS", peerInfo->fileDescriptor);
2622     }
2623
2624     peerInfo->isConnected = true;
2625
2626     if (modifyToEpoll(epoll_fd, peerInfo, (EPOLLIN | EPOLLET), sctpMap, message.message.enodbName, msg->mtype,
2627                       &lspan) != 0) {
2628 #ifdef __TRACING__
2629         lspan->Finish();
2630 #endif
2631         return -1;
2632     }
2633
2634     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2635         mdclog_write(MDCLOG_DEBUG, "Connected to host : %s port %d", host, port);
2636     }
2637
2638     message.message.messageType = msg->mtype;
2639     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2640         mdclog_write(MDCLOG_DEBUG, "Send SCTP message to FD %d", peerInfo->fileDescriptor);
2641     }
2642     if (sendSctpMsg(peerInfo, message, sctpMap, &lspan) != 0) {
2643         mdclog_write(MDCLOG_ERR, "Error write to SCTP  %s %d", __func__, __LINE__);
2644 #ifdef __TRACING__
2645         lspan->Finish();
2646 #endif
2647         return -1;
2648     }
2649     memset(peerInfo->asnData, 0, message.message.asnLength);
2650     peerInfo->asnLength = 0;
2651     peerInfo->mtype = 0;
2652
2653     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2654         mdclog_write(MDCLOG_DEBUG, "Sent message to SCTP for %s", message.message.enodbName);
2655     }
2656 #ifdef __TRACING__
2657     lspan->Finish();
2658 #endif
2659     return 0;
2660 }
2661
2662 /**
2663  *
2664  * @param epoll_fd
2665  * @param peerInfo
2666  * @param events
2667  * @param sctpMap
2668  * @param enodbName
2669  * @param msgType
2670  * @param pSpan
2671  * @return
2672  */
2673 int addToEpoll(int epoll_fd,
2674                ConnectedCU_t *peerInfo,
2675                uint32_t events,
2676                Sctp_Map_t *sctpMap,
2677                char *enodbName,
2678                int msgType,
2679                otSpan *pSpan) {
2680 #ifdef __TRACING__
2681     auto lspan = opentracing::Tracer::Global()->StartSpan(
2682             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2683 #else
2684     otSpan lspan = 0;
2685 #endif
2686     // Add to Epol
2687     struct epoll_event event{};
2688     event.data.ptr = peerInfo;
2689     event.events = events;
2690     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2691         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2692             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2693                          strerror(errno), __func__, __LINE__);
2694         }
2695         close(peerInfo->fileDescriptor);
2696         cleanHashEntry(peerInfo, sctpMap, &lspan);
2697         char key[MAX_ENODB_NAME_SIZE * 2];
2698         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2699         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2700             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2701         }
2702         auto tmp = sctpMap->find(key);
2703         if (tmp) {
2704             free(tmp);
2705         }
2706         sctpMap->erase(key);
2707         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2708 #ifdef __TRACING__
2709         lspan->Finish();
2710 #endif
2711         return -1;
2712     }
2713 #ifdef __TRACING__
2714     lspan->Finish();
2715 #endif
2716     return 0;
2717 }
2718
2719 /**
2720  *
2721  * @param epoll_fd
2722  * @param peerInfo
2723  * @param events
2724  * @param sctpMap
2725  * @param enodbName
2726  * @param msgType
2727  * @param pSpan
2728  * @return
2729  */
2730 int modifyToEpoll(int epoll_fd,
2731                   ConnectedCU_t *peerInfo,
2732                   uint32_t events,
2733                   Sctp_Map_t *sctpMap,
2734                   char *enodbName,
2735                   int msgType,
2736                   otSpan *pSpan) {
2737 #ifdef __TRACING__
2738     auto lspan = opentracing::Tracer::Global()->StartSpan(
2739             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2740 #else
2741     otSpan lspan = 0;
2742 #endif
2743     // Add to Epol
2744     struct epoll_event event{};
2745     event.data.ptr = peerInfo;
2746     event.events = events;
2747     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2748         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2749             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2750                          strerror(errno), __func__, __LINE__);
2751         }
2752         close(peerInfo->fileDescriptor);
2753         cleanHashEntry(peerInfo, sctpMap, &lspan);
2754         char key[MAX_ENODB_NAME_SIZE * 2];
2755         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2756         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2757             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2758         }
2759         auto tmp = sctpMap->find(key);
2760         if (tmp) {
2761             free(tmp);
2762         }
2763         sctpMap->erase(key);
2764         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2765 #ifdef __TRACING__
2766         lspan->Finish();
2767 #endif
2768         return -1;
2769     }
2770 #ifdef __TRACING__
2771     lspan->Finish();
2772 #endif
2773     return 0;
2774 }
2775
2776
2777 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan) {
2778 #ifdef __TRACING__
2779     auto lspan = opentracing::Tracer::Global()->StartSpan(
2780             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2781 #else
2782 //    otSpan lspan = 0;
2783 #endif
2784     //serialize the span
2785 #ifdef __TRACING__
2786     std::unordered_map<std::string, std::string> data;
2787     RICCarrierWriter carrier(data);
2788     opentracing::Tracer::Global()->Inject((lspan.get())->context(), carrier);
2789     nlohmann::json j = data;
2790     std::string str = j.dump();
2791     static auto maxTraceLength = 0;
2792
2793     maxTraceLength = str.length() > maxTraceLength ? str.length() : maxTraceLength;
2794     // serialized context can be put to RMR message using function:
2795     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2796         mdclog_write(MDCLOG_DEBUG, "max trace length is %d trace data length = %ld data = %s", maxTraceLength,
2797                      str.length(), str.c_str());
2798     }
2799     rmr_set_trace(rmrMessageBuffer.sendMessage, (const unsigned char *) str.c_str(), str.length());
2800 #endif
2801     buildJsonMessage(message);
2802
2803     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2804
2805     if (rmrMessageBuffer.sendMessage == nullptr) {
2806         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2807         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2808 #ifdef __TRACING__
2809         lspan->Finish();
2810 #endif
2811         return -1;
2812     }
2813
2814     if (rmrMessageBuffer.sendMessage->state != 0) {
2815         char meid[RMR_MAX_MEID]{};
2816         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2817             usleep(5);
2818             rmrMessageBuffer.sendMessage->state = 0;
2819             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2820                          rmrMessageBuffer.sendMessage->mtype,
2821                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2822             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2823             if (rmrMessageBuffer.sendMessage == nullptr) {
2824                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2825                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2826 #ifdef __TRACING__
2827                 lspan->Finish();
2828 #endif
2829                 return -1;
2830             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2831                 mdclog_write(MDCLOG_ERR,
2832                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2833                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2834                              rmrMessageBuffer.sendMessage->mtype,
2835                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2836                 auto rc = rmrMessageBuffer.sendMessage->state;
2837 #ifdef __TRACING__
2838                 lspan->Finish();
2839 #endif
2840                 return rc;
2841             }
2842         } else {
2843             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2844                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2845                          rmrMessageBuffer.sendMessage->mtype,
2846                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2847 #ifdef __TRACING__
2848             lspan->Finish();
2849 #endif
2850             return rmrMessageBuffer.sendMessage->state;
2851         }
2852     }
2853     return 0;
2854 }
2855
2856 void buildJsonMessage(ReportingMessages_t &message) {
2857     if (jsonTrace) {
2858         message.outLen = sizeof(message.base64Data);
2859         base64::encode((const unsigned char *) message.message.asndata,
2860                        (const int) message.message.asnLength,
2861                        message.base64Data,
2862                        message.outLen);
2863         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2864             mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2865                          (int) message.message.asnLength,
2866                          (int) message.outLen);
2867         }
2868
2869         snprintf(message.buffer, sizeof(message.buffer),
2870                                      "{\"header\": {\"ts\": \"%ld.%09ld\","
2871                                      "\"ranName\": \"%s\","
2872                                      "\"messageType\": %d,"
2873                                      "\"direction\": \"%c\"},"
2874                                      "\"base64Length\": %d,"
2875                                      "\"asnBase64\": \"%s\"}",
2876                                      message.message.time.tv_sec,
2877                                      message.message.time.tv_nsec,
2878                                      message.message.enodbName,
2879                                      message.message.messageType,
2880                                      message.message.direction,
2881                                      (int) message.outLen,
2882                                      message.base64Data);
2883         static src::logger_mt &lg = my_logger::get();
2884
2885         BOOST_LOG(lg) << message.buffer;
2886     }
2887 }
2888
2889
2890 /**
2891  * take RMR error code to string
2892  * @param state
2893  * @return
2894  */
2895 string translateRmrErrorMessages(int state) {
2896     string str = {};
2897     switch (state) {
2898         case RMR_OK:
2899             str = "RMR_OK - state is good";
2900             break;
2901         case RMR_ERR_BADARG:
2902             str = "RMR_ERR_BADARG - argument passd to function was unusable";
2903             break;
2904         case RMR_ERR_NOENDPT:
2905             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2906             break;
2907         case RMR_ERR_EMPTY:
2908             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2909             break;
2910         case RMR_ERR_NOHDR:
2911             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2912             break;
2913         case RMR_ERR_SENDFAILED:
2914             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2915             break;
2916         case RMR_ERR_CALLFAILED:
2917             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2918             break;
2919         case RMR_ERR_NOWHOPEN:
2920             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2921             break;
2922         case RMR_ERR_WHID:
2923             str = "RMR_ERR_WHID - wormhole id was invalid";
2924             break;
2925         case RMR_ERR_OVERFLOW:
2926             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2927             break;
2928         case RMR_ERR_RETRY:
2929             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2930             break;
2931         case RMR_ERR_RCVFAILED:
2932             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2933             break;
2934         case RMR_ERR_TIMEOUT:
2935             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2936             break;
2937         case RMR_ERR_UNSET:
2938             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2939             break;
2940         case RMR_ERR_TRUNC:
2941             str = "RMR_ERR_TRUNC - received message likely truncated";
2942             break;
2943         case RMR_ERR_INITFAILED:
2944             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2945             break;
2946         case RMR_ERR_NOTSUPP:
2947             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2948             break;
2949         default:
2950             char buf[128]{};
2951             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2952             str = buf;
2953             break;
2954     }
2955     return str;
2956 }
2957
2958