6e22f7bf32b5c8d68675d4904c0856fdd93dff2f
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.h
1 /*
2  * Copyright 2019 AT&T Intellectual Property
3  * Copyright 2019 Nokia
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 #ifndef X2_SCTP_THREAD_H
24 #define X2_SCTP_THREAD_H
25
26 #include <algorithm>
27
28 #include <cstdio>
29 #include <cerrno>
30 #include <cstdlib>
31 #include <cstring>
32 #include <random>
33 #include <sys/socket.h>
34 #include <arpa/inet.h>
35 #include <netinet/in_systm.h>
36 #include <netinet/in.h>
37 #include <netinet/ip.h>
38 #include <netinet/ip_icmp.h>
39 #include <netinet/sctp.h>
40 #include <thread>
41 #include <atomic>
42 #include <sys/param.h>
43 #include <sys/file.h>
44 #include <ctime>
45 #include <netdb.h>
46 #include <sys/epoll.h>
47 #include <mutex>
48 #include <shared_mutex>
49 #include <iterator>
50 #include <map>
51 #include <sys/inotify.h>
52 #include <csignal>
53 #include <future>
54
55 #include <rmr/rmr.h>
56 #include <rmr/RIC_message_types.h>
57 #include <mdclog/mdclog.h>
58 #include <functional>
59 #include <iostream>
60
61 #include <boost/algorithm/string/predicate.hpp>
62 #include <boost/lexical_cast.hpp>
63 #include <boost/move/utility.hpp>
64 #include <boost/log/sources/logger.hpp>
65 #include <boost/log/sources/record_ostream.hpp>
66 #include <boost/log/sources/global_logger_storage.hpp>
67 #include <boost/log/utility/setup/file.hpp>
68 #include <boost/log/utility/setup/common_attributes.hpp>
69 #include <boost/filesystem.hpp>
70
71 #include <mdclog/mdclog.h>
72
73 #include "oranE2/E2AP-PDU.h"
74 #include "oranE2/ProtocolIE-Container.h"
75 #include "oranE2/InitiatingMessage.h"
76 #include "oranE2/SuccessfulOutcome.h"
77 #include "oranE2/UnsuccessfulOutcome.h"
78 #include "oranE2/ProtocolIE-Container.h"
79 #include "oranE2/ProtocolIE-Field.h"
80 #include "oranE2/GlobalE2node-gNB-ID.h"
81 #include "oranE2/GlobalE2node-en-gNB-ID.h"
82 #include "oranE2/GlobalE2node-ng-eNB-ID.h"
83 #include "oranE2/GlobalE2node-eNB-ID.h"
84
85 #include "cxxopts.hpp"
86 //#include "config-cpp/include/config-cpp/config-cpp.h"
87 #include <zlib.h>
88 #include <prometheus/counter.h>
89 #include <prometheus/exposer.h>
90 #include <prometheus/gateway.h>
91 #include <prometheus/registry.h>
92
93 using namespace prometheus;
94
95 #include "mapWrapper.h"
96
97 #include "base64.h"
98
99 #include "ReadConfigFile.h"
100
101 using namespace std;
102 namespace logging = boost::log;
103 namespace src = boost::log::sources;
104 namespace keywords = boost::log::keywords;
105 namespace sinks = boost::log::sinks;
106 namespace posix_time = boost::posix_time;
107 namespace expr = boost::log::expressions;
108
109 #define SRC_PORT 36422
110 #define SA      struct sockaddr
111 #define MAX_ENODB_NAME_SIZE 64
112
113 #define MAXEVENTS 128
114
115 #define RECEIVE_SCTP_BUFFER_SIZE (256 * 1024)
116 #define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE
117
118 typedef mapWrapper Sctp_Map_t;
119
120
121
122 #define VOLUME_URL_SIZE 256
123 #define KA_MESSAGE_SIZE 2048
124
125 typedef struct sctp_params {
126     int      epollTimeOut = -1;
127     uint16_t rmrPort = 0;
128     uint16_t sctpPort = SRC_PORT;
129     int      epoll_fd = 0;
130     int      listenFD = 0;
131     int      rmrListenFd = 0;
132     int      inotifyFD = 0;
133     int      inotifyWD = 0;
134     void     *rmrCtx = nullptr;
135     Sctp_Map_t *sctpMap = nullptr;
136     char      ka_message[KA_MESSAGE_SIZE] {};
137     int       ka_message_length = 0;
138     char       rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566
139     mdclog_severity_t logLevel = MDCLOG_INFO;
140     char volume[VOLUME_URL_SIZE];
141     string myIP {};
142     string fqdn {};
143     string podName {};
144     string configFilePath {};
145     string configFileName {};
146     bool trace = true;
147     shared_ptr<prometheus::Registry> prometheusRegistry;
148     string prometheusPort {"8088"};
149     Family<Counter> *prometheusFamily;
150     Exposer *prometheusExposer = nullptr;
151 } sctp_params_t;
152
153 // RAN to RIC
154 #define IN_INITI 0 //INITIATING
155 #define IN_SUCC 1 //SUCCESSFUL
156 #define IN_UN_SUCC 2 //UN-Successful
157
158 // RIC To RAN
159 #define OUT_INITI 3 //INITIATING
160 #define OUT_SUCC 4 //SUCCESSFUL
161 #define OUT_UN_SUCC 5 //UN-Successful
162
163 #define MSG_COUNTER 0
164 #define BYTES_COUNTER 1
165
166 typedef struct ConnectedCU {
167     int fileDescriptor = 0;
168     char hostName[NI_MAXHOST] {};
169     char portNumber[NI_MAXSERV] {};
170     char enodbName[MAX_ENODB_NAME_SIZE] {};
171     char asnData[RECEIVE_SCTP_BUFFER_SIZE] {};
172     size_t asnLength = 0;
173     int mtype = 0;
174     bool isConnected = false;
175     bool gotSetup = false;
176     sctp_params_t *sctpParams = nullptr;
177     Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDelete + 1] {};
178 } ConnectedCU_t ;
179
180
181 #define MAX_RMR_BUFF_ARRAY 32
182 typedef struct RmrMessagesBuffer {
183     char ka_message[KA_MESSAGE_SIZE] {};
184     int  ka_message_len = 0;
185     void *rmrCtx = nullptr;
186     rmr_mbuf_t *sendMessage= nullptr;
187     //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
188     rmr_mbuf_t *rcvMessage= nullptr;
189     //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
190 } RmrMessagesBuffer_t;
191
192 typedef struct formatedMessage {
193     char enodbName[MAX_ENODB_NAME_SIZE];
194     struct timespec time;
195     int messageType;
196     char direction;
197     ssize_t asnLength;
198     unsigned char *asndata;
199 } FormatedMessage_t;
200
201 typedef struct ReportingMessages {
202     FormatedMessage_t message {};
203     ConnectedCU_t *peerInfo = nullptr;
204     long outLen = 0;
205     unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {};
206     char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {};
207 } ReportingMessages_t;
208
209 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams);
210
211 int buildInotify(sctp_params_t &sctpParams);
212
213 void handleTermInit(sctp_params_t &sctpParams);
214
215 void handleConfigChange(sctp_params_t *sctpParams);
216
217 void listener(sctp_params_t *params);
218
219 void sendTermInit(sctp_params_t &sctpParams);
220
221 int setSocketNoBlocking(int socket);
222
223 void handleEinprogressMessages(struct epoll_event &event,
224                                ReportingMessages_t &message,
225                                RmrMessagesBuffer_t &rmrMessageBuffer,
226                                sctp_params_t *params);
227
228 void handlepoll_error(struct epoll_event &event,
229                       ReportingMessages_t &message,
230                       RmrMessagesBuffer_t &rmrMessageBuffer,
231                       sctp_params_t *params);
232
233
234 void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m);
235
236
237 /**
238  *
239  * @param message
240  * @param rmrMessageBuffer
241  */
242 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer);
243
244 /**
245  *
246  * @param sctpMap
247  * @param messageBuffer
248  * @param message
249  * @param failedMesgId
250  * @return
251  */
252 int sendMessagetoCu(Sctp_Map_t *sctpMap,
253                     RmrMessagesBuffer_t &messageBuffer,
254                     ReportingMessages_t &message,
255                     int failedMesgId);
256
257 void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer,
258                                     ReportingMessages_t &message,
259                                     int failedMesgId);
260
261 int sendRequestToXapp(ReportingMessages_t &message,
262                       int requestId,
263                       RmrMessagesBuffer_t &rmrMmessageBuffer);
264
265 /**
266  *
267  * @param message
268  * @param msgType
269  * @param requestType
270  * @param rmrMessageBuffer
271  * @param sctpMap
272  * @return
273  */
274 /*
275 int sendResponseToXapp(ReportingMessages_t &message,
276                        int msgType,
277                        int requestType,
278                        RmrMessagesBuffer_t &rmrMessageBuffer,
279                        Sctp_Map_t *sctpMap);
280 */
281
282 /**
283  *
284  * @param peerInfo
285  * @param message
286  * @param m
287  * @return
288  */
289 int sendSctpMsg(ConnectedCU_t *peerInfo,
290                 ReportingMessages_t &message,
291                 Sctp_Map_t *m);
292
293 /**
294  *
295  * @param events
296  * @param sctpMap
297  * @param numOfMessages
298  * @param rmrMessageBuffer
299  * @param ts
300  * @return
301  */
302 int receiveDataFromSctp(struct epoll_event *events,
303                         Sctp_Map_t *sctpMap,
304                         int &numOfMessages,
305                         RmrMessagesBuffer_t &rmrMessageBuffer,
306                         struct timespec &ts);
307
308 /**
309  *
310  * @param rmrAddress
311  * @return
312  */
313 void getRmrContext(sctp_params_t &pSctpParams);
314
315 /**
316  *
317  * @param sctpMap
318  * @param rmrMessageBuffer
319  * @param ts
320  * @return
321  */
322 int receiveXappMessages(Sctp_Map_t *sctpMap,
323                         RmrMessagesBuffer_t &rmrMessageBuffer,
324                         struct timespec &ts);
325
326 /**
327  *
328  * @param messageBuffer
329  * @param failedMsgId
330  * @param sctpMap
331  * @return
332  */
333 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
334                            ReportingMessages_t &message,
335                            int failedMsgId,
336                            Sctp_Map_t *sctpMap);
337 /**
338  *
339  * @param pdu
340  * @param message
341  * @param rmrMessageBuffer
342  */
343 void asnInitiatingRequest(E2AP_PDU_t *pdu,
344                           Sctp_Map_t *sctpMap,
345                           ReportingMessages_t &message,
346                           RmrMessagesBuffer_t &rmrMessageBuffer);
347 /**
348  *
349  * @param pdu
350  * @param message
351  * @param sctpMap
352  * @param rmrMessageBuffer
353  */
354 void asnSuccessfulMsg(E2AP_PDU_t *pdu,
355                       Sctp_Map_t *sctpMap,
356                       ReportingMessages_t &message,
357                       RmrMessagesBuffer_t &rmrMessageBuffer);
358 /**
359  *
360  * @param pdu
361  * @param message
362  * @param sctpMap
363  * @param rmrMessageBuffer
364  */
365 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
366                         Sctp_Map_t *sctpMap,
367                         ReportingMessages_t &message,
368                         RmrMessagesBuffer_t &rmrMessageBuffer);
369
370 /**
371  *
372  * @param rmrMessageBuffer
373  * @param message
374  * @return
375  */
376 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message);
377 /**
378  *
379  * @param epoll_fd
380  * @param peerInfo
381  * @param events
382  * @param sctpMap
383  * @param enodbName
384  * @param msgType
385  * @returnsrc::logger_mt& lg = my_logger::get();
386  */
387 int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
388 /**
389  *
390  * @param epoll_fd
391  * @param peerInfo
392  * @param events
393  * @param sctpMap
394  * @param enodbName
395  * @param msgType
396  * @return
397  */
398 int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
399
400 /**
401  *
402  * @param message
403  */
404 void buildJsonMessage(ReportingMessages_t &message);
405
406 /**
407  *
408  *
409  * @param state
410  * @return
411  */
412 string translateRmrErrorMessages(int state);
413
414
415 static inline uint64_t rdtscp(uint32_t &aux) {
416     uint64_t rax,rdx;
417     asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :);
418     return (rdx << (unsigned)32) + rax;
419 }
420 #ifndef RIC_SCTP_CONNECTION_FAILURE
421 #define RIC_SCTP_CONNECTION_FAILURE  10080
422 #endif
423
424 #endif //X2_SCTP_THREAD_H