3d00a62f236890f3dce3acfd7b81ba7ebb368393
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
1 // Copyright 2019 AT&T Intellectual Property
2 // Copyright 2019 Nokia
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15
16 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 //  platform project (RICP).
18
19 // TODO: High-level file comment.
20
21
22
23 #include <3rdparty/oranE2/RANfunctions-List.h>
24 #include "sctpThread.h"
25 #include "BuildRunName.h"
26 #include <unistd.h>
27 //#include "3rdparty/oranE2SM/E2SM-gNB-NRT-RANfunction-Definition.h"
28 //#include "BuildXml.h"
29 //#include "pugixml/src/pugixml.hpp"
30 #include <pthread.h>
31 #include <sys/time.h>
32 #include <sys/inotify.h>
33 #include <errno.h>
34 #include <sys/stat.h>
35
36 using namespace std;
37 //using namespace std::placeholders;
38 using namespace boost::filesystem;
39 using namespace prometheus;
40
41
42 //#ifdef __cplusplus
43 //extern "C"
44 //{
45 //#endif
46
47 // need to expose without the include of gcov
48 extern "C" void __gcov_flush(void);
49 #define LOG_FILE_CONFIG_MAP "CONFIG_MAP_NAME"
50
51 static void catch_function(int signal) {
52     __gcov_flush();
53     exit(signal);
54 }
55
56
57 BOOST_LOG_INLINE_GLOBAL_LOGGER_DEFAULT(my_logger, src::logger_mt)
58
59 boost::shared_ptr<sinks::synchronous_sink<sinks::text_file_backend>> boostLogger;
60 double cpuClock = 0.0;
61 bool jsonTrace = false;
62
63
64 static int enable_log_change_notify(const char* fileName)
65 {
66     int ret = -1;
67     struct stat fileInfo;
68     if ( lstat(fileName,&fileInfo) == 0 )
69     {
70         ret = register_log_change_notify(fileName);
71     }
72     return ret;
73 }
74
75
76 static int register_log_change_notify(const char *fileName)
77 {
78     pthread_attr_t cb_attr;
79     pthread_t tid;
80     pthread_attr_init(&cb_attr);
81     pthread_attr_setdetachstate(&cb_attr,PTHREAD_CREATE_DETACHED);
82     return pthread_create(&tid, &cb_attr,&monitor_loglevel_change_handler,(void *)fileName);
83 }
84
85
86 static void * monitor_loglevel_change_handler(void* arg)
87 {
88     char *fileName = (char*) arg;
89     int ifd;                   // the inotify file des
90     int wfd;                   // the watched file des
91     ssize_t n = 0;
92     char rbuf[4096];           // large read buffer as the event is var len
93     fd_set fds;
94     int res = 0;
95     struct timeval timeout;
96     char* dname=NULL;          // directory name
97     char* bname = NULL;        // basename
98     char* tok=NULL;
99     char* log_level=NULL;
100
101     dname = strdup( fileName); // defrock the file name into dir and basename
102     if( (tok = strrchr( dname, '/' )) != NULL ) {
103         *tok = '\0';
104         bname = strdup( tok+1 );
105     }
106
107
108     ifd = inotify_init1( 0 ); // initialise watcher setting blocking read (no option)
109     if( ifd < 0 ) {
110         fprintf( stderr, "### ERR ### unable to initialise file watch %s\n", strerror( errno ) );
111     } else {
112         wfd = inotify_add_watch( ifd, dname, IN_MOVED_TO | IN_CLOSE_WRITE ); // we only care about close write changes
113
114         if( wfd < 0 ) {
115             fprintf( stderr, "### ERR ### unable to add watch on config file %s: %s\n", fileName, strerror( errno ) );
116         } else {
117            
118
119             memset( &timeout, 0, sizeof(timeout) );
120             while( 1 ) {
121                 FD_ZERO (&fds);
122                 FD_SET (ifd, &fds);
123                 timeout.tv_sec=1;
124                 res = select (ifd + 1, &fds, NULL, NULL, &timeout);
125                 if(res)
126                 {
127                     n = read( ifd, rbuf, sizeof( rbuf ) ); // read the event
128                     if( n < 0  ) {
129 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))                        
130                         if( errno == EAGAIN ) {
131                         } else {
132                             fprintf( stderr, "### CRIT ### config listener read err: %s\n", strerror( errno ) );
133                         }
134                         continue;
135 #endif                        
136                     }
137
138                     //Retrieving Log Level from configmap by parsing configmap file
139                     log_level = parse_file(fileName);
140                     update_mdc_log_level_severity(log_level); //setting log level
141                     free(log_level);
142                 }
143             }
144             inotify_rm_watch(ifd,wfd);
145         }
146         close(ifd);
147     }
148     free(bname);
149     free(dname);
150
151     pthread_exit(NULL);
152 }
153
154 void  update_mdc_log_level_severity(char* log_level)
155 {
156     mdclog_severity_t level = MDCLOG_ERR;
157
158     if(log_level == NULL)
159     {
160         printf("### ERR ### Invalid Log-Level Configuration in ConfigMap, Default Log-Level Applied:   %d\n",level);
161     }
162     else if(strcasecmp(log_level,"1")==0)
163     {
164         level = MDCLOG_ERR;
165     }
166     else if(strcasecmp(log_level,"2")==0)
167     {
168         level = MDCLOG_WARN;
169     }
170     else if(strcasecmp(log_level,"3")==0)
171     {
172         level = MDCLOG_INFO;
173     }
174     else if(strcasecmp(log_level,"4")==0)
175     {
176         level = MDCLOG_DEBUG;
177     }
178
179     mdclog_level_set(level);
180 }
181 static char* parse_file(char* filename)
182 {
183     char *token=NULL;
184     char *search = ": ";
185     char *string_match = "log-level";
186     bool found = false;
187     FILE *file = fopen ( filename, "r" );
188     if ( file != NULL )
189     {
190         char line [ 128 ];
191         while ( fgets ( line, sizeof line, file ) != NULL )
192         {
193             token = strtok(line, search);
194             if(strcmp(token,string_match)==0)
195             {
196                 found = true;
197                 token = strtok(NULL, search);
198                 token = strtok(token, "\n");//removing newline if any
199                 break;
200             }
201         }
202         fclose ( file );
203      }
204      if(found)
205          return(strdup(token));
206      else
207          return(NULL);
208 }
209
210 char *read_env_param(const char*envkey)
211 {
212     if(envkey)
213     {
214         char *value = getenv(envkey);
215         if(value)
216             return strdup(value);
217     }
218     return NULL;
219 }
220
221 void dynamic_log_level_change()
222 {
223     char *logFile_Name = read_env_param(LOG_FILE_CONFIG_MAP);
224     char* log_level_init=NULL;
225     if(logFile_Name)
226     {
227         log_level_init = parse_file(logFile_Name);
228         update_mdc_log_level_severity(log_level_init); //setting log level
229         free(log_level_init);
230
231     }
232     enable_log_change_notify(logFile_Name);
233     free(logFile_Name);
234
235 }
236
237 void init_log() {
238     int log_change_monitor = 0;
239     mdclog_attr_t *attr;
240     mdclog_attr_init(&attr);
241     mdclog_attr_set_ident(attr, "E2Terminator");
242     mdclog_init(attr);
243     if(mdclog_format_initialize(log_change_monitor)!=0)
244         mdclog_write(MDCLOG_ERR, "Failed to intialize MDC log format !!!");
245     dynamic_log_level_change();
246     mdclog_attr_destroy(attr);
247 }
248 auto start_time = std::chrono::high_resolution_clock::now();
249 typedef std::chrono::duration<double, std::ratio<1,1>> seconds_t;
250
251 double age() {
252     return seconds_t(std::chrono::high_resolution_clock::now() - start_time).count();
253 }
254
255 double approx_CPU_MHz(unsigned sleepTime) {
256     using namespace std::chrono_literals;
257     uint32_t aux = 0;
258     uint64_t cycles_start = rdtscp(aux);
259     double time_start = age();
260     std::this_thread::sleep_for(sleepTime * 1ms);
261     uint64_t elapsed_cycles = rdtscp(aux) - cycles_start;
262     double elapsed_time = age() - time_start;
263     return elapsed_cycles / elapsed_time;
264 }
265
266 //std::atomic<int64_t> rmrCounter{0};
267 std::atomic<int64_t> num_of_messages{0};
268 std::atomic<int64_t> num_of_XAPP_messages{0};
269 static long transactionCounter = 0;
270
271 int buildListeningPort(sctp_params_t &sctpParams) {
272     sctpParams.listenFD = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
273     if (sctpParams.listenFD <= 0) {
274 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
275         mdclog_write(MDCLOG_ERR, "Error Opening socket, %s", strerror(errno));
276         return -1;
277 #endif        
278     }
279
280     struct sockaddr_in6 serverAddress {};
281     serverAddress.sin6_family = AF_INET6;
282     serverAddress.sin6_addr   = in6addr_any;
283     serverAddress.sin6_port = htons(sctpParams.sctpPort);
284     if (bind(sctpParams.listenFD, (SA *)&serverAddress, sizeof(serverAddress)) < 0 ) {
285 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
286         mdclog_write(MDCLOG_ERR, "Error binding port %d. %s", sctpParams.sctpPort, strerror(errno));
287         return -1;
288 #endif        
289     }
290     if (setSocketNoBlocking(sctpParams.listenFD) == -1) {
291         //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno));
292         return -1;
293     }
294     if (mdclog_level_get() >= MDCLOG_DEBUG) {
295         struct sockaddr_in6 clientAddress {};
296         socklen_t len = sizeof(clientAddress);
297         getsockname(sctpParams.listenFD, (SA *)&clientAddress, &len);
298         char buff[1024] {};
299         inet_ntop(AF_INET6, &clientAddress.sin6_addr, buff, sizeof(buff));
300         mdclog_write(MDCLOG_DEBUG, "My address: %s, port %d\n", buff, htons(clientAddress.sin6_port));
301     }
302
303     if (listen(sctpParams.listenFD, SOMAXCONN) < 0) {
304 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))     
305         mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno));
306         return -1;
307 #endif        
308     }
309     struct epoll_event event {};
310     event.events = EPOLLIN | EPOLLET;
311     event.data.fd = sctpParams.listenFD;
312
313     // add listening port to epoll
314     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) {
315 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))
316         printf("Failed to add descriptor to epoll\n");
317         mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno));
318         return -1;
319 #endif        
320     }
321
322     return 0;
323 }
324
325 int buildConfiguration(sctp_params_t &sctpParams) {
326     path p = (sctpParams.configFilePath + "/" + sctpParams.configFileName).c_str();
327     if (exists(p)) {
328         const int size = 2048;
329         auto fileSize = file_size(p);
330         if (fileSize > size) {
331 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
332             mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
333             return -1;
334 #endif            
335         }
336     } else {
337 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
338         mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
339         return -1;
340 #endif        
341     }
342
343     ReadConfigFile conf;
344     if (conf.openConfigFile(p.string()) == -1) {
345 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
346         mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
347                      p.string().c_str(), strerror(errno));
348         return -1;
349 #endif        
350     }
351     int rmrPort = conf.getIntValue("nano");
352     if (rmrPort == -1) {
353 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))
354         mdclog_write(MDCLOG_ERR, "illegal RMR port ");
355         return -1;
356 #endif        
357     }
358     sctpParams.rmrPort = (uint16_t)rmrPort;
359     snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort));
360     auto tmpStr = conf.getStringValue("volume");
361     if (tmpStr.length() == 0) {
362 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
363         mdclog_write(MDCLOG_ERR, "illegal volume.");
364         return -1;
365 #endif        
366     }
367
368     char tmpLogFilespec[VOLUME_URL_SIZE];
369     tmpLogFilespec[0] = 0;
370     sctpParams.volume[0] = 0;
371     snprintf(sctpParams.volume, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
372     // copy the name to temp file as well
373     snprintf(tmpLogFilespec, VOLUME_URL_SIZE, "%s", tmpStr.c_str());
374
375
376     // define the file name in the tmp directory under the volume
377     strcat(tmpLogFilespec,"/tmp/E2Term_%Y-%m-%d_%H-%M-%S.%N.tmpStr");
378
379     sctpParams.myIP = conf.getStringValue("local-ip");
380     if (sctpParams.myIP.length() == 0) {
381 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
382         mdclog_write(MDCLOG_ERR, "illegal local-ip.");
383         return -1;
384 #endif        
385     }
386
387     int sctpPort = conf.getIntValue("sctp-port");
388     if (sctpPort == -1) {
389 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
390         mdclog_write(MDCLOG_ERR, "illegal SCTP port ");
391         return -1;
392 #endif        
393     }
394     sctpParams.sctpPort = (uint16_t)sctpPort;
395
396     sctpParams.fqdn = conf.getStringValue("external-fqdn");
397     if (sctpParams.fqdn.length() == 0) {
398 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
399         mdclog_write(MDCLOG_ERR, "illegal external-fqdn");
400         return -1;
401 #endif        
402     }
403
404     std::string pod = conf.getStringValue("pod_name");
405 #ifndef UNIT_TEST
406     if (pod.length() == 0) {
407         mdclog_write(MDCLOG_ERR, "illegal pod_name in config file");
408         return -1;
409     }
410     auto *podName = getenv(pod.c_str());
411     if (podName == nullptr) {
412         mdclog_write(MDCLOG_ERR, "illegal pod_name or environment variable not exists : %s", pod.c_str());
413         return -1;
414
415     } else {
416         sctpParams.podName.assign(podName);
417         if (sctpParams.podName.length() == 0) {
418             mdclog_write(MDCLOG_ERR, "illegal pod_name");
419             return -1;
420         }
421     }
422 #endif
423     tmpStr = conf.getStringValue("trace");
424     transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
425     if ((tmpStr.compare("start")) == 0) {
426 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
427         mdclog_write(MDCLOG_INFO, "Trace set to: start");
428         sctpParams.trace = true;
429 #endif        
430     } else if ((tmpStr.compare("stop")) == 0) {
431         mdclog_write(MDCLOG_INFO, "Trace set to: stop");
432         sctpParams.trace = false;
433     } else {
434 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))      
435         mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
436         sctpParams.trace = false;
437 #endif        
438     }
439     jsonTrace = sctpParams.trace;
440
441     sctpParams.epollTimeOut = -1;
442
443     tmpStr = conf.getStringValue("prometheusPort");
444     if (tmpStr.length() != 0) {
445         sctpParams.prometheusPort = tmpStr;
446     }
447
448     sctpParams.ka_message_length = snprintf(sctpParams.ka_message, KA_MESSAGE_SIZE, "{\"address\": \"%s:%d\","
449                                                                                     "\"fqdn\": \"%s\","
450                                                                                     "\"pod_name\": \"%s\"}",
451                                             (const char *)sctpParams.myIP.c_str(),
452                                             sctpParams.rmrPort,
453                                             sctpParams.fqdn.c_str(),
454                                             sctpParams.podName.c_str());
455
456     if (mdclog_level_get() >= MDCLOG_INFO) {
457         mdclog_write(MDCLOG_DEBUG,"RMR Port: %s", to_string(sctpParams.rmrPort).c_str());
458         mdclog_write(MDCLOG_DEBUG,"LogLevel: %s", to_string(sctpParams.logLevel).c_str());
459         mdclog_write(MDCLOG_DEBUG,"volume: %s", sctpParams.volume);
460         mdclog_write(MDCLOG_DEBUG,"tmpLogFilespec: %s", tmpLogFilespec);
461         mdclog_write(MDCLOG_DEBUG,"my ip: %s", sctpParams.myIP.c_str());
462         mdclog_write(MDCLOG_DEBUG,"pod name: %s", sctpParams.podName.c_str());
463
464         mdclog_write(MDCLOG_INFO, "running parameters for instance : %s", sctpParams.ka_message);
465     }
466
467     // Files written to the current working directory
468     boostLogger = logging::add_file_log(
469             keywords::file_name = tmpLogFilespec, // to temp directory
470             keywords::rotation_size = 10 * 1024 * 1024,
471             keywords::time_based_rotation = sinks::file::rotation_at_time_interval(posix_time::hours(1)),
472             keywords::format = "%Message%"
473             //keywords::format = "[%TimeStamp%]: %Message%" // use each tmpStr with time stamp
474     );
475
476     // Setup a destination folder for collecting rotated (closed) files --since the same volume can use rename()
477     boostLogger->locked_backend()->set_file_collector(sinks::file::make_collector(
478             keywords::target = sctpParams.volume
479     ));
480
481     // Upon restart, scan the directory for files matching the file_name pattern
482     boostLogger->locked_backend()->scan_for_files();
483
484     // Enable auto-flushing after each tmpStr record written
485     if (mdclog_level_get() >= MDCLOG_DEBUG) {
486         boostLogger->locked_backend()->auto_flush(true);
487     }
488
489     return 0;
490 }
491
492 void startPrometheus(sctp_params_t &sctpParams) {
493     auto podName = std::getenv("POD_NAME");
494     string metric = "E2TBeta";
495     if (strstr(podName, "alpha") != NULL) {
496         metric = "E2TAlpha";
497     }
498
499     sctpParams.prometheusFamily = &BuildCounter()
500             .Name(metric.c_str())
501             .Help("E2T instance metrics")
502             .Labels({{"POD_NAME", sctpParams.podName}})
503             .Register(*sctpParams.prometheusRegistry);
504
505     // Build E2T instance level metrics
506     buildE2TPrometheusCounters(sctpParams);
507
508     string prometheusPath = sctpParams.prometheusPort + "," + "[::]:" + sctpParams.prometheusPort;
509     if (mdclog_level_get() >= MDCLOG_DEBUG) {
510         mdclog_write(MDCLOG_DEBUG, "Start Prometheus Pull mode on %s", prometheusPath.c_str());
511     }
512     sctpParams.prometheusExposer = new Exposer(prometheusPath, 1);
513     sctpParams.prometheusExposer->RegisterCollectable(sctpParams.prometheusRegistry);
514 }
515 #ifndef UNIT_TEST
516
517 int main(const int argc, char **argv) {
518     sctp_params_t sctpParams;
519     {
520         std::random_device device{};
521         std::mt19937 generator(device());
522         std::uniform_int_distribution<long> distribution(1, (long) 1e12);
523         transactionCounter = distribution(generator);
524     }
525
526 //    uint64_t st = 0;
527 //    uint32_t aux1 = 0;
528 //   st = rdtscp(aux1);
529
530     unsigned num_cpus = std::thread::hardware_concurrency();
531     init_log();
532     if (std::signal(SIGINT, catch_function) == SIG_ERR) {
533         mdclog_write(MDCLOG_ERR, "Error initializing SIGINT");
534         exit(1);
535     }
536     if (std::signal(SIGABRT, catch_function)== SIG_ERR) {
537         mdclog_write(MDCLOG_ERR, "Error initializing SIGABRT");
538         exit(1);
539     }
540     if (std::signal(SIGTERM, catch_function)== SIG_ERR) {
541         mdclog_write(MDCLOG_ERR, "Error initializing SIGTERM");
542         exit(1);
543     }
544
545     cpuClock = approx_CPU_MHz(100);
546
547     mdclog_write(MDCLOG_DEBUG, "CPU speed %11.11f", cpuClock);
548
549     auto result = parse(argc, argv, sctpParams);
550
551     if (buildConfiguration(sctpParams) != 0) {
552         exit(-1);
553     }
554
555     //auto registry = std::make_shared<Registry>();
556     sctpParams.prometheusRegistry = std::make_shared<Registry>();
557
558     //sctpParams.prometheusFamily = new Family<Counter>("E2T", "E2T message counter", {{"E", sctpParams.podName}});
559
560     startPrometheus(sctpParams);
561
562     // start epoll
563     sctpParams.epoll_fd = epoll_create1(0);
564     if (sctpParams.epoll_fd == -1) {
565         mdclog_write(MDCLOG_ERR, "failed to open epoll descriptor");
566         exit(-1);
567     }
568     getRmrContext(sctpParams);
569     if (sctpParams.rmrCtx == nullptr) {
570         close(sctpParams.epoll_fd);
571         exit(-1);
572     }
573
574     if (buildInotify(sctpParams) == -1) {
575         close(sctpParams.rmrListenFd);
576         rmr_close(sctpParams.rmrCtx);
577         close(sctpParams.epoll_fd);
578         exit(-1);
579     }
580
581     if (buildListeningPort(sctpParams) != 0) {
582         close(sctpParams.rmrListenFd);
583         rmr_close(sctpParams.rmrCtx);
584         close(sctpParams.epoll_fd);
585         exit(-1);
586     }
587
588     sctpParams.sctpMap = new mapWrapper();
589
590     std::vector<std::thread> threads(num_cpus);
591 //    std::vector<std::thread> threads;
592
593     num_cpus = 3;
594     for (unsigned int i = 0; i < num_cpus; i++) {
595         threads[i] = std::thread(listener, &sctpParams);
596
597         cpu_set_t cpuset;
598         CPU_ZERO(&cpuset);
599         CPU_SET(i, &cpuset);
600         int rc = pthread_setaffinity_np(threads[i].native_handle(), sizeof(cpu_set_t), &cpuset);
601         if (rc != 0) {
602             mdclog_write(MDCLOG_ERR, "Error calling pthread_setaffinity_np: %d", rc);
603         }
604     }
605
606
607     //loop over term_init until first message from xApp
608     handleTermInit(sctpParams);
609
610     for (auto &t : threads) {
611         t.join();
612     }
613
614     return 0;
615 }
616 #endif
617 void handleTermInit(sctp_params_t &sctpParams) {
618     sendTermInit(sctpParams);
619     //send to e2 manager init of e2 term
620     //E2_TERM_INIT
621
622     int count = 0;
623     while (true) {
624         auto xappMessages = num_of_XAPP_messages.load(std::memory_order_acquire);
625         if (xappMessages > 0) {
626             if (mdclog_level_get() >=  MDCLOG_INFO) {
627                 mdclog_write(MDCLOG_INFO, "Got a message from some application, stop sending E2_TERM_INIT");
628             }
629             return;
630         }
631         usleep(100000);
632         count++;
633         if (count % 1000 == 0) {
634             mdclog_write(MDCLOG_ERR, "GOT No messages from any xApp");
635             sendTermInit(sctpParams);
636         }
637     }
638 }
639
640 void sendTermInit(sctp_params_t &sctpParams) {
641     rmr_mbuf_t *msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
642     auto count = 0;
643     while (true) {
644         msg->mtype = E2_TERM_INIT;
645         msg->state = 0;
646         rmr_bytes2payload(msg, (unsigned char *)sctpParams.ka_message, sctpParams.ka_message_length);
647         static unsigned char tx[32];
648         auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
649         rmr_bytes2xact(msg, tx, txLen);
650         msg = rmr_send_msg(sctpParams.rmrCtx, msg);
651         if (msg == nullptr) {
652             msg = rmr_alloc_msg(sctpParams.rmrCtx, sctpParams.ka_message_length);
653         } else if (msg->state == 0) {
654             rmr_free_msg(msg);
655             if (mdclog_level_get() >=  MDCLOG_INFO) {
656                 mdclog_write(MDCLOG_INFO, "E2_TERM_INIT successfully sent ");
657             }
658             return;
659         } else {
660             if (count % 100 == 0) {
661                 mdclog_write(MDCLOG_ERR, "Error sending E2_TERM_INIT cause : %s ", translateRmrErrorMessages(msg->state).c_str());
662             }
663             sleep(1);
664         }
665         count++;
666     }
667 }
668
669 /**
670  *
671  * @param argc
672  * @param argv
673  * @param sctpParams
674  * @return
675  */
676 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &sctpParams) {
677     cxxopts::Options options(argv[0], "e2 term help");
678     options.positional_help("[optional args]").show_positional_help();
679     options.allow_unrecognised_options().add_options()
680             ("p,path", "config file path", cxxopts::value<std::string>(sctpParams.configFilePath)->default_value("config"))
681             ("f,file", "config file name", cxxopts::value<std::string>(sctpParams.configFileName)->default_value("config.conf"))
682             ("h,help", "Print help");
683
684     auto result = options.parse(argc, (const char **&)argv);
685
686     if (result.count("help")) {
687         std::cout << options.help({""}) << std::endl;
688         exit(0);
689     }
690     return result;
691 }
692
693 /**
694  *
695  * @param sctpParams
696  * @return -1 failed 0 success
697  */
698 int buildInotify(sctp_params_t &sctpParams) {
699     sctpParams.inotifyFD = inotify_init1(IN_NONBLOCK);
700     if (sctpParams.inotifyFD == -1) {
701         mdclog_write(MDCLOG_ERR, "Failed to init inotify (inotify_init1) %s", strerror(errno));
702         return -1;
703     }
704
705     sctpParams.inotifyWD = inotify_add_watch(sctpParams.inotifyFD,
706                                              (const char *)sctpParams.configFilePath.c_str(),
707                                              (unsigned)IN_OPEN | (unsigned)IN_CLOSE_WRITE | (unsigned)IN_CLOSE_NOWRITE); //IN_CLOSE = (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE)
708     if (sctpParams.inotifyWD == -1) {
709         mdclog_write(MDCLOG_ERR, "Failed to add directory : %s to  inotify (inotify_add_watch) %s",
710                      sctpParams.configFilePath.c_str(),
711                      strerror(errno));
712         close(sctpParams.inotifyFD);
713         return -1;
714     }
715
716     struct epoll_event event{};
717     event.events = (EPOLLIN);
718     event.data.fd = sctpParams.inotifyFD;
719     // add listening RMR FD to epoll
720     if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.inotifyFD, &event)) {
721         mdclog_write(MDCLOG_ERR, "Failed to add inotify FD to epoll");
722         close(sctpParams.inotifyFD);
723         return -1;
724     }
725     return 0;
726 }
727
728 /**
729  *
730  * @param args
731  * @return
732  */
733 void listener(sctp_params_t *params) {
734     int num_of_SCTP_messages = 0;
735     auto totalTime = 0.0;
736     std::thread::id this_id = std::this_thread::get_id();
737     //save cout
738     auto pod_name = std::getenv("POD_NAME");
739     auto container_name = std::getenv("CONTAINER_NAME");
740     auto service_name = std::getenv("SERVICE_NAME");
741     auto host_name = std::getenv("HOST_NAME");
742     auto system_name = std::getenv("SYSTEM_NAME");
743     auto pid = std::to_string(getpid()).c_str();
744     streambuf *oldCout = cout.rdbuf();
745     ostringstream memCout;
746     // create new cout
747     cout.rdbuf(memCout.rdbuf());
748     cout << this_id;
749     //return to the normal cout
750     cout.rdbuf(oldCout);
751
752     char tid[32];
753     memcpy(tid, memCout.str().c_str(), memCout.str().length() < 32 ? memCout.str().length() : 31);
754     tid[memCout.str().length()] = 0;
755     mdclog_mdc_add("SYSTEM_NAME", system_name);
756     mdclog_mdc_add("HOST_NAME", host_name);
757     mdclog_mdc_add("SERVICE_NAME", service_name);
758     mdclog_mdc_add("CONTAINER_NAME", container_name);
759     mdclog_mdc_add("POD_NAME", pod_name);
760     mdclog_mdc_add("PID", pid);
761
762     if (mdclog_level_get() >= MDCLOG_DEBUG) {
763         mdclog_write(MDCLOG_DEBUG, "started thread number %s", tid);
764     }
765
766     RmrMessagesBuffer_t rmrMessageBuffer{};
767     //create and init RMR
768     rmrMessageBuffer.rmrCtx = params->rmrCtx;
769
770     auto *events = (struct epoll_event *) calloc(MAXEVENTS, sizeof(struct epoll_event));
771     struct timespec end{0, 0};
772     struct timespec start{0, 0};
773
774     rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
775     rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
776
777     memcpy(rmrMessageBuffer.ka_message, params->ka_message, params->ka_message_length);
778     rmrMessageBuffer.ka_message_len = params->ka_message_length;
779     rmrMessageBuffer.ka_message[rmrMessageBuffer.ka_message_len] = 0;
780
781     if (mdclog_level_get() >= MDCLOG_DEBUG) {
782         mdclog_write(MDCLOG_DEBUG, "keep alive message is : %s", rmrMessageBuffer.ka_message);
783     }
784
785     ReportingMessages_t message {};
786
787 //    for (int i = 0; i < MAX_RMR_BUFF_ARRAY; i++) {
788 //        rmrMessageBuffer.rcvBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
789 //        rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
790 //    }
791
792     while (true) {
793         if (mdclog_level_get() >= MDCLOG_DEBUG) {
794             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait. Timeout = %d", params->epollTimeOut);
795         }
796 #ifndef UNIT_TEST
797         auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, params->epollTimeOut);
798 #else
799         auto numOfEvents = 1;
800 #endif
801         if (numOfEvents == 0) { // time out
802 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
803             if (mdclog_level_get() >= MDCLOG_DEBUG) {
804                 mdclog_write(MDCLOG_DEBUG, "got epoll timeout");
805             }
806             continue;
807         } else if (numOfEvents < 0) {
808             if (errno == EINTR) {
809                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
810                     mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
811                 }
812                 continue;
813             }
814             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
815             return;
816 #endif            
817         }
818         for (auto i = 0; i < numOfEvents; i++) {
819             if (mdclog_level_get() >= MDCLOG_DEBUG) {
820                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
821             }
822             clock_gettime(CLOCK_MONOTONIC, &message.message.time);
823             start.tv_sec = message.message.time.tv_sec;
824             start.tv_nsec = message.message.time.tv_nsec;
825
826
827             if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP)) {
828                 handlepoll_error(events[i], message, rmrMessageBuffer, params);
829             } else if (events[i].events & EPOLLOUT) {
830                 handleEinprogressMessages(events[i], message, rmrMessageBuffer, params);
831             } else if (params->listenFD == events[i].data.fd) {
832                 if (mdclog_level_get() >= MDCLOG_INFO) {
833                     mdclog_write(MDCLOG_INFO, "New connection request from sctp network\n");
834                 }
835                 // new connection is requested from RAN  start build connection
836                 while (true) {
837                     struct sockaddr in_addr {};
838                     socklen_t in_len;
839                     char hostBuff[NI_MAXHOST];
840                     char portBuff[NI_MAXSERV];
841
842                     in_len = sizeof(in_addr);
843                     auto *peerInfo = (ConnectedCU_t *)calloc(1, sizeof(ConnectedCU_t));
844                     if(peerInfo == nullptr){
845                         mdclog_write(MDCLOG_ERR, "calloc failed");
846                         break;
847                     }
848                     peerInfo->sctpParams = params;
849                     peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len);
850                     if (peerInfo->fileDescriptor == -1) {
851 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))                        
852                         if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
853                             /* We have processed all incoming connections. */
854                             if(peerInfo)
855                                 free(peerInfo);
856                             break;
857                         } else {
858                             if(peerInfo)
859                                 free(peerInfo);
860                             mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
861                             break;
862                         }
863                     }
864                     if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) {
865                         mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff);
866                         close(peerInfo->fileDescriptor);
867                         if(peerInfo)
868                             free(peerInfo);
869                         break;
870 #endif                        
871                     }
872                     auto  ans = getnameinfo(&in_addr, in_len,
873                                             peerInfo->hostName, NI_MAXHOST,
874                                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
875                     if (ans < 0) {
876                         mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno));
877                         close(peerInfo->fileDescriptor);
878                         if(peerInfo)
879                             free(peerInfo);
880                         break;
881                     }
882                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
883                         mdclog_write(MDCLOG_DEBUG, "Accepted connection on descriptor %d (host=%s, port=%s)\n", peerInfo->fileDescriptor, peerInfo->hostName, peerInfo->portNumber);
884                     }
885                     peerInfo->isConnected = false;
886                     peerInfo->gotSetup = false;
887                     if (addToEpoll(params->epoll_fd,
888                                    peerInfo,
889                                    (EPOLLIN | EPOLLET),
890                                    params->sctpMap, nullptr,
891                                    0) != 0) {
892                         if(peerInfo)
893                             free(peerInfo);
894                         break;
895                     }
896                     break;
897                 }
898             } else if (params->rmrListenFd == events[i].data.fd) {
899                 // got message from XAPP
900                 //num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
901                 num_of_messages.fetch_add(1, std::memory_order_release);
902                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
903                     mdclog_write(MDCLOG_DEBUG, "new RMR message");
904                 }
905                 if (receiveXappMessages(params->sctpMap,
906                                         rmrMessageBuffer,
907                                         message.message.time) != 0) {
908                     mdclog_write(MDCLOG_ERR, "Error handling Xapp message");
909                 }
910             } else if (params->inotifyFD == events[i].data.fd) {
911                 mdclog_write(MDCLOG_INFO, "Got event from inotify (configuration update)");
912                 handleConfigChange(params);
913             } else {
914                 /* We RMR_ERR_RETRY have data on the fd waiting to be read. Read and display it.
915                  * We must read whatever data is available completely, as we are running
916                  *  in edge-triggered mode and won't get a notification again for the same data. */
917                 num_of_messages.fetch_add(1, std::memory_order_release);
918                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
919                     mdclog_write(MDCLOG_DEBUG, "new message from SCTP, epoll flags are : %0x", events[i].events);
920                 }
921                 receiveDataFromSctp(&events[i],
922                                     params->sctpMap,
923                                     num_of_SCTP_messages,
924                                     rmrMessageBuffer,
925                                     message.message.time);
926             }
927
928             clock_gettime(CLOCK_MONOTONIC, &end);
929             if (mdclog_level_get() >= MDCLOG_INFO) {
930                 totalTime += ((end.tv_sec + 1.0e-9 * end.tv_nsec) -
931                               ((double) start.tv_sec + 1.0e-9 * start.tv_nsec));
932             }
933             if (mdclog_level_get() >= MDCLOG_DEBUG) {
934                 mdclog_write(MDCLOG_DEBUG, "message handling is %ld seconds %ld nanoseconds",
935                              end.tv_sec - start.tv_sec,
936                              end.tv_nsec - start.tv_nsec);
937             }
938         }
939 #ifdef UNIT_TEST
940     break;
941 #endif
942     }
943 }
944
945 /**
946  *
947  * @param sctpParams
948  */
949 void handleConfigChange(sctp_params_t *sctpParams) {
950     char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
951     const struct inotify_event *event;
952     char *ptr;
953 #ifdef UNIT_TEST
954     struct inotify_event tmpEvent;
955 #endif
956     path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str();
957     auto endlessLoop = true;
958     while (endlessLoop) {
959 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))    
960         auto len = read(sctpParams->inotifyFD, buf, sizeof buf);
961 #else
962     auto len=10;
963 #endif
964         if (len == -1) {
965 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
966             if (errno != EAGAIN) {
967                 mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno));
968                 endlessLoop = false;
969                 continue;
970             }
971             else {
972                 endlessLoop = false;
973                 continue;
974             }
975 #endif            
976         }
977
978         for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
979 #ifndef UNIT_TEST
980     event = (const struct inotify_event *)ptr;
981 #else
982     tmpEvent.mask = (uint32_t)IN_CLOSE_WRITE;
983     event = &tmpEvent;
984 #endif
985             if (event->mask & (uint32_t)IN_ISDIR) {
986                 continue;
987             }
988
989             // the directory name
990             if (sctpParams->inotifyWD == event->wd) {
991                 // not the directory
992             }
993             if (event->len) {
994 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))                
995                 auto  retVal = strcmp(sctpParams->configFileName.c_str(), event->name);
996                 if (retVal != 0) {
997                     continue;
998                 }
999 #endif                
1000             }
1001             // only the file we want
1002             if (event->mask & (uint32_t)IN_CLOSE_WRITE) {
1003                 if (mdclog_level_get() >= MDCLOG_INFO) {
1004                     mdclog_write(MDCLOG_INFO, "Configuration file changed");
1005                 }
1006                 if (exists(p)) {
1007                     const int size = 2048;
1008                     auto fileSize = file_size(p);
1009                     if (fileSize > size) {
1010                         mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size);
1011                         return;
1012                     }
1013                 } else {
1014                     mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str());
1015                     return;
1016                 }
1017
1018                 ReadConfigFile conf;
1019                 if (conf.openConfigFile(p.string()) == -1) {
1020                     mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s",
1021                                  p.string().c_str(), strerror(errno));
1022                     return;
1023                 }
1024                 auto tmpStr = conf.getStringValue("loglevel");
1025                 if (tmpStr.length() == 0) {
1026                     mdclog_write(MDCLOG_ERR, "illegal loglevel. Set loglevel to MDCLOG_INFO");
1027                     tmpStr = "info";
1028                 }
1029                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
1030
1031                 if ((tmpStr.compare("debug")) == 0) {
1032                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG");
1033                     sctpParams->logLevel = MDCLOG_DEBUG;
1034                 } 
1035 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))                
1036                 else if ((tmpStr.compare("info")) == 0) {
1037                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO");
1038                     sctpParams->logLevel = MDCLOG_INFO;
1039                 } else if ((tmpStr.compare("warning")) == 0) {
1040                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_WARN");
1041                     sctpParams->logLevel = MDCLOG_WARN;
1042                 } else if ((tmpStr.compare("error")) == 0) {
1043                     mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_ERR");
1044                     sctpParams->logLevel = MDCLOG_ERR;
1045                 } else {
1046                     mdclog_write(MDCLOG_ERR, "illegal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str());
1047                     sctpParams->logLevel = MDCLOG_INFO;
1048                 }
1049 #endif                
1050                 mdclog_level_set(sctpParams->logLevel);
1051                 tmpStr = conf.getStringValue("trace");
1052                 if (tmpStr.length() == 0) {
1053                     mdclog_write(MDCLOG_ERR, "illegal trace. Set trace to stop");
1054                     tmpStr = "stop";
1055                 }
1056
1057                 transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
1058                 if ((tmpStr.compare("start")) == 0) {
1059                     mdclog_write(MDCLOG_INFO, "Trace set to: start");
1060                     sctpParams->trace = true;
1061                 } else if ((tmpStr.compare("stop")) == 0) {
1062                     mdclog_write(MDCLOG_INFO, "Trace set to: stop");
1063                     sctpParams->trace = false;
1064                 } else {
1065                     mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str());
1066                     sctpParams->trace = false;
1067                 }
1068                 jsonTrace = sctpParams->trace;
1069
1070
1071                 endlessLoop = false;
1072             }
1073 #ifdef UNIT_TEST
1074             break;
1075 #endif
1076         }
1077     }
1078 }
1079
1080 /**
1081  *
1082  * @param event
1083  * @param message
1084  * @param rmrMessageBuffer
1085  * @param params
1086  */
1087 void handleEinprogressMessages(struct epoll_event &event,
1088                                ReportingMessages_t &message,
1089                                RmrMessagesBuffer_t &rmrMessageBuffer,
1090                                sctp_params_t *params) {
1091     auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
1092     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1093
1094     mdclog_write(MDCLOG_INFO, "file descriptor %d got EPOLLOUT", peerInfo->fileDescriptor);
1095     auto retVal = 0;
1096     socklen_t retValLen = 0;
1097     auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen);
1098     if (rc != 0 || retVal != 0) {
1099 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
1100         if (rc != 0) {
1101             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
1102                                                          "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s",
1103                                                          peerInfo->enodbName, strerror(errno));
1104         } else if (retVal != 0) {
1105             rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
1106                                                          "%s|Failed SCTP Connection after EINPROGRESS, SO_ERROR",
1107                                                          peerInfo->enodbName);
1108         }
1109
1110         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1111         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
1112         mdclog_write(MDCLOG_ERR, "%s", rmrMessageBuffer.sendMessage->payload);
1113         message.message.direction = 'N';
1114         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
1115             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1116         }
1117 #endif
1118         memset(peerInfo->asnData, 0, peerInfo->asnLength);
1119         peerInfo->asnLength = 0;
1120         peerInfo->mtype = 0;
1121         return;
1122     }
1123 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))
1124     peerInfo->isConnected = true;
1125
1126     if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName,
1127                       peerInfo->mtype) != 0) {
1128         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_MOD");
1129         return;
1130     }
1131
1132     message.message.asndata = (unsigned char *)peerInfo->asnData;
1133     message.message.asnLength = peerInfo->asnLength;
1134     message.message.messageType = peerInfo->mtype;
1135     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1136     num_of_messages.fetch_add(1, std::memory_order_release);
1137     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1138         mdclog_write(MDCLOG_DEBUG, "send the delayed SETUP/ENDC SETUP to sctp for %s",
1139                      message.message.enodbName);
1140     }
1141     if (sendSctpMsg(peerInfo, message, params->sctpMap) != 0) {
1142         if (mdclog_level_get() >= MDCLOG_DEBUG) {
1143             mdclog_write(MDCLOG_DEBUG, "Error write to SCTP  %s %d", __func__, __LINE__);
1144         }
1145         return;
1146     }
1147
1148     memset(peerInfo->asnData, 0, peerInfo->asnLength);
1149     peerInfo->asnLength = 0;
1150     peerInfo->mtype = 0;
1151 #endif    
1152 }
1153
1154
1155 void handlepoll_error(struct epoll_event &event,
1156                       ReportingMessages_t &message,
1157                       RmrMessagesBuffer_t &rmrMessageBuffer,
1158                       sctp_params_t *params) {
1159     if (event.data.fd != params->rmrListenFd) {
1160         auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
1161         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
1162                      event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
1163 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))
1164         rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256,
1165                                                      "%s|Failed SCTP Connection",
1166                                                      peerInfo->enodbName);
1167         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1168         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
1169
1170         memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
1171         message.message.direction = 'N';
1172         if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
1173             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1174         }
1175 #endif
1176         close(peerInfo->fileDescriptor);
1177         params->sctpMap->erase(peerInfo->enodbName);
1178         cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
1179     } else {
1180         mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
1181     }
1182 }
1183 /**
1184  *
1185  * @param socket
1186  * @return
1187  */
1188 int setSocketNoBlocking(int socket) {
1189     auto flags = fcntl(socket, F_GETFL, 0);
1190
1191     if (flags == -1) {
1192         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
1193         return -1;
1194     }
1195
1196     flags = (unsigned) flags | (unsigned) O_NONBLOCK;
1197     if (fcntl(socket, F_SETFL, flags) == -1) {
1198         mdclog_write(MDCLOG_ERR, "%s, %s", __FUNCTION__, strerror(errno));
1199         return -1;
1200     }
1201
1202     return 0;
1203 }
1204
1205 /**
1206  *
1207  * @param val
1208  * @param m
1209  */
1210 void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
1211     char *dummy;
1212     auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
1213     char searchBuff[2048]{};
1214
1215     snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
1216     m->erase(searchBuff);
1217
1218     m->erase(val->enodbName);
1219 #ifndef UNIT_TEST
1220     free(val);
1221 #endif
1222 }
1223
1224 /**
1225  *
1226  * @param fd file descriptor
1227  * @param data the asn data to send
1228  * @param len  length of the data
1229  * @param enodbName the enodbName as in the map for printing purpose
1230  * @param m map host information
1231  * @param mtype message number
1232  * @return 0 success, a negative number on fail
1233  */
1234 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
1235     auto loglevel = mdclog_level_get();
1236 #ifndef UNIT_TEST    
1237     int fd = peerInfo->fileDescriptor;
1238 #else
1239     int fd = FILE_DESCRIPTOR;
1240 #endif    
1241     if (loglevel >= MDCLOG_DEBUG) {
1242         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
1243                      message.message.enodbName, __FUNCTION__);
1244     }
1245
1246     while (true) {
1247         if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
1248             if (errno == EINTR) {
1249                 continue;
1250             }
1251             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
1252 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
1253             if (!peerInfo->isConnected) {
1254                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
1255                 return -1;
1256             }
1257 #endif
1258 #ifndef UNIT_TEST            
1259             cleanHashEntry(peerInfo, m);
1260             close(fd);
1261 #endif            
1262             char key[MAX_ENODB_NAME_SIZE * 2];
1263             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
1264                      message.message.messageType);
1265             if (loglevel >= MDCLOG_DEBUG) {
1266                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
1267             }
1268             auto tmp = m->find(key);
1269             if (tmp) {
1270                 free(tmp);
1271             }
1272             m->erase(key);
1273 #ifndef UNIT_TEST
1274             return -1;
1275 #endif
1276         }
1277         message.message.direction = 'D';
1278         // send report.buffer of size
1279         buildJsonMessage(message);
1280
1281         if (loglevel >= MDCLOG_DEBUG) {
1282             mdclog_write(MDCLOG_DEBUG,
1283                          "SCTP message for CU %s sent from %s",
1284                          message.message.enodbName,
1285                          __FUNCTION__);
1286         }
1287         return 0;
1288     }
1289 }
1290
1291 /**
1292  *
1293  * @param message
1294  * @param rmrMessageBuffer
1295  */
1296 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1297     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
1298     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
1299
1300     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1301         mdclog_write(MDCLOG_DEBUG, "Message from Xapp RAN name = %s message length = %ld",
1302                      message.message.enodbName, (unsigned long) message.message.asnLength);
1303     }
1304 }
1305
1306
1307
1308 /**
1309  *
1310  * @param events
1311  * @param sctpMap
1312  * @param numOfMessages
1313  * @param rmrMessageBuffer
1314  * @param ts
1315  * @return
1316  */
1317 int receiveDataFromSctp(struct epoll_event *events,
1318                         Sctp_Map_t *sctpMap,
1319                         int &numOfMessages,
1320                         RmrMessagesBuffer_t &rmrMessageBuffer,
1321                         struct timespec &ts) {
1322     /* We have data on the fd waiting to be read. Read and display it.
1323  * We must read whatever data is available completely, as we are running
1324  *  in edge-triggered mode and won't get a notification again for the same data. */
1325     ReportingMessages_t message {};
1326     auto done = 0;
1327     auto loglevel = mdclog_level_get();
1328
1329     // get the identity of the interface
1330     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
1331
1332     struct timespec start{0, 0};
1333     struct timespec decodeStart{0, 0};
1334     struct timespec end{0, 0};
1335
1336     E2AP_PDU_t *pdu = nullptr;
1337
1338     while (true) {
1339         if (loglevel >= MDCLOG_DEBUG) {
1340             mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP %d fd", message.peerInfo->fileDescriptor);
1341             clock_gettime(CLOCK_MONOTONIC, &start);
1342         }
1343         // read the buffer directly to rmr payload
1344         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1345 #ifndef UNIT_TEST        
1346         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1347                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
1348 #else
1349         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
1350 #endif
1351
1352         if (loglevel >= MDCLOG_DEBUG) {
1353             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
1354                          message.peerInfo->fileDescriptor, message.message.asnLength);
1355         }
1356
1357         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
1358         message.message.direction = 'U';
1359         message.message.time.tv_nsec = ts.tv_nsec;
1360         message.message.time.tv_sec = ts.tv_sec;
1361
1362         if (message.message.asnLength < 0) {
1363             if (errno == EINTR) {
1364                 continue;
1365             }
1366             /* If errno == EAGAIN, that means we have read all
1367                data. So goReportingMessages_t back to the main loop. */
1368             if (errno != EAGAIN) {
1369                 mdclog_write(MDCLOG_ERR, "Read error, %s ", strerror(errno));
1370                 done = 1;
1371             } else if (loglevel >= MDCLOG_DEBUG) {
1372                 mdclog_write(MDCLOG_DEBUG, "EAGAIN - descriptor = %d", message.peerInfo->fileDescriptor);
1373             }
1374             break;
1375         } else if (message.message.asnLength == 0) {
1376             /* End of file. The remote has closed the connection. */
1377             if (loglevel >= MDCLOG_INFO) {
1378                 mdclog_write(MDCLOG_INFO, "END of File Closed connection - descriptor = %d",
1379                              message.peerInfo->fileDescriptor);
1380             }
1381             done = 1;
1382             break;
1383         }
1384
1385         if (loglevel >= MDCLOG_DEBUG) {
1386             char printBuffer[RECEIVE_SCTP_BUFFER_SIZE]{};
1387             char *tmp = printBuffer;
1388             for (size_t i = 0; i < (size_t)message.message.asnLength; ++i) {
1389                 snprintf(tmp, 3, "%02x", message.message.asndata[i]);
1390                 tmp += 2;
1391             }
1392             printBuffer[message.message.asnLength] = 0;
1393             clock_gettime(CLOCK_MONOTONIC, &end);
1394             mdclog_write(MDCLOG_DEBUG, "Before Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1395                          message.peerInfo->enodbName, end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1396             mdclog_write(MDCLOG_DEBUG, "PDU buffer length = %ld, data =  : %s", message.message.asnLength,
1397                          printBuffer);
1398             clock_gettime(CLOCK_MONOTONIC, &decodeStart);
1399         }
1400 #ifndef UNIT_TEST
1401         auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1402                         message.message.asndata, message.message.asnLength);
1403 #else
1404         asn_dec_rval_t rval = {RC_OK, 0};
1405         pdu = (E2AP_PDU_t*)rmrMessageBuffer.sendMessage->tp_buf;
1406 #endif
1407         if (rval.code != RC_OK) {
1408             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
1409                          message.peerInfo->enodbName);
1410             if (pdu != nullptr) {
1411                 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1412                 pdu = nullptr;
1413             }
1414             break;
1415         }
1416
1417         if (loglevel >= MDCLOG_DEBUG) {
1418             clock_gettime(CLOCK_MONOTONIC, &end);
1419             mdclog_write(MDCLOG_DEBUG, "After Encoding E2AP PDU for : %s, Read time is : %ld seconds, %ld nanoseconds",
1420                          message.peerInfo->enodbName, end.tv_sec - decodeStart.tv_sec, end.tv_nsec - decodeStart.tv_nsec);
1421             char *printBuffer;
1422             size_t size;
1423             FILE *stream = open_memstream(&printBuffer, &size);
1424             asn_fprint(stream, &asn_DEF_E2AP_PDU, pdu);
1425             mdclog_write(MDCLOG_DEBUG, "Encoding E2AP PDU past : %s", printBuffer);
1426             clock_gettime(CLOCK_MONOTONIC, &decodeStart);
1427
1428             fclose(stream);
1429             free(printBuffer);
1430         }
1431
1432         switch (pdu->present) {
1433             case E2AP_PDU_PR_initiatingMessage: {//initiating message
1434                 asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer);
1435                 break;
1436             }
1437             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
1438                 asnSuccessfulMsg(pdu, sctpMap, message, rmrMessageBuffer);
1439                 break;
1440             }
1441             case E2AP_PDU_PR_unsuccessfulOutcome: { //Unsuccessful Outcome
1442                 asnUnSuccsesfulMsg(pdu, sctpMap, message, rmrMessageBuffer);
1443                 break;
1444             }
1445             default:
1446                 mdclog_write(MDCLOG_ERR, "Unknown index %d in E2AP PDU", pdu->present);
1447                 break;
1448         }
1449         if (loglevel >= MDCLOG_DEBUG) {
1450             clock_gettime(CLOCK_MONOTONIC, &end);
1451             mdclog_write(MDCLOG_DEBUG,
1452                          "After processing message and sent to rmr for : %s, Read time is : %ld seconds, %ld nanoseconds",
1453                          message.peerInfo->enodbName, end.tv_sec - decodeStart.tv_sec, end.tv_nsec - decodeStart.tv_nsec);
1454         }
1455         numOfMessages++;
1456 #ifndef UNIT_TEST
1457         if (pdu != nullptr) {
1458             // ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); /* With reset we were not freeing the memory and was causing the leak here. */
1459             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1460             pdu = nullptr;
1461         }
1462 #else
1463     done = 1;
1464     break;
1465 #endif
1466     }
1467
1468     if (done) {
1469         if (loglevel >= MDCLOG_INFO) {
1470             mdclog_write(MDCLOG_INFO, "Closed connection - descriptor = %d", message.peerInfo->fileDescriptor);
1471         }
1472         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
1473                 snprintf((char *)rmrMessageBuffer.sendMessage->payload,
1474                          256,
1475                          "%s|CU disconnected unexpectedly",
1476                          message.peerInfo->enodbName);
1477         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
1478 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
1479         if (sendRequestToXapp(message,
1480                               RIC_SCTP_CONNECTION_FAILURE,
1481                               rmrMessageBuffer) != 0) {
1482             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
1483         }
1484 #endif        
1485
1486         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
1487         close(message.peerInfo->fileDescriptor);
1488         cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
1489     }
1490     if (loglevel >= MDCLOG_DEBUG) {
1491         clock_gettime(CLOCK_MONOTONIC, &end);
1492         mdclog_write(MDCLOG_DEBUG, "from receive SCTP to send RMR time is %ld seconds and %ld nanoseconds",
1493                      end.tv_sec - start.tv_sec, end.tv_nsec - start.tv_nsec);
1494
1495     }
1496     return 0;
1497 }
1498
1499 static void buildAndSendSetupRequest(ReportingMessages_t &message,
1500                                      RmrMessagesBuffer_t &rmrMessageBuffer,
1501                                      E2AP_PDU_t *pdu/*,
1502                                      string const &messageName,
1503                                      string const &ieName,
1504                                      vector<string> &functionsToAdd_v,
1505                                      vector<string> &functionsToModified_v*/) {
1506     auto logLevel = mdclog_level_get();
1507     // now we can send the data to e2Mgr
1508
1509     asn_enc_rval_t er;
1510     auto buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1511     unsigned char *buffer = nullptr;
1512     buffer = (unsigned char *) calloc(buffer_size, sizeof(unsigned char));
1513     if(!buffer)
1514     {
1515 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))        
1516         mdclog_write(MDCLOG_ERR, "Allocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1517         return;
1518 #endif        
1519     }
1520     while (true) {
1521         er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size);
1522         if (er.encoded == -1) {
1523 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))     
1524             mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1525             return;
1526 #endif            
1527         } else if (er.encoded > (ssize_t) buffer_size) {
1528             buffer_size = er.encoded + 128;
1529 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
1530             mdclog_write(MDCLOG_WARN, "Buffer of size %d is to small for %s. Reallocate buffer of size %d",
1531                          (int) buffer_size,
1532                          asn_DEF_E2AP_PDU.name, buffer_size);
1533             buffer_size = er.encoded + 128;
1534
1535             unsigned char *newBuffer = nullptr;
1536             newBuffer = (unsigned char *) realloc(buffer, buffer_size);
1537             if (!newBuffer)
1538             {
1539                 // out of memory
1540                 mdclog_write(MDCLOG_ERR, "Reallocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1541                 free(buffer);
1542                 return;
1543             }
1544             buffer = newBuffer;
1545             continue;
1546 #endif            
1547         }
1548         buffer[er.encoded] = '\0';
1549         break;
1550     }
1551     // encode to xml
1552
1553     string res((char *)buffer);
1554     res.erase(std::remove(res.begin(), res.end(), '\n'), res.end());
1555     res.erase(std::remove(res.begin(), res.end(), '\t'), res.end());
1556     res.erase(std::remove(res.begin(), res.end(), ' '), res.end());
1557
1558 //    string res {};
1559 //    if (!functionsToAdd_v.empty() || !functionsToModified_v.empty()) {
1560 //        res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded);
1561 //    }
1562     rmr_mbuf_t *rmrMsg;
1563 //    if (res.length() == 0) {
1564 //        rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size + 256);
1565 //        rmrMsg->len = snprintf((char *) rmrMsg->payload, RECEIVE_SCTP_BUFFER_SIZE * 2, "%s:%d|%s",
1566 //                               message.peerInfo->sctpParams->myIP.c_str(),
1567 //                               message.peerInfo->sctpParams->rmrPort,
1568 //                               buffer);
1569 //    } else {
1570         rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, (int)res.length() + 256);
1571         rmrMsg->len = snprintf((char *) rmrMsg->payload, res.length() + 256, "%s:%d|%s",
1572                                message.peerInfo->sctpParams->myIP.c_str(),
1573                                message.peerInfo->sctpParams->rmrPort,
1574                                res.c_str());
1575 //    }
1576
1577     if (logLevel >= MDCLOG_DEBUG) {
1578         mdclog_write(MDCLOG_DEBUG, "Setup request of size %d :\n %s\n", rmrMsg->len, rmrMsg->payload);
1579     }
1580     // send to RMR
1581     rmrMsg->mtype = message.message.messageType;
1582     rmrMsg->state = 0;
1583     rmr_bytes2meid(rmrMsg, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName));
1584
1585     static unsigned char tx[32];
1586     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
1587     rmr_bytes2xact(rmrMsg, tx, strlen((const char *) tx));
1588 #ifndef UNIT_TEST
1589     rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1590 #endif
1591     if (rmrMsg == nullptr) {
1592         mdclog_write(MDCLOG_ERR, "RMR failed to send returned nullptr");
1593     } else if (rmrMsg->state != 0) {
1594         char meid[RMR_MAX_MEID]{};
1595         if (rmrMsg->state == RMR_ERR_RETRY) {
1596             usleep(5);
1597             rmrMsg->state = 0;
1598             mdclog_write(MDCLOG_INFO, "RETRY sending Message %d to Xapp from %s",
1599                          rmrMsg->mtype, rmr_get_meid(rmrMsg, (unsigned char *) meid));
1600 #ifndef UNIT_TEST
1601             rmrMsg = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMsg);
1602 #endif
1603             if (rmrMsg == nullptr) {
1604                 mdclog_write(MDCLOG_ERR, "RMR failed send returned nullptr");
1605             } else if (rmrMsg->state != 0) {
1606                 mdclog_write(MDCLOG_ERR,
1607                              "RMR Retry failed %s sending request %d to Xapp from %s",
1608                              translateRmrErrorMessages(rmrMsg->state).c_str(),
1609                              rmrMsg->mtype,
1610                              rmr_get_meid(rmrMsg, (unsigned char *) meid));
1611             }
1612         } else {
1613             mdclog_write(MDCLOG_ERR, "RMR failed: %s. sending request %d to Xapp from %s",
1614                          translateRmrErrorMessages(rmrMsg->state).c_str(),
1615                          rmrMsg->mtype,
1616                          rmr_get_meid(rmrMsg, (unsigned char *) meid));
1617         }
1618     }
1619     message.peerInfo->gotSetup = true;
1620     buildJsonMessage(message);
1621
1622     if (rmrMsg != nullptr) {
1623         rmr_free_msg(rmrMsg);
1624     }
1625     free(buffer);
1626
1627     return;
1628 }
1629
1630 #if 0
1631 int RAN_Function_list_To_Vector(RANfunctions_List_t& list, vector <string> &runFunXML_v) {
1632     auto index = 0;
1633     runFunXML_v.clear();
1634     for (auto j = 0; j < list.list.count; j++) {
1635         auto *raNfunctionItemIEs = (RANfunction_ItemIEs_t *)list.list.array[j];
1636         if (raNfunctionItemIEs->id == ProtocolIE_ID_id_RANfunction_Item &&
1637             (raNfunctionItemIEs->value.present == RANfunction_ItemIEs__value_PR_RANfunction_Item)) {
1638             // encode to xml
1639             E2SM_gNB_NRT_RANfunction_Definition_t *ranFunDef = nullptr;
1640             auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER,
1641                                    &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1642                                    (void **)&ranFunDef,
1643                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.buf,
1644                                    raNfunctionItemIEs->value.choice.RANfunction_Item.ranFunctionDefinition.size);
1645             if (rval.code != RC_OK) {
1646                 mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2SM message from : %s",
1647                              rval.code,
1648                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name);
1649                 return -1;
1650             }
1651
1652             auto xml_buffer_size = RECEIVE_SCTP_BUFFER_SIZE * 2;
1653             unsigned char xml_buffer[RECEIVE_SCTP_BUFFER_SIZE * 2];
1654             memset(xml_buffer, 0, RECEIVE_SCTP_BUFFER_SIZE * 2);
1655             // encode to xml
1656             auto er = asn_encode_to_buffer(nullptr,
1657                                            ATS_BASIC_XER,
1658                                            &asn_DEF_E2SM_gNB_NRT_RANfunction_Definition,
1659                                            ranFunDef,
1660                                            xml_buffer,
1661                                            xml_buffer_size);
1662             if (er.encoded == -1) {
1663                 mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s",
1664                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1665                              strerror(errno));
1666             } else if (er.encoded > (ssize_t)xml_buffer_size) {
1667                 mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1668                              (int) xml_buffer_size,
1669                              asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name, __func__, __LINE__);
1670             } else {
1671                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1672                     mdclog_write(MDCLOG_DEBUG, "Encoding E2SM %s PDU number %d : %s",
1673                                  asn_DEF_E2SM_gNB_NRT_RANfunction_Definition.name,
1674                                  index++,
1675                                  xml_buffer);
1676                 }
1677
1678                 string runFuncs = (char *)(xml_buffer);
1679                 runFunXML_v.emplace_back(runFuncs);
1680             }
1681         }
1682     }
1683     return 0;
1684 }
1685
1686 int collectServiceUpdate_RequestData(E2AP_PDU_t *pdu,
1687                                      Sctp_Map_t *sctpMap,
1688                                      ReportingMessages_t &message,
1689                                      vector <string> &RANfunctionsAdded_v,
1690                                      vector <string> &RANfunctionsModified_v) {
1691     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1692     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.count; i++) {
1693         auto *ie = pdu->choice.initiatingMessage->value.choice.RICserviceUpdate.protocolIEs.list.array[i];
1694         if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1695             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctionsID_List) {
1696                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1697                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1698                                  ie->value.choice.RANfunctions_List.list.count);
1699                 }
1700                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1701                     return -1;
1702                 }
1703             }
1704         } else if (ie->id == ProtocolIE_ID_id_RANfunctionsModified) {
1705             if (ie->value.present == RICserviceUpdate_IEs__value_PR_RANfunctions_List) {
1706                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1707                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1708                                  ie->value.choice.RANfunctions_List.list.count);
1709                 }
1710                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsModified_v) != 0 ) {
1711                     return -1;
1712                 }
1713             }
1714         }
1715     }
1716     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1717         mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1718                      RANfunctionsAdded_v.size());
1719     }
1720     return 0;
1721 }
1722
1723 #endif
1724
1725
1726 void buildE2TPrometheusCounters(sctp_params_t &sctpParams) {
1727     sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestMsgs"}});
1728     sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestBytes"}});
1729
1730     sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupResponseMsgs"}});
1731     sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupResponseBytes"}});
1732
1733     sctpParams.e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestFailureMsgs"}});
1734     sctpParams.e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestFailureBytes"}});
1735
1736     sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &sctpParams.prometheusFamily->Add({{"counter", "ErrorIndicationMsgs"}});
1737     sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &sctpParams.prometheusFamily->Add({{"counter", "ErrorIndicationBytes"}});
1738
1739     sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetRequestMsgs"}});
1740     sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetRequestBytes"}});
1741
1742     sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetAckMsgs"}});
1743     sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetAckBytes"}});
1744
1745     sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateMsgs"}});
1746     sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateBytes"}});
1747
1748     sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateRespMsgs"}});
1749     sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateRespBytes"}});
1750
1751     sctpParams.e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateFailureMsgs"}});
1752     sctpParams.e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateFailureBytes"}});
1753
1754     sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlMsgs"}});
1755     sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlBytes"}});
1756
1757     sctpParams.e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlAckMsgs"}});
1758     sctpParams.e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlAckBytes"}});
1759
1760     sctpParams.e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlFailureMsgs"}});
1761     sctpParams.e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlFailureBytes"}});
1762
1763     sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionMsgs"}});
1764     sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionBytes"}});
1765
1766     sctpParams.e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionAckMsgs"}});
1767     sctpParams.e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionAckBytes"}});
1768
1769     sctpParams.e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionFailureMsgs"}});
1770     sctpParams.e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionFailureBytes"}});
1771
1772     sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteMsgs"}});
1773     sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteBytes"}});
1774
1775     sctpParams.e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteAckMsgs"}});
1776     sctpParams.e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteAckBytes"}});
1777
1778     sctpParams.e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteFailMsgs"}});
1779     sctpParams.e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteFailBytes"}});
1780
1781     sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication] = &sctpParams.prometheusFamily->Add({{"counter", "RICIndicationMsgs"}});
1782     sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication] = &sctpParams.prometheusFamily->Add({{"counter", "RICIndicationBytes"}});
1783
1784     sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceQueryMsgs"}});
1785     sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceQueryBytes"}});
1786 }
1787
1788 void buildPrometheusList(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFamily) {
1789     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}});
1790     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}});
1791
1792     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Messages"}});
1793     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Bytes"}});
1794
1795     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICindication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Messages"}});
1796     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICindication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICindication", "Bytes"}});
1797
1798     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Messages"}});
1799     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetRequest", "Bytes"}});
1800
1801     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Messages"}});
1802     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICserviceUpdate", "Bytes"}});
1803     // ---------------------------------------------
1804     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Messages"}});
1805     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ResetACK", "Bytes"}});
1806
1807     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Messages"}});
1808     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolACK", "Bytes"}});
1809
1810     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Messages"}});
1811     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionACK", "Bytes"}});
1812
1813     peerInfo->counters[IN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Messages"}});
1814     peerInfo->counters[IN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteACK", "Bytes"}});
1815     //-------------------------------------------------------------
1816
1817     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Messages"}});
1818     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICcontrolFailure", "Bytes"}});
1819
1820     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Messages"}});
1821     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionFailure", "Bytes"}});
1822
1823     peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Messages"}});
1824     peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"RICsubscriptionDeleteFailure", "Bytes"}});
1825
1826     //====================================================================================
1827     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Messages"}});
1828     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ErrorIndication", "Bytes"}});
1829
1830     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Messages"}});
1831     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetRequest", "Bytes"}});
1832
1833     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Messages"}});
1834     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICcontrol)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICcontrol", "Bytes"}});
1835
1836     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICserviceQuery)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Messages"}});
1837     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICserviceQuery)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceQuery", "Bytes"}});
1838
1839     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Messages"}});
1840     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscription)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscription", "Bytes"}});
1841
1842     peerInfo->counters[OUT_INITI][MSG_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Messages"}});
1843     peerInfo->counters[OUT_INITI][BYTES_COUNTER][(ProcedureCode_id_RICsubscriptionDelete)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICsubscriptionDelete", "Bytes"}});
1844     //---------------------------------------------------------------------------------------------------------
1845     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Messages"}});
1846     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Bytes"}});
1847
1848     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Messages"}});
1849     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Bytes"}});
1850
1851     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Messages"}});
1852     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateResponse", "Bytes"}});
1853     //----------------------------------------------------------------------------------------------------------------
1854     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Messages"}});
1855     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Bytes"}});
1856
1857     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Messages"}});
1858     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Bytes"}});
1859 }
1860
1861 /**
1862  *
1863  * @param pdu
1864  * @param sctpMap
1865  * @param message
1866  * @param RANfunctionsAdded_v
1867  * @return
1868  */
1869 int collectSetupRequestData(E2AP_PDU_t *pdu,
1870                                      Sctp_Map_t *sctpMap,
1871                                      ReportingMessages_t &message /*, vector <string> &RANfunctionsAdded_v*/) {
1872     memset(message.peerInfo->enodbName, 0 , MAX_ENODB_NAME_SIZE);
1873     for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.count; i++) {
1874         auto *ie = pdu->choice.initiatingMessage->value.choice.E2setupRequest.protocolIEs.list.array[i];
1875         if (ie->id == ProtocolIE_ID_id_GlobalE2node_ID) {
1876             // get the ran name for meid
1877             if (ie->value.present == E2setupRequestIEs__value_PR_GlobalE2node_ID) {
1878                 if (buildRanName(message.peerInfo->enodbName, ie) < 0) {
1879                     mdclog_write(MDCLOG_ERR, "Bad param in E2setupRequestIEs GlobalE2node_ID.\n");
1880                     // no message will be sent
1881                     return -1;
1882                 }
1883
1884                 memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
1885                 sctpMap->setkey(message.message.enodbName, message.peerInfo);
1886             }
1887         } /*else if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
1888             if (ie->value.present == E2setupRequestIEs__value_PR_RANfunctions_List) {
1889                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
1890                     mdclog_write(MDCLOG_DEBUG, "Run function list have %d entries",
1891                                  ie->value.choice.RANfunctions_List.list.count);
1892                 }
1893                 if (RAN_Function_list_To_Vector(ie->value.choice.RANfunctions_List, RANfunctionsAdded_v) != 0 ) {
1894                     return -1;
1895                 }
1896             }
1897         } */
1898     }
1899 //    if (mdclog_level_get() >= MDCLOG_DEBUG) {
1900 //        mdclog_write(MDCLOG_DEBUG, "Run function vector have %ld entries",
1901 //                     RANfunctionsAdded_v.size());
1902 //    }
1903     return 0;
1904 }
1905
1906 int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
1907     E2AP_PDU_t *pdu = nullptr;
1908
1909     if (mdclog_level_get() >= MDCLOG_DEBUG) {
1910         mdclog_write(MDCLOG_DEBUG, "got PER message of size %d is:%s",
1911                      rmrMessageBuffer.sendMessage->len, rmrMessageBuffer.sendMessage->payload);
1912     }
1913     auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
1914                            rmrMessageBuffer.sendMessage->payload, rmrMessageBuffer.sendMessage->len);
1915     if (rval.code != RC_OK) {
1916         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
1917                      rval.code,
1918                      message.message.enodbName);
1919         if (pdu != nullptr) {
1920             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1921             pdu = nullptr;
1922         }
1923         return -1;
1924     }
1925
1926     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
1927     auto er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu,
1928                                    rmrMessageBuffer.sendMessage->payload, buff_size);
1929     if (er.encoded == -1) {
1930         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
1931         if (pdu != nullptr) {
1932             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1933             pdu = nullptr;
1934         }
1935         return -1;
1936     } else if (er.encoded > (ssize_t)buff_size) {
1937         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
1938                      (int)rmrMessageBuffer.sendMessage->len,
1939                      asn_DEF_E2AP_PDU.name,
1940                      __func__,
1941                      __LINE__);
1942         if (pdu != nullptr) {
1943             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1944             pdu = nullptr;
1945         }
1946         return -1;
1947     }
1948     rmrMessageBuffer.sendMessage->len = er.encoded;
1949     if (pdu != nullptr) {
1950         ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
1951         pdu = nullptr;
1952     }
1953     return 0;
1954
1955 }
1956
1957 /**
1958  *
1959  * @param pdu
1960  * @param message
1961  * @param rmrMessageBuffer
1962  */
1963 void asnInitiatingRequest(E2AP_PDU_t *pdu,
1964                           Sctp_Map_t *sctpMap,
1965                           ReportingMessages_t &message,
1966                           RmrMessagesBuffer_t &rmrMessageBuffer) {
1967     auto logLevel = mdclog_level_get();
1968     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
1969     if (logLevel >= MDCLOG_DEBUG) {
1970         mdclog_write(MDCLOG_DEBUG, "Initiating message %ld\n", procedureCode);
1971     }
1972     switch (procedureCode) {
1973         case ProcedureCode_id_E2setup: {
1974             if (logLevel >= MDCLOG_DEBUG) {
1975                 mdclog_write(MDCLOG_DEBUG, "Got E2setup");
1976             }
1977
1978 //            vector <string> RANfunctionsAdded_v;
1979 //            vector <string> RANfunctionsModified_v;
1980 //            RANfunctionsAdded_v.clear();
1981 //            RANfunctionsModified_v.clear();
1982             if (collectSetupRequestData(pdu, sctpMap, message) != 0) {
1983                 break;
1984             }
1985
1986             buildPrometheusList(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily);
1987
1988             string messageName("E2setupRequest");
1989             string ieName("E2setupRequestIEs");
1990             message.message.messageType = RIC_E2_SETUP_REQ;
1991             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
1992             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment((double)message.message.asnLength);
1993
1994             // Update E2T instance level metrics
1995             message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
1996             message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment((double)message.message.asnLength);
1997
1998             buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
1999             break;
2000         }
2001         case ProcedureCode_id_RICserviceUpdate: {
2002             if (logLevel >= MDCLOG_DEBUG) {
2003                 mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
2004             }
2005 //            vector <string> RANfunctionsAdded_v;
2006 //            vector <string> RANfunctionsModified_v;
2007 //            RANfunctionsAdded_v.clear();
2008 //            RANfunctionsModified_v.clear();
2009 //            if (collectServiceUpdate_RequestData(pdu, sctpMap, message,
2010 //                                                 RANfunctionsAdded_v, RANfunctionsModified_v) != 0) {
2011 //                break;
2012 //            }
2013
2014             string messageName("RICserviceUpdate");
2015             string ieName("RICserviceUpdateIEs");
2016             message.message.messageType = RIC_SERVICE_UPDATE;
2017 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2018             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2019             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment((double)message.message.asnLength);
2020
2021             // Update E2T instance level metrics
2022             message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2023             message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment((double)message.message.asnLength);
2024 #endif
2025             buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
2026             break;
2027         }
2028         case ProcedureCode_id_ErrorIndication: {
2029             if (logLevel >= MDCLOG_DEBUG) {
2030                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
2031             }
2032 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2033             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
2034             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment((double)message.message.asnLength);
2035
2036             // Update E2T instance level metrics
2037             message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
2038             message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment((double)message.message.asnLength);
2039 #endif
2040             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
2041                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
2042             }
2043             break;
2044         }
2045         case ProcedureCode_id_Reset: {
2046             if (logLevel >= MDCLOG_DEBUG) {
2047                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
2048             }
2049 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2050             message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2051             message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
2052
2053             // Update E2T instance level metrics
2054             message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2055             message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
2056 #endif
2057             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
2058                 break;
2059             }
2060
2061             if (sendRequestToXapp(message, RIC_E2_RESET_REQ, rmrMessageBuffer) != 0) {
2062                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_REQ message failed to send to xAPP");
2063             }
2064             break;
2065         }
2066         case ProcedureCode_id_RICindication: {
2067             if (logLevel >= MDCLOG_DEBUG) {
2068                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
2069             }
2070             for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
2071                 auto messageSent = false;
2072                 RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
2073                 if (logLevel >= MDCLOG_DEBUG) {
2074                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
2075                 }
2076                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
2077                     if (logLevel >= MDCLOG_DEBUG) {
2078                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
2079                     }
2080                     if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
2081                         static unsigned char tx[32];
2082                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
2083                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2084                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
2085                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
2086                                        (unsigned char *)message.message.enodbName,
2087                                        strlen(message.message.enodbName));
2088                         rmrMessageBuffer.sendMessage->state = 0;
2089                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
2090
2091                         //ie->value.choice.RICrequestID.ricInstanceID;
2092                         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2093                             mdclog_write(MDCLOG_DEBUG, "sub id = %d, mtype = %d, ric instance id %ld, requestor id = %ld",
2094                                          rmrMessageBuffer.sendMessage->sub_id,
2095                                          rmrMessageBuffer.sendMessage->mtype,
2096                                          ie->value.choice.RICrequestID.ricInstanceID,
2097                                          ie->value.choice.RICrequestID.ricRequestorID);
2098                         }
2099 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))                        
2100                         message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication]->Increment();
2101                         message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication]->Increment((double)message.message.asnLength);
2102
2103                         // Update E2T instance level metrics
2104                         message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication]->Increment();
2105                         message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication]->Increment((double)message.message.asnLength);
2106 #endif
2107                         sendRmrMessage(rmrMessageBuffer, message);
2108                         messageSent = true;
2109                     } else {
2110                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
2111                     }
2112                 }
2113                 if (messageSent) {
2114                     break;
2115                 }
2116             }
2117             break;
2118         }
2119         default: {
2120             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
2121             message.message.messageType = 0; // no RMR message type yet
2122
2123             buildJsonMessage(message);
2124
2125             break;
2126         }
2127     }
2128 }
2129
2130 /**
2131  *
2132  * @param pdu
2133  * @param message
2134  * @param rmrMessageBuffer
2135  */
2136 void asnSuccessfulMsg(E2AP_PDU_t *pdu,
2137                       Sctp_Map_t *sctpMap,
2138                       ReportingMessages_t &message,
2139                       RmrMessagesBuffer_t &rmrMessageBuffer) {
2140     auto procedureCode = pdu->choice.successfulOutcome->procedureCode;
2141     auto logLevel = mdclog_level_get();
2142     if (logLevel >= MDCLOG_INFO) {
2143         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
2144     }
2145     switch (procedureCode) {
2146         case ProcedureCode_id_Reset: {
2147             if (logLevel >= MDCLOG_DEBUG) {
2148                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
2149             }
2150 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2151             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2152             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
2153
2154             // Update E2T instance level metrics
2155             message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2156             message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
2157 #endif
2158             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
2159                 break;
2160             }
2161             if (sendRequestToXapp(message, RIC_E2_RESET_RESP, rmrMessageBuffer) != 0) {
2162                 mdclog_write(MDCLOG_ERR, "RIC_E2_RESET_RESP message failed to send to xAPP");
2163             }
2164             break;
2165         }
2166         case ProcedureCode_id_RICcontrol: {
2167             if (logLevel >= MDCLOG_DEBUG) {
2168                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
2169             }
2170             for (auto i = 0;
2171                  i < pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.count; i++) {
2172                 auto messageSent = false;
2173                 RICcontrolAcknowledge_IEs_t *ie = pdu->choice.successfulOutcome->value.choice.RICcontrolAcknowledge.protocolIEs.list.array[i];
2174                 if (mdclog_level_get() >= MDCLOG_DEBUG) {
2175                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
2176                 }
2177                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
2178                     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2179                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
2180                     }
2181                     if (ie->value.present == RICcontrolAcknowledge_IEs__value_PR_RICrequestID) {
2182                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_ACK;
2183                         rmrMessageBuffer.sendMessage->state = 0;
2184 //                        rmrMessageBuffer.sendMessage->sub_id = (int) ie->value.choice.RICrequestID.ricRequestorID;
2185                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
2186
2187                         static unsigned char tx[32];
2188                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2189                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
2190                         rmr_bytes2meid(rmrMessageBuffer.sendMessage,
2191                                        (unsigned char *)message.message.enodbName,
2192                                        strlen(message.message.enodbName));
2193 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))                        
2194                         message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
2195                         message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
2196
2197                         // Update E2T instance level metrics
2198                         message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
2199                         message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
2200 #endif
2201                         sendRmrMessage(rmrMessageBuffer, message);
2202                         messageSent = true;
2203                     } else {
2204                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
2205                     }
2206                 }
2207                 if (messageSent) {
2208                     break;
2209                 }
2210             }
2211
2212             break;
2213         }
2214         case ProcedureCode_id_RICsubscription: {
2215             if (logLevel >= MDCLOG_DEBUG) {
2216                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
2217             }
2218 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2219             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
2220             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
2221
2222             // Update E2T instance level metrics
2223             message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
2224             message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
2225 #endif
2226             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
2227                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
2228             }
2229             break;
2230         }
2231         case ProcedureCode_id_RICsubscriptionDelete: {
2232             if (logLevel >= MDCLOG_DEBUG) {
2233                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
2234             }
2235 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2236             message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
2237             message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
2238
2239             // Update E2T instance level metrics
2240             message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
2241             message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
2242 #endif
2243             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
2244                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
2245             }
2246             break;
2247         }
2248         default: {
2249             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
2250             message.message.messageType = 0; // no RMR message type yet
2251             buildJsonMessage(message);
2252
2253             break;
2254         }
2255     }
2256 }
2257
2258 /**
2259  *
2260  * @param pdu
2261  * @param message
2262  * @param rmrMessageBuffer
2263  */
2264 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
2265                         Sctp_Map_t *sctpMap,
2266                         ReportingMessages_t &message,
2267                         RmrMessagesBuffer_t &rmrMessageBuffer) {
2268     auto procedureCode = pdu->choice.unsuccessfulOutcome->procedureCode;
2269     auto logLevel = mdclog_level_get();
2270     if (logLevel >= MDCLOG_INFO) {
2271         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
2272     }
2273     switch (procedureCode) {
2274         case ProcedureCode_id_RICcontrol: {
2275             if (logLevel >= MDCLOG_DEBUG) {
2276                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
2277             }
2278             for (int i = 0;
2279                  i < pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.count; i++) {
2280                 auto messageSent = false;
2281                 RICcontrolFailure_IEs_t *ie = pdu->choice.unsuccessfulOutcome->value.choice.RICcontrolFailure.protocolIEs.list.array[i];
2282                 if (logLevel >= MDCLOG_DEBUG) {
2283                     mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
2284                 }
2285                 if (ie->id == ProtocolIE_ID_id_RICrequestID) {
2286                     if (logLevel >= MDCLOG_DEBUG) {
2287                         mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
2288                     }
2289                     if (ie->value.present == RICcontrolFailure_IEs__value_PR_RICrequestID) {
2290                         message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_CONTROL_FAILURE;
2291                         rmrMessageBuffer.sendMessage->state = 0;
2292 //                        rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricRequestorID;
2293                         rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
2294                         static unsigned char tx[32];
2295                         snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2296                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
2297                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
2298                                        strlen(message.message.enodbName));
2299 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))                        
2300                         message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
2301                         message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
2302
2303                         // Update E2T instance level metrics
2304                         message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
2305                         message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
2306 #endif
2307                         sendRmrMessage(rmrMessageBuffer, message);
2308                         messageSent = true;
2309                     } else {
2310                         mdclog_write(MDCLOG_ERR, "RIC request id missing illegal request");
2311                     }
2312                 }
2313                 if (messageSent) {
2314                     break;
2315                 }
2316             }
2317             break;
2318         }
2319         case ProcedureCode_id_RICsubscription: {
2320             if (logLevel >= MDCLOG_DEBUG) {
2321                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
2322             }
2323 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2324             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
2325             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
2326
2327             // Update E2T instance level metrics
2328             message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
2329             message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
2330 #endif
2331             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
2332                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
2333             }
2334             break;
2335         }
2336         case ProcedureCode_id_RICsubscriptionDelete: {
2337             if (logLevel >= MDCLOG_DEBUG) {
2338                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
2339             }
2340 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2341             message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
2342             message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
2343
2344             // Update E2T instance level metrics
2345             message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
2346             message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
2347 #endif
2348             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
2349                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
2350             }
2351             break;
2352         }
2353         default: {
2354             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
2355             message.message.messageType = 0; // no RMR message type yet
2356 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2357             buildJsonMessage(message);
2358 #endif            
2359             break;
2360         }
2361     }
2362 }
2363
2364 /**
2365  *
2366  * @param message
2367  * @param requestId
2368  * @param rmrMmessageBuffer
2369  * @return
2370  */
2371 int sendRequestToXapp(ReportingMessages_t &message,
2372                       int requestId,
2373                       RmrMessagesBuffer_t &rmrMmessageBuffer) {
2374     rmr_bytes2meid(rmrMmessageBuffer.sendMessage,
2375                    (unsigned char *)message.message.enodbName,
2376                    strlen(message.message.enodbName));
2377     message.message.messageType = rmrMmessageBuffer.sendMessage->mtype = requestId;
2378     rmrMmessageBuffer.sendMessage->state = 0;
2379     static unsigned char tx[32];
2380     snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2381     rmr_bytes2xact(rmrMmessageBuffer.sendMessage, tx, strlen((const char *) tx));
2382
2383     auto rc = sendRmrMessage(rmrMmessageBuffer, message);
2384     return rc;
2385 }
2386
2387 /**
2388  *
2389  * @param pSctpParams
2390  */
2391 void getRmrContext(sctp_params_t &pSctpParams) {
2392     pSctpParams.rmrCtx = nullptr;
2393     pSctpParams.rmrCtx = rmr_init(pSctpParams.rmrAddress, RECEIVE_XAPP_BUFFER_SIZE, RMRFL_NONE);
2394     if (pSctpParams.rmrCtx == nullptr) {
2395         mdclog_write(MDCLOG_ERR, "Failed to initialize RMR");
2396         return;
2397     }
2398
2399     rmr_set_stimeout(pSctpParams.rmrCtx, 0);    // disable retries for any send operation
2400     // we need to find that routing table exist and we can run
2401     if (mdclog_level_get() >= MDCLOG_INFO) {
2402         mdclog_write(MDCLOG_INFO, "We are after RMR INIT wait for RMR_Ready");
2403     }
2404     int rmrReady = 0;
2405     int count = 0;
2406     while (!rmrReady) {
2407         if ((rmrReady = rmr_ready(pSctpParams.rmrCtx)) == 0) {
2408             sleep(1);
2409         }
2410         count++;
2411         if (count % 60 == 0) {
2412             mdclog_write(MDCLOG_INFO, "waiting to RMR ready state for %d seconds", count);
2413         }
2414     }
2415     if (mdclog_level_get() >= MDCLOG_INFO) {
2416         mdclog_write(MDCLOG_INFO, "RMR running");
2417     }
2418     rmr_init_trace(pSctpParams.rmrCtx, 200);
2419     // get the RMR fd for the epoll
2420     pSctpParams.rmrListenFd = rmr_get_rcvfd(pSctpParams.rmrCtx);
2421     struct epoll_event event{};
2422     // add RMR fd to epoll
2423     event.events = (EPOLLIN);
2424     event.data.fd = pSctpParams.rmrListenFd;
2425     // add listening RMR FD to epoll
2426     if (epoll_ctl(pSctpParams.epoll_fd, EPOLL_CTL_ADD, pSctpParams.rmrListenFd, &event)) {
2427         mdclog_write(MDCLOG_ERR, "Failed to add RMR descriptor to epoll");
2428         close(pSctpParams.rmrListenFd);
2429         rmr_close(pSctpParams.rmrCtx);
2430         pSctpParams.rmrCtx = nullptr;
2431     }
2432 }
2433
2434 /**
2435  *
2436  * @param message
2437  * @param rmrMessageBuffer
2438  * @return
2439  */
2440 int PER_FromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
2441     E2AP_PDU_t *pdu = nullptr;
2442
2443     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2444         mdclog_write(MDCLOG_DEBUG, "got xml Format  data from xApp of size %d is:%s",
2445                 rmrMessageBuffer.rcvMessage->len, rmrMessageBuffer.rcvMessage->payload);
2446     }
2447     auto rval = asn_decode(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, (void **) &pdu,
2448                            rmrMessageBuffer.rcvMessage->payload, rmrMessageBuffer.rcvMessage->len);
2449     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2450         mdclog_write(MDCLOG_DEBUG, "%s After  decoding the XML to PDU", __func__ );
2451     }
2452     if (rval.code != RC_OK) {
2453 #ifdef UNIT_TEST
2454     return 0;
2455 #endif    
2456         mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response  from E2MGR : %s",
2457                      rval.code,
2458                      message.message.enodbName);
2459         if (pdu != nullptr) {
2460             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
2461             pdu = nullptr;
2462         }
2463         return -1;
2464     }
2465
2466     int buff_size = RECEIVE_XAPP_BUFFER_SIZE;
2467     auto er = asn_encode_to_buffer(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, pdu,
2468                                    rmrMessageBuffer.rcvMessage->payload, buff_size);
2469     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2470         mdclog_write(MDCLOG_DEBUG, "%s After encoding PDU to PER", __func__ );
2471     }
2472     if (er.encoded == -1) {
2473         mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
2474         if (pdu != nullptr) {
2475             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
2476             pdu = nullptr;
2477         }
2478         return -1;
2479     } else if (er.encoded > (ssize_t)buff_size) {
2480         mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d",
2481                      (int)rmrMessageBuffer.rcvMessage->len,
2482                      asn_DEF_E2AP_PDU.name,
2483                      __func__,
2484                      __LINE__);
2485         if (pdu != nullptr) {
2486             ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
2487             pdu = nullptr;
2488         }
2489         return -1;
2490     }
2491     rmrMessageBuffer.rcvMessage->len = er.encoded;
2492     if (pdu != nullptr) {
2493         ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
2494         pdu = nullptr;
2495     }
2496     return 0;
2497 }
2498
2499 /**
2500  *
2501  * @param sctpMap
2502  * @param rmrMessageBuffer
2503  * @param ts
2504  * @return
2505  */
2506 int receiveXappMessages(Sctp_Map_t *sctpMap,
2507                         RmrMessagesBuffer_t &rmrMessageBuffer,
2508                         struct timespec &ts) {
2509     int loglevel = mdclog_level_get();
2510     if (rmrMessageBuffer.rcvMessage == nullptr) {
2511         //we have error
2512         mdclog_write(MDCLOG_ERR, "RMR Allocation message, %s", strerror(errno));
2513         return -1;
2514     }
2515
2516 //    if (loglevel >= MDCLOG_DEBUG) {
2517 //        mdclog_write(MDCLOG_DEBUG, "Call to rmr_rcv_msg");
2518 //    }
2519     rmrMessageBuffer.rcvMessage = rmr_rcv_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2520     if (rmrMessageBuffer.rcvMessage == nullptr) {
2521         mdclog_write(MDCLOG_ERR, "RMR Receiving message with null pointer, Reallocated rmr message buffer");
2522         rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2523         return -2;
2524     }
2525     ReportingMessages_t message;
2526     message.message.direction = 'D';
2527     message.message.time.tv_nsec = ts.tv_nsec;
2528     message.message.time.tv_sec = ts.tv_sec;
2529
2530     // get message payload
2531     //auto msgData = msg->payload;
2532 #ifdef UNIT_TEST
2533     rmrMessageBuffer.rcvMessage->state = 0;
2534 #endif
2535     if (rmrMessageBuffer.rcvMessage->state != 0) {
2536         mdclog_write(MDCLOG_ERR, "RMR Receiving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
2537         return -1;
2538     }
2539     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName);
2540     message.peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
2541     if (message.peerInfo == nullptr) {
2542         auto type = rmrMessageBuffer.rcvMessage->mtype;
2543         switch (type) {
2544             case RIC_SCTP_CLEAR_ALL:
2545             case E2_TERM_KEEP_ALIVE_REQ:
2546             case RIC_HEALTH_CHECK_REQ:
2547                 break;
2548             default:
2549 #ifdef UNIT_TEST
2550     break;
2551 #endif    
2552                 mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
2553                 return -1;
2554         }
2555     }
2556
2557     if (rmrMessageBuffer.rcvMessage->mtype != RIC_HEALTH_CHECK_REQ) {
2558         num_of_XAPP_messages.fetch_add(1, std::memory_order_release);
2559
2560     }
2561     switch (rmrMessageBuffer.rcvMessage->mtype) {
2562         case RIC_E2_SETUP_RESP : {
2563             if (loglevel >= MDCLOG_DEBUG) {
2564                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_SETUP_RESP");
2565             }
2566             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2567                 break;
2568             }
2569 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2570             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2571             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2572
2573             // Update E2T instance level metrics
2574             message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2575             message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2576 #endif            
2577             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2578                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
2579                 return -6;
2580             }
2581             break;
2582         }
2583         case RIC_E2_SETUP_FAILURE : {
2584             if (loglevel >= MDCLOG_DEBUG) {
2585                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_SETUP_FAILURE");
2586             }
2587             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2588                 break;
2589             }
2590 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2591             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2592             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2593
2594             // Update E2T instance level metrics
2595             message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
2596             message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
2597 #endif            
2598             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2599                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
2600                 return -6;
2601             }
2602             break;
2603         }
2604         case RIC_ERROR_INDICATION: {
2605             if (loglevel >= MDCLOG_DEBUG) {
2606                 mdclog_write(MDCLOG_DEBUG, "RIC_ERROR_INDICATION");
2607             }
2608 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2609             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
2610             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len);
2611
2612             // Update E2T instance level metrics
2613             message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
2614             message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len);
2615 #endif            
2616             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2617                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
2618                 return -6;
2619             }
2620             break;
2621         }
2622         case RIC_SUB_REQ: {
2623             if (loglevel >= MDCLOG_DEBUG) {
2624                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_REQ");
2625             }
2626 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2627             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
2628             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len);
2629
2630             // Update E2T instance level metrics
2631             message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
2632             message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len);
2633 #endif            
2634             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2635                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
2636                 return -6;
2637             }
2638             break;
2639         }
2640         case RIC_SUB_DEL_REQ: {
2641             if (loglevel >= MDCLOG_DEBUG) {
2642                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_DEL_REQ");
2643             }
2644 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2645             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
2646             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len);
2647
2648             // Update E2T instance level metrics
2649             message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
2650             message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len);
2651 #endif            
2652             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2653                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
2654                 return -6;
2655             }
2656             break;
2657         }
2658         case RIC_CONTROL_REQ: {
2659             if (loglevel >= MDCLOG_DEBUG) {
2660                 mdclog_write(MDCLOG_DEBUG, "RIC_CONTROL_REQ");
2661             }
2662 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2663             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
2664             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len);
2665
2666             // Update E2T instance level metrics
2667             message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
2668             message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len);
2669 #endif            
2670             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2671                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
2672                 return -6;
2673             }
2674             break;
2675         }
2676         case RIC_SERVICE_QUERY: {
2677             if (loglevel >= MDCLOG_DEBUG) {
2678                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_QUERY");
2679             }
2680             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2681                 break;
2682             }
2683 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2684             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment();
2685             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len);
2686
2687             // Update E2T instance level metrics
2688             message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment();
2689             message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len);
2690 #endif            
2691             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2692                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
2693                 return -6;
2694             }
2695             break;
2696         }
2697         case RIC_SERVICE_UPDATE_ACK: {
2698             if (loglevel >= MDCLOG_DEBUG) {
2699                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_UPDATE_ACK");
2700             }
2701             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2702                 mdclog_write(MDCLOG_ERR, "error in PER_FromXML");
2703                 break;
2704             }
2705 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2706             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2707             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2708
2709             // Update E2T instance level metrics
2710             message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2711             message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2712 #endif            
2713             if (loglevel >= MDCLOG_DEBUG) {
2714                 mdclog_write(MDCLOG_DEBUG, "Before sending to CU");
2715             }
2716             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2717                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
2718                 return -6;
2719             }
2720             break;
2721         }
2722         case RIC_SERVICE_UPDATE_FAILURE: {
2723             if (loglevel >= MDCLOG_DEBUG) {
2724                 mdclog_write(MDCLOG_DEBUG, "RIC_SERVICE_UPDATE_FAILURE");
2725             }
2726             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2727                 break;
2728             }
2729 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2730             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2731             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2732
2733             // Update E2T instance level metrics
2734             message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
2735             message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
2736 #endif            
2737             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2738                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
2739                 return -6;
2740             }
2741             break;
2742         }
2743         case RIC_E2_RESET_REQ: {
2744             if (loglevel >= MDCLOG_DEBUG) {
2745                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_RESET_REQ");
2746             }
2747             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2748                 break;
2749             }
2750 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2751             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2752             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2753
2754             // Update E2T instance level metrics
2755             message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2756             message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2757 #endif            
2758             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2759                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET");
2760                 return -6;
2761             }
2762             break;
2763         }
2764         case RIC_E2_RESET_RESP: {
2765             if (loglevel >= MDCLOG_DEBUG) {
2766                 mdclog_write(MDCLOG_DEBUG, "RIC_E2_RESET_RESP");
2767             }
2768             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
2769                 break;
2770             }
2771 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2772             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2773             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2774
2775             // Update E2T instance level metrics
2776             message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
2777             message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
2778 #endif            
2779             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
2780                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP");
2781                 return -6;
2782             }
2783             break;
2784         }
2785         case RIC_SCTP_CLEAR_ALL: {
2786             mdclog_write(MDCLOG_INFO, "RIC_SCTP_CLEAR_ALL");
2787             // loop on all keys and close socket and then erase all map.
2788             vector<char *> v;
2789             sctpMap->getKeys(v);
2790             for (auto const &iter : v) { //}; iter != sctpMap.end(); iter++) {
2791                 if (!boost::starts_with((string) (iter), "host:") && !boost::starts_with((string) (iter), "msg:")) {
2792                     auto *peerInfo = (ConnectedCU_t *) sctpMap->find(iter);
2793                     if (peerInfo == nullptr) {
2794                         continue;
2795                     }
2796                     close(peerInfo->fileDescriptor);
2797                     memcpy(message.message.enodbName, peerInfo->enodbName, sizeof(peerInfo->enodbName));
2798                     message.message.direction = 'D';
2799                     message.message.time.tv_nsec = ts.tv_nsec;
2800                     message.message.time.tv_sec = ts.tv_sec;
2801
2802                     message.message.asnLength = rmrMessageBuffer.sendMessage->len =
2803                             snprintf((char *)rmrMessageBuffer.sendMessage->payload,
2804                                      256,
2805                                      "%s|RIC_SCTP_CLEAR_ALL",
2806                                      peerInfo->enodbName);
2807                     message.message.asndata = rmrMessageBuffer.sendMessage->payload;
2808                     mdclog_write(MDCLOG_INFO, "%s", message.message.asndata);
2809                     if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) {
2810                         mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
2811                     }
2812                     free(peerInfo);
2813                 }
2814             }
2815
2816             sleep(1);
2817             sctpMap->clear();
2818             break;
2819         }
2820         case E2_TERM_KEEP_ALIVE_REQ: {
2821             // send message back
2822             rmr_bytes2payload(rmrMessageBuffer.sendMessage,
2823                               (unsigned char *)rmrMessageBuffer.ka_message,
2824                               rmrMessageBuffer.ka_message_len);
2825             rmrMessageBuffer.sendMessage->mtype = E2_TERM_KEEP_ALIVE_RESP;
2826             rmrMessageBuffer.sendMessage->state = 0;
2827             static unsigned char tx[32];
2828             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2829             rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen);
2830 #if !(defined(UNIT_TEST) || defined(MODULE_TEST))            
2831             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2832 #endif
2833             if (rmrMessageBuffer.sendMessage == nullptr) {
2834                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2835                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP RMR message returned NULL");
2836             } else if (rmrMessageBuffer.sendMessage->state != 0)  {
2837                 mdclog_write(MDCLOG_ERR, "Failed to send E2_TERM_KEEP_ALIVE_RESP, on RMR state = %d ( %s)",
2838                              rmrMessageBuffer.sendMessage->state, translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str());
2839             } else if (loglevel >= MDCLOG_DEBUG) {
2840                 mdclog_write(MDCLOG_DEBUG, "Got Keep Alive Request send : %s", rmrMessageBuffer.ka_message);
2841             }
2842
2843             break;
2844         }
2845         case RIC_HEALTH_CHECK_REQ: {
2846             static int counter = 0;
2847             // send message back
2848             rmr_bytes2payload(rmrMessageBuffer.rcvMessage,
2849                               (unsigned char *)"OK",
2850                               2);
2851             rmrMessageBuffer.rcvMessage->mtype = RIC_HEALTH_CHECK_RESP;
2852             rmrMessageBuffer.rcvMessage->state = 0;
2853             static unsigned char tx[32];
2854             auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
2855             rmr_bytes2xact(rmrMessageBuffer.rcvMessage, tx, txLen);
2856             rmrMessageBuffer.rcvMessage = rmr_rts_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.rcvMessage);
2857             //rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
2858             if (rmrMessageBuffer.rcvMessage == nullptr) {
2859                 rmrMessageBuffer.rcvMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
2860                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP RMR message returned NULL");
2861             } else if (rmrMessageBuffer.rcvMessage->state != 0)  {
2862                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP, on RMR state = %d ( %s)",
2863                              rmrMessageBuffer.rcvMessage->state, translateRmrErrorMessages(rmrMessageBuffer.rcvMessage->state).c_str());
2864             } else if (loglevel >= MDCLOG_DEBUG && ++counter % 100 == 0) {
2865                 mdclog_write(MDCLOG_DEBUG, "Got %d RIC_HEALTH_CHECK_REQ Request send : OK", counter);
2866             }
2867
2868             break;
2869         }
2870
2871         default:
2872             mdclog_write(MDCLOG_WARN, "Message Type : %d is not supported", rmrMessageBuffer.rcvMessage->mtype);
2873             message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
2874             message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
2875             message.message.time.tv_nsec = ts.tv_nsec;
2876             message.message.time.tv_sec = ts.tv_sec;
2877             message.message.messageType = rmrMessageBuffer.rcvMessage->mtype;
2878
2879             buildJsonMessage(message);
2880
2881
2882             return -7;
2883     }
2884     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2885         mdclog_write(MDCLOG_DEBUG, "EXIT OK from %s", __FUNCTION__);
2886     }
2887     return 0;
2888 }
2889
2890 /**
2891  * Send message to the CU that is not expecting for successful or unsuccessful results
2892  * @param messageBuffer
2893  * @param message
2894  * @param failedMsgId
2895  * @param sctpMap
2896  * @return
2897  */
2898 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
2899                            ReportingMessages_t &message,
2900                            int failedMsgId,
2901                            Sctp_Map_t *sctpMap) {
2902     if (mdclog_level_get() >= MDCLOG_DEBUG) {
2903         mdclog_write(MDCLOG_DEBUG, "send message: %d to %s address", message.message.messageType, message.message.enodbName);
2904     }
2905
2906     getRequestMetaData(message, messageBuffer);
2907     if (mdclog_level_get() >= MDCLOG_INFO) {
2908         mdclog_write(MDCLOG_INFO, "send message to %s address", message.message.enodbName);
2909     }
2910
2911     auto rc = sendMessagetoCu(sctpMap, messageBuffer, message, failedMsgId);
2912     return rc;
2913 }
2914
2915 /**
2916  *
2917  * @param sctpMap
2918  * @param messageBuffer
2919  * @param message
2920  * @param failedMesgId
2921  * @return
2922  */
2923 int sendMessagetoCu(Sctp_Map_t *sctpMap,
2924                     RmrMessagesBuffer_t &messageBuffer,
2925                     ReportingMessages_t &message,
2926                     int failedMesgId) {
2927     // get the FD
2928     message.message.messageType = messageBuffer.rcvMessage->mtype;
2929     auto rc = sendSctpMsg(message.peerInfo, message, sctpMap);
2930     return rc;
2931 }
2932
2933
2934 /**
2935  *
2936  * @param epoll_fd
2937  * @param peerInfo
2938  * @param events
2939  * @param sctpMap
2940  * @param enodbName
2941  * @param msgType
2942  * @return
2943  */
2944 int addToEpoll(int epoll_fd,
2945                ConnectedCU_t *peerInfo,
2946                uint32_t events,
2947                Sctp_Map_t *sctpMap,
2948                char *enodbName,
2949                int msgType) {
2950     // Add to Epol
2951     struct epoll_event event{};
2952     event.data.ptr = peerInfo;
2953     event.events = events;
2954     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) {
2955 #if !(defined(UNIT_TEST) || defined(MODULE_TEST)) 
2956         if (mdclog_level_get() >= MDCLOG_DEBUG) {
2957             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here), %s, %s %d",
2958                          strerror(errno), __func__, __LINE__);
2959         }
2960         close(peerInfo->fileDescriptor);
2961         if (enodbName != nullptr) {
2962             cleanHashEntry(peerInfo, sctpMap);
2963             char key[MAX_ENODB_NAME_SIZE * 2];
2964             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
2965             if (mdclog_level_get() >= MDCLOG_DEBUG) {
2966                 mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
2967             }
2968             auto tmp = sctpMap->find(key);
2969             if (tmp) {
2970                 free(tmp);
2971                 sctpMap->erase(key);
2972             }
2973         } else {
2974             peerInfo->enodbName[0] = 0;
2975         }
2976         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)");
2977         return -1;
2978 #endif
2979     }
2980     return 0;
2981 }
2982
2983 /**
2984  *
2985  * @param epoll_fd
2986  * @param peerInfo
2987  * @param events
2988  * @param sctpMap
2989  * @param enodbName
2990  * @param msgType
2991  * @return
2992  */
2993 int modifyToEpoll(int epoll_fd,
2994                   ConnectedCU_t *peerInfo,
2995                   uint32_t events,
2996                   Sctp_Map_t *sctpMap,
2997                   char *enodbName,
2998                   int msgType) {
2999     // Add to Epol
3000     struct epoll_event event{};
3001     event.data.ptr = peerInfo;
3002     event.events = events;
3003     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peerInfo->fileDescriptor, &event) < 0) {
3004         if (mdclog_level_get() >= MDCLOG_DEBUG) {
3005             mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_MOD (may check not to quit here), %s, %s %d",
3006                          strerror(errno), __func__, __LINE__);
3007         }
3008         close(peerInfo->fileDescriptor);
3009         cleanHashEntry(peerInfo, sctpMap);
3010         char key[MAX_ENODB_NAME_SIZE * 2];
3011         snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", enodbName, msgType);
3012         if (mdclog_level_get() >= MDCLOG_DEBUG) {
3013             mdclog_write(MDCLOG_DEBUG, "remove key = %s from %s at line %d", key, __FUNCTION__, __LINE__);
3014         }
3015         auto tmp = sctpMap->find(key);
3016         if (tmp) {
3017             free(tmp);
3018         }
3019         sctpMap->erase(key);
3020         mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)");
3021         return -1;
3022     }
3023     return 0;
3024 }
3025
3026
3027 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message) {
3028     buildJsonMessage(message);
3029 #ifndef UNIT_TEST
3030     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
3031 #else
3032     rmrMessageBuffer.sendMessage->state = RMR_ERR_RETRY;
3033 #endif
3034     if (rmrMessageBuffer.sendMessage == nullptr) {
3035         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
3036         mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
3037         return -1;
3038     }
3039
3040     if (rmrMessageBuffer.sendMessage->state != 0) {
3041         char meid[RMR_MAX_MEID]{};
3042         if (rmrMessageBuffer.sendMessage->state == RMR_ERR_RETRY) {
3043             usleep(5);
3044             rmrMessageBuffer.sendMessage->state = 0;
3045             mdclog_write(MDCLOG_INFO, "RETRY sending Message type %d to Xapp from %s",
3046                          rmrMessageBuffer.sendMessage->mtype,
3047                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
3048 #ifndef UNIT_TEST
3049             rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
3050 #endif
3051             if (rmrMessageBuffer.sendMessage == nullptr) {
3052                 mdclog_write(MDCLOG_ERR, "RMR failed send message returned with NULL pointer");
3053                 rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
3054                 return -1;
3055             } else if (rmrMessageBuffer.sendMessage->state != 0) {
3056                 mdclog_write(MDCLOG_ERR,
3057                              "Message state %s while sending request %d to Xapp from %s after retry of 10 microseconds",
3058                              translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
3059                              rmrMessageBuffer.sendMessage->mtype,
3060                              rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
3061                 auto rc = rmrMessageBuffer.sendMessage->state;
3062                 return rc;
3063             }
3064         } else {
3065             mdclog_write(MDCLOG_ERR, "Message state %s while sending request %d to Xapp from %s",
3066                          translateRmrErrorMessages(rmrMessageBuffer.sendMessage->state).c_str(),
3067                          rmrMessageBuffer.sendMessage->mtype,
3068                          rmr_get_meid(rmrMessageBuffer.sendMessage, (unsigned char *)meid));
3069             return rmrMessageBuffer.sendMessage->state;
3070         }
3071     }
3072     return 0;
3073 }
3074
3075 void buildJsonMessage(ReportingMessages_t &message) {
3076 #ifdef UNIT_TEST
3077     jsonTrace = true;
3078 #endif
3079     if (jsonTrace) {
3080         message.outLen = sizeof(message.base64Data);
3081         base64::encode((const unsigned char *) message.message.asndata,
3082                        (const int) message.message.asnLength,
3083                        message.base64Data,
3084                        message.outLen);
3085         if (mdclog_level_get() >= MDCLOG_DEBUG) {
3086             mdclog_write(MDCLOG_DEBUG, "Tracing: ASN length = %d, base64 message length = %d ",
3087                          (int) message.message.asnLength,
3088                          (int) message.outLen);
3089         }
3090
3091         snprintf(message.buffer, sizeof(message.buffer),
3092                  "{\"header\": {\"ts\": \"%ld.%09ld\","
3093                  "\"ranName\": \"%s\","
3094                  "\"messageType\": %d,"
3095                  "\"direction\": \"%c\"},"
3096                  "\"base64Length\": %d,"
3097                  "\"asnBase64\": \"%s\"}",
3098                  message.message.time.tv_sec,
3099                  message.message.time.tv_nsec,
3100                  message.message.enodbName,
3101                  message.message.messageType,
3102                  message.message.direction,
3103                  (int) message.outLen,
3104                  message.base64Data);
3105         static src::logger_mt &lg = my_logger::get();
3106
3107         BOOST_LOG(lg) << message.buffer;
3108     }
3109 }
3110
3111
3112 /**
3113  * take RMR error code to string
3114  * @param state
3115  * @return
3116  */
3117 string translateRmrErrorMessages(int state) {
3118     string str = {};
3119     switch (state) {
3120         case RMR_OK:
3121             str = "RMR_OK - state is good";
3122             break;
3123         case RMR_ERR_BADARG:
3124             str = "RMR_ERR_BADARG - argument passed to function was unusable";
3125             break;
3126         case RMR_ERR_NOENDPT:
3127             str = "RMR_ERR_NOENDPT - send//call could not find an endpoint based on msg type";
3128             break;
3129         case RMR_ERR_EMPTY:
3130             str = "RMR_ERR_EMPTY - msg received had no payload; attempt to send an empty message";
3131             break;
3132         case RMR_ERR_NOHDR:
3133             str = "RMR_ERR_NOHDR - message didn't contain a valid header";
3134             break;
3135         case RMR_ERR_SENDFAILED:
3136             str = "RMR_ERR_SENDFAILED - send failed; errno has nano reason";
3137             break;
3138         case RMR_ERR_CALLFAILED:
3139             str = "RMR_ERR_CALLFAILED - unable to send call() message";
3140             break;
3141         case RMR_ERR_NOWHOPEN:
3142             str = "RMR_ERR_NOWHOPEN - no wormholes are open";
3143             break;
3144         case RMR_ERR_WHID:
3145             str = "RMR_ERR_WHID - wormhole id was invalid";
3146             break;
3147         case RMR_ERR_OVERFLOW:
3148             str = "RMR_ERR_OVERFLOW - operation would have busted through a buffer/field size";
3149             break;
3150         case RMR_ERR_RETRY:
3151             str = "RMR_ERR_RETRY - request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)";
3152             break;
3153         case RMR_ERR_RCVFAILED:
3154             str = "RMR_ERR_RCVFAILED - receive failed (hard error)";
3155             break;
3156         case RMR_ERR_TIMEOUT:
3157             str = "RMR_ERR_TIMEOUT - message processing call timed out";
3158             break;
3159         case RMR_ERR_UNSET:
3160             str = "RMR_ERR_UNSET - the message hasn't been populated with a transport buffer";
3161             break;
3162         case RMR_ERR_TRUNC:
3163             str = "RMR_ERR_TRUNC - received message likely truncated";
3164             break;
3165         case RMR_ERR_INITFAILED:
3166             str = "RMR_ERR_INITFAILED - initialisation of something (probably message) failed";
3167             break;
3168         case RMR_ERR_NOTSUPP:
3169             str = "RMR_ERR_NOTSUPP - the request is not supported, or RMr was not initialised for the request";
3170             break;
3171         default:
3172             char buf[128]{};
3173             snprintf(buf, sizeof buf, "UNDOCUMENTED RMR_ERR : %d", state);
3174             str = buf;
3175             break;
3176     }
3177     return str;
3178 }