1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
16 // This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 // platform project (RICP).
19 // TODO: High-level file comment.
22 #include "sctpThread.h"
25 using namespace std::placeholders;
26 using namespace boost::filesystem;
29 using namespace opentracing;
36 // need to expose without the include of gcov
37 extern "C" void __gcov_flush(void);
39 static void catch_function(int signal) {
45 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
47 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
48 double cpuClock = 0.0;
49 bool jsonTrace = true;
53 mdclog_attr_init(&attr);
54 mdclog_attr_set_ident(attr, "E2Terminator");
56 mdclog_attr_destroy(attr);
58 auto start_time = std::chrono::high_resolution_clock::now();
59 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
62 return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
65 double approx_CPU_MHz(unsigned sleeptime) {
66 using namespace std::chrono_literals;
68 uint64_t cycles_start = rdtscp(aux);
69 double time_start = age();
70 std::this_thread::sleep_for(sleeptime * 1ms);
71 uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
72 double elapsed_time = age() - time_start;
73 return elapsed_cycles / elapsed_time;
76 //std::atomic<int64_t> rmrCounter{0};
77 std::atomic<int64_t> num_of_messages{0};
78 std::atomic<int64_t> num_of_XAPP_messages{0};
79 static long transactionCounter = 0;
82 int main(const int argc, char **argv) {
83 sctp_params_t sctpParams;
88 opentracing::Tracer::InitGlobal(tracelibcpp::createTracer("E2 Terminator"));
89 auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
95 std::random_device device{};
96 std::mt19937 generator(device());
97 std::uniform_int_distribution<long> distribution(1, (long) 1e12);
98 transactionCounter = distribution(generator);
101 uint64_t st = 0,en = 0;
106 unsigned num_cpus = std::thread::hardware_concurrency();
108 mdclog_level_set(MDCLOG_INFO);
110 if (std::signal(SIGINT, catch_function) == SIG_ERR) {
111 mdclog_write(MDCLOG_ERR, "Errir initializing SIGINT");
114 if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
115 mdclog_write(MDCLOG_ERR, "Errir initializing SIGABRT");
118 if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
119 mdclog_write(MDCLOG_ERR, "Errir initializing SIGTERM");
124 cpuClock = approx_CPU_MHz(100);
126 mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
127 auto result = parse(argc, argv, sctpParams);
129 path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
131 const int size = 2048;
132 auto fileSize = file_size(p);
133 if (fileSize > size) {
134 mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
138 mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
144 if (conf.openConfigFile(p.string()) == -1) {
145 mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
146 p.string().c_str(), strerror(errno));
149 int rmrPort = conf.getIntValue("nano");
151 mdclog_write(MDCLOG_ERR, "illigal RMR port ");
154 sctpParams.rmrPort = (uint16_t)rmrPort;
155 snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
157 auto tmpStr = conf.getStringValue("loglevel");
158 if (tmpStr.length() == 0) {
159 mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
162 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
164 if ((tmpStr.compare("debug")) == 0) {
165 sctpParams.logLevel = MDCLOG_DEBUG;
166 } else if ((tmpStr.compare("info")) == 0) {
167 sctpParams.logLevel = MDCLOG_INFO;
168 } else if ((tmpStr.compare("warning")) == 0) {
169 sctpParams.logLevel = MDCLOG_WARN;
170 } else if ((tmpStr.compare("error")) == 0) {
171 sctpParams.logLevel = MDCLOG_ERR;
173 mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
174 sctpParams.logLevel = MDCLOG_INFO;
176 mdclog_level_set(sctpParams.logLevel);
178 tmpStr = conf.getStringValue("volume");
179 if (tmpStr.length() == 0) {
180 mdclog_write(MDCLOG_ERR, "illigal volume.");
184 char tmpLogFilespec[VOLUME_URL_SIZE];
185 tmpLogFilespec[0] = 0;
186 sctpParams.volume[0] = 0;
187 snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
188 // copy the name to temp file as well
189 snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
192 // define the file name in the tmp directory under the volume
193 strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
195 // std::string localIP = conf.getStringValue("local-ip");
196 // if (localIP.length() == 0) {
197 // mdclog_write(MDCLOG_ERR, "illigal local-ip. environment variable");
201 //sctpParams.myIP.assign(getenv(localIP.c_str()));
202 sctpParams.myIP = conf.getStringValue("local-ip");
203 if (sctpParams.myIP.length() == 0) {
204 mdclog_write(MDCLOG_ERR, "illigal local-ip.");
208 sctpParams.myIP = conf.getStringValue("external-fqdn");
209 if (sctpParams.myIP.length() == 0) {
210 mdclog_write(MDCLOG_ERR, "illigal external-fqdn.");
214 std::string pod = conf.getStringValue("pod_name");
215 if (pod.length() == 0) {
216 mdclog_write(MDCLOG_ERR, "illigal pod_name in config file");
219 auto *podName = getenv(pod.c_str());
220 if (podName == nullptr) {
221 mdclog_write(MDCLOG_ERR, "illigal pod_name or environment varible not exists : %s", pod.c_str());
225 sctpParams.podName.assign(podName);
226 if (sctpParams.podName.length() == 0) {
227 mdclog_write(MDCLOG_ERR, "illigal pod_name");
232 tmpStr = conf.getStringValue("trace");
233 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
234 if ((tmpStr.compare("start")) == 0) {
235 mdclog_write(MDCLOG_INFO, "Trace set to: start");
236 sctpParams.trace = true;
237 } else if ((tmpStr.compare("stop")) == 0) {
238 mdclog_write(MDCLOG_INFO, "Trace set to: stop");
239 sctpParams.trace = false;
241 jsonTrace = sctpParams.trace;
245 mdclog_write(MDCLOG_INFO, "start = %lx end = %lx diff = %lx\n", st, en, en - st);
246 mdclog_write(MDCLOG_INFO, "start high = %lx start lo = %lx end high = %lx end lo = %lx\n",
247 st >> 32, st & 0xFFFFFFFF, (int64_t)en >> 32, en & 0xFFFFFFFF);
248 mdclog_write(MDCLOG_INFO, "ellapsed time = %5.9f\n", (double)(en - st)/cpuClock);
250 sctpParams.ka_message_length = snprintf(sctpParams.ka_message, 4096, "{\"address\": \"%s:%d\","
252 "\"pod_name\": \"%s\"}",
253 (const char *)sctpParams.myIP.c_str(),
255 sctpParams.fqdn.c_str(),
256 sctpParams.podName.c_str());
258 if (mdclog_level_get() >= MDCLOG_INFO) {
259 mdclog_mdc_add("RMR Port", to_string(sctpParams.rmrPort).c_str());
260 mdclog_mdc_add("LogLevel", to_string(sctpParams.logLevel).c_str());
261 mdclog_mdc_add("volume", sctpParams.volume);
262 mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
263 mdclog_mdc_add("my ip", sctpParams.myIP.c_str());
264 mdclog_mdc_add("pod name", sctpParams.podName.c_str());
266 mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
270 // Files written to the current working directory
271 boostLogger = logging::add_file_log(
272 keywords::file_name = tmpLogFilespec, // to temp directory
273 keywords::rotation_size = 10 * 1024 * 1024,
274 keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
275 keywords::format = "%Message%"
276 //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
279 // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
280 boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
281 keywords::target = sctpParams.volume
284 // Upon restart, scan the directory for files matching the file_name pattern
285 boostLogger->locked_backend()->scan_for_files();
287 // Enable auto-flushing after each tmpStr record written
288 if (mdclog_level_get() >= MDCLOG_DEBUG) {
289 boostLogger->locked_backend()->auto_flush(true);
293 sctpParams.epoll_fd = epoll_create1(0);
294 if (sctpParams.epoll_fd == -1) {
295 mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
299 getRmrContext(sctpParams, &span);
300 if (sctpParams.rmrCtx == nullptr) {
301 close(sctpParams.epoll_fd);
305 if (buildInotify(sctpParams) == -1) {
306 close(sctpParams.rmrListenFd);
307 rmr_close(sctpParams.rmrCtx);
308 close(sctpParams.epoll_fd);
312 sctpParams.sctpMap = new mapWrapper();
314 std::vector<std::thread> threads(num_cpus);
315 // std::vector<std::thread> threads;
318 for (unsigned int i = 0; i < num_cpus; i++) {
319 threads[i] = std::thread(listener, &sctpParams);
324 int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
326 mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
330 //loop over term_init until first message from xApp
331 handleTermInit(sctpParams);
333 for (auto &t : threads) {
338 opentracing::Tracer::Global()->Close();
343 void handleTermInit(sctp_params_t &sctpParams) {
344 sendTermInit(sctpParams);
345 //send to e2 manager init of e2 term
350 auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
351 if (xappMessages > 0) {
352 if (mdclog_level_get() >= MDCLOG_INFO) {
353 mdclog_write(MDCLOG_INFO, "Got a message from some appliction, stop sending E@_TERM_INIT");
359 if (count % 1000 == 0) {
360 mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
361 sendTermInit(sctpParams);
366 void sendTermInit(sctp_params_t &sctpParams) {
367 rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
370 msg->mtype = E2_TERM_INIT;
372 rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
373 static unsigned char tx[32];
374 auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
375 rmr_bytes2xact(msg, tx, txLen);
376 msg = rmr_send_msg(sctpParams.rmrCtx, msg);
377 if (msg == nullptr) {
378 msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.myIP.length());
379 } else if (msg->state == 0) {
381 if (mdclog_level_get() >= MDCLOG_INFO) {
382 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT succsesfuly sent ");
386 if (count % 100 == 0) {
387 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %d ", msg->state);
403 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
404 cxxopts::Options options(argv[0], "e2 term help");
405 options.positional_help("[optional args]").show_positional_help();
406 options.allow_unrecognised_options().add_options()
407 ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
408 ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
409 ("h,help", "Print help");
411 auto result = options.parse(argc, argv);
413 if (result.count("help")) {
414 std::cout << options.help({""}) << std::endl;
423 * @return -1 failed 0 success
425 int buildInotify(sctp_params_t &sctpParams) {
426 sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
427 if (sctpParams.inotifyFD == -1) {
428 mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
429 close(sctpParams.rmrListenFd);
430 rmr_close(sctpParams.rmrCtx);
431 close(sctpParams.epoll_fd);
435 sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
436 (const char *)sctpParams.configFilePath.c_str(),
438 if (sctpParams.inotifyWD == -1) {
439 mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to inotify (inotify_add_watch) %s",
440 sctpParams.configFilePath.c_str(),
442 close(sctpParams.inotifyFD);
446 struct epoll_event event{};
447 event.events = (EPOLLIN);
448 event.data.fd = sctpParams.inotifyFD;
449 // add listening RMR FD to epoll
450 if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
451 mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
452 close(sctpParams.inotifyFD);
463 void listener(sctp_params_t *params) {
465 auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
469 int num_of_SCTP_messages = 0;
470 auto totalTime = 0.0;
472 mdclog_level_set(params->logLevel);
474 std::thread::id this_id = std::this_thread::get_id();
476 streambuf *oldCout = cout.rdbuf();
477 ostringstream memCout;
479 cout.rdbuf(memCout.rdbuf());
481 //return to the normal cout
485 memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
486 tid[memCout.str().length()] = 0;
487 mdclog_mdc_add("thread id", tid);
489 if (mdclog_level_get() >= MDCLOG_DEBUG) {
490 mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
494 RmrMessagesBuffer_t rmrMessageBuffer{};
495 //create and init RMR
496 rmrMessageBuffer.rmrCtx = params->rmrCtx;
498 auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
499 struct timespec end{0, 0};
500 struct timespec start{0, 0};
502 rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
503 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
505 memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
506 rmrMessageBuffer.ka_message_len = params->ka_message_length;
507 rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
509 if (mdclog_level_get() >= MDCLOG_DEBUG) {
510 mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
513 ReportingMessages_t message {};
515 for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
516 rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
517 rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
521 if (mdclog_level_get() >= MDCLOG_DEBUG) {
522 mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
524 auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
525 if (numOfEvents < 0 && errno == EINTR) {
526 if (mdclog_level_get() >= MDCLOG_DEBUG) {
527 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
531 if (numOfEvents < 0) {
532 mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
535 for (auto i = 0; i < numOfEvents; i++) {
536 if (mdclog_level_get() >= MDCLOG_DEBUG) {
537 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
539 clock_gettime(CLOCK_MONOTONIC, &message.message.time);
540 start.tv_sec = message.message.time.tv_sec;
541 start.tv_nsec = message.message.time.tv_nsec;
544 if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
545 handlepoll_error(events[i], message, rmrMessageBuffer, params, &span);
546 } else if (events[i].events & EPOLLOUT) {
547 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params, &span);
548 } else if (params->rmrListenFd == events[i].data.fd) {
549 // got message from XAPP
550 num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
551 num_of_messages.fetch_add(1, std::memory_order_release);
552 if (mdclog_level_get() >= MDCLOG_DEBUG) {
553 mdclog_write(MDCLOG_DEBUG, "new message from RMR");
555 if (receiveXappMessages(params->epoll_fd,
558 message.message.time,
560 mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
562 } else if (params->inotifyFD == events[i].data.fd) {
563 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
564 handleConfigChange(params);
566 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
567 * We must read whatever data is available completely, as we are running
568 * in edge-triggered mode and won't get a notification again for the same data. */
569 num_of_messages.fetch_add(1, std::memory_order_release);
570 if (mdclog_level_get() >= MDCLOG_DEBUG) {
571 mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
573 receiveDataFromSctp(&events[i],
575 num_of_SCTP_messages,
577 message.message.time,
581 clock_gettime(CLOCK_MONOTONIC, &end);
582 if (mdclog_level_get() >= MDCLOG_INFO) {
583 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
584 ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
586 if (mdclog_level_get() >= MDCLOG_DEBUG) {
587 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
588 end.tv_sec - start.tv_sec,
589 end.tv_nsec - start.tv_nsec);
604 void handleConfigChange(sctp_params_t *sctpParams) {
605 char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
606 const struct inotify_event *event;
609 path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
610 auto endlessLoop = true;
611 while (endlessLoop) {
612 auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
614 if (errno != EAGAIN) {
615 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
625 for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
626 event = (const struct inotify_event *)ptr;
627 if (event->mask & (uint32_t)IN_ISDIR) {
631 // the directory name
632 if (sctpParams->inotifyWD == event->wd) {
636 if (!(sctpParams->configFileName.compare(event->name))) {
640 // only the file we want
641 if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
643 const int size = 2048;
644 auto fileSize = file_size(p);
645 if (fileSize > size) {
646 mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
650 mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
655 if (conf.openConfigFile(p.string()) == -1) {
656 mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
657 p.string().c_str(), strerror(errno));
661 auto tmpStr = conf.getStringValue("loglevel");
662 if (tmpStr.length() == 0) {
663 mdclog_write(MDCLOG_ERR, "illigal loglevel. Set loglevel to MDCLOG_INFO");
666 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
668 if ((tmpStr.compare("debug")) == 0) {
669 mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
670 sctpParams->logLevel = MDCLOG_DEBUG;
671 } else if ((tmpStr.compare("info")) == 0) {
672 mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
673 sctpParams->logLevel = MDCLOG_INFO;
674 } else if ((tmpStr.compare("warning")) == 0) {
675 mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
676 sctpParams->logLevel = MDCLOG_WARN;
677 } else if ((tmpStr.compare("error")) == 0) {
678 mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
679 sctpParams->logLevel = MDCLOG_ERR;
681 mdclog_write(MDCLOG_ERR, "illigal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
682 sctpParams->logLevel = MDCLOG_INFO;
684 mdclog_level_set(sctpParams->logLevel);
687 tmpStr = conf.getStringValue("trace");
688 if (tmpStr.length() == 0) {
689 mdclog_write(MDCLOG_ERR, "illigal trace. Set trace to stop");
693 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
694 if ((tmpStr.compare("start")) == 0) {
695 mdclog_write(MDCLOG_INFO, "Trace set to: start");
696 sctpParams->trace = true;
697 } else if ((tmpStr.compare("stop")) == 0) {
698 mdclog_write(MDCLOG_INFO, "Trace set to: stop");
699 sctpParams->trace = false;
701 mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
702 sctpParams->trace = false;
704 jsonTrace = sctpParams->trace;
715 * @param rmrMessageBuffer
719 void handleEinprogressMessages(struct epoll_event &event,
720 ReportingMessages_t &message,
721 RmrMessagesBuffer_t &rmrMessageBuffer,
722 sctp_params_t *params,
725 auto lspan = opentracing::Tracer::Global()->StartSpan(
726 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
730 auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
731 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
733 mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
735 socklen_t retValLen = 0;
736 auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
737 if (rc != 0 || retVal != 0) {
739 rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
740 "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
741 peerInfo->enodbName, strerror(errno));
742 } else if (retVal != 0) {
743 rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
744 "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
745 peerInfo->enodbName);
748 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
749 message.message.asnLength = rmrMessageBuffer.sendMessage->len;
750 mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
751 message.message.direction = 'N';
752 if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
753 mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
755 memset(peerInfo->asnData, 0, peerInfo->asnLength);
756 peerInfo->asnLength = 0;
764 peerInfo->isConnected = true;
766 if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
767 peerInfo->mtype, &lspan) != 0) {
768 mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
775 message.message.asndata = (unsigned char *)peerInfo->asnData;
776 message.message.asnLength = peerInfo->asnLength;
777 message.message.messageType = peerInfo->mtype;
778 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
779 num_of_messages.fetch_add(1, std::memory_order_release);
780 if (mdclog_level_get() >= MDCLOG_DEBUG) {
781 mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
782 message.message.enodbName);
784 if (sendSctpMsg(peerInfo, message, params->sctpMap, &lspan) != 0) {
785 if (mdclog_level_get() >= MDCLOG_DEBUG) {
786 mdclog_write(MDCLOG_DEBUG, "Error write to SCTP %s %d", __func__, __LINE__);
794 memset(peerInfo->asnData, 0, peerInfo->asnLength);
795 peerInfo->asnLength = 0;
803 void handlepoll_error(struct epoll_event &event,
804 ReportingMessages_t &message,
805 RmrMessagesBuffer_t &rmrMessageBuffer,
806 sctp_params_t *params,
809 auto lspan = opentracing::Tracer::Global()->StartSpan(
810 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
814 if (event.data.fd != params->rmrListenFd) {
815 auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
816 mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
817 event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
819 rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
820 "%s|Failed SCTP Connection",
821 peerInfo->enodbName);
822 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
823 message.message.asnLength = rmrMessageBuffer.sendMessage->len;
825 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
826 message.message.direction = 'N';
827 if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
828 mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
831 close(peerInfo->fileDescriptor);
832 cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap, &lspan);
834 mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
846 int setSocketNoBlocking(int socket) {
847 auto flags = fcntl(socket, F_GETFL, 0);
850 mdclog_mdc_add("func", "fcntl");
851 mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
856 flags = (unsigned) flags | (unsigned) O_NONBLOCK;
857 if (fcntl(socket, F_SETFL, flags) == -1) {
858 mdclog_mdc_add("func", "fcntl");
859 mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
873 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m, otSpan *pSpan) {
875 auto lspan = opentracing::Tracer::Global()->StartSpan(
876 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
881 auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
882 char searchBuff[256]{};
884 snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
885 m->erase(searchBuff);
887 m->erase(val->enodbName);
896 * @param fd file discriptor
897 * @param data the asn data to send
898 * @param len length of the data
899 * @param enodbName the enodbName as in the map for printing purpose
900 * @param m map host information
901 * @param mtype message number
902 * @return 0 success, anegative number on fail
904 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m, otSpan *pSpan) {
906 auto lspan = opentracing::Tracer::Global()->StartSpan(
907 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
911 auto loglevel = mdclog_level_get();
912 int fd = peerInfo->fileDescriptor;
913 if (loglevel >= MDCLOG_DEBUG) {
914 mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
915 message.message.enodbName, __FUNCTION__);
919 //TODO add send to VES client or KAFKA
920 //format ts|mtype|direction(D/U)|length of asn data|raw data
921 // auto length = sizeof message.message.time
922 // + sizeof message.message.enodbName
923 // + sizeof message.message.messageType
924 // + sizeof message.message.direction
925 // + sizeof message.message.asnLength
926 // + message.message.asnLength;
928 if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
929 if (errno == EINTR) {
932 mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
933 // Prevent double free() of peerInfo in the event of connection failure.
934 // Returning failure will trigger, in x2/endc setup flow, RIC_SCTP_CONNECTION_FAILURE rmr message causing the E2M to retry.
935 if (!peerInfo->isConnected){
936 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
942 cleanHashEntry(peerInfo, m, &lspan);
944 char key[MAX_ENODB_NAME_SIZE * 2];
945 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
946 message.message.messageType);
947 if (loglevel >= MDCLOG_DEBUG) {
948 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
950 auto tmp = m->find(key);
960 message.message.direction = 'D';
961 // send report.buffer of size
962 buildJsonMessage(message);
964 if (loglevel >= MDCLOG_DEBUG) {
965 mdclog_write(MDCLOG_DEBUG,
966 "SCTP message for CU %s sent from %s",
967 message.message.enodbName,
981 * @param rmrMessageBuffer
984 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
986 auto lspan = opentracing::Tracer::Global()->StartSpan(
987 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
991 rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)(message.message.enodbName));
993 message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
994 message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
996 if (mdclog_level_get() >= MDCLOG_DEBUG) {
997 mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
998 message.message.enodbName, (unsigned long) message.message.asnLength);
1009 * @param metaData all the data strip to structure
1010 * @param data the data recived from xAPP
1011 * @return 0 success all other values are fault
1013 int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan) {
1015 auto lspan = opentracing::Tracer::Global()->StartSpan(
1016 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1018 // otSpan lspan = 0;
1020 auto loglevel = mdclog_level_get();
1022 char delimiter[4] {};
1023 memset(delimiter, 0, (size_t)4);
1027 char *val = strtok_r(data, delimiter, &tmp);
1028 if (val != nullptr) {
1029 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1030 mdclog_write(MDCLOG_DEBUG, "SCTP ADDRESS parameter from message = %s", val);
1032 memcpy(host, val, tmp - val );
1034 mdclog_write(MDCLOG_ERR, "wrong Host Name for setup request %s", data);
1041 val = strtok_r(nullptr, delimiter, &tmp);
1042 if (val != nullptr) {
1043 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1044 mdclog_write(MDCLOG_DEBUG, "PORT parameter from message = %s", val);
1047 port = (uint16_t)strtol(val, &dummy, 10);
1049 mdclog_write(MDCLOG_ERR, "wrong Port for setup request %s", data);
1056 val = strtok_r(nullptr, delimiter, &tmp);
1057 if (val != nullptr) {
1058 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1059 mdclog_write(MDCLOG_DEBUG, "RAN NAME parameter from message = %s", val);
1061 memcpy(message.message.enodbName, val, tmp - val);
1063 mdclog_write(MDCLOG_ERR, "wrong gNb/Enodeb name for setup request %s", data);
1070 val = strtok_r(nullptr, delimiter, &tmp);
1071 if (val != nullptr) {
1072 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1073 mdclog_write(MDCLOG_DEBUG, "ASN length parameter from message = %s", val);
1076 message.message.asnLength = (uint16_t) strtol(val, &dummy, 10);
1078 mdclog_write(MDCLOG_ERR, "wrong ASN length for setup request %s", data);
1085 message.message.asndata = (unsigned char *)tmp; // tmp is local but point to the location in data
1087 if (loglevel >= MDCLOG_INFO) {
1088 mdclog_write(MDCLOG_INFO, "Message from Xapp RAN name = %s host address = %s port = %d",
1089 message.message.enodbName, host, port);
1102 * @param numOfMessages
1103 * @param rmrMessageBuffer
1108 int receiveDataFromSctp(struct epoll_event *events,
1109 Sctp_Map_t *sctpMap,
1111 RmrMessagesBuffer_t &rmrMessageBuffer,
1112 struct timespec &ts,
1115 auto lspan = opentracing::Tracer::Global()->StartSpan(
1116 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1120 /* We have data on the fd waiting to be read. Read and display it.
1121 * We must read whatever data is available completely, as we are running
1122 * in edge-triggered mode and won't get a notification again for the same data. */
1124 auto loglevel = mdclog_level_get();
1125 // get the identity of the interface
1126 auto *peerInfo = (ConnectedCU_t *)events->data.ptr;
1127 struct timespec start{0, 0};
1128 struct timespec decodestart{0, 0};
1129 struct timespec end{0, 0};
1131 E2AP_PDU_t *pdu = nullptr;
1133 ReportingMessages_t message {};
1136 if (loglevel >= MDCLOG_DEBUG) {
1137 mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", peerInfo->fileDescriptor);
1138 clock_gettime(CLOCK_MONOTONIC, &start);
1140 // read the buffer directly to rmr payload
1141 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1142 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1143 read(peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1144 if (loglevel >= MDCLOG_DEBUG) {
1145 mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1146 peerInfo->fileDescriptor, message.message.asnLength);
1148 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1149 message.message.direction = 'U';
1150 message.message.time.tv_nsec = ts.tv_nsec;
1151 message.message.time.tv_sec = ts.tv_sec;
1153 if (message.message.asnLength < 0) {
1154 if (errno == EINTR) {
1157 /* If errno == EAGAIN, that means we have read all
1158 data. So go back to the main loop. */
1159 if (errno != EAGAIN) {
1160 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1162 } else if (loglevel >= MDCLOG_DEBUG) {
1163 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", peerInfo->fileDescriptor);
1166 } else if (message.message.asnLength == 0) {
1167 /* End of file. The remote has closed the connection. */
1168 if (loglevel >= MDCLOG_INFO) {
1169 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1170 peerInfo->fileDescriptor);
1176 asn_dec_rval_t rval;
1177 if (loglevel >= MDCLOG_DEBUG) {
1178 char printBuffer[4096]{};
1179 char *tmp = printBuffer;
1180 for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1181 snprintf(tmp, 2, "%02x", message.message.asndata[i]);
1184 printBuffer[message.message.asnLength] = 0;
1185 clock_gettime(CLOCK_MONOTONIC, &end);
1186 mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1187 peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1188 mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data = : %s", message.message.asnLength,
1190 clock_gettime(CLOCK_MONOTONIC, &decodestart);
1193 rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1194 message.message.asndata, message.message.asnLength);
1195 if (rval.code != RC_OK) {
1196 mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1197 peerInfo->enodbName);
1201 if (loglevel >= MDCLOG_DEBUG) {
1202 clock_gettime(CLOCK_MONOTONIC, &end);
1203 mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1204 peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1207 FILE *stream = open_memstream(&printBuffer, &size);
1208 asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1209 mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1210 clock_gettime(CLOCK_MONOTONIC, &decodestart);
1213 switch (pdu->present) {
1214 case E2AP_PDU_PR_initiatingMessage: {//initiating message
1215 asnInitiatingRequest(pdu, message, rmrMessageBuffer, &lspan);
1218 case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1219 asnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
1222 case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1223 asnUnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
1227 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1230 if (loglevel >= MDCLOG_DEBUG) {
1231 clock_gettime(CLOCK_MONOTONIC, &end);
1232 mdclog_write(MDCLOG_DEBUG,
1233 "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1234 peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
1238 // remove the break for EAGAIN
1240 if (pdu != nullptr) {
1241 //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
1242 //ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
1243 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1246 //clock_gettime(CLOCK_MONOTONIC, &start);
1248 // in case of break to avoid memory leak
1249 if (pdu != nullptr) {
1250 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1255 if (loglevel >= MDCLOG_INFO) {
1256 mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", peerInfo->fileDescriptor);
1258 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1259 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1261 "%s|CU disconnected unexpectedly",
1262 peerInfo->enodbName);
1263 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1265 if (sendRequestToXapp(message,
1266 RIC_SCTP_CONNECTION_FAILURE,
1269 mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1272 /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1273 close(peerInfo->fileDescriptor);
1274 cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap, &lspan);
1276 if (loglevel >= MDCLOG_DEBUG) {
1277 clock_gettime(CLOCK_MONOTONIC, &end);
1278 mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1279 end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1293 * @param rmrMessageBuffer
1296 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1297 ReportingMessages_t &message,
1298 RmrMessagesBuffer_t &rmrMessageBuffer,
1301 auto lspan = opentracing::Tracer::Global()->StartSpan(
1302 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1307 auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1308 if (mdclog_level_get() >= MDCLOG_INFO) {
1309 mdclog_write(MDCLOG_INFO, "Initiating message %ld", procedureCode);
1311 switch (procedureCode) {
1312 case ProcedureCode_id_x2Setup: {
1313 if (mdclog_level_get() >= MDCLOG_INFO) {
1314 mdclog_write(MDCLOG_INFO, "Got Setup Initiating message from CU - %s",
1315 message.message.enodbName);
1319 case ProcedureCode_id_endcX2Setup: {
1320 if (mdclog_level_get() >= MDCLOG_INFO) {
1321 mdclog_write(MDCLOG_INFO, "Got X2 EN-DC Setup Request from CU - %s",
1322 message.message.enodbName);
1326 case ProcedureCode_id_ricSubscription: {
1327 if (mdclog_level_get() >= MDCLOG_INFO) {
1328 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Request message from CU - %s",
1329 message.message.enodbName);
1333 case ProcedureCode_id_ricSubscriptionDelete: {
1334 if (mdclog_level_get() >= MDCLOG_INFO) {
1335 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Delete Request message from CU - %s",
1336 message.message.enodbName);
1340 case ProcedureCode_id_endcConfigurationUpdate: {
1341 if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1342 mdclog_write(MDCLOG_ERR, "E2 EN-DC CONFIGURATION UPDATE message failed to send to xAPP");
1346 case ProcedureCode_id_eNBConfigurationUpdate: {
1347 if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1348 mdclog_write(MDCLOG_ERR, "E2 EN-BC CONFIGURATION UPDATE message failed to send to xAPP");
1352 case ProcedureCode_id_x2Removal: {
1353 if (mdclog_level_get() >= MDCLOG_INFO) {
1354 mdclog_write(MDCLOG_INFO, "Got E2 Removal Initiating message from CU - %s",
1355 message.message.enodbName);
1359 case ProcedureCode_id_loadIndication: {
1360 if (sendRequestToXapp(message, RIC_ENB_LOAD_INFORMATION, rmrMessageBuffer, &lspan) != 0) {
1361 mdclog_write(MDCLOG_ERR, "Load indication message failed to send to xAPP");
1365 case ProcedureCode_id_resourceStatusReportingInitiation: {
1366 if (mdclog_level_get() >= MDCLOG_INFO) {
1367 mdclog_write(MDCLOG_INFO, "Got Status reporting initiation message from CU - %s",
1368 message.message.enodbName);
1372 case ProcedureCode_id_resourceStatusReporting: {
1373 if (sendRequestToXapp(message, RIC_RESOURCE_STATUS_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1374 mdclog_write(MDCLOG_ERR, "Resource Status Reporting message failed to send to xAPP");
1378 case ProcedureCode_id_reset: {
1379 if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer, &lspan) != 0) {
1380 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
1384 case ProcedureCode_id_ricIndication: {
1385 for (int i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
1386 auto messageSent = false;
1387 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
1388 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1389 mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1391 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1392 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1393 mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1395 if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1396 static unsigned char tx[32];
1397 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1398 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1399 rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1400 rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1401 (unsigned char *)message.message.enodbName,
1402 strlen(message.message.enodbName));
1403 rmrMessageBuffer.sendMessage->state = 0;
1404 rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1405 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1406 mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
1407 rmrMessageBuffer.sendMessage->sub_id,
1408 rmrMessageBuffer.sendMessage->mtype);
1410 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1413 mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1422 case ProcedureCode_id_errorIndication: {
1423 if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1424 mdclog_write(MDCLOG_ERR, "Error Indication message failed to send to xAPP");
1428 case ProcedureCode_id_ricServiceUpdate : {
1429 if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1430 mdclog_write(MDCLOG_ERR, "Service Update message failed to send to xAPP");
1434 case ProcedureCode_id_gNBStatusIndication : {
1435 if (sendRequestToXapp(message, RIC_GNB_STATUS_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1436 mdclog_write(MDCLOG_ERR, "RIC_GNB_STATUS_INDICATION failed to send to xAPP");
1441 mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1442 message.message.messageType = 0; // no RMR message type yet
1444 buildJsonMessage(message);
1460 * @param rmrMessageBuffer
1463 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, Sctp_Map_t *sctpMap,
1464 RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
1466 auto lspan = opentracing::Tracer::Global()->StartSpan(
1467 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1471 auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1472 if (mdclog_level_get() >= MDCLOG_INFO) {
1473 mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1475 switch (procedureCode) {
1476 case ProcedureCode_id_x2Setup: {
1477 if (mdclog_level_get() >= MDCLOG_INFO) {
1478 mdclog_write(MDCLOG_INFO, "Got Succesful Setup response from CU - %s",
1479 message.message.enodbName);
1481 if (sendResponseToXapp(message, RIC_X2_SETUP_RESP,
1482 RIC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1483 mdclog_write(MDCLOG_ERR, "Failed to send Succesful Setup response for CU - %s",
1484 message.message.enodbName);
1488 case ProcedureCode_id_endcX2Setup: { //X2_EN_DC_SETUP_REQUEST_FROM_CU
1489 if (mdclog_level_get() >= MDCLOG_INFO) {
1490 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC Setup response from CU - %s",
1491 message.message.enodbName);
1493 if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_RESP,
1494 RIC_ENDC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1495 mdclog_write(MDCLOG_ERR, "Failed to send Succesful X2 EN DC Setup response for CU - %s",
1496 message.message.enodbName);
1500 case ProcedureCode_id_endcConfigurationUpdate: {
1501 if (mdclog_level_get() >= MDCLOG_INFO) {
1502 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC CONFIGURATION UPDATE from CU - %s",
1503 message.message.enodbName);
1505 if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1506 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 EN DC CONFIGURATION response for CU - %s",
1507 message.message.enodbName);
1511 case ProcedureCode_id_eNBConfigurationUpdate: {
1512 if (mdclog_level_get() >= MDCLOG_INFO) {
1513 mdclog_write(MDCLOG_INFO, "Got Succesful E2 ENB CONFIGURATION UPDATE from CU - %s",
1514 message.message.enodbName);
1516 if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1517 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 ENB CONFIGURATION response for CU - %s",
1518 message.message.enodbName);
1522 case ProcedureCode_id_reset: {
1523 if (sendRequestToXapp(message, RIC_X2_RESET_RESP, rmrMessageBuffer, &lspan) != 0) {
1524 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2_RESET response for CU - %s",
1525 message.message.enodbName);
1530 case ProcedureCode_id_resourceStatusReportingInitiation: {
1531 if (sendRequestToXapp(message, RIC_RES_STATUS_RESP, rmrMessageBuffer, &lspan) != 0) {
1532 mdclog_write(MDCLOG_ERR,
1533 "Failed to send Succesful 2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1534 message.message.enodbName);
1538 case ProcedureCode_id_ricSubscription: {
1539 if (mdclog_level_get() >= MDCLOG_INFO) {
1540 mdclog_write(MDCLOG_INFO, "Got Succesful RIC Subscription response from CU - %s",
1541 message.message.enodbName);
1543 if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer, &lspan) != 0) {
1544 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1549 case ProcedureCode_id_ricSubscriptionDelete: {
1550 if (mdclog_level_get() >= MDCLOG_INFO) {
1551 mdclog_write(MDCLOG_INFO,
1552 "Got Succesful RIC Subscription Delete response from CU - %s",
1553 message.message.enodbName);
1555 if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer, &lspan) != 0) {
1556 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1560 case ProcedureCode_id_ricControl: {
1561 if (mdclog_level_get() >= MDCLOG_INFO) {
1562 mdclog_write(MDCLOG_INFO,
1563 "Got Succesful RIC control response from CU - %s",
1564 message.message.enodbName);
1567 i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1568 auto messageSent = false;
1569 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1570 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1571 mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1573 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1574 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1575 mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1577 if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1578 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1579 rmrMessageBuffer.sendMessage->state = 0;
1580 rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1581 static unsigned char tx[32];
1582 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1583 rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1584 rmr_bytes2meid(rmrMessageBuffer.sendMessage,
1585 (unsigned char *)message.message.enodbName,
1586 strlen(message.message.enodbName));
1588 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1591 mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1601 mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1602 message.message.messageType = 0; // no RMR message type yet
1603 buildJsonMessage(message);
1619 * @param rmrMessageBuffer
1622 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1623 ReportingMessages_t &message,
1624 Sctp_Map_t *sctpMap,
1625 RmrMessagesBuffer_t &rmrMessageBuffer,
1628 auto lspan = opentracing::Tracer::Global()->StartSpan(
1629 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1633 auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1634 if (mdclog_level_get() >= MDCLOG_INFO) {
1635 mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1637 switch (procedureCode) {
1638 case ProcedureCode_id_x2Setup: {
1639 if (mdclog_level_get() >= MDCLOG_INFO) {
1640 mdclog_write(MDCLOG_INFO,
1641 "Got Unsuccessful Setup response from CU - %s",
1642 message.message.enodbName);
1644 if (sendResponseToXapp(message,
1645 RIC_X2_SETUP_FAILURE, RIC_X2_SETUP_REQ,
1649 mdclog_write(MDCLOG_ERR,
1650 "Failed to send Unsuccessful Setup response for CU - %s",
1651 message.message.enodbName);
1656 case ProcedureCode_id_endcX2Setup: {
1657 if (mdclog_level_get() >= MDCLOG_INFO) {
1658 mdclog_write(MDCLOG_INFO,
1659 "Got Unsuccessful E2 EN-DC Setup response from CU - %s",
1660 message.message.enodbName);
1662 if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_FAILURE,
1663 RIC_ENDC_X2_SETUP_REQ,
1667 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC Setup response for CU - %s",
1668 message.message.enodbName);
1672 case ProcedureCode_id_endcConfigurationUpdate: {
1673 if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1674 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC CONFIGURATION response for CU - %s",
1675 message.message.enodbName);
1679 case ProcedureCode_id_eNBConfigurationUpdate: {
1680 if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1681 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 ENB CONFIGURATION response for CU - %s",
1682 message.message.enodbName);
1686 case ProcedureCode_id_resourceStatusReportingInitiation: {
1687 if (sendRequestToXapp(message, RIC_RES_STATUS_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1688 mdclog_write(MDCLOG_ERR,
1689 "Failed to send Succesful E2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1690 message.message.enodbName);
1694 case ProcedureCode_id_ricSubscription: {
1695 if (mdclog_level_get() >= MDCLOG_INFO) {
1696 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Response from CU - %s",
1697 message.message.enodbName);
1699 if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1700 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1704 case ProcedureCode_id_ricSubscriptionDelete: {
1705 if (mdclog_level_get() >= MDCLOG_INFO) {
1706 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Delete Response from CU - %s",
1707 message.message.enodbName);
1709 if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1710 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1714 case ProcedureCode_id_ricControl: {
1715 if (mdclog_level_get() >= MDCLOG_INFO) {
1716 mdclog_write(MDCLOG_INFO, "Got UNSuccesful RIC control response from CU - %s",
1717 message.message.enodbName);
1720 i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1721 auto messageSent = false;
1722 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1723 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1724 mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1726 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1727 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1728 mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1730 if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1731 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1732 rmrMessageBuffer.sendMessage->state = 0;
1733 rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1734 static unsigned char tx[32];
1735 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1736 rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1737 rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1738 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1741 mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1751 mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1752 message.message.messageType = 0; // no RMR message type yet
1754 buildJsonMessage(message);
1769 * @param rmrMmessageBuffer
1773 int sendRequestToXapp(ReportingMessages_t &message,
1775 RmrMessagesBuffer_t &rmrMmessageBuffer,
1778 auto lspan = opentracing::Tracer::Global()->StartSpan(
1779 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1783 rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1784 (unsigned char *)message.message.enodbName,
1785 strlen(message.message.enodbName));
1786 message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1787 rmrMmessageBuffer.sendMessage->state = 0;
1788 static unsigned char tx[32];
1789 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1790 rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1792 auto rc = sendRmrMessage(rmrMmessageBuffer, message, &lspan);
1801 void getRmrContext(sctp_params_t &pSctpParams, otSpan *pSpan) {
1803 auto lspan = opentracing::Tracer::Global()->StartSpan(
1804 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1806 // otSpan lspan = 0;
1808 pSctpParams.rmrCtx = nullptr;
1809 pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1810 if (pSctpParams.rmrCtx == nullptr) {
1811 mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
1818 rmr_set_stimeout(pSctpParams.rmrCtx, 0); // disable retries for any send operation
1819 // we need to find that routing table exist and we can run
1820 if (mdclog_level_get() >= MDCLOG_INFO) {
1821 mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1826 if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
1830 if (count % 60 == 0) {
1831 mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1834 if (mdclog_level_get() >= MDCLOG_INFO) {
1835 mdclog_write(MDCLOG_INFO, "RMR running");
1840 rmr_init_trace(pSctpParams.rmrCtx, 200);
1841 // get the RMR fd for the epoll
1842 pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
1843 struct epoll_event event{};
1844 // add RMR fd to epoll
1845 event.events = (EPOLLIN);
1846 event.data.fd = pSctpParams.rmrListenFd;
1847 // add listening RMR FD to epoll
1848 if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
1849 mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
1850 close(pSctpParams.rmrListenFd);
1851 rmr_close(pSctpParams.rmrCtx);
1852 pSctpParams.rmrCtx = nullptr;
1860 * @param rmrMessageBuffer
1865 int receiveXappMessages(int epoll_fd,
1866 Sctp_Map_t *sctpMap,
1867 RmrMessagesBuffer_t &rmrMessageBuffer,
1868 struct timespec &ts,
1871 auto lspan = opentracing::Tracer::Global()->StartSpan(
1872 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1876 if (rmrMessageBuffer.rcvMessage == nullptr) {
1878 mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1886 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1887 mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1889 rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1890 if (rmrMessageBuffer.rcvMessage == nullptr) {
1891 mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1892 rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1899 ReportingMessages_t message;
1900 message.message.direction = 'D';
1901 message.message.time.tv_nsec = ts.tv_nsec;
1902 message.message.time.tv_sec = ts.tv_sec;
1904 // get message payload
1905 //auto msgData = msg->payload;
1906 if (rmrMessageBuffer.rcvMessage->state != 0) {
1907 mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1914 switch (rmrMessageBuffer.rcvMessage->mtype) {
1915 case RIC_X2_SETUP_REQ: {
1916 if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1917 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1918 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1919 message.message.direction = 'N';
1920 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1921 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1923 "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1924 rmrMessageBuffer.sendMessage->state = 0;
1925 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1927 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1928 mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1930 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1931 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName);
1933 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1941 case RIC_ENDC_X2_SETUP_REQ: {
1942 if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1943 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1944 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1945 message.message.direction = 'N';
1946 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1947 snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
1948 "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1949 rmrMessageBuffer.sendMessage->state = 0;
1950 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1952 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1953 mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1956 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1957 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
1959 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1967 case RIC_ENDC_CONF_UPDATE: {
1968 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1969 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1977 case RIC_ENDC_CONF_UPDATE_ACK: {
1978 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1979 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_ACK");
1987 case RIC_ENDC_CONF_UPDATE_FAILURE: {
1988 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1989 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_FAILURE");
1998 case RIC_ENB_CONF_UPDATE: {
1999 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2000 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
2008 case RIC_ENB_CONF_UPDATE_ACK: {
2009 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2010 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_ACK");
2018 case RIC_ENB_CONF_UPDATE_FAILURE: {
2019 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2020 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_FAILURE");
2028 case RIC_RES_STATUS_REQ: {
2029 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2030 mdclog_write(MDCLOG_ERR, "Failed to send RIC_RES_STATUS_REQ");
2039 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2040 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
2048 case RIC_SUB_DEL_REQ: {
2049 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2050 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
2058 case RIC_CONTROL_REQ: {
2059 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2060 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
2068 case RIC_SERVICE_QUERY: {
2069 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2070 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
2078 case RIC_SERVICE_UPDATE_ACK: {
2079 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2080 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
2088 case RIC_SERVICE_UPDATE_FAILURE: {
2089 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2090 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
2098 case RIC_X2_RESET: {
2099 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2100 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
2108 case RIC_X2_RESET_RESP: {
2109 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
2110 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
2118 case RIC_SCTP_CLEAR_ALL: {
2119 mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
2120 // loop on all keys and close socket and then erase all map.
2122 sctpMap->getKeys(v);
2123 for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
2124 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
2125 auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
2126 if (peerInfo == nullptr) {
2129 close(peerInfo->fileDescriptor);
2130 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
2131 message.message.direction = 'D';
2132 message.message.time.tv_nsec = ts.tv_nsec;
2133 message.message.time.tv_sec = ts.tv_sec;
2135 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2136 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2138 "%s|RIC_SCTP_CLEAR_ALL",
2139 peerInfo->enodbName);
2140 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2141 mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
2142 if (sendRequestToXapp(message,
2143 RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
2144 mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
2154 case E2_TERM_KEEP_ALIVE_REQ: {
2155 // send message back
2156 rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2157 (unsigned char *)rmrMessageBuffer.ka_message,
2158 rmrMessageBuffer.ka_message_len);
2159 rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
2160 rmrMessageBuffer.sendMessage->state = 0;
2161 static unsigned char tx[32];
2162 auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2163 rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2164 rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2165 if (rmrMessageBuffer.sendMessage == nullptr) {
2166 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2167 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
2168 } else if (rmrMessageBuffer.sendMessage->state != 0) {
2169 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
2170 rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
2171 } else if (mdclog_level_get() >= MDCLOG_INFO) {
2172 mdclog_write(MDCLOG_INFO, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
2178 mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
2179 message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2180 message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2181 message.message.time.tv_nsec = ts.tv_nsec;
2182 message.message.time.tv_sec = ts.tv_sec;
2183 message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2185 buildJsonMessage(message);
2193 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2194 mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2203 * Send message to the CU that is not expecting for successful or unsuccessful results
2204 * @param messageBuffer
2206 * @param failedMsgId
2211 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2212 ReportingMessages_t &message,
2214 Sctp_Map_t *sctpMap,
2217 auto lspan = opentracing::Tracer::Global()->StartSpan(
2218 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2223 getRequestMetaData(message, messageBuffer, &lspan);
2224 if (mdclog_level_get() >= MDCLOG_INFO) {
2225 mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2228 auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId, &lspan);
2239 * @param messageBuffer
2241 * @param failedMesgId
2245 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2246 RmrMessagesBuffer_t &messageBuffer,
2247 ReportingMessages_t &message,
2251 auto lspan = opentracing::Tracer::Global()->StartSpan(
2252 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2256 auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2257 if (peerInfo == nullptr) {
2258 if (failedMesgId != 0) {
2259 sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId, &lspan);
2261 mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2271 message.message.messageType = messageBuffer.rcvMessage->mtype;
2272 auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2282 * @param rmrCtx the rmr context to send and receive
2283 * @param msg the msg we got fromxApp
2284 * @param metaData data from xApp in ordered struct
2285 * @param failedMesgId the return message type error
2288 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId,
2291 auto lspan = opentracing::Tracer::Global()->StartSpan(
2292 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2296 rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
2297 msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
2298 message.message.enodbName);
2299 if (mdclog_level_get() >= MDCLOG_INFO) {
2300 mdclog_write(MDCLOG_INFO, "%s", msg->payload);
2302 msg->mtype = failedMesgId;
2305 static unsigned char tx[32];
2306 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2307 rmr_bytes2xact(msg, tx, strlen((const char *) tx));
2309 sendRmrMessage(rmrMessageBuffer, message, &lspan);
2311 lspan->Finish();pLogSink
2317 * Send Response back to xApp, message is used only when there was a request from the xApp
2319 * @param enodbName the name of the gNb/eNodeB
2320 * @param msgType the value of the message to the xApp
2321 * @param requestType The request that was sent by the xAPP
2322 * @param rmrCtx the rmr identifier
2323 * @param sctpMap hash map holds data on the requestrs
2324 * @param buf the buffer to send to xAPP
2325 * @param size size of the buffer to send
2328 int sendResponseToXapp(ReportingMessages_t &message,
2331 RmrMessagesBuffer_t &rmrMessageBuffer,
2332 Sctp_Map_t *sctpMap,
2335 auto lspan = opentracing::Tracer::Global()->StartSpan(
2336 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2340 char key[MAX_ENODB_NAME_SIZE * 2];
2341 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, requestType);
2343 auto xact = sctpMap->find(key);
2344 if (xact == nullptr) {
2345 mdclog_write(MDCLOG_ERR, "NO Request %s found for this response from CU: %s", key,
2346 message.message.enodbName);
2353 sctpMap->erase(key);
2355 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = msgType; //SETUP_RESPONSE_MESSAGE_TYPE;
2356 rmr_bytes2payload(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.asndata,
2357 message.message.asnLength);
2358 rmr_bytes2xact(rmrMessageBuffer.sendMessage, (const unsigned char *)xact, strlen((const char *)xact));
2359 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
2360 rmrMessageBuffer.sendMessage->state = 0;
2362 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2363 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2367 auto rc = sendRmrMessage(rmrMessageBuffer, message, &lspan);
2375 * build the SCTP connection to eNodB or gNb
2376 * @param rmrMessageBuffer
2383 int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
2384 ReportingMessages_t &message,
2386 Sctp_Map_t *sctpMap,
2389 auto lspan = opentracing::Tracer::Global()->StartSpan(
2390 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2394 struct sockaddr_in6 servaddr{};
2395 struct addrinfo hints{}, *result;
2396 auto msgData = rmrMessageBuffer.rcvMessage->payload;
2397 unsigned char meid[RMR_MAX_MEID]{};
2401 message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2402 rmr_mbuf_t *msg = rmrMessageBuffer.rcvMessage;
2403 rmr_get_meid(msg, meid);
2405 if (mdclog_level_get() >= MDCLOG_INFO) {
2406 mdclog_write(MDCLOG_INFO, "message %d Received for MEID :%s. SETUP/EN-DC Setup Request from xApp, Message = %s",
2407 msg->mtype, meid, msgData);
2409 if (getSetupRequestMetaData(message, (char *)msgData, host, port, &lspan) < 0) {
2410 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2411 mdclog_write(MDCLOG_DEBUG, "Error in setup parameters %s, %d", __func__, __LINE__);
2419 //// message asndata points to the start of the asndata of the message and not to start of payload
2420 // search if the same host:port but not the same enodbname
2421 char searchBuff[256]{};
2422 snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", host, port);
2423 auto e = (char *)sctpMap->find(searchBuff);
2425 // found one compare if not the same
2426 if (strcmp(message.message.enodbName, e) != 0) {
2427 mdclog_write(MDCLOG_ERR,
2428 "Try to connect CU %s to Host %s but %s already connected",
2429 message.message.enodbName, host, e);
2437 // check if not alread connected. if connected send the request and return
2438 auto *peerInfo = (ConnectedCU_t *)sctpMap->find(message.message.enodbName);
2439 if (peerInfo != nullptr) {
2442 // "Device %s already connected please remove and then setup again",
2443 // message.message.enodbName);
2444 if (mdclog_level_get() >= MDCLOG_INFO) {
2445 mdclog_write(MDCLOG_INFO,
2446 "Device already connected to %s",
2447 message.message.enodbName);
2449 message.message.messageType = msg->mtype;
2450 auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2452 mdclog_write(MDCLOG_ERR, "failed write to SCTP %s, %d", __func__, __LINE__);
2459 char key[MAX_ENODB_NAME_SIZE * 2];
2460 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2461 int xaction_len = strlen((const char *) msg->xaction);
2462 auto *xaction = (unsigned char *) calloc(1, xaction_len);
2463 memcpy(xaction, msg->xaction, xaction_len);
2464 sctpMap->setkey(key, xaction);
2465 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2466 mdclog_write(MDCLOG_DEBUG, "set key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2474 peerInfo = (ConnectedCU_t *) calloc(1, sizeof(ConnectedCU_t));
2475 memcpy(peerInfo->enodbName, message.message.enodbName, sizeof(message.message.enodbName));
2478 if ((peerInfo->fileDescriptor = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP)) < 0) {
2479 mdclog_write(MDCLOG_ERR, "Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2487 if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval) != 0) {
2488 mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEPORT Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2495 if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) != 0) {
2496 mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEADDR Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2502 servaddr.sin6_family = AF_INET6;
2504 struct sockaddr_in6 localAddr {};
2505 localAddr.sin6_family = AF_INET6;
2506 localAddr.sin6_addr = in6addr_any;
2507 localAddr.sin6_port = htons(SRC_PORT);
2509 if (bind(peerInfo->fileDescriptor, (struct sockaddr*)&localAddr , sizeof(struct sockaddr_in6)) < 0) {
2510 mdclog_write(MDCLOG_ERR, "bind Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2515 }//Ends the binding.
2517 memset(&hints, 0, sizeof hints);
2518 hints.ai_flags = AI_NUMERICHOST;
2519 if (getaddrinfo(host, nullptr, &hints, &result) < 0) {
2520 close(peerInfo->fileDescriptor);
2521 mdclog_write(MDCLOG_ERR, "getaddrinfo error for %s, Error = %s", host, strerror(errno));
2527 memcpy(&servaddr, result->ai_addr, sizeof(struct sockaddr_in6));
2528 freeaddrinfo(result);
2530 servaddr.sin6_port = htons(port); /* daytime server */
2531 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2532 mdclog_write(MDCLOG_DEBUG, "Send Connect FD = %d host : %s port %d",
2533 peerInfo->fileDescriptor,
2539 if (addToEpoll(epoll_fd, peerInfo, (EPOLLOUT | EPOLLIN | EPOLLET), sctpMap, message.message.enodbName,
2540 msg->mtype, &lspan) != 0) {
2547 char hostBuff[NI_MAXHOST];
2548 char portBuff[NI_MAXHOST];
2550 if (getnameinfo((SA *) &servaddr, sizeof(servaddr),
2551 hostBuff, sizeof(hostBuff),
2552 portBuff, sizeof(portBuff),
2553 (uint) (NI_NUMERICHOST) | (uint) (NI_NUMERICSERV)) != 0) {
2554 mdclog_write(MDCLOG_ERR, "getnameinfo() Error, %s %s %d", strerror(errno), __func__, __LINE__);
2561 if (setSocketNoBlocking(peerInfo->fileDescriptor) != 0) {
2562 mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on sctpPort %s", hostBuff,
2564 close(peerInfo->fileDescriptor);
2571 memcpy(peerInfo->hostName, hostBuff, strlen(hostBuff));
2572 peerInfo->hostName[strlen(hostBuff)] = 0;
2573 memcpy(peerInfo->portNumber, portBuff, strlen(portBuff));
2574 peerInfo->portNumber[strlen(portBuff)] = 0;
2576 // map by enoodb/gnb name
2577 sctpMap->setkey(message.message.enodbName, peerInfo);
2578 //map host and port to enodeb
2579 sctpMap->setkey(searchBuff, message.message.enodbName);
2581 // save message for the return values
2582 char key[MAX_ENODB_NAME_SIZE * 2];
2583 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2584 int xaction_len = strlen((const char *) msg->xaction);
2585 auto *xaction = (unsigned char *) calloc(1, xaction_len);
2586 memcpy(xaction, msg->xaction, xaction_len);
2587 sctpMap->setkey(key, xaction);
2588 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2589 mdclog_write(MDCLOG_DEBUG, "End building peerinfo: %s for CU %s", key, message.message.enodbName);
2592 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2593 mdclog_write(MDCLOG_DEBUG, "Send connect to FD %d, %s, %d",
2594 peerInfo->fileDescriptor, __func__, __LINE__);
2596 if (connect(peerInfo->fileDescriptor, (SA *) &servaddr, sizeof(servaddr)) < 0) {
2597 if (errno != EINPROGRESS) {
2598 mdclog_write(MDCLOG_ERR, "connect FD %d to host : %s port %d, %s",
2599 peerInfo->fileDescriptor, host, port, strerror(errno));
2600 close(peerInfo->fileDescriptor);
2606 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2607 mdclog_write(MDCLOG_DEBUG,
2608 "Connect to FD %d returned with EINPROGRESS : %s",
2609 peerInfo->fileDescriptor, strerror(errno));
2611 // since message.message.asndata is pointing to the asndata in the rmr message payload we copy it like this
2612 memcpy(peerInfo->asnData, message.message.asndata, message.message.asnLength);
2613 peerInfo->asnLength = message.message.asnLength;
2614 peerInfo->mtype = msg->mtype;
2621 if (mdclog_level_get() >= MDCLOG_INFO) {
2622 mdclog_write(MDCLOG_INFO, "Connect to FD %d returned OK without EINPROGRESS", peerInfo->fileDescriptor);
2625 peerInfo->isConnected = true;
2627 if (modifyToEpoll(epoll_fd, peerInfo, (EPOLLIN | EPOLLET), sctpMap, message.message.enodbName, msg->mtype,
2635 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2636 mdclog_write(MDCLOG_DEBUG, "Connected to host : %s port %d", host, port);
2639 message.message.messageType = msg->mtype;
2640 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2641 mdclog_write(MDCLOG_DEBUG, "Send SCTP message to FD %d", peerInfo->fileDescriptor);
2643 if (sendSctpMsg(peerInfo, message, sctpMap, &lspan) != 0) {
2644 mdclog_write(MDCLOG_ERR, "Error write to SCTP %s %d", __func__, __LINE__);
2650 memset(peerInfo->asnData, 0, message.message.asnLength);
2651 peerInfo->asnLength = 0;
2652 peerInfo->mtype = 0;
2654 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2655 mdclog_write(MDCLOG_DEBUG, "Sent message to SCTP for %s", message.message.enodbName);
2674 int addToEpoll(int epoll_fd,
2675 ConnectedCU_t *peerInfo,
2677 Sctp_Map_t *sctpMap,
2682 auto lspan = opentracing::Tracer::Global()->StartSpan(
2683 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2688 struct epoll_event event{};
2689 event.data.ptr = peerInfo;
2690 event.events = events;
2691 if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2692 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2693 mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2694 strerror(errno), __func__, __LINE__);
2696 close(peerInfo->fileDescriptor);
2697 cleanHashEntry(peerInfo, sctpMap, &lspan);
2698 char key[MAX_ENODB_NAME_SIZE * 2];
2699 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2700 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2701 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2703 auto tmp = sctpMap->find(key);
2707 sctpMap->erase(key);
2708 mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2731 int modifyToEpoll(int epoll_fd,
2732 ConnectedCU_t *peerInfo,
2734 Sctp_Map_t *sctpMap,
2739 auto lspan = opentracing::Tracer::Global()->StartSpan(
2740 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2745 struct epoll_event event{};
2746 event.data.ptr = peerInfo;
2747 event.events = events;
2748 if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2749 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2750 mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2751 strerror(errno), __func__, __LINE__);
2753 close(peerInfo->fileDescriptor);
2754 cleanHashEntry(peerInfo, sctpMap, &lspan);
2755 char key[MAX_ENODB_NAME_SIZE * 2];
2756 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2757 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2758 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2760 auto tmp = sctpMap->find(key);
2764 sctpMap->erase(key);
2765 mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2778 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan) {
2780 auto lspan = opentracing::Tracer::Global()->StartSpan(
2781 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2783 // otSpan lspan = 0;
2785 //serialize the span
2787 std::unordered_map<std::string, std::string> data;
2788 RICCarrierWriter carrier(data);
2789 opentracing::Tracer::Global()->Inject((lspan.get())->context(), carrier);
2790 nlohmann::json j = data;
2791 std::string str = j.dump();
2792 static auto maxTraceLength = 0;
2794 maxTraceLength = str.length() > maxTraceLength ? str.length() : maxTraceLength;
2795 // serialized context can be put to RMR message using function:
2796 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2797 mdclog_write(MDCLOG_DEBUG, "max trace length is %d trace data length = %ld data = %s", maxTraceLength,
2798 str.length(), str.c_str());
2800 rmr_set_trace(rmrMessageBuffer.sendMessage, (const unsigned char *) str.c_str(), str.length());
2802 buildJsonMessage(message);
2804 rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2806 if (rmrMessageBuffer.sendMessage == nullptr) {
2807 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2808 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2815 if (rmrMessageBuffer.sendMessage->state != 0) {
2816 char meid[RMR_MAX_MEID]{};
2817 if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2819 rmrMessageBuffer.sendMessage->state = 0;
2820 mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2821 rmrMessageBuffer.sendMessage->mtype,
2822 rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2823 rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2824 if (rmrMessageBuffer.sendMessage == nullptr) {
2825 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2826 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2831 } else if (rmrMessageBuffer.sendMessage->state != 0) {
2832 mdclog_write(MDCLOG_ERR,
2833 "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2834 translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2835 rmrMessageBuffer.sendMessage->mtype,
2836 rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2837 auto rc = rmrMessageBuffer.sendMessage->state;
2844 mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2845 translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2846 rmrMessageBuffer.sendMessage->mtype,
2847 rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2851 return rmrMessageBuffer.sendMessage->state;
2857 void buildJsonMessage(ReportingMessages_t &message) {
2859 message.outLen = sizeof(message.base64Data);
2860 base64::encode((const unsigned char *) message.message.asndata,
2861 (const int) message.message.asnLength,
2864 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2865 mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2866 (int) message.message.asnLength,
2867 (int) message.outLen);
2870 snprintf(message.buffer, sizeof(message.buffer),
2871 "{\"header\": {\"ts\": \"%ld.%09ld\","
2872 "\"ranName\": \"%s\","
2873 "\"messageType\": %d,"
2874 "\"direction\": \"%c\"},"
2875 "\"base64Length\": %d,"
2876 "\"asnBase64\": \"%s\"}",
2877 message.message.time.tv_sec,
2878 message.message.time.tv_nsec,
2879 message.message.enodbName,
2880 message.message.messageType,
2881 message.message.direction,
2882 (int) message.outLen,
2883 message.base64Data);
2884 static src::logger_mt &lg = my_logger::get();
2886 BOOST_LOG(lg) << message.buffer;
2892 * take RMR error code to string
2896 string translateRmrErrorMessages(int state) {
2900 str = "RMR_OK - state is good";
2902 case RMR_ERR_BADARG:
2903 str = "RMR_ERR_BADARG - argument passd to function was unusable";
2905 case RMR_ERR_NOENDPT:
2906 str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2909 str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2912 str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2914 case RMR_ERR_SENDFAILED:
2915 str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2917 case RMR_ERR_CALLFAILED:
2918 str = "RMR_ERR_CALLFAILED - unable to send call() message";
2920 case RMR_ERR_NOWHOPEN:
2921 str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2924 str = "RMR_ERR_WHID - wormhole id was invalid";
2926 case RMR_ERR_OVERFLOW:
2927 str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2930 str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2932 case RMR_ERR_RCVFAILED:
2933 str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2935 case RMR_ERR_TIMEOUT:
2936 str = "RMR_ERR_TIMEOUT - message processing call timed out";
2939 str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2942 str = "RMR_ERR_TRUNC - received message likely truncated";
2944 case RMR_ERR_INITFAILED:
2945 str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2947 case RMR_ERR_NOTSUPP:
2948 str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2952 snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);