1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
16 // TODO: High-level file comment.
19 #include "sctpThread.h"
22 using namespace std::placeholders;
24 using namespace opentracing;
31 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
33 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
37 mdclog_attr_init(&attr);
38 mdclog_attr_set_ident(attr, "E2Terminator");
40 mdclog_attr_destroy(attr);
44 //std::atomic<int64_t> rmrCounter{0};
45 std::atomic<int64_t> num_of_messages{0};
46 static long transactionCounter = 0;
49 int main(const int argc, char **argv) {
50 sctp_params_t pSctpParams;
52 opentracing::Tracer::InitGlobal(tracelibcpp::createTracer("E2 Terminator"));
53 auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
58 unsigned num_cpus = std::thread::hardware_concurrency();
60 mdclog_severity_t loglevel = MDCLOG_ERR;
62 mdclog_severity_t loglevel = MDCLOG_INFO;
65 mdclog_level_set(loglevel);
68 mdclog_mdc_add("app", argv[0]);
69 mdclog_write(MDCLOG_ERR, "Usage nano <rmr port> logLevel <debug/warning/info/error> volume <PATH to log file location>");
74 std::random_device device{};
75 std::mt19937 generator(device());
76 std::uniform_int_distribution<long> distribution(1, (long) 1e12);
78 transactionCounter = distribution(generator);
81 char tmpLogFilespec[VOLUME_URL_SIZE];
82 tmpLogFilespec[0] = 0;
83 pSctpParams.volume[0] = 0;
84 //read paramters from CLI
85 for (auto i = 1; i < argc; i += 2) {
87 if (strcasecmp("nano", argv[i]) == 0) {
88 pSctpParams.rmrPort = (uint16_t) (uint16_t) strtol(argv[i + 1], &dummy, 10);
89 } else if (strcasecmp("loglevel", argv[i]) == 0) {
90 if (strcasecmp("debug", argv[i + 1]) == 0) {
91 loglevel = MDCLOG_DEBUG;
92 } else if (strcasecmp("info", argv[i + 1]) == 0) {
93 loglevel = MDCLOG_INFO;
94 } else if (strcasecmp("warning", argv[i + 1]) == 0) {
95 loglevel = MDCLOG_WARN;
96 } else if (strcasecmp("error", argv[i + 1]) == 0) {
97 loglevel = MDCLOG_ERR;
99 } else if (strcasecmp("volume", argv[i]) == 0) {
100 snprintf(pSctpParams.volume, VOLUME_URL_SIZE, "%s", argv[i + 1]);
101 snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", argv[i + 1]);
106 pSctpParams.logLevel = loglevel;
107 snprintf(pSctpParams.rmrAddress, sizeof(pSctpParams.rmrAddress) - 1, "%d", (int) (pSctpParams.rmrPort));
109 strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.log");
111 if (mdclog_level_get() >= MDCLOG_INFO) {
112 mdclog_mdc_add("RMR Port", to_string(pSctpParams.rmrPort).c_str());
113 mdclog_mdc_add("LogLevel", to_string(pSctpParams.logLevel).c_str());
114 mdclog_mdc_add("volume", pSctpParams.volume);
115 mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
117 mdclog_write(MDCLOG_INFO, "running parameters");
121 // Files written to the current working directory
122 boostLogger = logging::add_file_log(
123 keywords::file_name = tmpLogFilespec,
124 keywords::rotation_size = 10 * 1024 * 1024,
125 keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
126 keywords::format = "%Message%"
127 //keywords::format = "[%TimeStamp%]: %Message%" // use each log with time stamp
130 // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
131 boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
132 keywords::target = pSctpParams.volume
133 //keywords::max_size = 16 * 1024 * 1024,
134 //keywords::min_free_space = 100 * 1024 * 1024
137 // Upon restart, scan the directory for files matching the file_name pattern
138 boostLogger->locked_backend()->scan_for_files();
140 // Enable auto-flushing after each log record written
141 if (mdclog_level_get() >= MDCLOG_DEBUG) {
142 boostLogger->locked_backend()->auto_flush(true);
146 pSctpParams.epoll_fd = epoll_create1(0);
147 if (pSctpParams.epoll_fd == -1) {
148 mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
152 pSctpParams.rmrCtx = getRmrContext(pSctpParams.rmrAddress, &span);
153 if (pSctpParams.rmrCtx == nullptr) {
154 mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
155 close(pSctpParams.epoll_fd);
158 rmr_init_trace(pSctpParams.rmrCtx, 200);
159 // get the RMR fd for the epoll
160 pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
161 struct epoll_event event{};
162 // add RMR fd to epoll
163 event.events = (EPOLLIN);
164 event.data.fd = pSctpParams.rmrListenFd;
165 // add listening RMR FD to epoll
166 if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
167 mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
168 close(pSctpParams.rmrListenFd);
169 rmr_close(pSctpParams.rmrCtx);
170 close(pSctpParams.epoll_fd);
174 pSctpParams.sctpMap = new mapWrapper();
176 std::vector<std::thread> threads(num_cpus);
177 // std::vector<std::thread> threads;
180 for (unsigned int i = 0; i < num_cpus; i++) {
181 threads[i] = std::thread(listener, &pSctpParams);
186 int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
188 mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
191 // threads.emplace_back(std::thread(listener, &pSctpParams));
194 //send to e2 manager init of e2 term
196 auto term_init = false;
199 auto len = snprintf(buff, 128, "E2 terminator started");
200 rmr_mbuf_t *msg = rmr_alloc_msg(pSctpParams.rmrCtx, 200);
203 msg->mtype = E2_TERM_INIT;
205 rmr_bytes2payload(msg, (unsigned char *) buff, len);
206 static unsigned char tx[32];
207 auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
208 rmr_bytes2xact(msg, tx, txLen);
209 msg = rmr_send_msg(pSctpParams.rmrCtx, msg);
210 if (msg == nullptr) {
211 msg = rmr_alloc_msg(pSctpParams.rmrCtx, 200);
212 } else if (msg->state == 0) {
217 if (count % 100 == 0) {
218 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %d ", msg->state);
225 for (auto &t : threads) {
230 opentracing::Tracer::Global()->Close();
240 void listener(sctp_params_t *params) {
242 auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
246 int num_of_SCTP_messages = 0;
247 int num_of_XAPP_messages = 0;
248 auto totalTime = 0.0;
250 mdclog_level_set(params->logLevel);
252 std::thread::id this_id = std::this_thread::get_id();
254 streambuf *oldCout = cout.rdbuf();
255 ostringstream memCout;
257 cout.rdbuf(memCout.rdbuf());
259 //return to the normal cout
263 memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
264 tid[memCout.str().length()] = 0;
265 mdclog_mdc_add("thread id", tid);
267 if (mdclog_level_get() >= MDCLOG_DEBUG) {
268 mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
271 RmrMessagesBuffer_t rmrMessageBuffer{};
272 //create and init RMR
273 rmrMessageBuffer.rmrCtx = params->rmrCtx;
275 auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
276 struct timespec end{0, 0};
277 struct timespec start{0, 0};
279 rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
280 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
282 ReportingMessages_t message {};
284 for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
285 rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
286 rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
290 if (mdclog_level_get() >= MDCLOG_DEBUG) {
291 mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
293 auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
294 if (numOfEvents < 0 && errno == EINTR) {
295 if (mdclog_level_get() >= MDCLOG_DEBUG) {
296 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
300 if (numOfEvents < 0) {
301 mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
304 for (auto i = 0; i < numOfEvents; i++) {
305 if (mdclog_level_get() >= MDCLOG_DEBUG) {
306 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
308 clock_gettime(CLOCK_MONOTONIC, &message.message.time);
309 start.tv_sec = message.message.time.tv_sec;
310 start.tv_nsec = message.message.time.tv_nsec;
311 if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
312 if (events[i].data.fd != params->rmrListenFd) {
313 auto *peerInfo = (ConnectedCU_t *)events[i].data.ptr;
314 mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
315 events[i].events, peerInfo->fileDescriptor, peerInfo->enodbName);
317 rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
318 "%s|Failed SCTP Connection",
319 peerInfo->enodbName);
320 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
321 message.message.asnLength = rmrMessageBuffer.sendMessage->len;
323 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
324 message.message.direction = 'N';
325 if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &span) != 0) {
326 mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
329 close(peerInfo->fileDescriptor);
330 cleanHashEntry((ConnectedCU_t *) events[i].data.ptr, params->sctpMap, &span);
332 mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", events[i].events);
334 } else if (events[i].events & EPOLLOUT) {
335 // this need to send waiting message from connection EINPROGRESS
336 auto *peerInfo = (ConnectedCU_t *) events[i].data.ptr;
338 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
340 mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
342 socklen_t retValLen = 0;
343 auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
344 if (rc != 0 || retVal != 0) {
346 rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
347 "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
348 peerInfo->enodbName, strerror(errno));
349 } else if (retVal != 0) {
350 rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
351 "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
352 peerInfo->enodbName);
355 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
356 message.message.asnLength = rmrMessageBuffer.sendMessage->len;
357 mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
358 message.message.direction = 'N';
359 if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &span) != 0) {
360 mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
362 memset(peerInfo->asnData, 0, peerInfo->asnLength);
363 peerInfo->asnLength = 0;
368 peerInfo->isConnected = true;
370 if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
371 peerInfo->mtype, &span) != 0) {
372 mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
376 message.message.asndata = (unsigned char *)peerInfo->asnData;
377 message.message.asnLength = peerInfo->asnLength;
378 message.message.messageType = peerInfo->mtype;
379 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
380 num_of_messages.fetch_add(1, std::memory_order_release);
381 if (mdclog_level_get() >= MDCLOG_DEBUG) {
382 mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
383 message.message.enodbName);
385 if (sendSctpMsg(peerInfo, message, params->sctpMap, &span) != 0) {
386 if (mdclog_level_get() >= MDCLOG_DEBUG) {
387 mdclog_write(MDCLOG_DEBUG, "Error write to SCTP %s %d", __func__, __LINE__);
392 memset(peerInfo->asnData, 0, peerInfo->asnLength);
393 peerInfo->asnLength = 0;
396 } else if (params->rmrListenFd == events[i].data.fd) {
397 // got message from XAPP
398 num_of_XAPP_messages++;
399 num_of_messages.fetch_add(1, std::memory_order_release);
400 if (mdclog_level_get() >= MDCLOG_DEBUG) {
401 mdclog_write(MDCLOG_DEBUG, "new message from RMR");
403 if (receiveXappMessages(params->epoll_fd,
406 message.message.time,
408 mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
411 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
412 * We must read whatever data is available completely, as we are running
413 * in edge-triggered mode and won't get a notification again for the same data. */
414 num_of_messages.fetch_add(1, std::memory_order_release);
415 if (mdclog_level_get() >= MDCLOG_DEBUG) {
416 mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
418 receiveDataFromSctp(&events[i],
420 num_of_SCTP_messages,
422 message.message.time,
426 clock_gettime(CLOCK_MONOTONIC, &end);
427 if (mdclog_level_get() >= MDCLOG_INFO) {
428 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
429 ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
431 if (mdclog_level_get() >= MDCLOG_DEBUG) {
432 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
433 end.tv_sec - start.tv_sec,
434 end.tv_nsec - start.tv_nsec);
450 int setSocketNoBlocking(int socket) {
451 auto flags = fcntl(socket, F_GETFL, 0);
454 mdclog_mdc_add("func", "fcntl");
455 mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
460 flags = (unsigned) flags | (unsigned) O_NONBLOCK;
461 if (fcntl(socket, F_SETFL, flags) == -1) {
462 mdclog_mdc_add("func", "fcntl");
463 mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
477 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m, otSpan *pSpan) {
479 auto lspan = opentracing::Tracer::Global()->StartSpan(
480 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
485 auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
486 char searchBuff[256]{};
488 snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
489 m->erase(searchBuff);
491 m->erase(val->enodbName);
500 * @param fd file discriptor
501 * @param data the asn data to send
502 * @param len length of the data
503 * @param enodbName the enodbName as in the map for printing purpose
504 * @param m map host information
505 * @param mtype message number
506 * @return 0 success, anegative number on fail
508 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m, otSpan *pSpan) {
510 auto lspan = opentracing::Tracer::Global()->StartSpan(
511 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
515 auto loglevel = mdclog_level_get();
516 int fd = peerInfo->fileDescriptor;
517 if (loglevel >= MDCLOG_DEBUG) {
518 mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
519 message.message.enodbName, __FUNCTION__);
523 //TODO add send to VES client or KAFKA
524 //format ts|mtype|direction(D/U)|length of asn data|raw data
525 // auto length = sizeof message.message.time
526 // + sizeof message.message.enodbName
527 // + sizeof message.message.messageType
528 // + sizeof message.message.direction
529 // + sizeof message.message.asnLength
530 // + message.message.asnLength;
532 if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
533 if (errno == EINTR) {
536 mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
537 // Prevent double free() of peerInfo in the event of connection failure.
538 // Returning failure will trigger, in x2/endc setup flow, RIC_SCTP_CONNECTION_FAILURE rmr message causing the E2M to retry.
539 if (!peerInfo->isConnected){
540 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
546 cleanHashEntry(peerInfo, m, &lspan);
548 char key[MAX_ENODB_NAME_SIZE * 2];
549 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
550 message.message.messageType);
551 if (loglevel >= MDCLOG_DEBUG) {
552 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
554 auto tmp = m->find(key);
564 message.message.direction = 'D';
565 // send report.buffer of size
566 buildJsonMessage(message);
568 if (loglevel >= MDCLOG_DEBUG) {
569 mdclog_write(MDCLOG_DEBUG,
570 "SCTP message for CU %s sent from %s",
571 message.message.enodbName,
585 * @param rmrMessageBuffer
588 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
590 auto lspan = opentracing::Tracer::Global()->StartSpan(
591 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
595 rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)(message.message.enodbName));
597 message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
598 message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
600 if (mdclog_level_get() >= MDCLOG_DEBUG) {
601 mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
602 message.message.enodbName, (unsigned long) message.message.asnLength);
613 * @param metaData all the data strip to structure
614 * @param data the data recived from xAPP
615 * @return 0 success all other values are fault
617 int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan) {
619 auto lspan = opentracing::Tracer::Global()->StartSpan(
620 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
624 auto loglevel = mdclog_level_get();
626 char delimiter[4] {};
627 memset(delimiter, 0, (size_t)4);
631 char *val = strtok_r(data, delimiter, &tmp);
632 if (val != nullptr) {
633 if (mdclog_level_get() >= MDCLOG_DEBUG) {
634 mdclog_write(MDCLOG_DEBUG, "SCTP ADDRESS parameter from message = %s", val);
636 memcpy(host, val, tmp - val );
638 mdclog_write(MDCLOG_ERR, "wrong Host Name for setup request %s", data);
645 val = strtok_r(nullptr, delimiter, &tmp);
646 if (val != nullptr) {
647 if (mdclog_level_get() >= MDCLOG_DEBUG) {
648 mdclog_write(MDCLOG_DEBUG, "PORT parameter from message = %s", val);
651 port = (uint16_t)strtol(val, &dummy, 10);
653 mdclog_write(MDCLOG_ERR, "wrong Port for setup request %s", data);
660 val = strtok_r(nullptr, delimiter, &tmp);
661 if (val != nullptr) {
662 if (mdclog_level_get() >= MDCLOG_DEBUG) {
663 mdclog_write(MDCLOG_DEBUG, "RAN NAME parameter from message = %s", val);
665 memcpy(message.message.enodbName, val, tmp - val);
667 mdclog_write(MDCLOG_ERR, "wrong gNb/Enodeb name for setup request %s", data);
674 val = strtok_r(nullptr, delimiter, &tmp);
675 if (val != nullptr) {
676 if (mdclog_level_get() >= MDCLOG_DEBUG) {
677 mdclog_write(MDCLOG_DEBUG, "ASN length parameter from message = %s", val);
680 message.message.asnLength = (uint16_t) strtol(val, &dummy, 10);
682 mdclog_write(MDCLOG_ERR, "wrong ASN length for setup request %s", data);
689 message.message.asndata = (unsigned char *)tmp; // tmp is local but point to the location in data
691 if (loglevel >= MDCLOG_INFO) {
692 mdclog_write(MDCLOG_INFO, "Message from Xapp RAN name = %s host address = %s port = %d",
693 message.message.enodbName, host, port);
706 * @param numOfMessages
707 * @param rmrMessageBuffer
712 int receiveDataFromSctp(struct epoll_event *events,
715 RmrMessagesBuffer_t &rmrMessageBuffer,
719 auto lspan = opentracing::Tracer::Global()->StartSpan(
720 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
724 /* We have data on the fd waiting to be read. Read and display it.
725 * We must read whatever data is available completely, as we are running
726 * in edge-triggered mode and won't get a notification again for the same data. */
728 auto loglevel = mdclog_level_get();
729 // get the identity of the interface
730 auto *peerInfo = (ConnectedCU_t *)events->data.ptr;
731 struct timespec start{0, 0};
732 struct timespec decodestart{0, 0};
733 struct timespec end{0, 0};
735 E2AP_PDU_t *pdu = nullptr;
737 ReportingMessages_t message {};
740 if (loglevel >= MDCLOG_DEBUG) {
741 mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", peerInfo->fileDescriptor);
742 clock_gettime(CLOCK_MONOTONIC, &start);
744 // read the buffer directly to rmr payload
745 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
746 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
747 read(peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
748 if (loglevel >= MDCLOG_DEBUG) {
749 mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
750 peerInfo->fileDescriptor, message.message.asnLength);
752 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
753 message.message.direction = 'U';
754 message.message.time.tv_nsec = ts.tv_nsec;
755 message.message.time.tv_sec = ts.tv_sec;
757 if (message.message.asnLength < 0) {
758 if (errno == EINTR) {
761 /* If errno == EAGAIN, that means we have read all
762 data. So go back to the main loop. */
763 if (errno != EAGAIN) {
764 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
766 } else if (loglevel >= MDCLOG_DEBUG) {
767 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", peerInfo->fileDescriptor);
770 } else if (message.message.asnLength == 0) {
771 /* End of file. The remote has closed the connection. */
772 if (loglevel >= MDCLOG_INFO) {
773 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
774 peerInfo->fileDescriptor);
781 if (loglevel >= MDCLOG_DEBUG) {
782 char printBuffer[4096]{};
783 char *tmp = printBuffer;
784 for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
785 snprintf(tmp, 2, "%02x", message.message.asndata[i]);
788 printBuffer[message.message.asnLength] = 0;
789 clock_gettime(CLOCK_MONOTONIC, &end);
790 mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
791 peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
792 mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data = : %s", message.message.asnLength,
794 clock_gettime(CLOCK_MONOTONIC, &decodestart);
797 rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
798 message.message.asndata, message.message.asnLength);
799 if (rval.code != RC_OK) {
800 mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
801 peerInfo->enodbName);
805 if (loglevel >= MDCLOG_DEBUG) {
806 clock_gettime(CLOCK_MONOTONIC, &end);
807 mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
808 peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
811 FILE *stream = open_memstream(&printBuffer, &size);
812 asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
813 mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
814 clock_gettime(CLOCK_MONOTONIC, &decodestart);
817 switch (pdu->present) {
818 case E2AP_PDU_PR_initiatingMessage: {//initiating message
819 asnInitiatingRequest(pdu, message, rmrMessageBuffer, &lspan);
822 case E2AP_PDU_PR_successfulOutcome: { //successful outcome
823 asnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
826 case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
827 asnUnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
831 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
834 if (loglevel >= MDCLOG_DEBUG) {
835 clock_gettime(CLOCK_MONOTONIC, &end);
836 mdclog_write(MDCLOG_DEBUG,
837 "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
838 peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
842 // remove the break for EAGAIN
844 if (pdu != nullptr) {
845 //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
846 //ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
847 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
850 //clock_gettime(CLOCK_MONOTONIC, &start);
852 // in case of break to avoid memory leak
853 if (pdu != nullptr) {
854 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
859 if (loglevel >= MDCLOG_INFO) {
860 mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", peerInfo->fileDescriptor);
862 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
863 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
865 "%s|CU disconnected unexpectedly",
866 peerInfo->enodbName);
867 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
869 if (sendRequestToXapp(message,
870 RIC_SCTP_CONNECTION_FAILURE,
873 mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
876 /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
877 close(peerInfo->fileDescriptor);
878 cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap, &lspan);
880 if (loglevel >= MDCLOG_DEBUG) {
881 clock_gettime(CLOCK_MONOTONIC, &end);
882 mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
883 end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
897 * @param rmrMessageBuffer
900 void asnInitiatingRequest(E2AP_PDU_t *pdu,
901 ReportingMessages_t &message,
902 RmrMessagesBuffer_t &rmrMessageBuffer,
905 auto lspan = opentracing::Tracer::Global()->StartSpan(
906 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
911 auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
912 if (mdclog_level_get() >= MDCLOG_INFO) {
913 mdclog_write(MDCLOG_INFO, "Initiating message %ld", procedureCode);
915 switch (procedureCode) {
916 case ProcedureCode_id_x2Setup: {
917 if (mdclog_level_get() >= MDCLOG_INFO) {
918 mdclog_write(MDCLOG_INFO, "Got Setup Initiating message from CU - %s",
919 message.message.enodbName);
923 case ProcedureCode_id_endcX2Setup: {
924 if (mdclog_level_get() >= MDCLOG_INFO) {
925 mdclog_write(MDCLOG_INFO, "Got X2 EN-DC Setup Request from CU - %s",
926 message.message.enodbName);
930 case ProcedureCode_id_ricSubscription: {
931 if (mdclog_level_get() >= MDCLOG_INFO) {
932 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Request message from CU - %s",
933 message.message.enodbName);
937 case ProcedureCode_id_ricSubscriptionDelete: {
938 if (mdclog_level_get() >= MDCLOG_INFO) {
939 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Delete Request message from CU - %s",
940 message.message.enodbName);
944 case ProcedureCode_id_endcConfigurationUpdate: {
945 if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
946 mdclog_write(MDCLOG_ERR, "E2 EN-DC CONFIGURATION UPDATE message failed to send to xAPP");
950 case ProcedureCode_id_eNBConfigurationUpdate: {
951 if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
952 mdclog_write(MDCLOG_ERR, "E2 EN-BC CONFIGURATION UPDATE message failed to send to xAPP");
956 case ProcedureCode_id_x2Removal: {
957 if (mdclog_level_get() >= MDCLOG_INFO) {
958 mdclog_write(MDCLOG_INFO, "Got E2 Removal Initiating message from CU - %s",
959 message.message.enodbName);
963 case ProcedureCode_id_loadIndication: {
964 if (sendRequestToXapp(message, RIC_ENB_LOAD_INFORMATION, rmrMessageBuffer, &lspan) != 0) {
965 mdclog_write(MDCLOG_ERR, "Load indication message failed to send to xAPP");
969 case ProcedureCode_id_resourceStatusReportingInitiation: {
970 if (mdclog_level_get() >= MDCLOG_INFO) {
971 mdclog_write(MDCLOG_INFO, "Got Status reporting initiation message from CU - %s",
972 message.message.enodbName);
976 case ProcedureCode_id_resourceStatusReporting: {
977 if (sendRequestToXapp(message, RIC_RESOURCE_STATUS_UPDATE, rmrMessageBuffer, &lspan) != 0) {
978 mdclog_write(MDCLOG_ERR, "Resource Status Reporting message failed to send to xAPP");
982 case ProcedureCode_id_reset: {
983 if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer, &lspan) != 0) {
984 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
988 case ProcedureCode_id_ricIndication: {
989 for (int i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
990 auto messageSent = false;
991 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
992 if (mdclog_level_get() >= MDCLOG_DEBUG) {
993 mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
995 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
996 if (mdclog_level_get() >= MDCLOG_DEBUG) {
997 mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
999 if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1000 static unsigned char tx[32];
1001 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1002 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1003 rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1004 rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1005 rmrMessageBuffer.sendMessage->state = 0;
1006 rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1007 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1010 mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1019 case ProcedureCode_id_errorIndication: {
1020 if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1021 mdclog_write(MDCLOG_ERR, "Error Indication message failed to send to xAPP");
1025 case ProcedureCode_id_ricServiceUpdate : {
1026 if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1027 mdclog_write(MDCLOG_ERR, "Service Update message failed to send to xAPP");
1031 case ProcedureCode_id_gNBStatusIndication : {
1032 if (sendRequestToXapp(message, RIC_GNB_STATUS_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1033 mdclog_write(MDCLOG_ERR, "RIC_GNB_STATUS_INDICATION failed to send to xAPP");
1038 mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1039 message.message.messageType = 0; // no RMR message type yet
1041 buildJsonMessage(message);
1057 * @param rmrMessageBuffer
1060 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, Sctp_Map_t *sctpMap,
1061 RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
1063 auto lspan = opentracing::Tracer::Global()->StartSpan(
1064 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1068 auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1069 if (mdclog_level_get() >= MDCLOG_INFO) {
1070 mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1072 switch (procedureCode) {
1073 case ProcedureCode_id_x2Setup: {
1074 if (mdclog_level_get() >= MDCLOG_INFO) {
1075 mdclog_write(MDCLOG_INFO, "Got Succesful Setup response from CU - %s",
1076 message.message.enodbName);
1078 if (sendResponseToXapp(message, RIC_X2_SETUP_RESP,
1079 RIC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1080 mdclog_write(MDCLOG_ERR, "Failed to send Succesful Setup response for CU - %s",
1081 message.message.enodbName);
1085 case ProcedureCode_id_endcX2Setup: { //X2_EN_DC_SETUP_REQUEST_FROM_CU
1086 if (mdclog_level_get() >= MDCLOG_INFO) {
1087 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC Setup response from CU - %s",
1088 message.message.enodbName);
1090 if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_RESP,
1091 RIC_ENDC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1092 mdclog_write(MDCLOG_ERR, "Failed to send Succesful X2 EN DC Setup response for CU - %s",
1093 message.message.enodbName);
1097 case ProcedureCode_id_endcConfigurationUpdate: {
1098 if (mdclog_level_get() >= MDCLOG_INFO) {
1099 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC CONFIGURATION UPDATE from CU - %s",
1100 message.message.enodbName);
1102 if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1103 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 EN DC CONFIGURATION response for CU - %s",
1104 message.message.enodbName);
1108 case ProcedureCode_id_eNBConfigurationUpdate: {
1109 if (mdclog_level_get() >= MDCLOG_INFO) {
1110 mdclog_write(MDCLOG_INFO, "Got Succesful E2 ENB CONFIGURATION UPDATE from CU - %s",
1111 message.message.enodbName);
1113 if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1114 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 ENB CONFIGURATION response for CU - %s",
1115 message.message.enodbName);
1119 case ProcedureCode_id_reset: {
1120 if (sendRequestToXapp(message, RIC_X2_RESET_RESP, rmrMessageBuffer, &lspan) != 0) {
1121 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2_RESET response for CU - %s",
1122 message.message.enodbName);
1127 case ProcedureCode_id_resourceStatusReportingInitiation: {
1128 if (sendRequestToXapp(message, RIC_RES_STATUS_RESP, rmrMessageBuffer, &lspan) != 0) {
1129 mdclog_write(MDCLOG_ERR,
1130 "Failed to send Succesful 2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1131 message.message.enodbName);
1135 case ProcedureCode_id_ricSubscription: {
1136 if (mdclog_level_get() >= MDCLOG_INFO) {
1137 mdclog_write(MDCLOG_INFO, "Got Succesful RIC Subscription response from CU - %s",
1138 message.message.enodbName);
1140 if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer, &lspan) != 0) {
1141 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1146 case ProcedureCode_id_ricSubscriptionDelete: {
1147 if (mdclog_level_get() >= MDCLOG_INFO) {
1148 mdclog_write(MDCLOG_INFO,
1149 "Got Succesful RIC Subscription Delete response from CU - %s",
1150 message.message.enodbName);
1152 if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer, &lspan) != 0) {
1153 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1157 case ProcedureCode_id_ricControl: {
1158 if (mdclog_level_get() >= MDCLOG_INFO) {
1159 mdclog_write(MDCLOG_INFO,
1160 "Got Succesful RIC control response from CU - %s",
1161 message.message.enodbName);
1164 i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1165 auto messageSent = false;
1166 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1167 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1168 mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1170 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1171 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1172 mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1174 if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1175 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1176 rmrMessageBuffer.sendMessage->state = 0;
1177 rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1178 static unsigned char tx[32];
1179 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1180 rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1181 rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1183 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1186 mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1196 mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1197 message.message.messageType = 0; // no RMR message type yet
1198 buildJsonMessage(message);
1214 * @param rmrMessageBuffer
1217 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1218 ReportingMessages_t &message,
1219 Sctp_Map_t *sctpMap,
1220 RmrMessagesBuffer_t &rmrMessageBuffer,
1223 auto lspan = opentracing::Tracer::Global()->StartSpan(
1224 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1228 auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1229 if (mdclog_level_get() >= MDCLOG_INFO) {
1230 mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1232 switch (procedureCode) {
1233 case ProcedureCode_id_x2Setup: {
1234 if (mdclog_level_get() >= MDCLOG_INFO) {
1235 mdclog_write(MDCLOG_INFO,
1236 "Got Unsuccessful Setup response from CU - %s",
1237 message.message.enodbName);
1239 if (sendResponseToXapp(message,
1240 RIC_X2_SETUP_FAILURE, RIC_X2_SETUP_REQ,
1244 mdclog_write(MDCLOG_ERR,
1245 "Failed to send Unsuccessful Setup response for CU - %s",
1246 message.message.enodbName);
1251 case ProcedureCode_id_endcX2Setup: {
1252 if (mdclog_level_get() >= MDCLOG_INFO) {
1253 mdclog_write(MDCLOG_INFO,
1254 "Got Unsuccessful E2 EN-DC Setup response from CU - %s",
1255 message.message.enodbName);
1257 if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_FAILURE,
1258 RIC_ENDC_X2_SETUP_REQ,
1262 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC Setup response for CU - %s",
1263 message.message.enodbName);
1267 case ProcedureCode_id_endcConfigurationUpdate: {
1268 if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1269 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC CONFIGURATION response for CU - %s",
1270 message.message.enodbName);
1274 case ProcedureCode_id_eNBConfigurationUpdate: {
1275 if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1276 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 ENB CONFIGURATION response for CU - %s",
1277 message.message.enodbName);
1281 case ProcedureCode_id_resourceStatusReportingInitiation: {
1282 if (sendRequestToXapp(message, RIC_RES_STATUS_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1283 mdclog_write(MDCLOG_ERR,
1284 "Failed to send Succesful E2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1285 message.message.enodbName);
1289 case ProcedureCode_id_ricSubscription: {
1290 if (mdclog_level_get() >= MDCLOG_INFO) {
1291 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Response from CU - %s",
1292 message.message.enodbName);
1294 if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1295 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1299 case ProcedureCode_id_ricSubscriptionDelete: {
1300 if (mdclog_level_get() >= MDCLOG_INFO) {
1301 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Delete Response from CU - %s",
1302 message.message.enodbName);
1304 if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1305 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1309 case ProcedureCode_id_ricControl: {
1310 if (mdclog_level_get() >= MDCLOG_INFO) {
1311 mdclog_write(MDCLOG_INFO, "Got UNSuccesful RIC control response from CU - %s",
1312 message.message.enodbName);
1315 i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1316 auto messageSent = false;
1317 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1318 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1319 mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1321 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1322 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1323 mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1325 if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1326 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1327 rmrMessageBuffer.sendMessage->state = 0;
1328 rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1329 static unsigned char tx[32];
1330 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1331 rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1332 rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1333 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1336 mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1346 mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1347 message.message.messageType = 0; // no RMR message type yet
1349 buildJsonMessage(message);
1364 * @param rmrMmessageBuffer
1368 int sendRequestToXapp(ReportingMessages_t &message,
1370 RmrMessagesBuffer_t &rmrMmessageBuffer,
1373 auto lspan = opentracing::Tracer::Global()->StartSpan(
1374 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1378 rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1379 (unsigned char *)message.message.enodbName,
1380 strlen(message.message.enodbName));
1381 message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1382 rmrMmessageBuffer.sendMessage->state = 0;
1383 static unsigned char tx[32];
1384 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1385 rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1387 auto rc = sendRmrMessage(rmrMmessageBuffer, message, &lspan);
1396 void *getRmrContext(char *rmrAddress, otSpan *pSpan) {
1398 auto lspan = opentracing::Tracer::Global()->StartSpan(
1399 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1401 // otSpan lspan = 0;
1403 void *rmrCtx = rmr_init(rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1406 if (rmrCtx == nullptr) {
1407 mdclog_write(MDCLOG_ERR, "RMR failed to initialise : %s", strerror(errno));
1415 rmr_set_stimeout(rmrCtx, 0); // disable retries for any send operation
1416 // we need to find that routing table exist and we can run
1417 if (mdclog_level_get() >= MDCLOG_INFO) {
1418 mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1423 if ((rmrReady = rmr_ready(rmrCtx)) == 0) {
1427 if (count % 60 == 0) {
1428 mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1431 if (mdclog_level_get() >= MDCLOG_INFO) {
1432 mdclog_write(MDCLOG_INFO, "RMR running");
1445 * @param rmrMessageBuffer
1450 int receiveXappMessages(int epoll_fd,
1451 Sctp_Map_t *sctpMap,
1452 RmrMessagesBuffer_t &rmrMessageBuffer,
1453 struct timespec &ts,
1456 auto lspan = opentracing::Tracer::Global()->StartSpan(
1457 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1461 if (rmrMessageBuffer.rcvMessage == nullptr) {
1463 mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1471 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1472 mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1474 rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1475 if (rmrMessageBuffer.rcvMessage == nullptr) {
1476 mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1477 rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1484 ReportingMessages_t message;
1485 message.message.direction = 'D';
1486 message.message.time.tv_nsec = ts.tv_nsec;
1487 message.message.time.tv_sec = ts.tv_sec;
1489 // get message payload
1490 //auto msgData = msg->payload;
1491 if (rmrMessageBuffer.rcvMessage->state != 0) {
1492 mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1499 switch (rmrMessageBuffer.rcvMessage->mtype) {
1500 case RIC_X2_SETUP_REQ: {
1501 if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1502 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1503 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1504 message.message.direction = 'N';
1505 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1506 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1508 "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1509 rmrMessageBuffer.sendMessage->state = 0;
1510 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1512 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1513 mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1515 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1516 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName);
1518 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1526 case RIC_ENDC_X2_SETUP_REQ: {
1527 if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1528 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1529 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1530 message.message.direction = 'N';
1531 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1532 snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
1533 "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1534 rmrMessageBuffer.sendMessage->state = 0;
1535 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1537 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1538 mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1541 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1542 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
1544 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1552 case RIC_ENDC_CONF_UPDATE: {
1553 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1554 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1562 case RIC_ENDC_CONF_UPDATE_ACK: {
1563 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1564 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_ACK");
1572 case RIC_ENDC_CONF_UPDATE_FAILURE: {
1573 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1574 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_FAILURE");
1583 case RIC_ENB_CONF_UPDATE: {
1584 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1585 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1593 case RIC_ENB_CONF_UPDATE_ACK: {
1594 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1595 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_ACK");
1603 case RIC_ENB_CONF_UPDATE_FAILURE: {
1604 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1605 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_FAILURE");
1613 case RIC_RES_STATUS_REQ: {
1614 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1615 mdclog_write(MDCLOG_ERR, "Failed to send RIC_RES_STATUS_REQ");
1624 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1625 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
1633 case RIC_SUB_DEL_REQ: {
1634 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1635 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
1643 case RIC_CONTROL_REQ: {
1644 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1645 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
1653 case RIC_SERVICE_QUERY: {
1654 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1655 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
1663 case RIC_SERVICE_UPDATE_ACK: {
1664 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1665 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
1673 case RIC_SERVICE_UPDATE_FAILURE: {
1674 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1675 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
1683 case RIC_X2_RESET: {
1684 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1685 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
1693 case RIC_X2_RESET_RESP: {
1694 if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1695 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
1703 case RIC_SCTP_CLEAR_ALL: {
1704 mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
1705 // loop on all keys and close socket and then erase all map.
1707 sctpMap->getKeys(v);
1708 for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
1709 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
1710 auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
1711 if (peerInfo == nullptr) {
1714 close(peerInfo->fileDescriptor);
1715 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1716 message.message.direction = 'D';
1717 message.message.time.tv_nsec = ts.tv_nsec;
1718 message.message.time.tv_sec = ts.tv_sec;
1720 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1721 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1723 "%s|RIC_SCTP_CLEAR_ALL",
1724 peerInfo->enodbName);
1725 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1726 mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
1727 if (sendRequestToXapp(message,
1728 RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1729 mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1740 mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
1741 message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1742 message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1743 message.message.time.tv_nsec = ts.tv_nsec;
1744 message.message.time.tv_sec = ts.tv_sec;
1745 message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
1747 buildJsonMessage(message);
1755 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1756 mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
1765 * Send message to the CU that is not expecting for successful or unsuccessful results
1766 * @param messageBuffer
1768 * @param failedMsgId
1773 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
1774 ReportingMessages_t &message,
1776 Sctp_Map_t *sctpMap,
1779 auto lspan = opentracing::Tracer::Global()->StartSpan(
1780 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1785 getRequestMetaData(message, messageBuffer, &lspan);
1786 if (mdclog_level_get() >= MDCLOG_INFO) {
1787 mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
1790 auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId, &lspan);
1801 * @param messageBuffer
1803 * @param failedMesgId
1807 int sendMessagetoCu(Sctp_Map_t *sctpMap,
1808 RmrMessagesBuffer_t &messageBuffer,
1809 ReportingMessages_t &message,
1813 auto lspan = opentracing::Tracer::Global()->StartSpan(
1814 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1818 auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
1819 if (peerInfo == nullptr) {
1820 if (failedMesgId != 0) {
1821 sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId, &lspan);
1823 mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
1833 message.message.messageType = messageBuffer.rcvMessage->mtype;
1834 auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
1844 * @param rmrCtx the rmr context to send and receive
1845 * @param msg the msg we got fromxApp
1846 * @param metaData data from xApp in ordered struct
1847 * @param failedMesgId the return message type error
1850 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId,
1853 auto lspan = opentracing::Tracer::Global()->StartSpan(
1854 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1858 rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
1859 msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
1860 message.message.enodbName);
1861 if (mdclog_level_get() >= MDCLOG_INFO) {
1862 mdclog_write(MDCLOG_INFO, "%s", msg->payload);
1864 msg->mtype = failedMesgId;
1867 static unsigned char tx[32];
1868 snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1869 rmr_bytes2xact(msg, tx, strlen((const char *) tx));
1871 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1873 lspan->Finish();pLogSink
1879 * Send Response back to xApp, message is used only when there was a request from the xApp
1881 * @param enodbName the name of the gNb/eNodeB
1882 * @param msgType the value of the message to the xApp
1883 * @param requestType The request that was sent by the xAPP
1884 * @param rmrCtx the rmr identifier
1885 * @param sctpMap hash map holds data on the requestrs
1886 * @param buf the buffer to send to xAPP
1887 * @param size size of the buffer to send
1890 int sendResponseToXapp(ReportingMessages_t &message,
1893 RmrMessagesBuffer_t &rmrMessageBuffer,
1894 Sctp_Map_t *sctpMap,
1897 auto lspan = opentracing::Tracer::Global()->StartSpan(
1898 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1902 char key[MAX_ENODB_NAME_SIZE * 2];
1903 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, requestType);
1905 auto xact = sctpMap->find(key);
1906 if (xact == nullptr) {
1907 mdclog_write(MDCLOG_ERR, "NO Request %s found for this response from CU: %s", key,
1908 message.message.enodbName);
1915 sctpMap->erase(key);
1917 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = msgType; //SETUP_RESPONSE_MESSAGE_TYPE;
1918 rmr_bytes2payload(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.asndata,
1919 message.message.asnLength);
1920 rmr_bytes2xact(rmrMessageBuffer.sendMessage, (const unsigned char *)xact, strlen((const char *)xact));
1921 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
1922 rmrMessageBuffer.sendMessage->state = 0;
1924 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1925 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
1929 auto rc = sendRmrMessage(rmrMessageBuffer, message, &lspan);
1937 * build the SCTP connection to eNodB or gNb
1938 * @param rmrMessageBuffer
1945 int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
1946 ReportingMessages_t &message,
1948 Sctp_Map_t *sctpMap,
1951 auto lspan = opentracing::Tracer::Global()->StartSpan(
1952 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1956 struct sockaddr_in6 servaddr{};
1957 struct addrinfo hints{}, *result;
1958 auto msgData = rmrMessageBuffer.rcvMessage->payload;
1959 unsigned char meid[RMR_MAX_MEID]{};
1963 message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
1964 rmr_mbuf_t *msg = rmrMessageBuffer.rcvMessage;
1965 rmr_get_meid(msg, meid);
1967 if (mdclog_level_get() >= MDCLOG_INFO) {
1968 mdclog_write(MDCLOG_INFO, "message %d Received for MEID :%s. SETUP/EN-DC Setup Request from xApp, Message = %s",
1969 msg->mtype, meid, msgData);
1971 if (getSetupRequestMetaData(message, (char *)msgData, host, port, &lspan) < 0) {
1972 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1973 mdclog_write(MDCLOG_DEBUG, "Error in setup parameters %s, %d", __func__, __LINE__);
1981 //// message asndata points to the start of the asndata of the message and not to start of payload
1982 // search if the same host:port but not the same enodbname
1983 char searchBuff[256]{};
1984 snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", host, port);
1985 auto e = (char *)sctpMap->find(searchBuff);
1987 // found one compare if not the same
1988 if (strcmp(message.message.enodbName, e) != 0) {
1989 mdclog_write(MDCLOG_ERR,
1990 "Try to connect CU %s to Host %s but %s already connected",
1991 message.message.enodbName, host, e);
1999 // check if not alread connected. if connected send the request and return
2000 auto *peerInfo = (ConnectedCU_t *)sctpMap->find(message.message.enodbName);
2001 if (peerInfo != nullptr) {
2004 // "Device %s already connected please remove and then setup again",
2005 // message.message.enodbName);
2006 if (mdclog_level_get() >= MDCLOG_INFO) {
2007 mdclog_write(MDCLOG_INFO,
2008 "Device already connected to %s",
2009 message.message.enodbName);
2011 message.message.messageType = msg->mtype;
2012 auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2014 mdclog_write(MDCLOG_ERR, "failed write to SCTP %s, %d", __func__, __LINE__);
2021 char key[MAX_ENODB_NAME_SIZE * 2];
2022 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2023 int xaction_len = strlen((const char *) msg->xaction);
2024 auto *xaction = (unsigned char *) calloc(1, xaction_len);
2025 memcpy(xaction, msg->xaction, xaction_len);
2026 sctpMap->setkey(key, xaction);
2027 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2028 mdclog_write(MDCLOG_DEBUG, "set key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2036 peerInfo = (ConnectedCU_t *) calloc(1, sizeof(ConnectedCU_t));
2037 memcpy(peerInfo->enodbName, message.message.enodbName, sizeof(message.message.enodbName));
2040 if ((peerInfo->fileDescriptor = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP)) < 0) {
2041 mdclog_write(MDCLOG_ERR, "Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2049 if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval) != 0) {
2050 mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEPORT Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2057 if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) != 0) {
2058 mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEADDR Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2064 servaddr.sin6_family = AF_INET6;
2066 struct sockaddr_in6 localAddr {};
2067 localAddr.sin6_family = AF_INET6;
2068 localAddr.sin6_addr = in6addr_any;
2069 localAddr.sin6_port = htons(SRC_PORT);
2071 if (bind(peerInfo->fileDescriptor, (struct sockaddr*)&localAddr , sizeof(struct sockaddr_in6)) < 0) {
2072 mdclog_write(MDCLOG_ERR, "bind Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2077 }//Ends the binding.
2079 memset(&hints, 0, sizeof hints);
2080 hints.ai_flags = AI_NUMERICHOST;
2081 if (getaddrinfo(host, nullptr, &hints, &result) < 0) {
2082 close(peerInfo->fileDescriptor);
2083 mdclog_write(MDCLOG_ERR, "getaddrinfo error for %s, Error = %s", host, strerror(errno));
2089 memcpy(&servaddr, result->ai_addr, sizeof(struct sockaddr_in6));
2090 freeaddrinfo(result);
2092 servaddr.sin6_port = htons(port); /* daytime server */
2093 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2094 mdclog_write(MDCLOG_DEBUG, "Send Connect FD = %d host : %s port %d",
2095 peerInfo->fileDescriptor,
2101 if (addToEpoll(epoll_fd, peerInfo, (EPOLLOUT | EPOLLIN | EPOLLET), sctpMap, message.message.enodbName,
2102 msg->mtype, &lspan) != 0) {
2109 char hostBuff[NI_MAXHOST];
2110 char portBuff[NI_MAXHOST];
2112 if (getnameinfo((SA *) &servaddr, sizeof(servaddr),
2113 hostBuff, sizeof(hostBuff),
2114 portBuff, sizeof(portBuff),
2115 (uint) (NI_NUMERICHOST) | (uint) (NI_NUMERICSERV)) != 0) {
2116 mdclog_write(MDCLOG_ERR, "getnameinfo() Error, %s %s %d", strerror(errno), __func__, __LINE__);
2123 if (setSocketNoBlocking(peerInfo->fileDescriptor) != 0) {
2124 mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on sctpPort %s", hostBuff,
2126 close(peerInfo->fileDescriptor);
2133 memcpy(peerInfo->hostName, hostBuff, strlen(hostBuff));
2134 peerInfo->hostName[strlen(hostBuff)] = 0;
2135 memcpy(peerInfo->portNumber, portBuff, strlen(portBuff));
2136 peerInfo->portNumber[strlen(portBuff)] = 0;
2138 // map by enoodb/gnb name
2139 sctpMap->setkey(message.message.enodbName, peerInfo);
2140 //map host and port to enodeb
2141 sctpMap->setkey(searchBuff, message.message.enodbName);
2143 // save message for the return values
2144 char key[MAX_ENODB_NAME_SIZE * 2];
2145 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2146 int xaction_len = strlen((const char *) msg->xaction);
2147 auto *xaction = (unsigned char *) calloc(1, xaction_len);
2148 memcpy(xaction, msg->xaction, xaction_len);
2149 sctpMap->setkey(key, xaction);
2150 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2151 mdclog_write(MDCLOG_DEBUG, "End building peerinfo: %s for CU %s", key, message.message.enodbName);
2154 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2155 mdclog_write(MDCLOG_DEBUG, "Send connect to FD %d, %s, %d",
2156 peerInfo->fileDescriptor, __func__, __LINE__);
2158 if (connect(peerInfo->fileDescriptor, (SA *) &servaddr, sizeof(servaddr)) < 0) {
2159 if (errno != EINPROGRESS) {
2160 mdclog_write(MDCLOG_ERR, "connect FD %d to host : %s port %d, %s",
2161 peerInfo->fileDescriptor, host, port, strerror(errno));
2162 close(peerInfo->fileDescriptor);
2168 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2169 mdclog_write(MDCLOG_DEBUG,
2170 "Connect to FD %d returned with EINPROGRESS : %s",
2171 peerInfo->fileDescriptor, strerror(errno));
2173 // since message.message.asndata is pointing to the asndata in the rmr message payload we copy it like this
2174 memcpy(peerInfo->asnData, message.message.asndata, message.message.asnLength);
2175 peerInfo->asnLength = message.message.asnLength;
2176 peerInfo->mtype = msg->mtype;
2183 if (mdclog_level_get() >= MDCLOG_INFO) {
2184 mdclog_write(MDCLOG_INFO, "Connect to FD %d returned OK without EINPROGRESS", peerInfo->fileDescriptor);
2187 peerInfo->isConnected = true;
2189 if (modifyToEpoll(epoll_fd, peerInfo, (EPOLLIN | EPOLLET), sctpMap, message.message.enodbName, msg->mtype,
2197 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2198 mdclog_write(MDCLOG_DEBUG, "Connected to host : %s port %d", host, port);
2201 message.message.messageType = msg->mtype;
2202 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2203 mdclog_write(MDCLOG_DEBUG, "Send SCTP message to FD %d", peerInfo->fileDescriptor);
2205 if (sendSctpMsg(peerInfo, message, sctpMap, &lspan) != 0) {
2206 mdclog_write(MDCLOG_ERR, "Error write to SCTP %s %d", __func__, __LINE__);
2212 memset(peerInfo->asnData, 0, message.message.asnLength);
2213 peerInfo->asnLength = 0;
2214 peerInfo->mtype = 0;
2216 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2217 mdclog_write(MDCLOG_DEBUG, "Sent message to SCTP for %s", message.message.enodbName);
2236 int addToEpoll(int epoll_fd,
2237 ConnectedCU_t *peerInfo,
2239 Sctp_Map_t *sctpMap,
2244 auto lspan = opentracing::Tracer::Global()->StartSpan(
2245 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2250 struct epoll_event event{};
2251 event.data.ptr = peerInfo;
2252 event.events = events;
2253 if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2254 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2255 mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2256 strerror(errno), __func__, __LINE__);
2258 close(peerInfo->fileDescriptor);
2259 cleanHashEntry(peerInfo, sctpMap, &lspan);
2260 char key[MAX_ENODB_NAME_SIZE * 2];
2261 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2262 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2263 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2265 auto tmp = sctpMap->find(key);
2269 sctpMap->erase(key);
2270 mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2293 int modifyToEpoll(int epoll_fd,
2294 ConnectedCU_t *peerInfo,
2296 Sctp_Map_t *sctpMap,
2301 auto lspan = opentracing::Tracer::Global()->StartSpan(
2302 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2307 struct epoll_event event{};
2308 event.data.ptr = peerInfo;
2309 event.events = events;
2310 if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2311 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2312 mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2313 strerror(errno), __func__, __LINE__);
2315 close(peerInfo->fileDescriptor);
2316 cleanHashEntry(peerInfo, sctpMap, &lspan);
2317 char key[MAX_ENODB_NAME_SIZE * 2];
2318 snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2319 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2320 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2322 auto tmp = sctpMap->find(key);
2326 sctpMap->erase(key);
2327 mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2340 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan) {
2342 auto lspan = opentracing::Tracer::Global()->StartSpan(
2343 __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2345 // otSpan lspan = 0;
2347 //serialize the span
2349 std::unordered_map<std::string, std::string> data;
2350 RICCarrierWriter carrier(data);
2351 opentracing::Tracer::Global()->Inject((lspan.get())->context(), carrier);
2352 nlohmann::json j = data;
2353 std::string str = j.dump();
2354 static auto maxTraceLength = 0;
2356 maxTraceLength = str.length() > maxTraceLength ? str.length() : maxTraceLength;
2357 // serialized context can be put to RMR message using function:
2358 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2359 mdclog_write(MDCLOG_DEBUG, "max trace length is %d trace data length = %ld data = %s", maxTraceLength,
2360 str.length(), str.c_str());
2362 rmr_set_trace(rmrMessageBuffer.sendMessage, (const unsigned char *) str.c_str(), str.length());
2364 buildJsonMessage(message);
2366 rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2368 if (rmrMessageBuffer.sendMessage == nullptr) {
2369 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2370 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2377 if (rmrMessageBuffer.sendMessage->state != 0) {
2378 char meid[RMR_MAX_MEID]{};
2379 if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2381 rmrMessageBuffer.sendMessage->state = 0;
2382 mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2383 rmrMessageBuffer.sendMessage->mtype,
2384 rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2385 rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2386 if (rmrMessageBuffer.sendMessage == nullptr) {
2387 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2388 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2393 } else if (rmrMessageBuffer.sendMessage->state != 0) {
2394 mdclog_write(MDCLOG_ERR,
2395 "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2396 translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2397 rmrMessageBuffer.sendMessage->mtype,
2398 rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2399 auto rc = rmrMessageBuffer.sendMessage->state;
2406 mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2407 translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2408 rmrMessageBuffer.sendMessage->mtype,
2409 rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2413 return rmrMessageBuffer.sendMessage->state;
2419 void buildJsonMessage(ReportingMessages_t &message) {
2420 message.outLen = sizeof(message.base64Data);
2421 base64::encode((const unsigned char *)message.message.asndata,
2422 (const int)message.message.asnLength,
2425 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2426 mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2427 (int)message.message.asnLength,
2428 (int)message.outLen);
2432 // // build day time to seconds from epoc
2433 // strftime(buff, sizeof message.message.time, "%D %T", gmtime(&message.message.time.tv_sec));
2434 // // add nanosecond
2435 // snprintf(buff, sizeof buff, "%s.%09ld UTC\n", buff, message.message.time.tv_nsec);
2437 message.bufferLen = snprintf(message.buffer, sizeof(message.buffer),
2438 "{\"header\": {\"ts\": \"%ld.%09ld\","
2439 "\"ranName\": \"%s\","
2440 "\"messageType\": %d,"
2441 "\"direction\": \"%c\"},"
2442 "\"base64Length\": %d,"
2443 "\"asnBase64\": \"%s\"}",
2444 message.message.time.tv_sec,
2445 message.message.time.tv_nsec,
2446 message.message.enodbName,
2447 message.message.messageType,
2448 message.message.direction,
2449 (int)message.outLen,
2450 message.base64Data);
2451 static src::logger_mt& lg = my_logger::get();
2453 BOOST_LOG(lg) << message.buffer;
2459 * take RMR error code to string
2463 string translateRmrErrorMessages(int state) {
2467 str = "RMR_OK - state is good";
2469 case RMR_ERR_BADARG:
2470 str = "RMR_ERR_BADARG - argument passd to function was unusable";
2472 case RMR_ERR_NOENDPT:
2473 str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2476 str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2479 str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2481 case RMR_ERR_SENDFAILED:
2482 str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2484 case RMR_ERR_CALLFAILED:
2485 str = "RMR_ERR_CALLFAILED - unable to send call() message";
2487 case RMR_ERR_NOWHOPEN:
2488 str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2491 str = "RMR_ERR_WHID - wormhole id was invalid";
2493 case RMR_ERR_OVERFLOW:
2494 str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2497 str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2499 case RMR_ERR_RCVFAILED:
2500 str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2502 case RMR_ERR_TIMEOUT:
2503 str = "RMR_ERR_TIMEOUT - message processing call timed out";
2506 str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2509 str = "RMR_ERR_TRUNC - received message likely truncated";
2511 case RMR_ERR_INITFAILED:
2512 str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2514 case RMR_ERR_NOTSUPP:
2515 str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2519 snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);