f4cd98f08bb3005aa40d990dda4b7ef5612f5ab8
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22 #include "sctpThread.h"
23
24
25 using namespace std::placeholders;
26 #ifdef __TRACING__
27 using namespace opentracing;
28 #endif
29 //#ifdef __cplusplus
30 //extern "C"
31 //{
32 //#endif
33
34 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
35
36 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
37
38 void init_log() {
39     mdclog_attr_t *attr;
40     mdclog_attr_init(&attr);
41     mdclog_attr_set_ident(attr, "E2Terminator");
42     mdclog_init(attr);
43     mdclog_attr_destroy(attr);
44 }
45
46
47 //std::atomic<int64_t> rmrCounter{0};
48 std::atomic<int64_t> num_of_messages{0};
49 static long transactionCounter = 0;
50
51
52 int main(const int argc, char **argv) {
53     sctp_params_t pSctpParams;
54 #ifdef __TRACING__
55     opentracing::Tracer::InitGlobal(tracelibcpp::createTracer("E2 Terminator"));
56     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
57 #else
58     otSpan span = 0;
59 #endif
60
61     unsigned num_cpus = std::thread::hardware_concurrency();
62 #ifdef ERROR_LEVEL
63     mdclog_severity_t loglevel = MDCLOG_ERR;
64 #else
65     mdclog_severity_t loglevel = MDCLOG_INFO;
66 #endif
67     init_log();
68     mdclog_level_set(loglevel);
69
70     if (argc < 7) {
71         mdclog_mdc_add("app", argv[0]);
72         mdclog_write(MDCLOG_ERR, "Usage nano <rmr port> logLevel <debug/warning/info/error> volume <PATH to log file location>");
73         return -1;
74     }
75
76     {
77         std::random_device device{};
78         std::mt19937 generator(device());
79         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
80
81         transactionCounter = distribution(generator);
82     }
83
84     char tmpLogFilespec[VOLUME_URL_SIZE];
85     tmpLogFilespec[0] = 0;
86     pSctpParams.volume[0] = 0;
87     //read paramters from CLI
88     for (auto i = 1; i < argc; i += 2) {
89         char *dummy;
90         if (strcasecmp("nano", argv[i]) == 0) {
91             pSctpParams.rmrPort = (uint16_t) (uint16_t) strtol(argv[i + 1], &dummy, 10);
92         } else if (strcasecmp("loglevel", argv[i]) == 0) {
93             if (strcasecmp("debug", argv[i + 1]) == 0) {
94                 loglevel = MDCLOG_DEBUG;
95             } else if (strcasecmp("info", argv[i + 1]) == 0) {
96                 loglevel = MDCLOG_INFO;
97             } else if (strcasecmp("warning", argv[i + 1]) == 0) {
98                 loglevel = MDCLOG_WARN;
99             } else if (strcasecmp("error", argv[i + 1]) == 0) {
100                 loglevel = MDCLOG_ERR;
101             }
102         } else if (strcasecmp("volume", argv[i]) == 0) {
103             snprintf(pSctpParams.volume, VOLUME_URL_SIZE, "%s", argv[i + 1]);
104             snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", argv[i + 1]);
105         }
106     }
107
108
109     pSctpParams.logLevel = loglevel;
110     snprintf(pSctpParams.rmrAddress, sizeof(pSctpParams.rmrAddress) - 1, "%d", (int) (pSctpParams.rmrPort));
111
112     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.log");
113
114     if (mdclog_level_get() >= MDCLOG_INFO) {
115         mdclog_mdc_add("RMR Port", to_string(pSctpParams.rmrPort).c_str());
116         mdclog_mdc_add("LogLevel", to_string(pSctpParams.logLevel).c_str());
117         mdclog_mdc_add("volume", pSctpParams.volume);
118         mdclog_mdc_add("tmpLogFilespec", tmpLogFilespec);
119
120         mdclog_write(MDCLOG_INFO, "running parameters");
121     }
122     mdclog_mdc_clean();
123
124     // Files written to the current working directory
125     boostLogger = logging::add_file_log(
126             keywords::file_name = tmpLogFilespec,
127             keywords::rotation_size = 10 * 1024 * 1024,
128             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
129             keywords::format = "%Message%"
130             //keywords::format = "[%TimeStamp%]: %Message%" // use each log with time stamp
131     );
132
133     // Setup a destination folder for collecting rotated (closed) files --since the same volumn can use rename()
134     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
135          keywords::target = pSctpParams.volume 
136          //keywords::max_size = 16 * 1024 * 1024, 
137          //keywords::min_free_space = 100 * 1024 * 1024  
138     ));
139
140     // Upon restart, scan the directory for files matching the file_name pattern
141     boostLogger->locked_backend()->scan_for_files();
142
143     // Enable auto-flushing after each log record written
144     if (mdclog_level_get() >= MDCLOG_DEBUG) {
145         boostLogger->locked_backend()->auto_flush(true);
146     }
147
148     // start epoll
149     pSctpParams.epoll_fd = epoll_create1(0);
150     if (pSctpParams.epoll_fd == -1) {
151         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
152         exit(-1);
153     }
154
155     pSctpParams.rmrCtx = getRmrContext(pSctpParams.rmrAddress, &span);
156     if (pSctpParams.rmrCtx == nullptr) {
157         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
158         close(pSctpParams.epoll_fd);
159         exit(-1);
160     }
161     rmr_init_trace(pSctpParams.rmrCtx, 200);
162     // get the RMR fd for the epoll
163     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
164     struct epoll_event event{};
165     // add RMR fd to epoll
166     event.events = (EPOLLIN);
167     event.data.fd = pSctpParams.rmrListenFd;
168     // add listening RMR FD to epoll
169     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
170         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
171         close(pSctpParams.rmrListenFd);
172         rmr_close(pSctpParams.rmrCtx);
173         close(pSctpParams.epoll_fd);
174         exit(-1);
175     }
176
177     pSctpParams.sctpMap = new mapWrapper();
178
179     std::vector<std::thread> threads(num_cpus);
180 //    std::vector<std::thread> threads;
181
182     num_cpus = 1;
183     for (unsigned int i = 0; i < num_cpus; i++) {
184         threads[i] = std::thread(listener, &pSctpParams);
185
186         cpu_set_t cpuset;
187         CPU_ZERO(&cpuset);
188         CPU_SET(i, &cpuset);
189         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
190         if (rc != 0) {
191             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
192         }
193
194 //        threads.emplace_back(std::thread(listener, &pSctpParams));
195     }
196
197     //send to e2 manager init of e2 term
198     //E2_TERM_INIT
199     auto term_init = false;
200
201     char buff[128]{};
202     auto len = snprintf(buff, 128, "E2 terminator started");
203     rmr_mbuf_t *msg = rmr_alloc_msg(pSctpParams.rmrCtx, 200);
204     auto count = 0;
205     while (!term_init) {
206         msg->mtype = E2_TERM_INIT;
207         msg->state = 0;
208         rmr_bytes2payload(msg, (unsigned char *) buff, len);
209         static unsigned char tx[32];
210         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
211         rmr_bytes2xact(msg, tx, txLen);
212         msg = rmr_send_msg(pSctpParams.rmrCtx, msg);
213         if (msg == nullptr) {
214             msg = rmr_alloc_msg(pSctpParams.rmrCtx, 200);
215         } else if (msg->state == 0) {
216             term_init = true;
217             rmr_free_msg(msg);
218             //break;
219         } else {
220             if (count % 100 == 0) {
221                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %d ", msg->state);
222             }
223             sleep(1);
224         }
225         count++;
226     }
227
228     for (auto &t : threads) {
229         t.join();
230     }
231
232 #ifdef __TRACING__
233     opentracing::Tracer::Global()->Close();
234 #endif
235     return 0;
236 }
237
238 /**
239  *
240  * @param args
241  * @return
242  */
243 void listener(sctp_params_t *params) {
244 #ifdef __TRACING__
245     auto span = opentracing::Tracer::Global()->StartSpan(__FUNCTION__);
246 #else
247     otSpan span = 0;
248 #endif
249     int num_of_SCTP_messages = 0;
250     int num_of_XAPP_messages = 0;
251     auto totalTime = 0.0;
252     mdclog_mdc_clean();
253     mdclog_level_set(params->logLevel);
254
255     std::thread::id this_id = std::this_thread::get_id();
256     //save cout
257     streambuf *oldCout = cout.rdbuf();
258     ostringstream memCout;
259     // create new cout
260     cout.rdbuf(memCout.rdbuf());
261     cout << this_id;
262     //return to the normal cout
263     cout.rdbuf(oldCout);
264
265     char tid[32];
266     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
267     tid[memCout.str().length()] = 0;
268     mdclog_mdc_add("thread id", tid);
269
270     if (mdclog_level_get() >= MDCLOG_DEBUG) {
271         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
272     }
273
274     RmrMessagesBuffer_t rmrMessageBuffer{};
275     //create and init RMR
276     rmrMessageBuffer.rmrCtx = params->rmrCtx;
277
278     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
279     struct timespec end{0, 0};
280     struct timespec start{0, 0};
281
282     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
283     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
284
285     ReportingMessages_t message {};
286
287     for (int i = 0; i < MAX_RMR_BUFF_ARRY; i++) {
288         rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
289         rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
290     }
291
292     while (true) {
293         if (mdclog_level_get() >= MDCLOG_DEBUG) {
294             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
295         }
296         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
297         if (numOfEvents < 0 && errno == EINTR) {
298             if (mdclog_level_get() >= MDCLOG_DEBUG) {
299                 mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
300             }
301             continue;
302         }
303         if (numOfEvents < 0) {
304             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
305             return;
306         }
307         for (auto i = 0; i < numOfEvents; i++) {
308             if (mdclog_level_get() >= MDCLOG_DEBUG) {
309                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
310             }
311             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
312             start.tv_sec = message.message.time.tv_sec;
313             start.tv_nsec = message.message.time.tv_nsec;
314             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
315                 if (events[i].data.fd != params->rmrListenFd) {
316                     auto *peerInfo = (ConnectedCU_t *)events[i].data.ptr;
317                     mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
318                                  events[i].events, peerInfo->fileDescriptor, peerInfo->enodbName);
319
320                     rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
321                                                          "%s|Failed SCTP Connection",
322                                                          peerInfo->enodbName);
323                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
324                     message.message.asnLength = rmrMessageBuffer.sendMessage->len;
325
326                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
327                     message.message.direction = 'N';
328                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &span) != 0) {
329                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
330                     }
331
332                     close(peerInfo->fileDescriptor);
333                     cleanHashEntry((ConnectedCU_t *) events[i].data.ptr, params->sctpMap, &span);
334                 } else {
335                     mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", events[i].events);
336                 }
337             } else if (events[i].events & EPOLLOUT) {
338                 // this need to send waiting message from connection EINPROGRESS
339                 auto *peerInfo = (ConnectedCU_t *) events[i].data.ptr;
340
341                 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
342
343                 mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
344                 auto retVal = 0;
345                 socklen_t retValLen = 0;
346                 auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
347                 if (rc != 0 || retVal != 0) {
348                     if (rc != 0) {
349                         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
350                                                                        "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
351                                                                        peerInfo->enodbName, strerror(errno));
352                     } else if (retVal != 0) {
353                         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
354                                                                        "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
355                                                                        peerInfo->enodbName);
356                     }
357
358                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
359                     message.message.asnLength = rmrMessageBuffer.sendMessage->len;
360                     mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
361                     message.message.direction = 'N';
362                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &span) != 0) {
363                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
364                     }
365                     memset(peerInfo->asnData, 0, peerInfo->asnLength);
366                     peerInfo->asnLength = 0;
367                     peerInfo->mtype = 0;
368                     continue;
369                 }
370
371                 peerInfo->isConnected = true;
372
373                 if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
374                                   peerInfo->mtype, &span) != 0) {
375                     mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
376                     continue;
377                 }
378
379                 message.message.asndata = (unsigned char *)peerInfo->asnData;
380                 message.message.asnLength = peerInfo->asnLength;
381                 message.message.messageType = peerInfo->mtype;
382                 memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
383                 num_of_messages.fetch_add(1, std::memory_order_release);
384                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
385                     mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
386                                  message.message.enodbName);
387                 }
388                 if (sendSctpMsg(peerInfo, message, params->sctpMap, &span) != 0) {
389                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
390                         mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
391                     }
392                     continue;
393                 }
394
395                 memset(peerInfo->asnData, 0, peerInfo->asnLength);
396                 peerInfo->asnLength = 0;
397                 peerInfo->mtype = 0;
398
399             } else if (params->rmrListenFd == events[i].data.fd) {
400                 // got message from XAPP
401                 num_of_XAPP_messages++;
402                 num_of_messages.fetch_add(1, std::memory_order_release);
403                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
404                     mdclog_write(MDCLOG_DEBUG, "new message from RMR");
405                 }
406                 if (receiveXappMessages(params->epoll_fd,
407                                         params->sctpMap,
408                                         rmrMessageBuffer,
409                                         message.message.time,
410                                         &span) != 0) {
411                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
412                 }
413             } else {
414                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
415                  * We must read whatever data is available completely, as we are running
416                  *  in edge-triggered mode and won't get a notification again for the same data. */
417                 num_of_messages.fetch_add(1, std::memory_order_release);
418                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
419                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
420                 }
421                 receiveDataFromSctp(&events[i],
422                                     params->sctpMap,
423                                     num_of_SCTP_messages,
424                                     rmrMessageBuffer,
425                                     message.message.time,
426                                     &span);
427             }
428
429             clock_gettime(CLOCK_MONOTONIC, &end);
430             if (mdclog_level_get() >= MDCLOG_INFO) {
431                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
432                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
433             }
434             if (mdclog_level_get() >= MDCLOG_DEBUG) {
435                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
436                              end.tv_sec - start.tv_sec,
437                              end.tv_nsec - start.tv_nsec);
438             }
439         }
440     }
441 #ifdef __TRACING__
442     span->Finish();
443 #else
444
445 #endif
446 }
447
448 /**
449  *
450  * @param socket
451  * @return
452  */
453 int setSocketNoBlocking(int socket) {
454     auto flags = fcntl(socket, F_GETFL, 0);
455
456     if (flags == -1) {
457         mdclog_mdc_add("func", "fcntl");
458         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
459         mdclog_mdc_clean();
460         return -1;
461     }
462
463     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
464     if (fcntl(socket, F_SETFL, flags) == -1) {
465         mdclog_mdc_add("func", "fcntl");
466         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
467         mdclog_mdc_clean();
468         return -1;
469     }
470
471     return 0;
472 }
473
474 /**
475  *
476  * @param val
477  * @param m
478  * @param pSpan
479  */
480 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m, otSpan *pSpan) {
481 #ifdef __TRACING__
482     auto lspan = opentracing::Tracer::Global()->StartSpan(
483             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
484 #else
485 //    otSpan lspan = 0;
486 #endif
487     char *dummy;
488     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
489     char searchBuff[256]{};
490
491     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
492     m->erase(searchBuff);
493
494     m->erase(val->enodbName);
495     free(val);
496 #ifdef __TRACING__
497     lspan->Finish();
498 #endif
499 }
500
501 /**
502  *
503  * @param fd file discriptor
504  * @param data the asn data to send
505  * @param len  length of the data
506  * @param enodbName the enodbName as in the map for printing purpose
507  * @param m map host information
508  * @param mtype message number
509  * @return 0 success, anegative number on fail
510  */
511 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m, otSpan *pSpan) {
512 #ifdef __TRACING__
513     auto lspan = opentracing::Tracer::Global()->StartSpan(
514             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
515 #else
516     otSpan lspan = 0;
517 #endif
518     auto loglevel = mdclog_level_get();
519     int fd = peerInfo->fileDescriptor;
520     if (loglevel >= MDCLOG_DEBUG) {
521         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
522                      message.message.enodbName, __FUNCTION__);
523     }
524
525     while (true) {
526         //TODO add send to VES client or KAFKA
527         //format ts|mtype|direction(D/U)|length of asn data|raw data
528 //        auto length = sizeof message.message.time
529 //                      + sizeof message.message.enodbName
530 //                      + sizeof message.message.messageType
531 //                      + sizeof message.message.direction
532 //                      + sizeof message.message.asnLength
533 //                      + message.message.asnLength;
534
535         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
536             if (errno == EINTR) {
537                 continue;
538             }
539             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
540             // Prevent double free() of peerInfo in the event of connection failure.
541             // Returning failure will trigger, in x2/endc setup flow, RIC_SCTP_CONNECTION_FAILURE rmr message causing the E2M to retry.
542             if (!peerInfo->isConnected){
543                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
544 #ifdef __TRACING__
545             lspan->Finish();
546 #endif
547                 return -1;
548             }
549             cleanHashEntry(peerInfo, m, &lspan);
550             close(fd);
551             char key[MAX_ENODB_NAME_SIZE * 2];
552             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
553                      message.message.messageType);
554             if (loglevel >= MDCLOG_DEBUG) {
555                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
556             }
557             auto tmp = m->find(key);
558             if (tmp) {
559                 free(tmp);
560             }
561             m->erase(key);
562 #ifdef __TRACING__
563             lspan->Finish();
564 #endif
565             return -1;
566         }
567         message.message.direction = 'D';
568         // send report.buffer of size
569         buildJsonMessage(message);
570
571         if (loglevel >= MDCLOG_DEBUG) {
572             mdclog_write(MDCLOG_DEBUG,
573                          "SCTP message for CU %s sent from %s",
574                          message.message.enodbName,
575                          __FUNCTION__);
576         }
577 #ifdef __TRACING__
578         lspan->Finish();
579 #endif
580
581         return 0;
582     }
583 }
584
585 /**
586  *
587  * @param message
588  * @param rmrMessageBuffer
589  * @param pSpan
590  */
591 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
592 #ifdef __TRACING__
593     auto lspan = opentracing::Tracer::Global()->StartSpan(
594             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
595 #else
596 //    otSpan lspan = 0;
597 #endif
598     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)(message.message.enodbName));
599
600     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
601     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
602
603     if (mdclog_level_get() >= MDCLOG_DEBUG) {
604         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
605                      message.message.enodbName, (unsigned long) message.message.asnLength);
606     }
607 #ifdef __TRACING__
608     lspan->Finish();
609 #endif
610
611 }
612
613
614 /**
615  *
616  * @param metaData all the data strip to structure
617  * @param data the data recived from xAPP
618  * @return 0 success all other values are fault
619  */
620 int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan) {
621 #ifdef __TRACING__
622     auto lspan = opentracing::Tracer::Global()->StartSpan(
623             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
624 #else
625 //    otSpan lspan = 0;
626 #endif
627     auto loglevel = mdclog_level_get();
628
629     char delimiter[4] {};
630     memset(delimiter, 0, (size_t)4);
631     delimiter[0] = '|';
632     char *tmp;
633
634     char *val = strtok_r(data, delimiter, &tmp);
635     if (val != nullptr) {
636         if (mdclog_level_get() >= MDCLOG_DEBUG) {
637             mdclog_write(MDCLOG_DEBUG, "SCTP ADDRESS parameter from message = %s", val);
638         }
639         memcpy(host, val, tmp - val );
640     } else {
641         mdclog_write(MDCLOG_ERR, "wrong Host Name for setup request %s", data);
642 #ifdef __TRACING__
643         lspan->Finish();
644 #endif
645         return -1;
646     }
647
648     val = strtok_r(nullptr, delimiter, &tmp);
649     if (val != nullptr) {
650         if (mdclog_level_get() >= MDCLOG_DEBUG) {
651             mdclog_write(MDCLOG_DEBUG, "PORT parameter from message = %s", val);
652         }
653         char *dummy;
654         port = (uint16_t)strtol(val, &dummy, 10);
655     } else {
656         mdclog_write(MDCLOG_ERR, "wrong Port for setup request %s", data);
657 #ifdef __TRACING__
658         lspan->Finish();
659 #endif
660         return -2;
661     }
662
663     val = strtok_r(nullptr, delimiter, &tmp);
664     if (val != nullptr) {
665         if (mdclog_level_get() >= MDCLOG_DEBUG) {
666             mdclog_write(MDCLOG_DEBUG, "RAN NAME parameter from message = %s", val);
667         }
668         memcpy(message.message.enodbName, val, tmp - val);
669     } else {
670         mdclog_write(MDCLOG_ERR, "wrong gNb/Enodeb name for setup request %s", data);
671 #ifdef __TRACING__
672         lspan->Finish();
673 #endif
674
675         return -3;
676     }
677     val = strtok_r(nullptr, delimiter, &tmp);
678     if (val != nullptr) {
679         if (mdclog_level_get() >= MDCLOG_DEBUG) {
680             mdclog_write(MDCLOG_DEBUG, "ASN length parameter from message = %s", val);
681         }
682         char *dummy;
683         message.message.asnLength = (uint16_t) strtol(val, &dummy, 10);
684     } else {
685         mdclog_write(MDCLOG_ERR, "wrong ASN length for setup request %s", data);
686 #ifdef __TRACING__
687         lspan->Finish();
688 #endif
689         return -4;
690     }
691
692     message.message.asndata = (unsigned char *)tmp;  // tmp is local but point to the location in data
693
694     if (loglevel >= MDCLOG_INFO) {
695         mdclog_write(MDCLOG_INFO, "Message from Xapp RAN name = %s host address = %s port = %d",
696                      message.message.enodbName, host, port);
697     }
698 #ifdef __TRACING__
699     lspan->Finish();
700 #endif
701
702     return 0;
703 }
704
705 /**
706  *
707  * @param events
708  * @param sctpMap
709  * @param numOfMessages
710  * @param rmrMessageBuffer
711  * @param ts
712  * @param pSpan
713  * @return
714  */
715 int receiveDataFromSctp(struct epoll_event *events,
716                         Sctp_Map_t *sctpMap,
717                         int &numOfMessages,
718                         RmrMessagesBuffer_t &rmrMessageBuffer,
719                         struct timespec &ts,
720                         otSpan *pSpan) {
721 #ifdef __TRACING__
722     auto lspan = opentracing::Tracer::Global()->StartSpan(
723             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
724 #else
725     otSpan lspan = 0;
726 #endif
727     /* We have data on the fd waiting to be read. Read and display it.
728  * We must read whatever data is available completely, as we are running
729  *  in edge-triggered mode and won't get a notification again for the same data. */
730     int done = 0;
731     auto loglevel = mdclog_level_get();
732     // get the identity of the interface
733     auto *peerInfo = (ConnectedCU_t *)events->data.ptr;
734     struct timespec start{0, 0};
735     struct timespec decodestart{0, 0};
736     struct timespec end{0, 0};
737
738     E2AP_PDU_t *pdu = nullptr;
739
740     ReportingMessages_t message {};
741
742     while (true) {
743         if (loglevel >= MDCLOG_DEBUG) {
744             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", peerInfo->fileDescriptor);
745             clock_gettime(CLOCK_MONOTONIC, &start);
746         }
747         // read the buffer directly to rmr payload
748         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
749         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
750                 read(peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
751         if (loglevel >= MDCLOG_DEBUG) {
752             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
753                     peerInfo->fileDescriptor, message.message.asnLength);
754         }
755         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
756         message.message.direction = 'U';
757         message.message.time.tv_nsec = ts.tv_nsec;
758         message.message.time.tv_sec = ts.tv_sec;
759
760         if (message.message.asnLength < 0) {
761             if (errno == EINTR) {
762                 continue;
763             }
764             /* If errno == EAGAIN, that means we have read all
765                data. So go back to the main loop. */
766             if (errno != EAGAIN) {
767                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
768                 done = 1;
769             } else if (loglevel >= MDCLOG_DEBUG) {
770                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", peerInfo->fileDescriptor);
771             }
772             break;
773         } else if (message.message.asnLength == 0) {
774             /* End of file. The remote has closed the connection. */
775             if (loglevel >= MDCLOG_INFO) {
776                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
777                              peerInfo->fileDescriptor);
778             }
779             done = 1;
780             break;
781         }
782
783         asn_dec_rval_t rval;
784         if (loglevel >= MDCLOG_DEBUG) {
785             char printBuffer[4096]{};
786             char *tmp = printBuffer;
787             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
788                 snprintf(tmp, 2, "%02x", message.message.asndata[i]);
789                 tmp += 2;
790             }
791             printBuffer[message.message.asnLength] = 0;
792             clock_gettime(CLOCK_MONOTONIC, &end);
793             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
794                          peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
795             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
796                          printBuffer);
797             clock_gettime(CLOCK_MONOTONIC, &decodestart);
798         }
799
800         rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
801                           message.message.asndata, message.message.asnLength);
802         if (rval.code != RC_OK) {
803             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
804                          peerInfo->enodbName);
805             break;
806         }
807
808         if (loglevel >= MDCLOG_DEBUG) {
809             clock_gettime(CLOCK_MONOTONIC, &end);
810             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
811                          peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
812             char *printBuffer;
813             size_t size;
814             FILE *stream = open_memstream(&printBuffer, &size);
815             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
816             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
817             clock_gettime(CLOCK_MONOTONIC, &decodestart);
818         }
819
820         switch (pdu->present) {
821             case E2AP_PDU_PR_initiatingMessage: {//initiating message
822                 asnInitiatingRequest(pdu, message, rmrMessageBuffer, &lspan);
823                 break;
824             }
825             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
826                 asnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
827                 break;
828             }
829             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
830                 asnUnSuccsesfulMsg(pdu, message, sctpMap, rmrMessageBuffer, &lspan);
831                 break;
832             }
833             default:
834                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
835                 break;
836         }
837         if (loglevel >= MDCLOG_DEBUG) {
838             clock_gettime(CLOCK_MONOTONIC, &end);
839             mdclog_write(MDCLOG_DEBUG,
840                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
841                          peerInfo->enodbName, end.tv_sec - decodestart.tv_sec, end.tv_nsec - decodestart.tv_nsec);
842
843         }
844         numOfMessages++;
845         // remove the break for EAGAIN
846         //break;
847         if (pdu != nullptr) {
848             //TODO need to test ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); to get better performance
849             //ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu);
850             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
851             pdu = nullptr;
852         }
853         //clock_gettime(CLOCK_MONOTONIC, &start);
854     }
855     // in case of break to avoid memory leak
856     if (pdu != nullptr) {
857         ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
858         pdu = nullptr;
859     }
860
861     if (done) {
862         if (loglevel >= MDCLOG_INFO) {
863             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", peerInfo->fileDescriptor);
864         }
865         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
866                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
867                         256,
868                         "%s|CU disconnected unexpectedly",
869                         peerInfo->enodbName);
870         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
871
872         if (sendRequestToXapp(message,
873                               RIC_SCTP_CONNECTION_FAILURE,
874                               rmrMessageBuffer,
875                               &lspan) != 0) {
876             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
877         }
878
879         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
880         close(peerInfo->fileDescriptor);
881         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap, &lspan);
882     }
883     if (loglevel >= MDCLOG_DEBUG) {
884         clock_gettime(CLOCK_MONOTONIC, &end);
885         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
886                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
887
888     }
889 #ifdef __TRACING__
890     lspan->Finish();
891 #endif
892
893     return 0;
894 }
895
896 /**
897  *
898  * @param pdu
899  * @param message
900  * @param rmrMessageBuffer
901  * @param pSpan
902  */
903 void asnInitiatingRequest(E2AP_PDU_t *pdu,
904                           ReportingMessages_t &message,
905                           RmrMessagesBuffer_t &rmrMessageBuffer,
906                           otSpan *pSpan) {
907 #ifdef __TRACING__
908     auto lspan = opentracing::Tracer::Global()->StartSpan(
909             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
910 #else
911     otSpan lspan = 0;
912 #endif
913
914     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
915     if (mdclog_level_get() >= MDCLOG_INFO) {
916         mdclog_write(MDCLOG_INFO, "Initiating message %ld", procedureCode);
917     }
918     switch (procedureCode) {
919         case ProcedureCode_id_x2Setup: {
920             if (mdclog_level_get() >= MDCLOG_INFO) {
921                 mdclog_write(MDCLOG_INFO, "Got Setup Initiating  message from CU - %s",
922                              message.message.enodbName);
923             }
924             break;
925         }
926         case ProcedureCode_id_endcX2Setup: {
927             if (mdclog_level_get() >= MDCLOG_INFO) {
928                 mdclog_write(MDCLOG_INFO, "Got X2 EN-DC Setup Request from CU - %s",
929                              message.message.enodbName);
930             }
931             break;
932         }
933         case ProcedureCode_id_ricSubscription: {
934             if (mdclog_level_get() >= MDCLOG_INFO) {
935                 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Request message from CU - %s",
936                              message.message.enodbName);
937             }
938             break;
939         }
940         case ProcedureCode_id_ricSubscriptionDelete: {
941             if (mdclog_level_get() >= MDCLOG_INFO) {
942                 mdclog_write(MDCLOG_INFO, "Got RIC Subscription Delete Request message from CU - %s",
943                              message.message.enodbName);
944             }
945             break;
946         }
947         case ProcedureCode_id_endcConfigurationUpdate: {
948             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
949                 mdclog_write(MDCLOG_ERR, "E2 EN-DC CONFIGURATION UPDATE message failed to send to xAPP");
950             }
951             break;
952         }
953         case ProcedureCode_id_eNBConfigurationUpdate: {
954             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE, rmrMessageBuffer, &lspan) != 0) {
955                 mdclog_write(MDCLOG_ERR, "E2 EN-BC CONFIGURATION UPDATE message failed to send to xAPP");
956             }
957             break;
958         }
959         case ProcedureCode_id_x2Removal: {
960             if (mdclog_level_get() >= MDCLOG_INFO) {
961                 mdclog_write(MDCLOG_INFO, "Got E2 Removal Initiating  message from CU - %s",
962                              message.message.enodbName);
963             }
964             break;
965         }
966         case ProcedureCode_id_loadIndication: {
967             if (sendRequestToXapp(message, RIC_ENB_LOAD_INFORMATION, rmrMessageBuffer, &lspan) != 0) {
968                 mdclog_write(MDCLOG_ERR, "Load indication message failed to send to xAPP");
969             }
970             break;
971         }
972         case ProcedureCode_id_resourceStatusReportingInitiation: {
973             if (mdclog_level_get() >= MDCLOG_INFO) {
974                 mdclog_write(MDCLOG_INFO, "Got Status reporting initiation message from CU - %s",
975                              message.message.enodbName);
976             }
977             break;
978         }
979         case ProcedureCode_id_resourceStatusReporting: {
980             if (sendRequestToXapp(message, RIC_RESOURCE_STATUS_UPDATE, rmrMessageBuffer, &lspan) != 0) {
981                 mdclog_write(MDCLOG_ERR, "Resource Status Reporting message failed to send to xAPP");
982             }
983             break;
984         }
985         case ProcedureCode_id_reset: {
986             if (sendRequestToXapp(message, RIC_X2_RESET, rmrMessageBuffer, &lspan) != 0) {
987                 mdclog_write(MDCLOG_ERR, "RIC_X2_RESET message failed to send to xAPP");
988             }
989             break;
990         }
991         case ProcedureCode_id_ricIndication: {
992             for (int i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
993                 auto messageSent = false;
994                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
995                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
996                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
997                 }
998                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
999                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1000                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1001                     }
1002                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
1003                         static unsigned char tx[32];
1004                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
1005                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1006                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1007                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1008                         rmrMessageBuffer.sendMessage->state = 0;
1009                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1010                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1011                         messageSent = true;
1012                     } else {
1013                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1014                     }
1015                 }
1016                 if (messageSent) {
1017                     break;
1018                 }
1019             }
1020             break;
1021         }
1022         case ProcedureCode_id_errorIndication: {
1023             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1024                 mdclog_write(MDCLOG_ERR, "Error Indication message failed to send to xAPP");
1025             }
1026             break;
1027         }
1028         case ProcedureCode_id_ricServiceUpdate : {
1029             if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer, &lspan) != 0) {
1030                 mdclog_write(MDCLOG_ERR, "Service Update message failed to send to xAPP");
1031             }
1032             break;
1033         }
1034         case ProcedureCode_id_gNBStatusIndication : {
1035             if (sendRequestToXapp(message, RIC_GNB_STATUS_INDICATION, rmrMessageBuffer, &lspan) != 0) {
1036                 mdclog_write(MDCLOG_ERR, "RIC_GNB_STATUS_INDICATION failed to send to xAPP");
1037             }
1038             break;
1039         }
1040         default: {
1041             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
1042             message.message.messageType = 0; // no RMR message type yet
1043
1044             buildJsonMessage(message);
1045
1046             break;
1047         }
1048     }
1049 #ifdef __TRACING__
1050     lspan->Finish();
1051 #endif
1052
1053 }
1054
1055 /**
1056  *
1057  * @param pdu
1058  * @param message
1059  * @param sctpMap
1060  * @param rmrMessageBuffer
1061  * @param pSpan
1062  */
1063 void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, Sctp_Map_t *sctpMap,
1064                       RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan) {
1065 #ifdef __TRACING__
1066     auto lspan = opentracing::Tracer::Global()->StartSpan(
1067             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1068 #else
1069     otSpan lspan = 0;
1070 #endif
1071     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
1072     if (mdclog_level_get() >= MDCLOG_INFO) {
1073         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
1074     }
1075     switch (procedureCode) {
1076         case ProcedureCode_id_x2Setup: {
1077             if (mdclog_level_get() >= MDCLOG_INFO) {
1078                 mdclog_write(MDCLOG_INFO, "Got Succesful Setup response from CU - %s",
1079                              message.message.enodbName);
1080             }
1081             if (sendResponseToXapp(message, RIC_X2_SETUP_RESP,
1082                                    RIC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1083                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful Setup response for CU - %s",
1084                              message.message.enodbName);
1085             }
1086             break;
1087         }
1088         case ProcedureCode_id_endcX2Setup: { //X2_EN_DC_SETUP_REQUEST_FROM_CU
1089             if (mdclog_level_get() >= MDCLOG_INFO) {
1090                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC Setup response from CU - %s",
1091                              message.message.enodbName);
1092             }
1093             if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_RESP,
1094                                    RIC_ENDC_X2_SETUP_REQ, rmrMessageBuffer, sctpMap, &lspan) != 0) {
1095                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful X2 EN DC Setup response for CU - %s",
1096                              message.message.enodbName);
1097             }
1098             break;
1099         }
1100         case ProcedureCode_id_endcConfigurationUpdate: {
1101             if (mdclog_level_get() >= MDCLOG_INFO) {
1102                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 EN-DC CONFIGURATION UPDATE from CU - %s",
1103                              message.message.enodbName);
1104             }
1105             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1106                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 EN DC CONFIGURATION response for CU - %s",
1107                              message.message.enodbName);
1108             }
1109             break;
1110         }
1111         case ProcedureCode_id_eNBConfigurationUpdate: {
1112             if (mdclog_level_get() >= MDCLOG_INFO) {
1113                 mdclog_write(MDCLOG_INFO, "Got Succesful E2 ENB CONFIGURATION UPDATE from CU - %s",
1114                              message.message.enodbName);
1115             }
1116             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_ACK, rmrMessageBuffer, &lspan) != 0) {
1117                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2 ENB CONFIGURATION response for CU - %s",
1118                              message.message.enodbName);
1119             }
1120             break;
1121         }
1122         case ProcedureCode_id_reset: {
1123             if (sendRequestToXapp(message, RIC_X2_RESET_RESP, rmrMessageBuffer, &lspan) != 0) {
1124                 mdclog_write(MDCLOG_ERR, "Failed to send Succesful E2_RESET response for CU - %s",
1125                              message.message.enodbName);
1126             }
1127             break;
1128
1129         }
1130         case ProcedureCode_id_resourceStatusReportingInitiation: {
1131             if (sendRequestToXapp(message, RIC_RES_STATUS_RESP, rmrMessageBuffer, &lspan) != 0) {
1132                 mdclog_write(MDCLOG_ERR,
1133                              "Failed to send Succesful 2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1134                              message.message.enodbName);
1135             }
1136             break;
1137         }
1138         case ProcedureCode_id_ricSubscription: {
1139             if (mdclog_level_get() >= MDCLOG_INFO) {
1140                 mdclog_write(MDCLOG_INFO, "Got Succesful RIC Subscription response from CU - %s",
1141                              message.message.enodbName);
1142             }
1143             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer, &lspan) != 0) {
1144                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
1145             }
1146             break;
1147
1148         }
1149         case ProcedureCode_id_ricSubscriptionDelete: {
1150             if (mdclog_level_get() >= MDCLOG_INFO) {
1151                 mdclog_write(MDCLOG_INFO,
1152                              "Got Succesful RIC Subscription Delete response from CU - %s",
1153                              message.message.enodbName);
1154             }
1155             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer, &lspan) != 0) {
1156                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
1157             }
1158             break;
1159         }
1160         case ProcedureCode_id_ricControl: {
1161             if (mdclog_level_get() >= MDCLOG_INFO) {
1162                 mdclog_write(MDCLOG_INFO,
1163                              "Got Succesful RIC control response from CU - %s",
1164                              message.message.enodbName);
1165             }
1166             for (int i = 0;
1167                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
1168                 auto messageSent = false;
1169                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
1170                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1171                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1172                 }
1173                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1174                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1175                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1176                     }
1177                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
1178                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
1179                         rmrMessageBuffer.sendMessage->state = 0;
1180                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1181                         static unsigned char tx[32];
1182                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1183                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1184                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1185
1186                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1187                         messageSent = true;
1188                     } else {
1189                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1190                     }
1191                 }
1192                 if (messageSent) {
1193                     break;
1194                 }
1195             }
1196             break;
1197         }
1198         default: {
1199             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1200             message.message.messageType = 0; // no RMR message type yet
1201             buildJsonMessage(message);
1202
1203             break;
1204         }
1205     }
1206 #ifdef __TRACING__
1207     lspan->Finish();
1208 #endif
1209
1210 }
1211
1212 /**
1213  *
1214  * @param pdu
1215  * @param message
1216  * @param sctpMap
1217  * @param rmrMessageBuffer
1218  * @param pSpan
1219  */
1220 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
1221                         ReportingMessages_t &message,
1222                         Sctp_Map_t *sctpMap,
1223                         RmrMessagesBuffer_t &rmrMessageBuffer,
1224                         otSpan *pSpan) {
1225 #ifdef __TRACING__
1226     auto lspan = opentracing::Tracer::Global()->StartSpan(
1227             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1228 #else
1229     otSpan lspan = 0;
1230 #endif
1231     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
1232     if (mdclog_level_get() >= MDCLOG_INFO) {
1233         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
1234     }
1235     switch (procedureCode) {
1236         case ProcedureCode_id_x2Setup: {
1237             if (mdclog_level_get() >= MDCLOG_INFO) {
1238                 mdclog_write(MDCLOG_INFO,
1239                              "Got Unsuccessful Setup response from CU - %s",
1240                              message.message.enodbName);
1241             }
1242             if (sendResponseToXapp(message,
1243                                    RIC_X2_SETUP_FAILURE, RIC_X2_SETUP_REQ,
1244                                    rmrMessageBuffer,
1245                                    sctpMap,
1246                                    &lspan) != 0) {
1247                 mdclog_write(MDCLOG_ERR,
1248                              "Failed to send Unsuccessful Setup response for CU - %s",
1249                              message.message.enodbName);
1250                 break;
1251             }
1252             break;
1253         }
1254         case ProcedureCode_id_endcX2Setup: {
1255             if (mdclog_level_get() >= MDCLOG_INFO) {
1256                 mdclog_write(MDCLOG_INFO,
1257                              "Got Unsuccessful E2 EN-DC Setup response from CU - %s",
1258                              message.message.enodbName);
1259             }
1260             if (sendResponseToXapp(message, RIC_ENDC_X2_SETUP_FAILURE,
1261                                    RIC_ENDC_X2_SETUP_REQ,
1262                                    rmrMessageBuffer,
1263                                    sctpMap,
1264                                    &lspan) != 0) {
1265                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC Setup response for CU - %s",
1266                              message.message.enodbName);
1267             }
1268             break;
1269         }
1270         case ProcedureCode_id_endcConfigurationUpdate: {
1271             if (sendRequestToXapp(message, RIC_ENDC_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1272                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 EN DC CONFIGURATION response for CU - %s",
1273                              message.message.enodbName);
1274             }
1275             break;
1276         }
1277         case ProcedureCode_id_eNBConfigurationUpdate: {
1278             if (sendRequestToXapp(message, RIC_ENB_CONF_UPDATE_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1279                 mdclog_write(MDCLOG_ERR, "Failed to send Unsuccessful E2 ENB CONFIGURATION response for CU - %s",
1280                              message.message.enodbName);
1281             }
1282             break;
1283         }
1284         case ProcedureCode_id_resourceStatusReportingInitiation: {
1285             if (sendRequestToXapp(message, RIC_RES_STATUS_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1286                 mdclog_write(MDCLOG_ERR,
1287                              "Failed to send Succesful E2_REQUEST_STATUS_REPORTING_INITIATION response for CU - %s",
1288                              message.message.enodbName);
1289             }
1290             break;
1291         }
1292         case ProcedureCode_id_ricSubscription: {
1293             if (mdclog_level_get() >= MDCLOG_INFO) {
1294                 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Response from CU - %s",
1295                              message.message.enodbName);
1296             }
1297             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1298                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
1299             }
1300             break;
1301         }
1302         case ProcedureCode_id_ricSubscriptionDelete: {
1303             if (mdclog_level_get() >= MDCLOG_INFO) {
1304                 mdclog_write(MDCLOG_INFO, "Got Unsuccessful RIC Subscription Delete Response from CU - %s",
1305                              message.message.enodbName);
1306             }
1307             if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1308                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
1309             }
1310             break;
1311         }
1312         case ProcedureCode_id_ricControl: {
1313             if (mdclog_level_get() >= MDCLOG_INFO) {
1314                 mdclog_write(MDCLOG_INFO, "Got UNSuccesful RIC control response from CU - %s",
1315                              message.message.enodbName);
1316             }
1317             for (int i = 0;
1318                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
1319                 auto messageSent = false;
1320                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
1321                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1322                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
1323                 }
1324                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
1325                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1326                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
1327                     }
1328                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
1329                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
1330                         rmrMessageBuffer.sendMessage->state = 0;
1331                         rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
1332                         static unsigned char tx[32];
1333                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1334                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
1335                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName));
1336                         sendRmrMessage(rmrMessageBuffer, message, &lspan);
1337                         messageSent = true;
1338                     } else {
1339                         mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
1340                     }
1341                 }
1342                 if (messageSent) {
1343                     break;
1344                 }
1345             }
1346             break;
1347         }
1348         default: {
1349             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
1350             message.message.messageType = 0; // no RMR message type yet
1351
1352             buildJsonMessage(message);
1353
1354             break;
1355         }
1356     }
1357 #ifdef __TRACING__
1358     lspan->Finish();
1359 #endif
1360
1361 }
1362
1363 /**
1364  *
1365  * @param message
1366  * @param requestId
1367  * @param rmrMmessageBuffer
1368  * @param pSpan
1369  * @return
1370  */
1371 int sendRequestToXapp(ReportingMessages_t &message,
1372                       int requestId,
1373                       RmrMessagesBuffer_t &rmrMmessageBuffer,
1374                       otSpan *pSpan) {
1375 #ifdef __TRACING__
1376     auto lspan = opentracing::Tracer::Global()->StartSpan(
1377             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1378 #else
1379     otSpan lspan = 0;
1380 #endif
1381     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
1382                    (unsigned char *)message.message.enodbName,
1383                    strlen(message.message.enodbName));
1384     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
1385     rmrMmessageBuffer.sendMessage->state = 0;
1386     static unsigned char tx[32];
1387     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1388     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
1389
1390     auto rc = sendRmrMessage(rmrMmessageBuffer, message, &lspan);
1391 #ifdef __TRACING__
1392     lspan->Finish();
1393 #endif
1394
1395     return rc;
1396 }
1397
1398
1399 void *getRmrContext(char *rmrAddress, otSpan *pSpan) {
1400 #ifdef __TRACING__
1401     auto lspan = opentracing::Tracer::Global()->StartSpan(
1402             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1403 #else
1404 //    otSpan lspan = 0;
1405 #endif
1406     void *rmrCtx = rmr_init(rmrAddress, RMR_MAX_RCV_BYTES, RMRFL_NONE);
1407
1408
1409     if (rmrCtx == nullptr) {
1410         mdclog_write(MDCLOG_ERR, "RMR failed to initialise : %s", strerror(errno));
1411 #ifdef __TRACING__
1412         lspan->Finish();
1413 #endif
1414
1415         return (nullptr);
1416     }
1417
1418     rmr_set_stimeout(rmrCtx, 0);    // disable retries for any send operation
1419     // we need to find that routing table exist and we can run
1420     if (mdclog_level_get() >= MDCLOG_INFO) {
1421         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
1422     }
1423     int rmrReady = 0;
1424     int count = 0;
1425     while (!rmrReady) {
1426         if ((rmrReady = rmr_ready(rmrCtx)) == 0) {
1427             sleep(1);
1428         }
1429         count++;
1430         if (count % 60 == 0) {
1431             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
1432         }
1433     }
1434     if (mdclog_level_get() >= MDCLOG_INFO) {
1435         mdclog_write(MDCLOG_INFO, "RMR running");
1436     }
1437 #ifdef __TRACING__
1438     lspan->Finish();
1439 #endif
1440
1441     return rmrCtx;
1442 }
1443
1444 /**
1445  *
1446  * @param epoll_fd
1447  * @param sctpMap
1448  * @param rmrMessageBuffer
1449  * @param ts
1450  * @param pSpan
1451  * @return
1452  */
1453 int receiveXappMessages(int epoll_fd,
1454                         Sctp_Map_t *sctpMap,
1455                         RmrMessagesBuffer_t &rmrMessageBuffer,
1456                         struct timespec &ts,
1457                         otSpan *pSpan) {
1458 #ifdef __TRACING__
1459     auto lspan = opentracing::Tracer::Global()->StartSpan(
1460             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1461 #else
1462     otSpan lspan = 0;
1463 #endif
1464     if (rmrMessageBuffer.rcvMessage == nullptr) {
1465         //we have error
1466         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
1467 #ifdef __TRACING__
1468         lspan->Finish();
1469 #endif
1470
1471         return -1;
1472     }
1473
1474     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1475         mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
1476     }
1477     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
1478     if (rmrMessageBuffer.rcvMessage == nullptr) {
1479         mdclog_write(MDCLOG_ERR, "RMR Receving message with null pointer, Realloc rmr mesage buffer");
1480         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
1481 #ifdef __TRACING__
1482         lspan->Finish();
1483 #endif
1484
1485         return -2;
1486     }
1487     ReportingMessages_t message;
1488     message.message.direction = 'D';
1489     message.message.time.tv_nsec = ts.tv_nsec;
1490     message.message.time.tv_sec = ts.tv_sec;
1491
1492     // get message payload
1493     //auto msgData = msg->payload;
1494     if (rmrMessageBuffer.rcvMessage->state != 0) {
1495         mdclog_write(MDCLOG_ERR, "RMR Receving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
1496 #ifdef __TRACING__
1497         lspan->Finish();
1498 #endif
1499
1500         return -1;
1501     }
1502     switch (rmrMessageBuffer.rcvMessage->mtype) {
1503         case RIC_X2_SETUP_REQ: {
1504             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1505                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1506                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1507                 message.message.direction = 'N';
1508                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1509                         snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1510                                 256,
1511                                 "ERROR in connectToCUandSetUp on RIC_X2_SETUP_REQ");
1512                 rmrMessageBuffer.sendMessage->state = 0;
1513                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1514
1515                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1516                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1517                 }
1518                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1519                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName);
1520
1521                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1522 #ifdef __TRACING__
1523                 lspan->Finish();
1524 #endif
1525                 return -3;
1526             }
1527             break;
1528         }
1529         case RIC_ENDC_X2_SETUP_REQ: {
1530             if (connectToCUandSetUp(rmrMessageBuffer, message, epoll_fd, sctpMap, &lspan) != 0) {
1531                 mdclog_write(MDCLOG_ERR, "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1532                 message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_SCTP_CONNECTION_FAILURE;
1533                 message.message.direction = 'N';
1534                 message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1535                         snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
1536                                  "ERROR in connectToCUandSetUp on RIC_ENDC_X2_SETUP_REQ");
1537                 rmrMessageBuffer.sendMessage->state = 0;
1538                 message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1539
1540                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1541                     mdclog_write(MDCLOG_DEBUG, "start writing to rmr buffer");
1542                 }
1543
1544                 rmr_bytes2xact(rmrMessageBuffer.sendMessage, rmrMessageBuffer.rcvMessage->xaction, RMR_MAX_XID);
1545                 rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
1546
1547                 sendRmrMessage(rmrMessageBuffer, message, &lspan);
1548 #ifdef __TRACING__
1549                 lspan->Finish();
1550 #endif
1551                 return -3;
1552             }
1553             break;
1554         }
1555         case RIC_ENDC_CONF_UPDATE: {
1556             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1557                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1558 #ifdef __TRACING__
1559                 lspan->Finish();
1560 #endif
1561                 return -4;
1562             }
1563             break;
1564         }
1565         case RIC_ENDC_CONF_UPDATE_ACK: {
1566             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1567                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_ACK");
1568 #ifdef __TRACING__
1569                 lspan->Finish();
1570 #endif
1571                 return -4;
1572             }
1573             break;
1574         }
1575         case RIC_ENDC_CONF_UPDATE_FAILURE: {
1576             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1577                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE_FAILURE");
1578 #ifdef __TRACING__
1579                 lspan->Finish();
1580 #endif
1581
1582                 return -4;
1583             }
1584             break;
1585         }
1586         case RIC_ENB_CONF_UPDATE: {
1587             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1588                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENDC_CONF_UPDATE");
1589 #ifdef __TRACING__
1590                 lspan->Finish();
1591 #endif
1592                 return -4;
1593             }
1594             break;
1595         }
1596         case RIC_ENB_CONF_UPDATE_ACK: {
1597             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1598                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_ACK");
1599 #ifdef __TRACING__
1600                 lspan->Finish();
1601 #endif
1602                 return -4;
1603             }
1604             break;
1605         }
1606         case RIC_ENB_CONF_UPDATE_FAILURE: {
1607             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1608                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ENB_CONF_UPDATE_FAILURE");
1609 #ifdef __TRACING__
1610                 lspan->Finish();
1611 #endif
1612                 return -4;
1613             }
1614             break;
1615         }
1616         case RIC_RES_STATUS_REQ: {
1617             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1618                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_RES_STATUS_REQ");
1619 #ifdef __TRACING__
1620                 lspan->Finish();
1621 #endif
1622                 return -6;
1623             }
1624             break;
1625         }
1626         case RIC_SUB_REQ: {
1627             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1628                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
1629 #ifdef __TRACING__
1630                 lspan->Finish();
1631 #endif
1632                 return -6;
1633             }
1634             break;
1635         }
1636         case RIC_SUB_DEL_REQ: {
1637             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1638                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
1639 #ifdef __TRACING__
1640                 lspan->Finish();
1641 #endif
1642                 return -6;
1643             }
1644             break;
1645         }
1646         case RIC_CONTROL_REQ: {
1647             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1648                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
1649 #ifdef __TRACING__
1650                 lspan->Finish();
1651 #endif
1652                 return -6;
1653             }
1654             break;
1655         }
1656         case RIC_SERVICE_QUERY: {
1657             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1658                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
1659 #ifdef __TRACING__
1660                 lspan->Finish();
1661 #endif
1662                 return -6;
1663             }
1664             break;
1665         }
1666         case RIC_SERVICE_UPDATE_ACK: {
1667             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1668                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
1669 #ifdef __TRACING__
1670                 lspan->Finish();
1671 #endif
1672                 return -6;
1673             }
1674             break;
1675         }
1676         case RIC_SERVICE_UPDATE_FAILURE: {
1677             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1678                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
1679 #ifdef __TRACING__
1680                 lspan->Finish();
1681 #endif
1682                 return -6;
1683             }
1684             break;
1685         }
1686         case RIC_X2_RESET: {
1687             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1688                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET");
1689 #ifdef __TRACING__
1690                 lspan->Finish();
1691 #endif
1692                 return -6;
1693             }
1694             break;
1695         }
1696         case RIC_X2_RESET_RESP: {
1697             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap, &lspan) != 0) {
1698                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_X2_RESET_RESP");
1699 #ifdef __TRACING__
1700                 lspan->Finish();
1701 #endif
1702                 return -6;
1703             }
1704             break;
1705         }
1706         case RIC_SCTP_CLEAR_ALL: {
1707             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
1708             // loop on all keys and close socket and then erase all map.
1709             vector<char *> v;
1710             sctpMap->getKeys(v);
1711             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
1712                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
1713                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
1714                     if (peerInfo == nullptr) {
1715                         continue;
1716                     }
1717                     close(peerInfo->fileDescriptor);
1718                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1719                     message.message.direction = 'D';
1720                     message.message.time.tv_nsec = ts.tv_nsec;
1721                     message.message.time.tv_sec = ts.tv_sec;
1722
1723                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1724                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1725                                                                    256,
1726                                                                    "%s|RIC_SCTP_CLEAR_ALL",
1727                                                                    peerInfo->enodbName);
1728                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1729                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
1730                     if (sendRequestToXapp(message,
1731                                           RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer, &lspan) != 0) {
1732                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1733                     }
1734                     free(peerInfo);
1735                 }
1736             }
1737
1738             sleep(1);
1739             sctpMap->clear();
1740             break;
1741         }
1742         default:
1743             mdclog_write(MDCLOG_WARN, "Message Type : %d is not seported", rmrMessageBuffer.rcvMessage->mtype);
1744             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1745             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1746             message.message.time.tv_nsec = ts.tv_nsec;
1747             message.message.time.tv_sec = ts.tv_sec;
1748             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
1749
1750             buildJsonMessage(message);
1751
1752
1753 #ifdef __TRACING__
1754             lspan->Finish();
1755 #endif
1756             return -7;
1757     }
1758     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1759         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
1760     }
1761 #ifdef __TRACING__
1762     lspan->Finish();
1763 #endif
1764     return 0;
1765 }
1766
1767 /**
1768  * Send message to the CU that is not expecting for successful or unsuccessful results
1769  * @param messageBuffer
1770  * @param message
1771  * @param failedMsgId
1772  * @param sctpMap
1773  * @param pSpan
1774  * @return
1775  */
1776 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
1777                            ReportingMessages_t &message,
1778                            int failedMsgId,
1779                            Sctp_Map_t *sctpMap,
1780                            otSpan *pSpan) {
1781 #ifdef __TRACING__
1782     auto lspan = opentracing::Tracer::Global()->StartSpan(
1783             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1784 #else
1785     otSpan lspan = 0;
1786 #endif
1787
1788     getRequestMetaData(message, messageBuffer, &lspan);
1789     if (mdclog_level_get() >= MDCLOG_INFO) {
1790         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
1791     }
1792
1793     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId, &lspan);
1794 #ifdef __TRACING__
1795     lspan->Finish();
1796 #endif
1797
1798     return rc;
1799 }
1800
1801 /**
1802  *
1803  * @param sctpMap
1804  * @param messageBuffer
1805  * @param message
1806  * @param failedMesgId
1807  * @param pSpan
1808  * @return
1809  */
1810 int sendMessagetoCu(Sctp_Map_t *sctpMap,
1811                     RmrMessagesBuffer_t &messageBuffer,
1812                     ReportingMessages_t &message,
1813                     int failedMesgId,
1814                     otSpan *pSpan) {
1815 #ifdef __TRACING__
1816     auto lspan = opentracing::Tracer::Global()->StartSpan(
1817             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1818 #else
1819     otSpan lspan = 0;
1820 #endif
1821     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
1822     if (peerInfo == nullptr) {
1823         if (failedMesgId != 0) {
1824             sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId, &lspan);
1825         } else {
1826             mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
1827         }
1828 #ifdef __TRACING__
1829         lspan->Finish();
1830 #endif
1831
1832         return -1;
1833     }
1834
1835     // get the FD
1836     message.message.messageType = messageBuffer.rcvMessage->mtype;
1837     auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
1838 #ifdef __TRACING__
1839     lspan->Finish();
1840 #endif
1841
1842     return rc;
1843 }
1844
1845 /**
1846  *
1847  * @param rmrCtx the rmr context to send and receive
1848  * @param msg the msg we got fromxApp
1849  * @param metaData data from xApp in ordered struct
1850  * @param failedMesgId the return message type error
1851  */
1852 void
1853 sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId,
1854                                otSpan *pSpan) {
1855 #ifdef __TRACING__
1856     auto lspan = opentracing::Tracer::Global()->StartSpan(
1857             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1858 #else
1859     otSpan lspan = 0;
1860 #endif
1861     rmr_mbuf_t *msg = rmrMessageBuffer.sendMessage;
1862     msg->len = snprintf((char *) msg->payload, 200, "the gNb/eNode name %s not found",
1863                         message.message.enodbName);
1864     if (mdclog_level_get() >= MDCLOG_INFO) {
1865         mdclog_write(MDCLOG_INFO, "%s", msg->payload);
1866     }
1867     msg->mtype = failedMesgId;
1868     msg->state = 0;
1869
1870     static unsigned char tx[32];
1871     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1872     rmr_bytes2xact(msg, tx, strlen((const char *) tx));
1873
1874     sendRmrMessage(rmrMessageBuffer, message, &lspan);
1875 #ifdef __TRACING__
1876     lspan->Finish();pLogSink
1877 #endif
1878
1879 }
1880
1881 /**
1882  * Send Response back to xApp, message is used only when there was a request from the xApp
1883  *
1884  * @param enodbName the name of the gNb/eNodeB
1885  * @param msgType  the value of the message to the xApp
1886  * @param requestType The request that was sent by the xAPP
1887  * @param rmrCtx the rmr identifier
1888  * @param sctpMap hash map holds data on the requestrs
1889  * @param buf  the buffer to send to xAPP
1890  * @param size size of the buffer to send
1891  * @return
1892  */
1893 int sendResponseToXapp(ReportingMessages_t &message,
1894                        int msgType,
1895                        int requestType,
1896                        RmrMessagesBuffer_t &rmrMessageBuffer,
1897                        Sctp_Map_t *sctpMap,
1898                        otSpan *pSpan) {
1899 #ifdef __TRACING__
1900     auto lspan = opentracing::Tracer::Global()->StartSpan(
1901             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1902 #else
1903     otSpan lspan = 0;
1904 #endif
1905     char key[MAX_ENODB_NAME_SIZE * 2];
1906     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, requestType);
1907
1908     auto xact = sctpMap->find(key);
1909     if (xact == nullptr) {
1910         mdclog_write(MDCLOG_ERR, "NO Request %s found for this response from CU: %s", key,
1911                      message.message.enodbName);
1912 #ifdef __TRACING__
1913         lspan->Finish();
1914 #endif
1915
1916         return -1;
1917     }
1918     sctpMap->erase(key);
1919
1920     message.message.messageType = rmrMessageBuffer.sendMessage->mtype = msgType; //SETUP_RESPONSE_MESSAGE_TYPE;
1921     rmr_bytes2payload(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.asndata,
1922                       message.message.asnLength);
1923     rmr_bytes2xact(rmrMessageBuffer.sendMessage, (const unsigned char *)xact, strlen((const char *)xact));
1924     rmr_str2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName);
1925     rmrMessageBuffer.sendMessage->state = 0;
1926
1927     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1928         mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
1929     }
1930     free(xact);
1931
1932     auto rc = sendRmrMessage(rmrMessageBuffer, message, &lspan);
1933 #ifdef __TRACING__
1934     lspan->Finish();
1935 #endif
1936     return rc;
1937 }
1938
1939 /**
1940  * build the SCTP connection to eNodB or gNb
1941  * @param rmrMessageBuffer
1942  * @param message
1943  * @param epoll_fd
1944  * @param sctpMap
1945  * @param pSpan
1946  * @return
1947  */
1948 int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
1949                         ReportingMessages_t &message,
1950                         int epoll_fd,
1951                         Sctp_Map_t *sctpMap,
1952                         otSpan *pSpan) {
1953 #ifdef __TRACING__
1954     auto lspan = opentracing::Tracer::Global()->StartSpan(
1955             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
1956 #else
1957     otSpan lspan = 0;
1958 #endif
1959     struct sockaddr_in6 servaddr{};
1960     struct addrinfo hints{}, *result;
1961     auto msgData = rmrMessageBuffer.rcvMessage->payload;
1962     unsigned char meid[RMR_MAX_MEID]{};
1963     char host[256]{};
1964     uint16_t port = 0;
1965
1966     message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
1967     rmr_mbuf_t *msg = rmrMessageBuffer.rcvMessage;
1968     rmr_get_meid(msg, meid);
1969
1970     if (mdclog_level_get() >= MDCLOG_INFO) {
1971         mdclog_write(MDCLOG_INFO, "message %d Received for MEID :%s. SETUP/EN-DC Setup Request from xApp, Message = %s",
1972                      msg->mtype, meid, msgData);
1973     }
1974     if (getSetupRequestMetaData(message, (char *)msgData, host, port, &lspan) < 0) {
1975         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1976             mdclog_write(MDCLOG_DEBUG, "Error in setup parameters %s, %d", __func__, __LINE__);
1977         }
1978 #ifdef __TRACING__
1979         lspan->Finish();
1980 #endif
1981         return -1;
1982     }
1983
1984     //// message asndata points to the start of the asndata of the message and not to start of payload
1985     // search if the same host:port but not the same enodbname
1986     char searchBuff[256]{};
1987     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", host, port);
1988     auto e = (char *)sctpMap->find(searchBuff);
1989     if (e != nullptr) {
1990         // found one compare if not the same
1991         if (strcmp(message.message.enodbName, e) != 0) {
1992             mdclog_write(MDCLOG_ERR,
1993                          "Try to connect CU %s to Host %s but %s already connected",
1994                          message.message.enodbName, host, e);
1995 #ifdef __TRACING__
1996             lspan->Finish();
1997 #endif
1998             return -1;
1999         }
2000     }
2001
2002     // check if not alread connected. if connected send the request and return
2003     auto *peerInfo = (ConnectedCU_t *)sctpMap->find(message.message.enodbName);
2004     if (peerInfo != nullptr) {
2005 //        snprintf(strErr,
2006 //                128,
2007 //                "Device %s already connected please remove and then setup again",
2008 //                message.message.enodbName);
2009         if (mdclog_level_get() >= MDCLOG_INFO) {
2010             mdclog_write(MDCLOG_INFO,
2011                          "Device already connected to %s",
2012                          message.message.enodbName);
2013         }
2014         message.message.messageType = msg->mtype;
2015         auto rc = sendSctpMsg(peerInfo, message, sctpMap, &lspan);
2016         if (rc != 0) {
2017             mdclog_write(MDCLOG_ERR, "failed write to SCTP %s, %d", __func__, __LINE__);
2018 #ifdef __TRACING__
2019             lspan->Finish();
2020 #endif
2021             return -1;
2022         }
2023
2024         char key[MAX_ENODB_NAME_SIZE * 2];
2025         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2026         int xaction_len = strlen((const char *) msg->xaction);
2027         auto *xaction = (unsigned char *) calloc(1, xaction_len);
2028         memcpy(xaction, msg->xaction, xaction_len);
2029         sctpMap->setkey(key, xaction);
2030         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2031             mdclog_write(MDCLOG_DEBUG, "set key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2032         }
2033 #ifdef __TRACING__
2034         lspan->Finish();
2035 #endif
2036         return 0;
2037     }
2038
2039     peerInfo = (ConnectedCU_t *) calloc(1, sizeof(ConnectedCU_t));
2040     memcpy(peerInfo->enodbName, message.message.enodbName, sizeof(message.message.enodbName));
2041
2042     // new connection
2043     if ((peerInfo->fileDescriptor = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP)) < 0) {
2044         mdclog_write(MDCLOG_ERR, "Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2045 #ifdef __TRACING__
2046         lspan->Finish();
2047 #endif
2048         return -1;
2049     }
2050
2051     auto optval = 1;
2052     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval) != 0) {
2053         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEPORT Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2054 #ifdef __TRACING__
2055         lspan->Finish();
2056 #endif
2057         return -1;
2058     }
2059     optval = 1;
2060     if (setsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval) != 0) {
2061         mdclog_write(MDCLOG_ERR, "setsockopt SO_REUSEADDR Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2062 #ifdef __TRACING__
2063         lspan->Finish();
2064 #endif
2065         return -1;
2066     }
2067     servaddr.sin6_family = AF_INET6;
2068
2069     struct sockaddr_in6 localAddr {};
2070     localAddr.sin6_family = AF_INET6;
2071     localAddr.sin6_addr = in6addr_any;
2072     localAddr.sin6_port = htons(SRC_PORT);
2073
2074     if (bind(peerInfo->fileDescriptor, (struct sockaddr*)&localAddr , sizeof(struct sockaddr_in6)) < 0) {
2075         mdclog_write(MDCLOG_ERR, "bind Socket Error, %s %s, %d", strerror(errno), __func__, __LINE__);
2076 #ifdef __TRACING__
2077         lspan->Finish();
2078 #endif
2079         return -1;
2080     }//Ends the binding.
2081
2082     memset(&hints, 0, sizeof hints);
2083     hints.ai_flags = AI_NUMERICHOST;
2084     if (getaddrinfo(host, nullptr, &hints, &result) < 0) {
2085         close(peerInfo->fileDescriptor);
2086         mdclog_write(MDCLOG_ERR, "getaddrinfo error for %s, Error = %s", host, strerror(errno));
2087 #ifdef __TRACING__
2088         lspan->Finish();
2089 #endif
2090         return -1;
2091     }
2092     memcpy(&servaddr, result->ai_addr, sizeof(struct sockaddr_in6));
2093     freeaddrinfo(result);
2094
2095     servaddr.sin6_port = htons(port);      /* daytime server */
2096     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2097         mdclog_write(MDCLOG_DEBUG, "Send Connect FD = %d host : %s port %d",
2098                      peerInfo->fileDescriptor,
2099                      host,
2100                      port);
2101     }
2102
2103     // Add to Epol
2104     if (addToEpoll(epoll_fd, peerInfo, (EPOLLOUT | EPOLLIN | EPOLLET), sctpMap, message.message.enodbName,
2105                    msg->mtype, &lspan) != 0) {
2106 #ifdef __TRACING__
2107         lspan->Finish();
2108 #endif
2109         return -1;
2110     }
2111
2112     char hostBuff[NI_MAXHOST];
2113     char portBuff[NI_MAXHOST];
2114
2115     if (getnameinfo((SA *) &servaddr, sizeof(servaddr),
2116                     hostBuff, sizeof(hostBuff),
2117                     portBuff, sizeof(portBuff),
2118                     (uint) (NI_NUMERICHOST) | (uint) (NI_NUMERICSERV)) != 0) {
2119         mdclog_write(MDCLOG_ERR, "getnameinfo() Error, %s  %s %d", strerror(errno), __func__, __LINE__);
2120 #ifdef __TRACING__
2121         lspan->Finish();
2122 #endif
2123         return -1;
2124     }
2125
2126     if (setSocketNoBlocking(peerInfo->fileDescriptor) != 0) {
2127         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on sctpPort %s", hostBuff,
2128                      portBuff);
2129         close(peerInfo->fileDescriptor);
2130 #ifdef __TRACING__
2131         lspan->Finish();
2132 #endif
2133         return -1;
2134     }
2135
2136     memcpy(peerInfo->hostName, hostBuff, strlen(hostBuff));
2137     peerInfo->hostName[strlen(hostBuff)] = 0;
2138     memcpy(peerInfo->portNumber, portBuff, strlen(portBuff));
2139     peerInfo->portNumber[strlen(portBuff)] = 0;
2140
2141     // map by enoodb/gnb name
2142     sctpMap->setkey(message.message.enodbName, peerInfo);
2143     //map host and port to enodeb
2144     sctpMap->setkey(searchBuff, message.message.enodbName);
2145
2146     // save message for the return values
2147     char key[MAX_ENODB_NAME_SIZE * 2];
2148     snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, msg->mtype);
2149     int xaction_len = strlen((const char *) msg->xaction);
2150     auto *xaction = (unsigned char *) calloc(1, xaction_len);
2151     memcpy(xaction, msg->xaction, xaction_len);
2152     sctpMap->setkey(key, xaction);
2153     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2154         mdclog_write(MDCLOG_DEBUG, "End building peerinfo: %s for CU %s", key, message.message.enodbName);
2155     }
2156
2157     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2158         mdclog_write(MDCLOG_DEBUG, "Send connect to FD %d, %s, %d",
2159                      peerInfo->fileDescriptor, __func__, __LINE__);
2160     }
2161     if (connect(peerInfo->fileDescriptor, (SA *) &servaddr, sizeof(servaddr)) < 0) {
2162         if (errno != EINPROGRESS) {
2163             mdclog_write(MDCLOG_ERR, "connect FD %d to host : %s port %d, %s",
2164                          peerInfo->fileDescriptor, host, port, strerror(errno));
2165             close(peerInfo->fileDescriptor);
2166 #ifdef __TRACING__
2167             lspan->Finish();
2168 #endif
2169             return -1;
2170         }
2171         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2172             mdclog_write(MDCLOG_DEBUG,
2173                          "Connect to FD %d returned with EINPROGRESS : %s",
2174                          peerInfo->fileDescriptor, strerror(errno));
2175         }
2176         // since message.message.asndata is pointing to the asndata in the rmr message payload we copy it like this
2177         memcpy(peerInfo->asnData, message.message.asndata, message.message.asnLength);
2178         peerInfo->asnLength = message.message.asnLength;
2179         peerInfo->mtype = msg->mtype;
2180 #ifdef __TRACING__
2181         lspan->Finish();
2182 #endif
2183         return 0;
2184     }
2185
2186     if (mdclog_level_get() >= MDCLOG_INFO) {
2187         mdclog_write(MDCLOG_INFO, "Connect to FD %d returned OK without EINPROGRESS", peerInfo->fileDescriptor);
2188     }
2189
2190     peerInfo->isConnected = true;
2191
2192     if (modifyToEpoll(epoll_fd, peerInfo, (EPOLLIN | EPOLLET), sctpMap, message.message.enodbName, msg->mtype,
2193                       &lspan) != 0) {
2194 #ifdef __TRACING__
2195         lspan->Finish();
2196 #endif
2197         return -1;
2198     }
2199
2200     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2201         mdclog_write(MDCLOG_DEBUG, "Connected to host : %s port %d", host, port);
2202     }
2203
2204     message.message.messageType = msg->mtype;
2205     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2206         mdclog_write(MDCLOG_DEBUG, "Send SCTP message to FD %d", peerInfo->fileDescriptor);
2207     }
2208     if (sendSctpMsg(peerInfo, message, sctpMap, &lspan) != 0) {
2209         mdclog_write(MDCLOG_ERR, "Error write to SCTP  %s %d", __func__, __LINE__);
2210 #ifdef __TRACING__
2211         lspan->Finish();
2212 #endif
2213         return -1;
2214     }
2215     memset(peerInfo->asnData, 0, message.message.asnLength);
2216     peerInfo->asnLength = 0;
2217     peerInfo->mtype = 0;
2218
2219     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2220         mdclog_write(MDCLOG_DEBUG, "Sent message to SCTP for %s", message.message.enodbName);
2221     }
2222 #ifdef __TRACING__
2223     lspan->Finish();
2224 #endif
2225     return 0;
2226 }
2227
2228 /**
2229  *
2230  * @param epoll_fd
2231  * @param peerInfo
2232  * @param events
2233  * @param sctpMap
2234  * @param enodbName
2235  * @param msgType
2236  * @param pSpan
2237  * @return
2238  */
2239 int addToEpoll(int epoll_fd,
2240                ConnectedCU_t *peerInfo,
2241                uint32_t events,
2242                Sctp_Map_t *sctpMap,
2243                char *enodbName,
2244                int msgType,
2245                otSpan *pSpan) {
2246 #ifdef __TRACING__
2247     auto lspan = opentracing::Tracer::Global()->StartSpan(
2248             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2249 #else
2250     otSpan lspan = 0;
2251 #endif
2252     // Add to Epol
2253     struct epoll_event event{};
2254     event.data.ptr = peerInfo;
2255     event.events = events;
2256     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2257         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2258             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here), %s, %s %d",
2259                          strerror(errno), __func__, __LINE__);
2260         }
2261         close(peerInfo->fileDescriptor);
2262         cleanHashEntry(peerInfo, sctpMap, &lspan);
2263         char key[MAX_ENODB_NAME_SIZE * 2];
2264         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2265         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2266             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2267         }
2268         auto tmp = sctpMap->find(key);
2269         if (tmp) {
2270             free(tmp);
2271         }
2272         sctpMap->erase(key);
2273         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2274 #ifdef __TRACING__
2275         lspan->Finish();
2276 #endif
2277         return -1;
2278     }
2279 #ifdef __TRACING__
2280     lspan->Finish();
2281 #endif
2282     return 0;
2283 }
2284
2285 /**
2286  *
2287  * @param epoll_fd
2288  * @param peerInfo
2289  * @param events
2290  * @param sctpMap
2291  * @param enodbName
2292  * @param msgType
2293  * @param pSpan
2294  * @return
2295  */
2296 int modifyToEpoll(int epoll_fd,
2297                   ConnectedCU_t *peerInfo,
2298                   uint32_t events,
2299                   Sctp_Map_t *sctpMap,
2300                   char *enodbName,
2301                   int msgType,
2302                   otSpan *pSpan) {
2303 #ifdef __TRACING__
2304     auto lspan = opentracing::Tracer::Global()->StartSpan(
2305             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2306 #else
2307     otSpan lspan = 0;
2308 #endif
2309     // Add to Epol
2310     struct epoll_event event{};
2311     event.data.ptr = peerInfo;
2312     event.events = events;
2313     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
2314         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2315             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may chack not to quit here), %s, %s %d",
2316                          strerror(errno), __func__, __LINE__);
2317         }
2318         close(peerInfo->fileDescriptor);
2319         cleanHashEntry(peerInfo, sctpMap, &lspan);
2320         char key[MAX_ENODB_NAME_SIZE * 2];
2321         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2322         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2323             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2324         }
2325         auto tmp = sctpMap->find(key);
2326         if (tmp) {
2327             free(tmp);
2328         }
2329         sctpMap->erase(key);
2330         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may chack not to quit here)");
2331 #ifdef __TRACING__
2332         lspan->Finish();
2333 #endif
2334         return -1;
2335     }
2336 #ifdef __TRACING__
2337     lspan->Finish();
2338 #endif
2339     return 0;
2340 }
2341
2342
2343 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan) {
2344 #ifdef __TRACING__
2345     auto lspan = opentracing::Tracer::Global()->StartSpan(
2346             __FUNCTION__, { opentracing::ChildOf(&pSpan->get()->context()) });
2347 #else
2348 //    otSpan lspan = 0;
2349 #endif
2350     //serialize the span
2351 #ifdef __TRACING__
2352     std::unordered_map<std::string, std::string> data;
2353     RICCarrierWriter carrier(data);
2354     opentracing::Tracer::Global()->Inject((lspan.get())->context(), carrier);
2355     nlohmann::json j = data;
2356     std::string str = j.dump();
2357     static auto maxTraceLength = 0;
2358
2359     maxTraceLength = str.length() > maxTraceLength ? str.length() : maxTraceLength;
2360     // serialized context can be put to RMR message using function:
2361     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2362         mdclog_write(MDCLOG_DEBUG, "max trace length is %d trace data length = %ld data = %s", maxTraceLength,
2363                      str.length(), str.c_str());
2364     }
2365     rmr_set_trace(rmrMessageBuffer.sendMessage, (const unsigned char *) str.c_str(), str.length());
2366 #endif
2367     buildJsonMessage(message);
2368
2369     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2370
2371     if (rmrMessageBuffer.sendMessage == nullptr) {
2372         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2373         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2374 #ifdef __TRACING__
2375         lspan->Finish();
2376 #endif
2377         return -1;
2378     }
2379
2380     if (rmrMessageBuffer.sendMessage->state != 0) {
2381         char meid[RMR_MAX_MEID]{};
2382         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
2383             usleep(5);
2384             rmrMessageBuffer.sendMessage->state = 0;
2385             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
2386                          rmrMessageBuffer.sendMessage->mtype,
2387                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2388             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2389             if (rmrMessageBuffer.sendMessage == nullptr) {
2390                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
2391                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2392 #ifdef __TRACING__
2393                 lspan->Finish();
2394 #endif
2395                 return -1;
2396             } else if (rmrMessageBuffer.sendMessage->state != 0) {
2397                 mdclog_write(MDCLOG_ERR,
2398                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
2399                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2400                              rmrMessageBuffer.sendMessage->mtype,
2401                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2402                 auto rc = rmrMessageBuffer.sendMessage->state;
2403 #ifdef __TRACING__
2404                 lspan->Finish();
2405 #endif
2406                 return rc;
2407             }
2408         } else {
2409             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
2410                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
2411                          rmrMessageBuffer.sendMessage->mtype,
2412                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
2413 #ifdef __TRACING__
2414             lspan->Finish();
2415 #endif
2416             return rmrMessageBuffer.sendMessage->state;
2417         }
2418     }
2419     return 0;
2420 }
2421
2422 void buildJsonMessage(ReportingMessages_t &message) {
2423         message.outLen = sizeof(message.base64Data);
2424     base64::encode((const unsigned char *)message.message.asndata,
2425                    (const int)message.message.asnLength,
2426                   message.base64Data,
2427                   message.outLen);
2428     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2429         mdclog_write(MDCLOG_DEBUG, "asn data length = %d, base64 message length = %d ",
2430                      (int)message.message.asnLength,
2431                      (int)message.outLen);
2432     }
2433
2434 //    char buff[256];
2435 //    // build day time to seconds from epoc
2436 //    strftime(buff, sizeof message.message.time, "%D %T", gmtime(&message.message.time.tv_sec));
2437 //    // add nanosecond
2438 //    snprintf(buff, sizeof buff, "%s.%09ld UTC\n", buff, message.message.time.tv_nsec);
2439
2440     message.bufferLen = snprintf(message.buffer, sizeof(message.buffer),
2441             "{\"header\": {\"ts\": \"%ld.%09ld\","
2442             "\"ranName\": \"%s\","
2443             "\"messageType\": %d,"
2444             "\"direction\": \"%c\"},"
2445             "\"base64Length\": %d,"
2446             "\"asnBase64\": \"%s\"}",
2447             message.message.time.tv_sec,
2448             message.message.time.tv_nsec,
2449             message.message.enodbName,
2450             message.message.messageType,
2451             message.message.direction,
2452             (int)message.outLen,
2453             message.base64Data);
2454     static src::logger_mt& lg = my_logger::get();
2455
2456     BOOST_LOG(lg) << message.buffer;
2457
2458 }
2459
2460
2461 /**
2462  * take RMR error code to string
2463  * @param state
2464  * @return
2465  */
2466 string translateRmrErrorMessages(int state) {
2467     string str = {};
2468     switch (state) {
2469         case RMR_OK:
2470             str = "RMR_OK - state is good";
2471             break;
2472         case RMR_ERR_BADARG:
2473             str = "RMR_ERR_BADARG - argument passd to function was unusable";
2474             break;
2475         case RMR_ERR_NOENDPT:
2476             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
2477             break;
2478         case RMR_ERR_EMPTY:
2479             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
2480             break;
2481         case RMR_ERR_NOHDR:
2482             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
2483             break;
2484         case RMR_ERR_SENDFAILED:
2485             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
2486             break;
2487         case RMR_ERR_CALLFAILED:
2488             str = "RMR_ERR_CALLFAILED - unable to send call() message";
2489             break;
2490         case RMR_ERR_NOWHOPEN:
2491             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
2492             break;
2493         case RMR_ERR_WHID:
2494             str = "RMR_ERR_WHID - wormhole id was invalid";
2495             break;
2496         case RMR_ERR_OVERFLOW:
2497             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
2498             break;
2499         case RMR_ERR_RETRY:
2500             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
2501             break;
2502         case RMR_ERR_RCVFAILED:
2503             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
2504             break;
2505         case RMR_ERR_TIMEOUT:
2506             str = "RMR_ERR_TIMEOUT - message processing call timed out";
2507             break;
2508         case RMR_ERR_UNSET:
2509             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
2510             break;
2511         case RMR_ERR_TRUNC:
2512             str = "RMR_ERR_TRUNC - received message likely truncated";
2513             break;
2514         case RMR_ERR_INITFAILED:
2515             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
2516             break;
2517         case RMR_ERR_NOTSUPP:
2518             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
2519             break;
2520         default:
2521             char buf[128]{};
2522             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
2523             str = buf;
2524             break;
2525     }
2526     return str;
2527 }
2528
2529