Sync from Azure to LF
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 // TODO: High-level file comment.
17
18
19 #include "sctpThread.h"
20
21
22 using namespace std::placeholders;
23 #ifdef __TRACING__
24 using namespace opentracing;
25 #endif
26 //#ifdef __cplusplus
27 //extern "C"
28 //{
29 //#endif
30
31 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
32
33 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
34
35 void init_log() {
36     mdclog_attr_t *attr;
37     mdclog_attr_init(&attr);
38     mdclog_attr_set_ident(attr, "E2Terminator");
39     mdclog_init(attr);
40     mdclog_attr_destroy(attr);
41 }
42
43
44 //std::atomic<int64_t> rmrCounter{0};
45 std::atomic<int64_t> num_of_messages{0};
46 static long transactionCounter = 0;
47
48
49 int main(const int argc, char **argv) {
50     sctp_params_t pSctpParams;
51 #ifdef __TRACING__
52     opentracing::Tracer::InitGlobal(tracelibcpp::createTracer("E2 Terminator"));
53     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
54 #else
55     otSpan span = 0;
56 #endif
57
58     unsigned num_cpus = std::thread::hardware_concurrency();
59 #ifdef ERROR_LEVEL
60     mdclog_severity_t loglevel = MDCLOG_ERR;
61 #else
62     mdclog_severity_t loglevel = MDCLOG_INFO;
63 #endif
64     init_log();
65     mdclog_level_set(loglevel);
66
67     if (argc < 7) {
68         mdclog_mdc_add("app", argv[0]);
69         mdclog_write(MDCLOG_ERR, "Usage nano <rmr port> logLevel <debug/warning/info/error> volume <PATH to log file location>");
70         return -1;
71     }
72
73     {
74         std::random_device device{};
75         std::mt19937 generator(device());
76         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
77
78         transactionCounter = distribution(generator);
79     }
80
81     char tmpLogFilespec[VOLUME_URL_SIZE];
82     tmpLogFilespec[0] = 0;
83     pSctpParams.volume[0] = 0;
84     //read paramters from CLI
85     for (auto i = 1; i < argc; i += 2) {
86         char *dummy;
87         if (strcasecmp("nano", argv[i]) == 0) {
88             pSctpParams.rmrPort = (uint16_t) (uint16_t) strtol(argv[i + 1], &dummy, 10);
89         } else if (strcasecmp("loglevel", argv[i]) == 0) {
90             if (strcasecmp("debug", argv[i + 1]) == 0) {
91                 loglevel = MDCLOG_DEBUG;
92             } else if (strcasecmp("info", argv[i + 1]) == 0) {
93                 loglevel = MDCLOG_INFO;
94             } else if (strcasecmp("warning", argv[i + 1]) == 0) {
95                 loglevel = MDCLOG_WARN;
96             } else if (strcasecmp("error", argv[i + 1]) == 0) {
97                 loglevel = MDCLOG_ERR;
98             }
99         } else if (strcasecmp("volume", argv[i]) == 0) {
100             snprintf(pSctpParams.volume, VOLUME_URL_SIZE, "%s", argv[i + 1]);
101             snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", argv[i + 1]);
102         }
103     }
104
105
106     pSctpParams.logLevel = loglevel;
107     snprintf(pSctpParams.rmrAddress, sizeof(pSctpParams.rmrAddress) - 1, "%d", (int) (pSctpParams.rmrPort));
108
109     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.log");
110
111     if (mdclog_level_get() >= MDCLOG_INFO) {
112         mdclog_mdc_add("RMR Port", to_string(pSctpParams.rmrPort).c_str());
113         mdclog_mdc_add("LogLevel", to_string(pSctpParams.logLevel).c_str());
114         mdclog_mdc_add("volume", pSctpParams.volume);
115         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
116
117         mdclog_write(MDCLOG_INFO, "running parameters");
118     }
119     mdclog_mdc_clean();
120
121     // Files written to the current working directory
122     boostLogger = logging::add_file_log(
123             keywords::file_name = tmpLogFilespec,
124             keywords::rotation_size = 10 * 1024 * 1024,
125             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
126             keywords::format = "%Message%"
127             //keywords::format = "[%TimeStamp%]: %Message%" // use each log with time stamp
128     );
129
130     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
131     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
132          keywords::target = pSctpParams.volume 
133          //keywords::max_size = 16 * 1024 * 1024, 
134          //keywords::min_free_space = 100 * 1024 * 1024  
135     ));
136
137     // Upon restart, scan the directory for files matching the file_name pattern
138     boostLogger->locked_backend()->scan_for_files();
139
140     // Enable auto-flushing after each log record written
141     if (mdclog_level_get() >= MDCLOG_DEBUG) {
142         boostLogger->locked_backend()->auto_flush(true);
143     }
144
145     // start epoll
146     pSctpParams.epoll_fd = epoll_create1(0);
147     if (pSctpParams.epoll_fd == -1) {
148         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
149         exit(-1);
150     }
151
152     pSctpParams.rmrCtx = getRmrContext(pSctpParams.rmrAddress, &span);
153     if (pSctpParams.rmrCtx == nullptr) {
154         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
155         close(pSctpParams.epoll_fd);
156         exit(-1);
157     }
158     rmr_init_trace(pSctpParams.rmrCtx, 200);
159     // get the RMR fd for the epoll
160     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
161     struct epoll_event event{};
162     // add RMR fd to epoll
163     event.events = (EPOLLIN);
164     event.data.fd = pSctpParams.rmrListenFd;
165     // add listening RMR FD to epoll
166     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
167         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
168         close(pSctpParams.rmrListenFd);
169         rmr_close(pSctpParams.rmrCtx);
170         close(pSctpParams.epoll_fd);
171         exit(-1);
172     }
173
174     pSctpParams.sctpMap = new mapWrapper();
175
176     std::vector<std::thread> threads(num_cpus);
177 //    std::vector<std::thread> threads;
178
179     num_cpus = 1;
180     for (unsigned int i = 0; i < num_cpus; i++) {
181         threads[i] = std::thread(listener, &pSctpParams);
182
183         cpu_set_t cpuset;
184         CPU_ZERO(&cpuset);
185         CPU_SET(i, &cpuset);
186         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
187         if (rc != 0) {
188             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
189         }
190
191 //        threads.emplace_back(std::thread(listener, &pSctpParams));
192     }
193
194     //send to e2 manager init of e2 term
195     //E2_TERM_INIT
196     auto term_init = false;
197
198     char buff[128]{};
199     auto len = snprintf(buff, 128, "E2 terminator started");
200     rmr_mbuf_t *msg = rmr_alloc_msg(pSctpParams.rmrCtx, 200);
201     auto count = 0;
202     while (!term_init) {
203         msg->mtype = E2_TERM_INIT;
204         msg->state = 0;
205         rmr_bytes2payload(msg, (unsigned char *) buff, len);
206         static unsigned char tx[32];
207         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
208         rmr_bytes2xact(msg, tx, txLen);
209         msg = rmr_send_msg(pSctpParams.rmrCtx, msg);
210         if (msg == nullptr) {
211             msg = rmr_alloc_msg(pSctpParams.rmrCtx, 200);
212         } else if (msg->state == 0) {
213             term_init = true;
214             rmr_free_msg(msg);
215             //break;
216         } else {
217             if (count % 100 == 0) {
218                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %d ", msg->state);
219             }
220             sleep(1);
221         }
222         count++;
223     }
224
225     for (auto &t : threads) {
226         t.join();
227     }
228
229 #ifdef __TRACING__
230     opentracing::Tracer::Global()->Close();
231 #endif
232     return 0;
233 }
234
235 /**
236  *
237  * @param args
238  * @return
239  */
240 void listener(sctp_params_t *params) {
241 #ifdef __TRACING__
242     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
243 #else
244     otSpan span = 0;
245 #endif
246     int num_of_SCTP_messages = 0;
247     int num_of_XAPP_messages = 0;
248     auto totalTime = 0.0;
249     mdclog_mdc_clean();
250     mdclog_level_set(params->logLevel);
251
252     std::thread::id this_id = std::this_thread::get_id();
253     //save cout
254     streambuf *oldCout = cout.rdbuf();
255     ostringstream memCout;
256     // create new cout
257     cout.rdbuf(memCout.rdbuf());
258     cout << this_id;
259     //return to the normal cout
260     cout.rdbuf(oldCout);
261
262     char tid[32];
263     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
264     tid[memCout.str().length()] = 0;
265     mdclog_mdc_add("thread id", tid);
266
267     if (mdclog_level_get() >= MDCLOG_DEBUG) {
268         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
269     }
270
271     RmrMessagesBuffer_t rmrMessageBuffer{};
272     //create and init RMR
273     rmrMessageBuffer.rmrCtx = params->rmrCtx;
274
275     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
276     struct timespec end{0, 0};
277     struct timespec start{0, 0};
278
279     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
280     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
281
282     ReportingMessages_t message {};
283
284     for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
285         rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
286         rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
287     }
288
289     while (true) {
290         if (mdclog_level_get() >= MDCLOG_DEBUG) {
291             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
292         }
293         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
294         if (numOfEvents < 0 && errno == EINTR) {
295             if (mdclog_level_get() >= MDCLOG_DEBUG) {
296                 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
297             }
298             continue;
299         }
300         if (numOfEvents < 0) {
301             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
302             return;
303         }
304         for (auto i = 0; i < numOfEvents; i++) {
305             if (mdclog_level_get() >= MDCLOG_DEBUG) {
306                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
307             }
308             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
309             start.tv_sec = message.message.time.tv_sec;
310             start.tv_nsec = message.message.time.tv_nsec;
311             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
312                 if (events[i].data.fd != params->rmrListenFd) {
313                     auto *peerInfo = (ConnectedCU_t *)events[i].data.ptr;
314                     mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
315                                  events[i].events, peerInfo->fileDescriptor, peerInfo->enodbName);
316
317                     rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
318                                                          "%s|Failed SCTP Connection",
319                                                          peerInfo->enodbName);
320                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
321                     message.message.asnLength = rmrMessageBuffer.sendMessage->len;
322
323                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
324                     message.message.direction = 'N';
325                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &span) != 0) {
326                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
327                     }
328
329                     close(peerInfo->fileDescriptor);
330                     cleanHashEntry((ConnectedCU_t *) events[i].data.ptr, params->sctpMap, &span);
331                 } else {
332                     mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", events[i].events);
333                 }
334             } else if (events[i].events & EPOLLOUT) {
335                 // this need to send waiting message from connection EINPROGRESS
336                 auto *peerInfo = (ConnectedCU_t *) events[i].data.ptr;
337
338                 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
339
340                 mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
341                 auto retVal = 0;
342                 socklen_t retValLen = 0;
343                 auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
344                 if (rc != 0 || retVal != 0) {
345                     if (rc != 0) {
346                         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
347                                                                        "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
348                                                                        peerInfo->enodbName, strerror(errno));
349                     } else if (retVal != 0) {
350                         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
351                                                                        "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
352                                                                        peerInfo->enodbName);
353                     }
354
355                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
356                     message.message.asnLength = rmrMessageBuffer.sendMessage->len;
357                     mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
358                     message.message.direction = 'N';
359                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &span) != 0) {
360                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
361                     }
362                     memset(peerInfo->asnData, 0, peerInfo->asnLength);
363                     peerInfo->asnLength = 0;
364                     peerInfo->mtype = 0;
365                     continue;
366                 }
367
368                 peerInfo->isConnected = true;
369
370                 if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
371                                   peerInfo->mtype, &span) != 0) {
372                     mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
373                     continue;
374                 }
375
376                 message.message.asndata = (unsigned char *)peerInfo->asnData;
377                 message.message.asnLength = peerInfo->asnLength;
378                 message.message.messageType = peerInfo->mtype;
379                 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
380                 num_of_messages.fetch_add(1, std::memory_order_release);
381                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
382                     mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
383                                  message.message.enodbName);
384                 }
385                 if (sendSctpMsg(peerInfo, message, params->sctpMap, &span) != 0) {
386                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
387                         mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
388                     }
389                     continue;
390                 }
391
392                 memset(peerInfo->asnData, 0, peerInfo->asnLength);
393                 peerInfo->asnLength = 0;
394                 peerInfo->mtype = 0;
395
396             } else if (params->rmrListenFd == events[i].data.fd) {
397                 // got message from XAPP
398                 num_of_XAPP_messages++;
399                 num_of_messages.fetch_add(1, std::memory_order_release);
400                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
401                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
402                 }
403                 if (receiveXappMessages(params->epoll_fd,
404                                         params->sctpMap,
405                                         rmrMessageBuffer,
406                                         message.message.time,
407                                         &span) != 0) {
408                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
409                 }
410             } else {
411                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
412                  * We must read whatever data is available completely, as we are running
413                  *  in edge-triggered mode and won't get a notification again for the same data. */
414                 num_of_messages.fetch_add(1, std::memory_order_release);
415                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
416                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
417                 }
418                 receiveDataFromSctp(&events[i],
419                                     params->sctpMap,
420                                     num_of_SCTP_messages,
421                                     rmrMessageBuffer,
422                                     message.message.time,
423                                     &span);
424             }
425
426             clock_gettime(CLOCK_MONOTONIC, &end);
427             if (mdclog_level_get() >= MDCLOG_INFO) {
428                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
429                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
430             }
431             if (mdclog_level_get() >= MDCLOG_DEBUG) {
432                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
433                              end.tv_sec - start.tv_sec,
434                              end.tv_nsec - start.tv_nsec);
435             }
436         }
437     }
438 #ifdef __TRACING__
439     span->Finish();
440 #else
441
442 #endif
443 }
444
445 /**
446  *
447  * @param socket
448  * @return
449  */
450 int setSocketNoBlocking(int socket) {
451     auto flags = fcntl(socket, F_GETFL, 0);
452
453     if (flags == -1) {
454         mdclog_mdc_add("func", "fcntl");
455         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
456         mdclog_mdc_clean();
457         return -1;
458     }
459
460     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
461     if (fcntl(socket, F_SETFL, flags) == -1) {
462         mdclog_mdc_add("func", "fcntl");
463         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
464         mdclog_mdc_clean();
465         return -1;
466     }
467
468     return 0;
469 }
470
471 /**
472  *
473  * @param val
474  * @param m
475  * @param pSpan
476  */
477 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m, otSpan *pSpan) {
478 #ifdef __TRACING__
479     auto lspan = opentracing::Tracer::Global()->StartSpan(
480             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
481 #else
482 //    otSpan lspan = 0;
483 #endif
484     char *dummy;
485     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
486     char searchBuff[256]{};
487
488     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
489     m->erase(searchBuff);
490
491     m->erase(val->enodbName);
492     free(val);
493 #ifdef __TRACING__
494     lspan->Finish();
495 #endif
496 }
497
498 /**
499  *
500  * @param fd file discriptor
501  * @param data the asn data to send
502  * @param len  length of the data
503  * @param enodbName the enodbName as in the map for printing purpose
504  * @param m map host information
505  * @param mtype message number
506  * @return 0 success, anegative number on fail
507  */
508 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m, otSpan *pSpan) {
509 #ifdef __TRACING__
510     auto lspan = opentracing::Tracer::Global()->StartSpan(
511             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
512 #else
513     otSpan lspan = 0;
514 #endif
515     auto loglevel = mdclog_level_get();
516     int fd = peerInfo->fileDescriptor;
517     if (loglevel >= MDCLOG_DEBUG) {
518         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
519                      message.message.enodbName, __FUNCTION__);
520     }
521
522     while (true) {
523         //TODO add send to VES client or KAFKA
524         //format ts|mtype|direction(D/U)|length of asn data|raw data
525 //        auto length = sizeof message.message.time
526 //                      + sizeof message.message.enodbName
527 //                      + sizeof message.message.messageType
528 //                      + sizeof message.message.direction
529 //                      + sizeof message.message.asnLength
530 //                      + message.message.asnLength;
531
532         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
533             if (errno == EINTR) {
534                 continue;
535             }
536             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
537             // Prevent double free() of peerInfo in the event of connection failure.
538             // Returning failure will trigger, in x2/endc setup flow, RIC_SCTP_CONNECTION_FAILURE rmr message causing the E2M to retry.
539             if (!peerInfo->isConnected){
540                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
541 #ifdef __TRACING__
542             lspan->Finish();
543 #endif
544                 return -1;
545             }
546             cleanHashEntry(peerInfo, m, &lspan);
547             close(fd);
548             char key[MAX_ENODB_NAME_SIZE * 2];
549             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
550                      message.message.messageType);
551             if (loglevel >= MDCLOG_DEBUG) {
552                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
553             }
554             auto tmp = m->find(key);
555             if (tmp) {
556                 free(tmp);
557             }
558             m->erase(key);
559 #ifdef __TRACING__
560             lspan->Finish();
561 #endif
562             return -1;
563         }
564         message.message.direction = 'D';
565         // send report.buffer of size
566         buildJsonMessage(message);
567
568         if (loglevel >= MDCLOG_DEBUG) {
569             mdclog_write(MDCLOG_DEBUG,
570                          "SCTP message for CU %s sent from %s",
571                          message.message.enodbName,
572                          __FUNCTION__);
573         }
574 #ifdef __TRACING__
575         lspan->Finish();
576 #endif
577
578         return 0;
579     }
580 }
581
582 /**
583  *
584  * @param message
585  * @param rmrMessageBuffer
586  * @param pSpan
587  */
588 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
589 #ifdef __TRACING__
590     auto lspan = opentracing::Tracer::Global()->StartSpan(
591             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
592 #else
593 //    otSpan lspan = 0;
594 #endif
595     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)(message.message.enodbName));
596
597     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
598     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
599
600     if (mdclog_level_get() >= MDCLOG_DEBUG) {
601         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
602                      message.message.enodbName, (unsigned long) message.message.asnLength);
603     }
604 #ifdef __TRACING__
605     lspan->Finish();
606 #endif
607
608 }
609
610
611 /**
612  *
613  * @param metaData all the data strip to structure
614  * @param data the data recived from xAPP
615  * @return 0 success all other values are fault
616  */
617 int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan) {
618 #ifdef __TRACING__
619     auto lspan = opentracing::Tracer::Global()->StartSpan(
620             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
621 #else
622 //    otSpan lspan = 0;
623 #endif
624     auto loglevel = mdclog_level_get();
625
626     char delimiter[4] {};
627     memset(delimiter, 0, (size_t)4);
628     delimiter[0] = '|';
629     char *tmp;
630
631     char *val = strtok_r(data, delimiter, &tmp);
632     if (val != nullptr) {
633         if (mdclog_level_get() >= MDCLOG_DEBUG) {
634             mdclog_write(MDCLOG_DEBUG, "SCTP ADDRESS parameter from message = %s", val);
635         }
636         memcpy(host, val, tmp - val );
637     } else {
638         mdclog_write(MDCLOG_ERR, "wrong Host Name for setup request %s", data);
639 #ifdef __TRACING__
640         lspan->Finish();
641 #endif
642         return -1;
643     }
644
645     val = strtok_r(nullptr, delimiter, &tmp);
646     if (val != nullptr) {
647         if (mdclog_level_get() >= MDCLOG_DEBUG) {
648             mdclog_write(MDCLOG_DEBUG, "PORT parameter from message = %s", val);
649         }
650         char *dummy;
651         port = (uint16_t)strtol(val, &dummy, 10);
652     } else {
653         mdclog_write(MDCLOG_ERR, "wrong Port for setup request %s", data);
654 #ifdef __TRACING__
655         lspan->Finish();
656 #endif
657         return -2;
658     }
659
660     val = strtok_r(nullptr, delimiter, &tmp);
661     if (val != nullptr) {
662         if (mdclog_level_get() >= MDCLOG_DEBUG) {
663             mdclog_write(MDCLOG_DEBUG, "RAN NAME parameter from message = %s", val);
664         }
665         memcpy(message.message.enodbName, val, tmp - val);
666     } else {
667         mdclog_write(MDCLOG_ERR, "wrong gNb/Enodeb name for setup request %s", data);
668 #ifdef __TRACING__
669         lspan->Finish();
670 #endif
671
672         return -3;
673     }
674     val = strtok_r(nullptr, delimiter, &tmp);
675     if (val != nullptr) {
676         if (mdclog_level_get() >= MDCLOG_DEBUG) {
677             mdclog_write(MDCLOG_DEBUG, "ASN length parameter from message = %s", val);
678         }
679         char *dummy;
680         message.message.asnLength = (uint16_t) strtol(val, &dummy, 10);
681     } else {
682         mdclog_write(MDCLOG_ERR, "wrong ASN length for setup request %s", data);
683 #ifdef __TRACING__
684         lspan->Finish();
685 #endif
686         return -4;
687     }
688
689     message.message.asndata = (unsigned char *)tmp;  // tmp is local but point to the location in data
690
691     if (loglevel >= MDCLOG_INFO) {
692         mdclog_write(MDCLOG_INFO, "Message from Xapp RAN name = %s host address = %s port = %d",
693                      message.message.enodbName, host, port);
694     }
695 #ifdef __TRACING__
696     lspan->Finish();
697 #endif
698
699     return 0;
700 }
701
702 /**
703  *
704  * @param events
705  * @param sctpMap
706  * @param numOfMessages
707  * @param rmrMessageBuffer
708  * @param ts
709  * @param pSpan
710  * @return
711  */
712 int receiveDataFromSctp(struct epoll_event *events,
713                         Sctp_Map_t *sctpMap,
714                         int &numOfMessages,
715                         RmrMessagesBuffer_t &rmrMessageBuffer,
716                         struct timespec &ts,
717                         otSpan *pSpan) {
718 #ifdef __TRACING__
719     auto lspan = opentracing::Tracer::Global()->StartSpan(
720             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
721 #else
722     otSpan lspan = 0;
723 #endif
724     /* We have data on the fd waiting to be read. Read and display it.
725  * We must read whatever data is available completely, as we are running
726  *  in edge-triggered mode and won't get a notification again for the same data. */
727     int done = 0;
728     auto loglevel = mdclog_level_get();
729     // get the identity of the interface
730     auto *peerInfo = (ConnectedCU_t *)events->data.ptr;
731     struct timespec start{0, 0};
732     struct timespec decodestart{0, 0};
733     struct timespec end{0, 0};
734
735     E2AP_PDU_t *pdu = nullptr;
736
737     ReportingMessages_t message {};
738
739     while (true) {
740         if (loglevel >= MDCLOG_DEBUG) {
741             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", peerInfo->fileDescriptor);
742             clock_gettime(CLOCK_MONOTONIC, &start);
743         }
744         // read the buffer directly to rmr payload
745         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
746         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
747                 read(peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
748         if (loglevel >= MDCLOG_DEBUG) {
749             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
750                     peerInfo->fileDescriptor, message.message.asnLength);
751         }
752         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
753         message.message.direction = 'U';
754         message.message.time.tv_nsec = ts.tv_nsec;
755         message.message.time.tv_sec = ts.tv_sec;
756
757         if (message.message.asnLength < 0) {
758             if (errno == EINTR) {
759                 continue;
760             }
761             /* If errno == EAGAIN, that means we have read all
762                data. So go back to the main loop. */
763             if (errno != EAGAIN) {
764                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
765                 done = 1;
766             } else if (loglevel >= MDCLOG_DEBUG) {
767                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", peerInfo->fileDescriptor);
768             }
769             break;
770         } else if (message.message.asnLength == 0) {
771             /* End of file. The remote has closed the connection. */
772             if (loglevel >= MDCLOG_INFO) {
773                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
774                              peerInfo->fileDescriptor);
775             }
776             done = 1;
777             break;
778         }
779
780         asn_dec_rval_t rval;
781         if (loglevel >= MDCLOG_DEBUG) {
782             char printBuffer[4096]{};
783             char *tmp = printBuffer;
784             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
785                 snprintf(tmp, 2, "%02x", message.message.asndata[i]);
786                 tmp += 2;
787             }
788             printBuffer[message.message.asnLength] = 0;
789             clock_gettime(CLOCK_MONOTONIC, &end);
790             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
791                          peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
792             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
793                          printBuffer);
794             clock_gettime(CLOCK_MONOTONIC, &decodestart);
795         }
796
797         rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
798                           message.message.asndata, message.message.asnLength);
799         if (rval.code != RC_OK) {
800             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
801                          peerInfo->enodbName);
802             break;
803         }
804
805         if (loglevel >= MDCLOG_DEBUG) {
806             clock_gettime(CLOCK_MONOTONIC, &end);
807             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
808                          peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
809             char *printBuffer;
810             size_t size;
811             FILE *stream = open_memstream(&printBuffer, &size);
812             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
813             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
814             clock_gettime(CLOCK_MONOTONIC, &decodestart);
815         }
816
817         switch (pdu->present) {
818             case E2AP_PDU_PR_initiatingMessage: {//initiating message
819                 asnInitiatingRequest(pdu, message, rmrMessageBuffer, &lspan);
820                 break;
821             }
822             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
823                 asnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
824                 break;
825             }
826             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
827                 asnUnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
828                 break;
829             }
830             default:
831                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
832                 break;
833         }
834         if (loglevel >= MDCLOG_DEBUG) {
835             clock_gettime(CLOCK_MONOTONIC, &end);
836             mdclog_write(MDCLOG_DEBUG,
837                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
838                          peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
839
840         }
841         numOfMessages++;
842         // remove the break for EAGAIN
843         //break;
844         if (pdu != nullptr) {
845             //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
846             //ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
847             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
848             pdu = nullptr;
849         }
850         //clock_gettime(CLOCK_MONOTONIC, &start);
851     }
852     // in case of break to avoid memory leak
853     if (pdu != nullptr) {
854         ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
855         pdu = nullptr;
856     }
857
858     if (done) {
859         if (loglevel >= MDCLOG_INFO) {
860             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", peerInfo->fileDescriptor);
861         }
862         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
863                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
864                         256,
865                         "%s|CU disconnected unexpectedly",
866                         peerInfo->enodbName);
867         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
868
869         if (sendRequestToXapp(message,
870                               RIC_SCTP_CONNECTION_FAILURE,
871                               rmrMessageBuffer,
872                               &lspan) != 0) {
873             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
874         }
875
876         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
877         close(peerInfo->fileDescriptor);
878         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap, &lspan);
879     }
880     if (loglevel >= MDCLOG_DEBUG) {
881         clock_gettime(CLOCK_MONOTONIC, &end);
882         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
883                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
884
885     }
886 #ifdef __TRACING__
887     lspan->Finish();
888 #endif
889
890     return 0;
891 }
892
893 /**
894  *
895  * @param pdu
896  * @param message
897  * @param rmrMessageBuffer
898  * @param pSpan
899  */
900 void asnInitiatingRequest(E2AP_PDU_t *pdu,
901                           ReportingMessages_t &message,
902                           RmrMessagesBuffer_t &rmrMessageBuffer,
903                           otSpan *pSpan) {
904 #ifdef __TRACING__
905     auto lspan = opentracing::Tracer::Global()->StartSpan(
906             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
907 #else
908     otSpan lspan = 0;
909 #endif
910
911     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
912     if (mdclog_level_get() >= MDCLOG_INFO) {
913         mdclog_write(MDCLOG_INFO, "Initiating message %ld", procedureCode);
914     }
915     switch (procedureCode) {
916         case ProcedureCode_id_x2Setup: {
917             if (mdclog_level_get() >= MDCLOG_INFO) {
918                 mdclog_write(MDCLOG_INFO, "Got Setup Initiating  message from CU - %s",
919                              message.message.enodbName);
920             }
921             break;
922         }
923         case ProcedureCode_id_endcX2Setup: {
924             if (mdclog_level_get() >= MDCLOG_INFO) {
925                 mdclog_write(MDCLOG_INFO, "Got X2 EN-DC Setup Request from CU - %s",
926                              message.message.enodbName);
927             }
928             break;
929         }
930         case ProcedureCode_id_ricSubscription: {
931             if (mdclog_level_get() >= MDCLOG_INFO) {
932                 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Request message from CU - %s",
933                              message.message.enodbName);
934             }
935             break;
936         }
937         case ProcedureCode_id_ricSubscriptionDelete: {
938             if (mdclog_level_get() >= MDCLOG_INFO) {
939                 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Delete Request message from CU - %s",
940                              message.message.enodbName);
941             }
942             break;
943         }
944         case ProcedureCode_id_endcConfigurationUpdate: {
945             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
946                 mdclog_write(MDCLOG_ERR, "E2 EN-DC CONFIGURATION UPDATE message failed to send to xAPP");
947             }
948             break;
949         }
950         case ProcedureCode_id_eNBConfigurationUpdate: {
951             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
952                 mdclog_write(MDCLOG_ERR, "E2 EN-BC CONFIGURATION UPDATE message failed to send to xAPP");
953             }
954             break;
955         }
956         case ProcedureCode_id_x2Removal: {
957             if (mdclog_level_get() >= MDCLOG_INFO) {
958                 mdclog_write(MDCLOG_INFO, "Got E2 Removal Initiating  message from CU - %s",
959                              message.message.enodbName);
960             }
961             break;
962         }
963         case ProcedureCode_id_loadIndication: {
964             if (sendRequestToXapp(message, RIC_ENB_LOAD_INFORMATION, rmrMessageBuffer, &lspan) != 0) {
965                 mdclog_write(MDCLOG_ERR, "Load indication message failed to send to xAPP");
966             }
967             break;
968         }
969         case ProcedureCode_id_resourceStatusReportingInitiation: {
970             if (mdclog_level_get() >= MDCLOG_INFO) {
971                 mdclog_write(MDCLOG_INFO, "Got Status reporting initiation message from CU - %s",
972                              message.message.enodbName);
973             }
974             break;
975         }
976         case ProcedureCode_id_resourceStatusReporting: {
977             if (sendRequestToXapp(message, RIC_RESOURCE_STATUS_UPDATE, rmrMessageBuffer, &lspan) != 0) {
978                 mdclog_write(MDCLOG_ERR, "Resource Status Reporting message failed to send to xAPP");
979             }
980             break;
981         }
982         case ProcedureCode_id_reset: {
983             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer, &lspan) != 0) {
984                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
985             }
986             break;
987         }
988         case ProcedureCode_id_ricIndication: {
989             for (int i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
990                 auto messageSent = false;
991                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
992                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
993                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
994                 }
995                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
996                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
997                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
998                     }
999                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1000                         static unsigned char tx[32];
1001                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1002                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1003                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1004                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1005                         rmrMessageBuffer.sendMessage->state = 0;
1006                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1007                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1008                         messageSent = true;
1009                     } else {
1010                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1011                     }
1012                 }
1013                 if (messageSent) {
1014                     break;
1015                 }
1016             }
1017             break;
1018         }
1019         case ProcedureCode_id_errorIndication: {
1020             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1021                 mdclog_write(MDCLOG_ERR, "Error Indication message failed to send to xAPP");
1022             }
1023             break;
1024         }
1025         case ProcedureCode_id_ricServiceUpdate : {
1026             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1027                 mdclog_write(MDCLOG_ERR, "Service Update message failed to send to xAPP");
1028             }
1029             break;
1030         }
1031         case ProcedureCode_id_gNBStatusIndication : {
1032             if (sendRequestToXapp(message, RIC_GNB_STATUS_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1033                 mdclog_write(MDCLOG_ERR, "RIC_GNB_STATUS_INDICATION failed to send to xAPP");
1034             }
1035             break;
1036         }
1037         default: {
1038             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1039             message.message.messageType = 0; // no RMR message type yet
1040
1041             buildJsonMessage(message);
1042
1043             break;
1044         }
1045     }
1046 #ifdef __TRACING__
1047     lspan->Finish();
1048 #endif
1049
1050 }
1051
1052 /**
1053  *
1054  * @param pdu
1055  * @param message
1056  * @param sctpMap
1057  * @param rmrMessageBuffer
1058  * @param pSpan
1059  */
1060 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, Sctp_Map_t *sctpMap,
1061                       RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
1062 #ifdef __TRACING__
1063     auto lspan = opentracing::Tracer::Global()->StartSpan(
1064             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1065 #else
1066     otSpan lspan = 0;
1067 #endif
1068     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1069     if (mdclog_level_get() >= MDCLOG_INFO) {
1070         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1071     }
1072     switch (procedureCode) {
1073         case ProcedureCode_id_x2Setup: {
1074             if (mdclog_level_get() >= MDCLOG_INFO) {
1075                 mdclog_write(MDCLOG_INFO, "Got Succesful Setup response from CU - %s",
1076                              message.message.enodbName);
1077             }
1078             if (sendResponseToXapp(message, RIC_X2_SETUP_RESP,
1079                                    RIC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1080                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful Setup response for CU - %s",
1081                              message.message.enodbName);
1082             }
1083             break;
1084         }
1085         case ProcedureCode_id_endcX2Setup: { //X2_EN_DC_SETUP_REQUEST_FROM_CU
1086             if (mdclog_level_get() >= MDCLOG_INFO) {
1087                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC Setup response from CU - %s",
1088                              message.message.enodbName);
1089             }
1090             if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_RESP,
1091                                    RIC_ENDC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1092                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful X2 EN DC Setup response for CU - %s",
1093                              message.message.enodbName);
1094             }
1095             break;
1096         }
1097         case ProcedureCode_id_endcConfigurationUpdate: {
1098             if (mdclog_level_get() >= MDCLOG_INFO) {
1099                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC CONFIGURATION UPDATE from CU - %s",
1100                              message.message.enodbName);
1101             }
1102             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1103                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 EN DC CONFIGURATION response for CU - %s",
1104                              message.message.enodbName);
1105             }
1106             break;
1107         }
1108         case ProcedureCode_id_eNBConfigurationUpdate: {
1109             if (mdclog_level_get() >= MDCLOG_INFO) {
1110                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 ENB CONFIGURATION UPDATE from CU - %s",
1111                              message.message.enodbName);
1112             }
1113             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1114                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 ENB CONFIGURATION response for CU - %s",
1115                              message.message.enodbName);
1116             }
1117             break;
1118         }
1119         case ProcedureCode_id_reset: {
1120             if (sendRequestToXapp(message, RIC_X2_RESET_RESP, rmrMessageBuffer, &lspan) != 0) {
1121                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2_RESET response for CU - %s",
1122                              message.message.enodbName);
1123             }
1124             break;
1125
1126         }
1127         case ProcedureCode_id_resourceStatusReportingInitiation: {
1128             if (sendRequestToXapp(message, RIC_RES_STATUS_RESP, rmrMessageBuffer, &lspan) != 0) {
1129                 mdclog_write(MDCLOG_ERR,
1130                              "Failed to send Succesful 2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1131                              message.message.enodbName);
1132             }
1133             break;
1134         }
1135         case ProcedureCode_id_ricSubscription: {
1136             if (mdclog_level_get() >= MDCLOG_INFO) {
1137                 mdclog_write(MDCLOG_INFO, "Got Succesful RIC Subscription response from CU - %s",
1138                              message.message.enodbName);
1139             }
1140             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer, &lspan) != 0) {
1141                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1142             }
1143             break;
1144
1145         }
1146         case ProcedureCode_id_ricSubscriptionDelete: {
1147             if (mdclog_level_get() >= MDCLOG_INFO) {
1148                 mdclog_write(MDCLOG_INFO,
1149                              "Got Succesful RIC Subscription Delete response from CU - %s",
1150                              message.message.enodbName);
1151             }
1152             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer, &lspan) != 0) {
1153                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1154             }
1155             break;
1156         }
1157         case ProcedureCode_id_ricControl: {
1158             if (mdclog_level_get() >= MDCLOG_INFO) {
1159                 mdclog_write(MDCLOG_INFO,
1160                              "Got Succesful RIC control response from CU - %s",
1161                              message.message.enodbName);
1162             }
1163             for (int i = 0;
1164                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1165                 auto messageSent = false;
1166                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1167                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1168                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1169                 }
1170                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1171                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1172                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1173                     }
1174                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1175                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1176                         rmrMessageBuffer.sendMessage->state = 0;
1177                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1178                         static unsigned char tx[32];
1179                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1180                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1181                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1182
1183                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1184                         messageSent = true;
1185                     } else {
1186                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1187                     }
1188                 }
1189                 if (messageSent) {
1190                     break;
1191                 }
1192             }
1193             break;
1194         }
1195         default: {
1196             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1197             message.message.messageType = 0; // no RMR message type yet
1198             buildJsonMessage(message);
1199
1200             break;
1201         }
1202     }
1203 #ifdef __TRACING__
1204     lspan->Finish();
1205 #endif
1206
1207 }
1208
1209 /**
1210  *
1211  * @param pdu
1212  * @param message
1213  * @param sctpMap
1214  * @param rmrMessageBuffer
1215  * @param pSpan
1216  */
1217 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1218                         ReportingMessages_t &message,
1219                         Sctp_Map_t *sctpMap,
1220                         RmrMessagesBuffer_t &rmrMessageBuffer,
1221                         otSpan *pSpan) {
1222 #ifdef __TRACING__
1223     auto lspan = opentracing::Tracer::Global()->StartSpan(
1224             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1225 #else
1226     otSpan lspan = 0;
1227 #endif
1228     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1229     if (mdclog_level_get() >= MDCLOG_INFO) {
1230         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1231     }
1232     switch (procedureCode) {
1233         case ProcedureCode_id_x2Setup: {
1234             if (mdclog_level_get() >= MDCLOG_INFO) {
1235                 mdclog_write(MDCLOG_INFO,
1236                              "Got Unsuccessful Setup response from CU - %s",
1237                              message.message.enodbName);
1238             }
1239             if (sendResponseToXapp(message,
1240                                    RIC_X2_SETUP_FAILURE, RIC_X2_SETUP_REQ,
1241                                    rmrMessageBuffer,
1242                                    sctpMap,
1243                                    &lspan) != 0) {
1244                 mdclog_write(MDCLOG_ERR,
1245                              "Failed to send Unsuccessful Setup response for CU - %s",
1246                              message.message.enodbName);
1247                 break;
1248             }
1249             break;
1250         }
1251         case ProcedureCode_id_endcX2Setup: {
1252             if (mdclog_level_get() >= MDCLOG_INFO) {
1253                 mdclog_write(MDCLOG_INFO,
1254                              "Got Unsuccessful E2 EN-DC Setup response from CU - %s",
1255                              message.message.enodbName);
1256             }
1257             if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_FAILURE,
1258                                    RIC_ENDC_X2_SETUP_REQ,
1259                                    rmrMessageBuffer,
1260                                    sctpMap,
1261                                    &lspan) != 0) {
1262                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC Setup response for CU - %s",
1263                              message.message.enodbName);
1264             }
1265             break;
1266         }
1267         case ProcedureCode_id_endcConfigurationUpdate: {
1268             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1269                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC CONFIGURATION response for CU - %s",
1270                              message.message.enodbName);
1271             }
1272             break;
1273         }
1274         case ProcedureCode_id_eNBConfigurationUpdate: {
1275             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1276                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 ENB CONFIGURATION response for CU - %s",
1277                              message.message.enodbName);
1278             }
1279             break;
1280         }
1281         case ProcedureCode_id_resourceStatusReportingInitiation: {
1282             if (sendRequestToXapp(message, RIC_RES_STATUS_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1283                 mdclog_write(MDCLOG_ERR,
1284                              "Failed to send Succesful E2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1285                              message.message.enodbName);
1286             }
1287             break;
1288         }
1289         case ProcedureCode_id_ricSubscription: {
1290             if (mdclog_level_get() >= MDCLOG_INFO) {
1291                 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Response from CU - %s",
1292                              message.message.enodbName);
1293             }
1294             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1295                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1296             }
1297             break;
1298         }
1299         case ProcedureCode_id_ricSubscriptionDelete: {
1300             if (mdclog_level_get() >= MDCLOG_INFO) {
1301                 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Delete Response from CU - %s",
1302                              message.message.enodbName);
1303             }
1304             if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1305                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1306             }
1307             break;
1308         }
1309         case ProcedureCode_id_ricControl: {
1310             if (mdclog_level_get() >= MDCLOG_INFO) {
1311                 mdclog_write(MDCLOG_INFO, "Got UNSuccesful RIC control response from CU - %s",
1312                              message.message.enodbName);
1313             }
1314             for (int i = 0;
1315                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1316                 auto messageSent = false;
1317                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1318                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1319                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1320                 }
1321                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1322                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1323                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1324                     }
1325                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1326                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1327                         rmrMessageBuffer.sendMessage->state = 0;
1328                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1329                         static unsigned char tx[32];
1330                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1331                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1332                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1333                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1334                         messageSent = true;
1335                     } else {
1336                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1337                     }
1338                 }
1339                 if (messageSent) {
1340                     break;
1341                 }
1342             }
1343             break;
1344         }
1345         default: {
1346             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1347             message.message.messageType = 0; // no RMR message type yet
1348
1349             buildJsonMessage(message);
1350
1351             break;
1352         }
1353     }
1354 #ifdef __TRACING__
1355     lspan->Finish();
1356 #endif
1357
1358 }
1359
1360 /**
1361  *
1362  * @param message
1363  * @param requestId
1364  * @param rmrMmessageBuffer
1365  * @param pSpan
1366  * @return
1367  */
1368 int sendRequestToXapp(ReportingMessages_t &message,
1369                       int requestId,
1370                       RmrMessagesBuffer_t &rmrMmessageBuffer,
1371                       otSpan *pSpan) {
1372 #ifdef __TRACING__
1373     auto lspan = opentracing::Tracer::Global()->StartSpan(
1374             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1375 #else
1376     otSpan lspan = 0;
1377 #endif
1378     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1379                    (unsigned char *)message.message.enodbName,
1380                    strlen(message.message.enodbName));
1381     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1382     rmrMmessageBuffer.sendMessage->state = 0;
1383     static unsigned char tx[32];
1384     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1385     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1386
1387     auto rc = sendRmrMessage(rmrMmessageBuffer, message, &lspan);
1388 #ifdef __TRACING__
1389     lspan->Finish();
1390 #endif
1391
1392     return rc;
1393 }
1394
1395
1396 void *getRmrContext(char *rmrAddress, otSpan *pSpan) {
1397 #ifdef __TRACING__
1398     auto lspan = opentracing::Tracer::Global()->StartSpan(
1399             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1400 #else
1401 //    otSpan lspan = 0;
1402 #endif
1403     void *rmrCtx = rmr_init(rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1404
1405
1406     if (rmrCtx == nullptr) {
1407         mdclog_write(MDCLOG_ERR, "RMR failed to initialise : %s", strerror(errno));
1408 #ifdef __TRACING__
1409         lspan->Finish();
1410 #endif
1411
1412         return (nullptr);
1413     }
1414
1415     rmr_set_stimeout(rmrCtx, 0);    // disable retries for any send operation
1416     // we need to find that routing table exist and we can run
1417     if (mdclog_level_get() >= MDCLOG_INFO) {
1418         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1419     }
1420     int rmrReady = 0;
1421     int count = 0;
1422     while (!rmrReady) {
1423         if ((rmrReady = rmr_ready(rmrCtx)) == 0) {
1424             sleep(1);
1425         }
1426         count++;
1427         if (count % 60 == 0) {
1428             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1429         }
1430     }
1431     if (mdclog_level_get() >= MDCLOG_INFO) {
1432         mdclog_write(MDCLOG_INFO, "RMR running");
1433     }
1434 #ifdef __TRACING__
1435     lspan->Finish();
1436 #endif
1437
1438     return rmrCtx;
1439 }
1440
1441 /**
1442  *
1443  * @param epoll_fd
1444  * @param sctpMap
1445  * @param rmrMessageBuffer
1446  * @param ts
1447  * @param pSpan
1448  * @return
1449  */
1450 int receiveXappMessages(int epoll_fd,
1451                         Sctp_Map_t *sctpMap,
1452                         RmrMessagesBuffer_t &rmrMessageBuffer,
1453                         struct timespec &ts,
1454                         otSpan *pSpan) {
1455 #ifdef __TRACING__
1456     auto lspan = opentracing::Tracer::Global()->StartSpan(
1457             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1458 #else
1459     otSpan lspan = 0;
1460 #endif
1461     if (rmrMessageBuffer.rcvMessage == nullptr) {
1462         //we have error
1463         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1464 #ifdef __TRACING__
1465         lspan->Finish();
1466 #endif
1467
1468         return -1;
1469     }
1470
1471     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1472         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1473     }
1474     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1475     if (rmrMessageBuffer.rcvMessage == nullptr) {
1476         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1477         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1478 #ifdef __TRACING__
1479         lspan->Finish();
1480 #endif
1481
1482         return -2;
1483     }
1484     ReportingMessages_t message;
1485     message.message.direction = 'D';
1486     message.message.time.tv_nsec = ts.tv_nsec;
1487     message.message.time.tv_sec = ts.tv_sec;
1488
1489     // get message payload
1490     //auto msgData = msg->payload;
1491     if (rmrMessageBuffer.rcvMessage->state != 0) {
1492         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1493 #ifdef __TRACING__
1494         lspan->Finish();
1495 #endif
1496
1497         return -1;
1498     }
1499     switch (rmrMessageBuffer.rcvMessage->mtype) {
1500         case RIC_X2_SETUP_REQ: {
1501             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1502                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1503                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1504                 message.message.direction = 'N';
1505                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1506                         snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1507                                 256,
1508                                 "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1509                 rmrMessageBuffer.sendMessage->state = 0;
1510                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1511
1512                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1513                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1514                 }
1515                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1516                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName);
1517
1518                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1519 #ifdef __TRACING__
1520                 lspan->Finish();
1521 #endif
1522                 return -3;
1523             }
1524             break;
1525         }
1526         case RIC_ENDC_X2_SETUP_REQ: {
1527             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1528                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1529                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1530                 message.message.direction = 'N';
1531                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1532                         snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
1533                                  "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1534                 rmrMessageBuffer.sendMessage->state = 0;
1535                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1536
1537                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1538                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1539                 }
1540
1541                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1542                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
1543
1544                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1545 #ifdef __TRACING__
1546                 lspan->Finish();
1547 #endif
1548                 return -3;
1549             }
1550             break;
1551         }
1552         case RIC_ENDC_CONF_UPDATE: {
1553             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1554                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1555 #ifdef __TRACING__
1556                 lspan->Finish();
1557 #endif
1558                 return -4;
1559             }
1560             break;
1561         }
1562         case RIC_ENDC_CONF_UPDATE_ACK: {
1563             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1564                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_ACK");
1565 #ifdef __TRACING__
1566                 lspan->Finish();
1567 #endif
1568                 return -4;
1569             }
1570             break;
1571         }
1572         case RIC_ENDC_CONF_UPDATE_FAILURE: {
1573             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1574                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_FAILURE");
1575 #ifdef __TRACING__
1576                 lspan->Finish();
1577 #endif
1578
1579                 return -4;
1580             }
1581             break;
1582         }
1583         case RIC_ENB_CONF_UPDATE: {
1584             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1585                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1586 #ifdef __TRACING__
1587                 lspan->Finish();
1588 #endif
1589                 return -4;
1590             }
1591             break;
1592         }
1593         case RIC_ENB_CONF_UPDATE_ACK: {
1594             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1595                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_ACK");
1596 #ifdef __TRACING__
1597                 lspan->Finish();
1598 #endif
1599                 return -4;
1600             }
1601             break;
1602         }
1603         case RIC_ENB_CONF_UPDATE_FAILURE: {
1604             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1605                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_FAILURE");
1606 #ifdef __TRACING__
1607                 lspan->Finish();
1608 #endif
1609                 return -4;
1610             }
1611             break;
1612         }
1613         case RIC_RES_STATUS_REQ: {
1614             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1615                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_RES_STATUS_REQ");
1616 #ifdef __TRACING__
1617                 lspan->Finish();
1618 #endif
1619                 return -6;
1620             }
1621             break;
1622         }
1623         case RIC_SUB_REQ: {
1624             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1625                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
1626 #ifdef __TRACING__
1627                 lspan->Finish();
1628 #endif
1629                 return -6;
1630             }
1631             break;
1632         }
1633         case RIC_SUB_DEL_REQ: {
1634             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1635                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
1636 #ifdef __TRACING__
1637                 lspan->Finish();
1638 #endif
1639                 return -6;
1640             }
1641             break;
1642         }
1643         case RIC_CONTROL_REQ: {
1644             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1645                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
1646 #ifdef __TRACING__
1647                 lspan->Finish();
1648 #endif
1649                 return -6;
1650             }
1651             break;
1652         }
1653         case RIC_SERVICE_QUERY: {
1654             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1655                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
1656 #ifdef __TRACING__
1657                 lspan->Finish();
1658 #endif
1659                 return -6;
1660             }
1661             break;
1662         }
1663         case RIC_SERVICE_UPDATE_ACK: {
1664             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1665                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
1666 #ifdef __TRACING__
1667                 lspan->Finish();
1668 #endif
1669                 return -6;
1670             }
1671             break;
1672         }
1673         case RIC_SERVICE_UPDATE_FAILURE: {
1674             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1675                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
1676 #ifdef __TRACING__
1677                 lspan->Finish();
1678 #endif
1679                 return -6;
1680             }
1681             break;
1682         }
1683         case RIC_X2_RESET: {
1684             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1685                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
1686 #ifdef __TRACING__
1687                 lspan->Finish();
1688 #endif
1689                 return -6;
1690             }
1691             break;
1692         }
1693         case RIC_X2_RESET_RESP: {
1694             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1695                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
1696 #ifdef __TRACING__
1697                 lspan->Finish();
1698 #endif
1699                 return -6;
1700             }
1701             break;
1702         }
1703         case RIC_SCTP_CLEAR_ALL: {
1704             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
1705             // loop on all keys and close socket and then erase all map.
1706             vector<char *> v;
1707             sctpMap->getKeys(v);
1708             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
1709                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
1710                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
1711                     if (peerInfo == nullptr) {
1712                         continue;
1713                     }
1714                     close(peerInfo->fileDescriptor);
1715                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1716                     message.message.direction = 'D';
1717                     message.message.time.tv_nsec = ts.tv_nsec;
1718                     message.message.time.tv_sec = ts.tv_sec;
1719
1720                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1721                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1722                                                                    256,
1723                                                                    "%s|RIC_SCTP_CLEAR_ALL",
1724                                                                    peerInfo->enodbName);
1725                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1726                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
1727                     if (sendRequestToXapp(message,
1728                                           RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1729                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1730                     }
1731                     free(peerInfo);
1732                 }
1733             }
1734
1735             sleep(1);
1736             sctpMap->clear();
1737             break;
1738         }
1739         default:
1740             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
1741             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1742             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1743             message.message.time.tv_nsec = ts.tv_nsec;
1744             message.message.time.tv_sec = ts.tv_sec;
1745             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
1746
1747             buildJsonMessage(message);
1748
1749
1750 #ifdef __TRACING__
1751             lspan->Finish();
1752 #endif
1753             return -7;
1754     }
1755     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1756         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
1757     }
1758 #ifdef __TRACING__
1759     lspan->Finish();
1760 #endif
1761     return 0;
1762 }
1763
1764 /**
1765  * Send message to the CU that is not expecting for successful or unsuccessful results
1766  * @param messageBuffer
1767  * @param message
1768  * @param failedMsgId
1769  * @param sctpMap
1770  * @param pSpan
1771  * @return
1772  */
1773 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
1774                            ReportingMessages_t &message,
1775                            int failedMsgId,
1776                            Sctp_Map_t *sctpMap,
1777                            otSpan *pSpan) {
1778 #ifdef __TRACING__
1779     auto lspan = opentracing::Tracer::Global()->StartSpan(
1780             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1781 #else
1782     otSpan lspan = 0;
1783 #endif
1784
1785     getRequestMetaData(message, messageBuffer, &lspan);
1786     if (mdclog_level_get() >= MDCLOG_INFO) {
1787         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
1788     }
1789
1790     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId, &lspan);
1791 #ifdef __TRACING__
1792     lspan->Finish();
1793 #endif
1794
1795     return rc;
1796 }
1797
1798 /**
1799  *
1800  * @param sctpMap
1801  * @param messageBuffer
1802  * @param message
1803  * @param failedMesgId
1804  * @param pSpan
1805  * @return
1806  */
1807 int sendMessagetoCu(Sctp_Map_t *sctpMap,
1808                     RmrMessagesBuffer_t &messageBuffer,
1809                     ReportingMessages_t &message,
1810                     int failedMesgId,
1811                     otSpan *pSpan) {
1812 #ifdef __TRACING__
1813     auto lspan = opentracing::Tracer::Global()->StartSpan(
1814             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1815 #else
1816     otSpan lspan = 0;
1817 #endif
1818     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
1819     if (peerInfo == nullptr) {
1820         if (failedMesgId != 0) {
1821             sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId, &lspan);
1822         } else {
1823             mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
1824         }
1825 #ifdef __TRACING__
1826         lspan->Finish();
1827 #endif
1828
1829         return -1;
1830     }
1831
1832     // get the FD
1833     message.message.messageType = messageBuffer.rcvMessage->mtype;
1834     auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
1835 #ifdef __TRACING__
1836     lspan->Finish();
1837 #endif
1838
1839     return rc;
1840 }
1841
1842 /**
1843  *
1844  * @param rmrCtx the rmr context to send and receive
1845  * @param msg the msg we got fromxApp
1846  * @param metaData data from xApp in ordered struct
1847  * @param failedMesgId the return message type error
1848  */
1849 void
1850 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId,
1851                                otSpan *pSpan) {
1852 #ifdef __TRACING__
1853     auto lspan = opentracing::Tracer::Global()->StartSpan(
1854             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1855 #else
1856     otSpan lspan = 0;
1857 #endif
1858     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
1859     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
1860                         message.message.enodbName);
1861     if (mdclog_level_get() >= MDCLOG_INFO) {
1862         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
1863     }
1864     msg->mtype = failedMesgId;
1865     msg->state = 0;
1866
1867     static unsigned char tx[32];
1868     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1869     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
1870
1871     sendRmrMessage(rmrMessageBuffer, message, &lspan);
1872 #ifdef __TRACING__
1873     lspan->Finish();pLogSink
1874 #endif
1875
1876 }
1877
1878 /**
1879  * Send Response back to xApp, message is used only when there was a request from the xApp
1880  *
1881  * @param enodbName the name of the gNb/eNodeB
1882  * @param msgType  the value of the message to the xApp
1883  * @param requestType The request that was sent by the xAPP
1884  * @param rmrCtx the rmr identifier
1885  * @param sctpMap hash map holds data on the requestrs
1886  * @param buf  the buffer to send to xAPP
1887  * @param size size of the buffer to send
1888  * @return
1889  */
1890 int sendResponseToXapp(ReportingMessages_t &message,
1891                        int msgType,
1892                        int requestType,
1893                        RmrMessagesBuffer_t &rmrMessageBuffer,
1894                        Sctp_Map_t *sctpMap,
1895                        otSpan *pSpan) {
1896 #ifdef __TRACING__
1897     auto lspan = opentracing::Tracer::Global()->StartSpan(
1898             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1899 #else
1900     otSpan lspan = 0;
1901 #endif
1902     char key[MAX_ENODB_NAME_SIZE * 2];
1903     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, requestType);
1904
1905     auto xact = sctpMap->find(key);
1906     if (xact == nullptr) {
1907         mdclog_write(MDCLOG_ERR, "NO Request %s found for this response from CU: %s", key,
1908                      message.message.enodbName);
1909 #ifdef __TRACING__
1910         lspan->Finish();
1911 #endif
1912
1913         return -1;
1914     }
1915     sctpMap->erase(key);
1916
1917     message.message.messageType = rmrMessageBuffer.sendMessage->mtype = msgType; //SETUP_RESPONSE_MESSAGE_TYPE;
1918     rmr_bytes2payload(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.asndata,
1919                       message.message.asnLength);
1920     rmr_bytes2xact(rmrMessageBuffer.sendMessage, (const unsigned char *)xact, strlen((const char *)xact));
1921     rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
1922     rmrMessageBuffer.sendMessage->state = 0;
1923
1924     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1925         mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
1926     }
1927     free(xact);
1928
1929     auto rc = sendRmrMessage(rmrMessageBuffer, message, &lspan);
1930 #ifdef __TRACING__
1931     lspan->Finish();
1932 #endif
1933     return rc;
1934 }
1935
1936 /**
1937  * build the SCTP connection to eNodB or gNb
1938  * @param rmrMessageBuffer
1939  * @param message
1940  * @param epoll_fd
1941  * @param sctpMap
1942  * @param pSpan
1943  * @return
1944  */
1945 int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
1946                         ReportingMessages_t &message,
1947                         int epoll_fd,
1948                         Sctp_Map_t *sctpMap,
1949                         otSpan *pSpan) {
1950 #ifdef __TRACING__
1951     auto lspan = opentracing::Tracer::Global()->StartSpan(
1952             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1953 #else
1954     otSpan lspan = 0;
1955 #endif
1956     struct sockaddr_in6 servaddr{};
1957     struct addrinfo hints{}, *result;
1958     auto msgData = rmrMessageBuffer.rcvMessage->payload;
1959     unsigned char meid[RMR_MAX_MEID]{};
1960     char host[256]{};
1961     uint16_t port = 0;
1962
1963     message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
1964     rmr_mbuf_t *msg = rmrMessageBuffer.rcvMessage;
1965     rmr_get_meid(msg, meid);
1966
1967     if (mdclog_level_get() >= MDCLOG_INFO) {
1968         mdclog_write(MDCLOG_INFO, "message %d Received for MEID :%s. SETUP/EN-DC Setup Request from xApp, Message = %s",
1969                      msg->mtype, meid, msgData);
1970     }
1971     if (getSetupRequestMetaData(message, (char *)msgData, host, port, &lspan) < 0) {
1972         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1973             mdclog_write(MDCLOG_DEBUG, "Error in setup parameters %s, %d", __func__, __LINE__);
1974         }
1975 #ifdef __TRACING__
1976         lspan->Finish();
1977 #endif
1978         return -1;
1979     }
1980
1981     //// message asndata points to the start of the asndata of the message and not to start of payload
1982     // search if the same host:port but not the same enodbname
1983     char searchBuff[256]{};
1984     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", host, port);
1985     auto e = (char *)sctpMap->find(searchBuff);
1986     if (e != nullptr) {
1987         // found one compare if not the same
1988         if (strcmp(message.message.enodbName, e) != 0) {
1989             mdclog_write(MDCLOG_ERR,
1990                          "Try to connect CU %s to Host %s but %s already connected",
1991                          message.message.enodbName, host, e);
1992 #ifdef __TRACING__
1993             lspan->Finish();
1994 #endif
1995             return -1;
1996         }
1997     }
1998
1999     // check if not alread connected. if connected send the request and return
2000     auto *peerInfo = (ConnectedCU_t *)sctpMap->find(message.message.enodbName);
2001     if (peerInfo != nullptr) {
2002 //        snprintf(strErr,
2003 //                128,
2004 //                "Device %s already connected please remove and then setup again",
2005 //                message.message.enodbName);
2006         if (mdclog_level_get() >= MDCLOG_INFO) {
2007             mdclog_write(MDCLOG_INFO,
2008                          "Device already connected to %s",
2009                          message.message.enodbName);
2010         }
2011         message.message.messageType = msg->mtype;
2012         auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2013         if (rc != 0) {
2014             mdclog_write(MDCLOG_ERR, "failed write to SCTP %s, %d", __func__, __LINE__);
2015 #ifdef __TRACING__
2016             lspan->Finish();
2017 #endif
2018             return -1;
2019         }
2020
2021         char key[MAX_ENODB_NAME_SIZE * 2];
2022         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2023         int xaction_len = strlen((const char *) msg->xaction);
2024         auto *xaction = (unsigned char *) calloc(1, xaction_len);
2025         memcpy(xaction, msg->xaction, xaction_len);
2026         sctpMap->setkey(key, xaction);
2027         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2028             mdclog_write(MDCLOG_DEBUG, "set key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2029         }
2030 #ifdef __TRACING__
2031         lspan->Finish();
2032 #endif
2033         return 0;
2034     }
2035
2036     peerInfo = (ConnectedCU_t *) calloc(1, sizeof(ConnectedCU_t));
2037     memcpy(peerInfo->enodbName, message.message.enodbName, sizeof(message.message.enodbName));
2038
2039     // new connection
2040     if ((peerInfo->fileDescriptor = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP)) < 0) {
2041         mdclog_write(MDCLOG_ERR, "Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2042 #ifdef __TRACING__
2043         lspan->Finish();
2044 #endif
2045         return -1;
2046     }
2047
2048     auto optval = 1;
2049     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval) != 0) {
2050         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEPORT Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2051 #ifdef __TRACING__
2052         lspan->Finish();
2053 #endif
2054         return -1;
2055     }
2056     optval = 1;
2057     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) != 0) {
2058         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEADDR Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2059 #ifdef __TRACING__
2060         lspan->Finish();
2061 #endif
2062         return -1;
2063     }
2064     servaddr.sin6_family = AF_INET6;
2065
2066     struct sockaddr_in6 localAddr {};
2067     localAddr.sin6_family = AF_INET6;
2068     localAddr.sin6_addr = in6addr_any;
2069     localAddr.sin6_port = htons(SRC_PORT);
2070
2071     if (bind(peerInfo->fileDescriptor, (struct sockaddr*)&localAddr , sizeof(struct sockaddr_in6)) < 0) {
2072         mdclog_write(MDCLOG_ERR, "bind Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2073 #ifdef __TRACING__
2074         lspan->Finish();
2075 #endif
2076         return -1;
2077     }//Ends the binding.
2078
2079     memset(&hints, 0, sizeof hints);
2080     hints.ai_flags = AI_NUMERICHOST;
2081     if (getaddrinfo(host, nullptr, &hints, &result) < 0) {
2082         close(peerInfo->fileDescriptor);
2083         mdclog_write(MDCLOG_ERR, "getaddrinfo error for %s, Error = %s", host, strerror(errno));
2084 #ifdef __TRACING__
2085         lspan->Finish();
2086 #endif
2087         return -1;
2088     }
2089     memcpy(&servaddr, result->ai_addr, sizeof(struct sockaddr_in6));
2090     freeaddrinfo(result);
2091
2092     servaddr.sin6_port = htons(port);      /* daytime server */
2093     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2094         mdclog_write(MDCLOG_DEBUG, "Send Connect FD = %d host : %s port %d",
2095                      peerInfo->fileDescriptor,
2096                      host,
2097                      port);
2098     }
2099
2100     // Add to Epol
2101     if (addToEpoll(epoll_fd, peerInfo, (EPOLLOUT | EPOLLIN | EPOLLET), sctpMap, message.message.enodbName,
2102                    msg->mtype, &lspan) != 0) {
2103 #ifdef __TRACING__
2104         lspan->Finish();
2105 #endif
2106         return -1;
2107     }
2108
2109     char hostBuff[NI_MAXHOST];
2110     char portBuff[NI_MAXHOST];
2111
2112     if (getnameinfo((SA *) &servaddr, sizeof(servaddr),
2113                     hostBuff, sizeof(hostBuff),
2114                     portBuff, sizeof(portBuff),
2115                     (uint) (NI_NUMERICHOST) | (uint) (NI_NUMERICSERV)) != 0) {
2116         mdclog_write(MDCLOG_ERR, "getnameinfo() Error, %s  %s %d", strerror(errno), __func__, __LINE__);
2117 #ifdef __TRACING__
2118         lspan->Finish();
2119 #endif
2120         return -1;
2121     }
2122
2123     if (setSocketNoBlocking(peerInfo->fileDescriptor) != 0) {
2124         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on sctpPort %s", hostBuff,
2125                      portBuff);
2126         close(peerInfo->fileDescriptor);
2127 #ifdef __TRACING__
2128         lspan->Finish();
2129 #endif
2130         return -1;
2131     }
2132
2133     memcpy(peerInfo->hostName, hostBuff, strlen(hostBuff));
2134     peerInfo->hostName[strlen(hostBuff)] = 0;
2135     memcpy(peerInfo->portNumber, portBuff, strlen(portBuff));
2136     peerInfo->portNumber[strlen(portBuff)] = 0;
2137
2138     // map by enoodb/gnb name
2139     sctpMap->setkey(message.message.enodbName, peerInfo);
2140     //map host and port to enodeb
2141     sctpMap->setkey(searchBuff, message.message.enodbName);
2142
2143     // save message for the return values
2144     char key[MAX_ENODB_NAME_SIZE * 2];
2145     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2146     int xaction_len = strlen((const char *) msg->xaction);
2147     auto *xaction = (unsigned char *) calloc(1, xaction_len);
2148     memcpy(xaction, msg->xaction, xaction_len);
2149     sctpMap->setkey(key, xaction);
2150     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2151         mdclog_write(MDCLOG_DEBUG, "End building peerinfo: %s for CU %s", key, message.message.enodbName);
2152     }
2153
2154     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2155         mdclog_write(MDCLOG_DEBUG, "Send connect to FD %d, %s, %d",
2156                      peerInfo->fileDescriptor, __func__, __LINE__);
2157     }
2158     if (connect(peerInfo->fileDescriptor, (SA *) &servaddr, sizeof(servaddr)) < 0) {
2159         if (errno != EINPROGRESS) {
2160             mdclog_write(MDCLOG_ERR, "connect FD %d to host : %s port %d, %s",
2161                          peerInfo->fileDescriptor, host, port, strerror(errno));
2162             close(peerInfo->fileDescriptor);
2163 #ifdef __TRACING__
2164             lspan->Finish();
2165 #endif
2166             return -1;
2167         }
2168         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2169             mdclog_write(MDCLOG_DEBUG,
2170                          "Connect to FD %d returned with EINPROGRESS : %s",
2171                          peerInfo->fileDescriptor, strerror(errno));
2172         }
2173         // since message.message.asndata is pointing to the asndata in the rmr message payload we copy it like this
2174         memcpy(peerInfo->asnData, message.message.asndata, message.message.asnLength);
2175         peerInfo->asnLength = message.message.asnLength;
2176         peerInfo->mtype = msg->mtype;
2177 #ifdef __TRACING__
2178         lspan->Finish();
2179 #endif
2180         return 0;
2181     }
2182
2183     if (mdclog_level_get() >= MDCLOG_INFO) {
2184         mdclog_write(MDCLOG_INFO, "Connect to FD %d returned OK without EINPROGRESS", peerInfo->fileDescriptor);
2185     }
2186
2187     peerInfo->isConnected = true;
2188
2189     if (modifyToEpoll(epoll_fd, peerInfo, (EPOLLIN | EPOLLET), sctpMap, message.message.enodbName, msg->mtype,
2190                       &lspan) != 0) {
2191 #ifdef __TRACING__
2192         lspan->Finish();
2193 #endif
2194         return -1;
2195     }
2196
2197     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2198         mdclog_write(MDCLOG_DEBUG, "Connected to host : %s port %d", host, port);
2199     }
2200
2201     message.message.messageType = msg->mtype;
2202     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2203         mdclog_write(MDCLOG_DEBUG, "Send SCTP message to FD %d", peerInfo->fileDescriptor);
2204     }
2205     if (sendSctpMsg(peerInfo, message, sctpMap, &lspan) != 0) {
2206         mdclog_write(MDCLOG_ERR, "Error write to SCTP  %s %d", __func__, __LINE__);
2207 #ifdef __TRACING__
2208         lspan->Finish();
2209 #endif
2210         return -1;
2211     }
2212     memset(peerInfo->asnData, 0, message.message.asnLength);
2213     peerInfo->asnLength = 0;
2214     peerInfo->mtype = 0;
2215
2216     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2217         mdclog_write(MDCLOG_DEBUG, "Sent message to SCTP for %s", message.message.enodbName);
2218     }
2219 #ifdef __TRACING__
2220     lspan->Finish();
2221 #endif
2222     return 0;
2223 }
2224
2225 /**
2226  *
2227  * @param epoll_fd
2228  * @param peerInfo
2229  * @param events
2230  * @param sctpMap
2231  * @param enodbName
2232  * @param msgType
2233  * @param pSpan
2234  * @return
2235  */
2236 int addToEpoll(int epoll_fd,
2237                ConnectedCU_t *peerInfo,
2238                uint32_t events,
2239                Sctp_Map_t *sctpMap,
2240                char *enodbName,
2241                int msgType,
2242                otSpan *pSpan) {
2243 #ifdef __TRACING__
2244     auto lspan = opentracing::Tracer::Global()->StartSpan(
2245             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2246 #else
2247     otSpan lspan = 0;
2248 #endif
2249     // Add to Epol
2250     struct epoll_event event{};
2251     event.data.ptr = peerInfo;
2252     event.events = events;
2253     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2254         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2255             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2256                          strerror(errno), __func__, __LINE__);
2257         }
2258         close(peerInfo->fileDescriptor);
2259         cleanHashEntry(peerInfo, sctpMap, &lspan);
2260         char key[MAX_ENODB_NAME_SIZE * 2];
2261         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2262         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2263             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2264         }
2265         auto tmp = sctpMap->find(key);
2266         if (tmp) {
2267             free(tmp);
2268         }
2269         sctpMap->erase(key);
2270         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2271 #ifdef __TRACING__
2272         lspan->Finish();
2273 #endif
2274         return -1;
2275     }
2276 #ifdef __TRACING__
2277     lspan->Finish();
2278 #endif
2279     return 0;
2280 }
2281
2282 /**
2283  *
2284  * @param epoll_fd
2285  * @param peerInfo
2286  * @param events
2287  * @param sctpMap
2288  * @param enodbName
2289  * @param msgType
2290  * @param pSpan
2291  * @return
2292  */
2293 int modifyToEpoll(int epoll_fd,
2294                   ConnectedCU_t *peerInfo,
2295                   uint32_t events,
2296                   Sctp_Map_t *sctpMap,
2297                   char *enodbName,
2298                   int msgType,
2299                   otSpan *pSpan) {
2300 #ifdef __TRACING__
2301     auto lspan = opentracing::Tracer::Global()->StartSpan(
2302             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2303 #else
2304     otSpan lspan = 0;
2305 #endif
2306     // Add to Epol
2307     struct epoll_event event{};
2308     event.data.ptr = peerInfo;
2309     event.events = events;
2310     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2311         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2312             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2313                          strerror(errno), __func__, __LINE__);
2314         }
2315         close(peerInfo->fileDescriptor);
2316         cleanHashEntry(peerInfo, sctpMap, &lspan);
2317         char key[MAX_ENODB_NAME_SIZE * 2];
2318         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2319         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2320             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2321         }
2322         auto tmp = sctpMap->find(key);
2323         if (tmp) {
2324             free(tmp);
2325         }
2326         sctpMap->erase(key);
2327         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2328 #ifdef __TRACING__
2329         lspan->Finish();
2330 #endif
2331         return -1;
2332     }
2333 #ifdef __TRACING__
2334     lspan->Finish();
2335 #endif
2336     return 0;
2337 }
2338
2339
2340 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan) {
2341 #ifdef __TRACING__
2342     auto lspan = opentracing::Tracer::Global()->StartSpan(
2343             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2344 #else
2345 //    otSpan lspan = 0;
2346 #endif
2347     //serialize the span
2348 #ifdef __TRACING__
2349     std::unordered_map<std::string, std::string> data;
2350     RICCarrierWriter carrier(data);
2351     opentracing::Tracer::Global()->Inject((lspan.get())->context(), carrier);
2352     nlohmann::json j = data;
2353     std::string str = j.dump();
2354     static auto maxTraceLength = 0;
2355
2356     maxTraceLength = str.length() > maxTraceLength ? str.length() : maxTraceLength;
2357     // serialized context can be put to RMR message using function:
2358     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2359         mdclog_write(MDCLOG_DEBUG, "max trace length is %d trace data length = %ld data = %s", maxTraceLength,
2360                      str.length(), str.c_str());
2361     }
2362     rmr_set_trace(rmrMessageBuffer.sendMessage, (const unsigned char *) str.c_str(), str.length());
2363 #endif
2364     buildJsonMessage(message);
2365
2366     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2367
2368     if (rmrMessageBuffer.sendMessage == nullptr) {
2369         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2370         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2371 #ifdef __TRACING__
2372         lspan->Finish();
2373 #endif
2374         return -1;
2375     }
2376
2377     if (rmrMessageBuffer.sendMessage->state != 0) {
2378         char meid[RMR_MAX_MEID]{};
2379         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2380             usleep(5);
2381             rmrMessageBuffer.sendMessage->state = 0;
2382             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2383                          rmrMessageBuffer.sendMessage->mtype,
2384                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2385             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2386             if (rmrMessageBuffer.sendMessage == nullptr) {
2387                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2388                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2389 #ifdef __TRACING__
2390                 lspan->Finish();
2391 #endif
2392                 return -1;
2393             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2394                 mdclog_write(MDCLOG_ERR,
2395                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2396                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2397                              rmrMessageBuffer.sendMessage->mtype,
2398                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2399                 auto rc = rmrMessageBuffer.sendMessage->state;
2400 #ifdef __TRACING__
2401                 lspan->Finish();
2402 #endif
2403                 return rc;
2404             }
2405         } else {
2406             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2407                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2408                          rmrMessageBuffer.sendMessage->mtype,
2409                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2410 #ifdef __TRACING__
2411             lspan->Finish();
2412 #endif
2413             return rmrMessageBuffer.sendMessage->state;
2414         }
2415     }
2416     return 0;
2417 }
2418
2419 void buildJsonMessage(ReportingMessages_t &message) {
2420         message.outLen = sizeof(message.base64Data);
2421     base64::encode((const unsigned char *)message.message.asndata,
2422                    (const int)message.message.asnLength,
2423                   message.base64Data,
2424                   message.outLen);
2425     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2426         mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2427                      (int)message.message.asnLength,
2428                      (int)message.outLen);
2429     }
2430
2431 //    char buff[256];
2432 //    // build day time to seconds from epoc
2433 //    strftime(buff, sizeof message.message.time, "%D %T", gmtime(&message.message.time.tv_sec));
2434 //    // add nanosecond
2435 //    snprintf(buff, sizeof buff, "%s.%09ld UTC\n", buff, message.message.time.tv_nsec);
2436
2437     message.bufferLen = snprintf(message.buffer, sizeof(message.buffer),
2438             "{\"header\": {\"ts\": \"%ld.%09ld\","
2439             "\"ranName\": \"%s\","
2440             "\"messageType\": %d,"
2441             "\"direction\": \"%c\"},"
2442             "\"base64Length\": %d,"
2443             "\"asnBase64\": \"%s\"}",
2444             message.message.time.tv_sec,
2445             message.message.time.tv_nsec,
2446             message.message.enodbName,
2447             message.message.messageType,
2448             message.message.direction,
2449             (int)message.outLen,
2450             message.base64Data);
2451     static src::logger_mt& lg = my_logger::get();
2452
2453     BOOST_LOG(lg) << message.buffer;
2454
2455 }
2456
2457
2458 /**
2459  * take RMR error code to string
2460  * @param state
2461  * @return
2462  */
2463 string translateRmrErrorMessages(int state) {
2464     string str = {};
2465     switch (state) {
2466         case RMR_OK:
2467             str = "RMR_OK - state is good";
2468             break;
2469         case RMR_ERR_BADARG:
2470             str = "RMR_ERR_BADARG - argument passd to function was unusable";
2471             break;
2472         case RMR_ERR_NOENDPT:
2473             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2474             break;
2475         case RMR_ERR_EMPTY:
2476             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2477             break;
2478         case RMR_ERR_NOHDR:
2479             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2480             break;
2481         case RMR_ERR_SENDFAILED:
2482             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2483             break;
2484         case RMR_ERR_CALLFAILED:
2485             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2486             break;
2487         case RMR_ERR_NOWHOPEN:
2488             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2489             break;
2490         case RMR_ERR_WHID:
2491             str = "RMR_ERR_WHID - wormhole id was invalid";
2492             break;
2493         case RMR_ERR_OVERFLOW:
2494             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2495             break;
2496         case RMR_ERR_RETRY:
2497             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2498             break;
2499         case RMR_ERR_RCVFAILED:
2500             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2501             break;
2502         case RMR_ERR_TIMEOUT:
2503             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2504             break;
2505         case RMR_ERR_UNSET:
2506             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2507             break;
2508         case RMR_ERR_TRUNC:
2509             str = "RMR_ERR_TRUNC - received message likely truncated";
2510             break;
2511         case RMR_ERR_INITFAILED:
2512             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2513             break;
2514         case RMR_ERR_NOTSUPP:
2515             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2516             break;
2517         default:
2518             char buf[128]{};
2519             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2520             str = buf;
2521             break;
2522     }
2523     return str;
2524 }
2525
2526