0bcff53495f1aaf733351dcb54c0de81cdc37742
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.h
1 /*
2  * Copyright 2019 AT&T Intellectual Property
3  * Copyright 2019 Nokia
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 #ifndef X2_SCTP_THREAD_H
24 #define X2_SCTP_THREAD_H
25
26 #include <algorithm>
27
28 #include <cstdio>
29 #include <cerrno>
30 #include <cstdlib>
31 #include <cstring>
32 #include <random>
33 #include <sys/socket.h>
34 #include <arpa/inet.h>
35 #include <netinet/in_systm.h>
36 #include <netinet/in.h>
37 #include <netinet/ip.h>
38 #include <netinet/ip_icmp.h>
39 #include <netinet/sctp.h>
40 #include <thread>
41 #include <atomic>
42 #include <sys/param.h>
43 #include <sys/file.h>
44 #include <ctime>
45 #include <netdb.h>
46 #include <sys/epoll.h>
47 #include <mutex>
48 #include <shared_mutex>
49 #include <iterator>
50 #include <map>
51 #include <sys/inotify.h>
52 #include <csignal>
53 #include <future>
54
55 #include <rmr/rmr.h>
56 #include <rmr/RIC_message_types.h>
57 #include <mdclog/mdclog.h>
58 #include <functional>
59 #include <iostream>
60
61 #include <boost/algorithm/string/predicate.hpp>
62 #include <boost/lexical_cast.hpp>
63 #include <boost/move/utility.hpp>
64 #include <boost/log/sources/logger.hpp>
65 #include <boost/log/sources/record_ostream.hpp>
66 #include <boost/log/sources/global_logger_storage.hpp>
67 #include <boost/log/utility/setup/file.hpp>
68 #include <boost/log/utility/setup/common_attributes.hpp>
69 #include <boost/filesystem.hpp>
70
71 #include <mdclog/mdclog.h>
72
73 #include "oranE2/E2AP-PDU.h"
74 #include "oranE2/ProtocolIE-Container.h"
75 #include "oranE2/InitiatingMessage.h"
76 #include "oranE2/SuccessfulOutcome.h"
77 #include "oranE2/UnsuccessfulOutcome.h"
78 #include "oranE2/ProtocolIE-Container.h"
79 #include "oranE2/ProtocolIE-Field.h"
80 #include "oranE2/GlobalE2node-gNB-ID.h"
81 #include "oranE2/GlobalE2node-en-gNB-ID.h"
82 #include "oranE2/GlobalE2node-ng-eNB-ID.h"
83 #include "oranE2/GlobalE2node-eNB-ID.h"
84
85 #include "cxxopts.hpp"
86 //#include "config-cpp/include/config-cpp/config-cpp.h"
87 #include <zlib.h>
88 #include <prometheus/counter.h>
89 #include <prometheus/exposer.h>
90 #include <prometheus/gateway.h>
91 #include <prometheus/registry.h>
92
93 using namespace prometheus;
94
95 #include "mapWrapper.h"
96
97 #include "base64.h"
98
99 #include "ReadConfigFile.h"
100
101 using namespace std;
102 namespace logging = boost::log;
103 namespace src = boost::log::sources;
104 namespace keywords = boost::log::keywords;
105 namespace sinks = boost::log::sinks;
106 namespace posix_time = boost::posix_time;
107 namespace expr = boost::log::expressions;
108
109 #define SRC_PORT 36422
110 #define SA      struct sockaddr
111 #define MAX_ENODB_NAME_SIZE 64
112
113 #define MAXEVENTS 128
114
115 #define RECEIVE_SCTP_BUFFER_SIZE (256 * 1024)
116 #define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE
117
118 typedef mapWrapper Sctp_Map_t;
119
120
121
122 #define VOLUME_URL_SIZE 256
123 #define KA_MESSAGE_SIZE 2048
124
125 typedef struct sctp_params {
126     int      epollTimeOut = -1;
127     uint16_t rmrPort = 0;
128     uint16_t sctpPort = SRC_PORT;
129     int      epoll_fd = 0;
130     int      listenFD = 0;
131     int      rmrListenFd = 0;
132     int      inotifyFD = 0;
133     int      inotifyWD = 0;
134     void     *rmrCtx = nullptr;
135     Sctp_Map_t *sctpMap = nullptr;
136     char      ka_message[KA_MESSAGE_SIZE] {};
137     int       ka_message_length = 0;
138     char       rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566
139     mdclog_severity_t logLevel = MDCLOG_INFO;
140     char volume[VOLUME_URL_SIZE];
141     string myIP {};
142     string fqdn {};
143     string podName {};
144     string configFilePath {};
145     string configFileName {};
146     bool trace = true;
147     shared_ptr<prometheus::Registry> prometheusRegistry;
148     string prometheusPort {"8088"};
149     Family<Counter> *prometheusFamily;
150     Exposer *prometheusExposer = nullptr;
151     Counter *e2tCounters[6][2][ProcedureCode_id_RICsubscriptionDelete + 1] {};
152 } sctp_params_t;
153
154 // RAN to RIC
155 #define IN_INITI 0 //INITIATING
156 #define IN_SUCC 1 //SUCCESSFUL
157 #define IN_UN_SUCC 2 //UN-Successful
158
159 // RIC To RAN
160 #define OUT_INITI 3 //INITIATING
161 #define OUT_SUCC 4 //SUCCESSFUL
162 #define OUT_UN_SUCC 5 //UN-Successful
163
164 #define MSG_COUNTER 0
165 #define BYTES_COUNTER 1
166
167 typedef struct ConnectedCU {
168     int fileDescriptor = 0;
169     char hostName[NI_MAXHOST] {};
170     char portNumber[NI_MAXSERV] {};
171     char enodbName[MAX_ENODB_NAME_SIZE] {};
172     char asnData[RECEIVE_SCTP_BUFFER_SIZE] {};
173     size_t asnLength = 0;
174     int mtype = 0;
175     bool isConnected = false;
176     bool gotSetup = false;
177     sctp_params_t *sctpParams = nullptr;
178     Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDelete + 1] {};
179 } ConnectedCU_t ;
180
181
182 #define MAX_RMR_BUFF_ARRAY 32
183 typedef struct RmrMessagesBuffer {
184     char ka_message[KA_MESSAGE_SIZE] {};
185     int  ka_message_len = 0;
186     void *rmrCtx = nullptr;
187     rmr_mbuf_t *sendMessage= nullptr;
188     //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
189     rmr_mbuf_t *rcvMessage= nullptr;
190     //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
191 } RmrMessagesBuffer_t;
192
193 typedef struct formatedMessage {
194     char enodbName[MAX_ENODB_NAME_SIZE];
195     struct timespec time;
196     int messageType;
197     char direction;
198     ssize_t asnLength;
199     unsigned char *asndata;
200 } FormatedMessage_t;
201
202 typedef struct ReportingMessages {
203     FormatedMessage_t message {};
204     ConnectedCU_t *peerInfo = nullptr;
205     long outLen = 0;
206     unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {};
207     char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {};
208 } ReportingMessages_t;
209
210 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams);
211
212 int buildInotify(sctp_params_t &sctpParams);
213
214 void handleTermInit(sctp_params_t &sctpParams);
215
216 void handleConfigChange(sctp_params_t *sctpParams);
217
218 void listener(sctp_params_t *params);
219
220 void sendTermInit(sctp_params_t &sctpParams);
221
222 int setSocketNoBlocking(int socket);
223
224 void handleEinprogressMessages(struct epoll_event &event,
225                                ReportingMessages_t &message,
226                                RmrMessagesBuffer_t &rmrMessageBuffer,
227                                sctp_params_t *params);
228
229 void handlepoll_error(struct epoll_event &event,
230                       ReportingMessages_t &message,
231                       RmrMessagesBuffer_t &rmrMessageBuffer,
232                       sctp_params_t *params);
233
234
235 void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m);
236
237
238 /**
239  *
240  * @param message
241  * @param rmrMessageBuffer
242  */
243 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer);
244
245 /**
246  *
247  * @param sctpMap
248  * @param messageBuffer
249  * @param message
250  * @param failedMesgId
251  * @return
252  */
253 int sendMessagetoCu(Sctp_Map_t *sctpMap,
254                     RmrMessagesBuffer_t &messageBuffer,
255                     ReportingMessages_t &message,
256                     int failedMesgId);
257
258 void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer,
259                                     ReportingMessages_t &message,
260                                     int failedMesgId);
261
262 int sendRequestToXapp(ReportingMessages_t &message,
263                       int requestId,
264                       RmrMessagesBuffer_t &rmrMmessageBuffer);
265
266 /**
267  *
268  * @param message
269  * @param msgType
270  * @param requestType
271  * @param rmrMessageBuffer
272  * @param sctpMap
273  * @return
274  */
275 /*
276 int sendResponseToXapp(ReportingMessages_t &message,
277                        int msgType,
278                        int requestType,
279                        RmrMessagesBuffer_t &rmrMessageBuffer,
280                        Sctp_Map_t *sctpMap);
281 */
282
283 /**
284  *
285  * @param peerInfo
286  * @param message
287  * @param m
288  * @return
289  */
290 int sendSctpMsg(ConnectedCU_t *peerInfo,
291                 ReportingMessages_t &message,
292                 Sctp_Map_t *m);
293
294 /**
295  *
296  * @param events
297  * @param sctpMap
298  * @param numOfMessages
299  * @param rmrMessageBuffer
300  * @param ts
301  * @return
302  */
303 int receiveDataFromSctp(struct epoll_event *events,
304                         Sctp_Map_t *sctpMap,
305                         int &numOfMessages,
306                         RmrMessagesBuffer_t &rmrMessageBuffer,
307                         struct timespec &ts);
308
309 /**
310  *
311  * @param rmrAddress
312  * @return
313  */
314 void getRmrContext(sctp_params_t &pSctpParams);
315
316 /**
317  *
318  * @param sctpMap
319  * @param rmrMessageBuffer
320  * @param ts
321  * @return
322  */
323 int receiveXappMessages(Sctp_Map_t *sctpMap,
324                         RmrMessagesBuffer_t &rmrMessageBuffer,
325                         struct timespec &ts);
326
327 /**
328  *
329  * @param messageBuffer
330  * @param failedMsgId
331  * @param sctpMap
332  * @return
333  */
334 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
335                            ReportingMessages_t &message,
336                            int failedMsgId,
337                            Sctp_Map_t *sctpMap);
338 /**
339  *
340  * @param pdu
341  * @param message
342  * @param rmrMessageBuffer
343  */
344 void asnInitiatingRequest(E2AP_PDU_t *pdu,
345                           Sctp_Map_t *sctpMap,
346                           ReportingMessages_t &message,
347                           RmrMessagesBuffer_t &rmrMessageBuffer);
348 /**
349  *
350  * @param pdu
351  * @param message
352  * @param sctpMap
353  * @param rmrMessageBuffer
354  */
355 void asnSuccessfulMsg(E2AP_PDU_t *pdu,
356                       Sctp_Map_t *sctpMap,
357                       ReportingMessages_t &message,
358                       RmrMessagesBuffer_t &rmrMessageBuffer);
359 /**
360  *
361  * @param pdu
362  * @param message
363  * @param sctpMap
364  * @param rmrMessageBuffer
365  */
366 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
367                         Sctp_Map_t *sctpMap,
368                         ReportingMessages_t &message,
369                         RmrMessagesBuffer_t &rmrMessageBuffer);
370
371 /**
372  *
373  * @param rmrMessageBuffer
374  * @param message
375  * @return
376  */
377 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message);
378 /**
379  *
380  * @param epoll_fd
381  * @param peerInfo
382  * @param events
383  * @param sctpMap
384  * @param enodbName
385  * @param msgType
386  * @returnsrc::logger_mt& lg = my_logger::get();
387  */
388 int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
389 /**
390  *
391  * @param epoll_fd
392  * @param peerInfo
393  * @param events
394  * @param sctpMap
395  * @param enodbName
396  * @param msgType
397  * @return
398  */
399 int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
400
401 /**
402  *
403  * @param message
404  */
405 void buildJsonMessage(ReportingMessages_t &message);
406
407 /**
408  *
409  *
410  * @param state
411  * @return
412  */
413 string translateRmrErrorMessages(int state);
414
415 int buildConfiguration(sctp_params_t &sctpParams);
416 void startPrometheus(sctp_params_t &sctpParams);
417 static int enable_log_change_notify(const char* fileName);
418 static int register_log_change_notify(const char *fileName);
419 static void * monitor_loglevel_change_handler(void* arg);
420 void  update_mdc_log_level_severity(char* log_level);
421 static char* parse_file(char* filename);
422
423
424 static inline uint64_t rdtscp(uint32_t &aux) {
425     uint64_t rax,rdx;
426     asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :);
427     return (rdx << (unsigned)32) + rax;
428 }
429 #ifndef RIC_SCTP_CONNECTION_FAILURE
430 #define RIC_SCTP_CONNECTION_FAILURE  10080
431 #endif
432
433 #ifdef UNIT_TEST
434     #define FILE_DESCRIPTOR 53424 /*Dummy value for file descriptor only when UT is defined*/
435 #endif
436
437 int buildListeningPort(sctp_params_t &sctpParams);
438 void buildE2TPrometheusCounters(sctp_params_t &sctpParams);
439
440 #endif //X2_SCTP_THREAD_H