E2 changes to fix the E2setup response issue
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.h
1 /*
2  * Copyright 2019 AT&T Intellectual Property
3  * Copyright 2019 Nokia
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 #ifndef X2_SCTP_THREAD_H
24 #define X2_SCTP_THREAD_H
25
26 #include <algorithm>
27
28 #include <cstdio>
29 #include <cerrno>
30 #include <cstdlib>
31 #include <cstring>
32 #include <random>
33 #include <sys/socket.h>
34 #include <arpa/inet.h>
35 #include <netinet/in_systm.h>
36 #include <netinet/in.h>
37 #include <netinet/ip.h>
38 #include <netinet/ip_icmp.h>
39 #include <netinet/sctp.h>
40 #include <thread>
41 #include <atomic>
42 #include <sys/param.h>
43 #include <sys/file.h>
44 #include <sys/types.h>
45 #include <ifaddrs.h>
46 #include <ctime>
47 #include <netdb.h>
48 #include <sys/epoll.h>
49 #include <mutex>
50 #include <shared_mutex>
51 #include <iterator>
52 #include <map>
53 #include <sys/inotify.h>
54 #include <csignal>
55 #include <future>
56
57 #include <rmr/rmr.h>
58 #include <rmr/RIC_message_types.h>
59 #include <mdclog/mdclog.h>
60 #include <functional>
61 #include <iostream>
62
63 #include <boost/algorithm/string/predicate.hpp>
64 #include <boost/lexical_cast.hpp>
65 #include <boost/move/utility.hpp>
66 #include <boost/log/sources/logger.hpp>
67 #include <boost/log/sources/record_ostream.hpp>
68 #include <boost/log/sources/global_logger_storage.hpp>
69 #include <boost/log/utility/setup/file.hpp>
70 #include <boost/log/utility/setup/common_attributes.hpp>
71 #include <boost/filesystem.hpp>
72
73 #include <mdclog/mdclog.h>
74
75 #include "oranE2/E2AP-PDU.h"
76 #include "oranE2/ProtocolIE-Container.h"
77 #include "oranE2/InitiatingMessage.h"
78 #include "oranE2/SuccessfulOutcome.h"
79 #include "oranE2/UnsuccessfulOutcome.h"
80 #include "oranE2/ProtocolIE-Container.h"
81 #include "oranE2/ProtocolIE-Field.h"
82 #include "oranE2/GlobalE2node-gNB-ID.h"
83 #include "oranE2/GlobalE2node-en-gNB-ID.h"
84 #include "oranE2/GlobalE2node-ng-eNB-ID.h"
85 #include "oranE2/GlobalE2node-eNB-ID.h"
86
87 #include "cxxopts.hpp"
88 //#include "config-cpp/include/config-cpp/config-cpp.h"
89 #include <zlib.h>
90 #include <prometheus/counter.h>
91 #include <prometheus/exposer.h>
92 #include <prometheus/gateway.h>
93 #include <prometheus/registry.h>
94
95 using namespace prometheus;
96
97 #include "mapWrapper.h"
98
99 #include "base64.h"
100
101 #include "ReadConfigFile.h"
102
103 using namespace std;
104 namespace logging = boost::log;
105 namespace src = boost::log::sources;
106 namespace keywords = boost::log::keywords;
107 namespace sinks = boost::log::sinks;
108 namespace posix_time = boost::posix_time;
109 namespace expr = boost::log::expressions;
110
111 #define SRC_PORT 36422
112 #define SA      struct sockaddr
113 #define MAX_ENODB_NAME_SIZE 64
114
115 #define MAXEVENTS 128
116
117 #define RECEIVE_SCTP_BUFFER_SIZE (256 * 1024)
118 #define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE
119
120 typedef mapWrapper Sctp_Map_t;
121
122
123
124 #define VOLUME_URL_SIZE 256
125 #define KA_MESSAGE_SIZE 2048
126
127 typedef struct sctp_params {
128     int      epollTimeOut = -1;
129     uint16_t rmrPort = 0;
130     uint16_t sctpPort = SRC_PORT;
131     int      epoll_fd = 0;
132     int      listenFD = 0;
133     int      rmrListenFd = 0;
134     int      inotifyFD = 0;
135     int      inotifyWD = 0;
136     void     *rmrCtx = nullptr;
137     Sctp_Map_t *sctpMap = nullptr;
138     char      ka_message[KA_MESSAGE_SIZE] {};
139     int       ka_message_length = 0;
140     char       rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566
141     mdclog_severity_t logLevel = MDCLOG_INFO;
142     char volume[VOLUME_URL_SIZE];
143     string myIP {};
144     string fqdn {};
145     string podName {};
146     string configFilePath {};
147     string configFileName {};
148     bool trace = true;
149     shared_ptr<prometheus::Registry> prometheusRegistry;
150     string prometheusPort {"8088"};
151     Family<Counter> *prometheusFamily;
152     Exposer *prometheusExposer = nullptr;
153     Counter *e2tCounters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {};
154 } sctp_params_t;
155
156 // RAN to RIC
157 #define IN_INITI 0 //INITIATING
158 #define IN_SUCC 1 //SUCCESSFUL
159 #define IN_UN_SUCC 2 //UN-Successful
160
161 // RIC To RAN
162 #define OUT_INITI 3 //INITIATING
163 #define OUT_SUCC 4 //SUCCESSFUL
164 #define OUT_UN_SUCC 5 //UN-Successful
165
166 #define MSG_COUNTER 0
167 #define BYTES_COUNTER 1
168
169 #define INVALID_STREAM_ID -1
170
171 typedef struct ConnectedCU {
172     int fileDescriptor = 0;
173     char hostName[NI_MAXHOST] {};
174     char portNumber[NI_MAXSERV] {};
175     char enodbName[MAX_ENODB_NAME_SIZE] {};
176     char asnData[RECEIVE_SCTP_BUFFER_SIZE] {};
177     size_t asnLength = 0;
178     int mtype = 0;
179     bool isConnected = false;
180     bool gotSetup = false;
181     sctp_params_t *sctpParams = nullptr;
182     Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {};
183     bool isSingleStream = false;
184     int singleStreamId = 0;
185 } ConnectedCU_t ;
186
187
188 #define MAX_RMR_BUFF_ARRAY 32
189 typedef struct RmrMessagesBuffer {
190     char ka_message[KA_MESSAGE_SIZE] {};
191     int  ka_message_len = 0;
192     void *rmrCtx = nullptr;
193     rmr_mbuf_t *sendMessage= nullptr;
194     //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
195     rmr_mbuf_t *rcvMessage= nullptr;
196     //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
197 } RmrMessagesBuffer_t;
198
199 typedef struct formatedMessage {
200     char enodbName[MAX_ENODB_NAME_SIZE];
201     struct timespec time;
202     int messageType;
203     char direction;
204     ssize_t asnLength;
205     unsigned char *asndata;
206 } FormatedMessage_t;
207
208 typedef struct ReportingMessages {
209     FormatedMessage_t message {};
210     ConnectedCU_t *peerInfo = nullptr;
211     long outLen = 0;
212     unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {};
213     char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {};
214 } ReportingMessages_t;
215
216 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams);
217
218 int buildInotify(sctp_params_t &sctpParams);
219
220 void handleTermInit(sctp_params_t &sctpParams);
221
222 void handleConfigChange(sctp_params_t *sctpParams);
223
224 void listener(sctp_params_t *params);
225
226 void sendTermInit(sctp_params_t &sctpParams);
227
228 int setSocketNoBlocking(int socket);
229
230 void handleEinprogressMessages(struct epoll_event &event,
231                                ReportingMessages_t &message,
232                                RmrMessagesBuffer_t &rmrMessageBuffer,
233                                sctp_params_t *params);
234
235 void handlepoll_error(struct epoll_event &event,
236                       ReportingMessages_t &message,
237                       RmrMessagesBuffer_t &rmrMessageBuffer,
238                       sctp_params_t *params);
239
240
241 void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m);
242
243
244 /**
245  *
246  * @param message
247  * @param rmrMessageBuffer
248  */
249 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer);
250
251 /**
252  *
253  * @param sctpMap
254  * @param messageBuffer
255  * @param message
256  * @param failedMesgId
257  * @return
258  */
259 int sendMessagetoCu(Sctp_Map_t *sctpMap,
260                     RmrMessagesBuffer_t &messageBuffer,
261                     ReportingMessages_t &message,
262                     int failedMesgId);
263
264 void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer,
265                                     ReportingMessages_t &message,
266                                     int failedMesgId);
267
268 int sendRequestToXapp(ReportingMessages_t &message,
269                       int requestId,
270                       RmrMessagesBuffer_t &rmrMmessageBuffer);
271
272 /**
273  *
274  * @param message
275  * @param msgType
276  * @param requestType
277  * @param rmrMessageBuffer
278  * @param sctpMap
279  * @return
280  */
281 /*
282 int sendResponseToXapp(ReportingMessages_t &message,
283                        int msgType,
284                        int requestType,
285                        RmrMessagesBuffer_t &rmrMessageBuffer,
286                        Sctp_Map_t *sctpMap);
287 */
288
289 /**
290  *
291  * @param peerInfo
292  * @param message
293  * @param m
294  * @return
295  */
296 int sendSctpMsg(ConnectedCU_t *peerInfo,
297                 ReportingMessages_t &message,
298                 Sctp_Map_t *m);
299
300 /**
301  *
302  * @param events
303  * @param sctpMap
304  * @param numOfMessages
305  * @param rmrMessageBuffer
306  * @param ts
307  * @return
308  */
309 int receiveDataFromSctp(struct epoll_event *events,
310                         Sctp_Map_t *sctpMap,
311                         int &numOfMessages,
312                         RmrMessagesBuffer_t &rmrMessageBuffer,
313                         struct timespec &ts);
314
315 /**
316  *
317  * @param rmrAddress
318  * @return
319  */
320 void getRmrContext(sctp_params_t &pSctpParams);
321
322 /**
323  *
324  * @param sctpMap
325  * @param rmrMessageBuffer
326  * @param ts
327  * @return
328  */
329 int receiveXappMessages(Sctp_Map_t *sctpMap,
330                         RmrMessagesBuffer_t &rmrMessageBuffer,
331                         struct timespec &ts);
332
333 /**
334  *
335  * @param messageBuffer
336  * @param failedMsgId
337  * @param sctpMap
338  * @return
339  */
340 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
341                            ReportingMessages_t &message,
342                            int failedMsgId,
343                            Sctp_Map_t *sctpMap);
344 /**
345  *
346  * @param pdu
347  * @param message
348  * @param rmrMessageBuffer
349  */
350 void asnInitiatingRequest(E2AP_PDU_t *pdu,
351                           Sctp_Map_t *sctpMap,
352                           ReportingMessages_t &message,
353                           RmrMessagesBuffer_t &rmrMessageBuffer,int streamId);
354 /**
355  *
356  * @param pdu
357  * @param message
358  * @param sctpMap
359  * @param rmrMessageBuffer
360  */
361 void asnSuccessfulMsg(E2AP_PDU_t *pdu,
362                       Sctp_Map_t *sctpMap,
363                       ReportingMessages_t &message,
364                       RmrMessagesBuffer_t &rmrMessageBuffer);
365 /**
366  *
367  * @param pdu
368  * @param message
369  * @param sctpMap
370  * @param rmrMessageBuffer
371  */
372 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
373                         Sctp_Map_t *sctpMap,
374                         ReportingMessages_t &message,
375                         RmrMessagesBuffer_t &rmrMessageBuffer);
376
377 /**
378  *
379  * @param rmrMessageBuffer
380  * @param message
381  * @return
382  */
383 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message);
384 /**
385  *
386  * @param epoll_fd
387  * @param peerInfo
388  * @param events
389  * @param sctpMap
390  * @param enodbName
391  * @param msgType
392  * @returnsrc::logger_mt& lg = my_logger::get();
393  */
394 int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
395 /**
396  *
397  * @param epoll_fd
398  * @param peerInfo
399  * @param events
400  * @param sctpMap
401  * @param enodbName
402  * @param msgType
403  * @return
404  */
405 int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
406
407 /**
408  *
409  * @param message
410  */
411 void buildJsonMessage(ReportingMessages_t &message);
412
413 /**
414  *
415  *
416  * @param state
417  * @return
418  */
419 string translateRmrErrorMessages(int state);
420
421 int buildConfiguration(sctp_params_t &sctpParams);
422 void startPrometheus(sctp_params_t &sctpParams);
423 static int enable_log_change_notify(const char* fileName);
424 static int register_log_change_notify(const char *fileName);
425 static void * monitor_loglevel_change_handler(void* arg);
426 void  update_mdc_log_level_severity(char* log_level);
427 char* getinterfaceip();
428 static char* parse_file(char* filename);
429
430
431 static inline uint64_t rdtscp(uint32_t &aux) {
432     uint64_t rax,rdx;
433     asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :);
434     return (rdx << (unsigned)32) + rax;
435 }
436 #ifndef RIC_SCTP_CONNECTION_FAILURE
437 #define RIC_SCTP_CONNECTION_FAILURE  10080
438 #endif
439
440 #ifdef UNIT_TEST
441     #define FILE_DESCRIPTOR 53424 /*Dummy value for file descriptor only when UT is defined*/
442 #endif
443
444 int buildListeningPort(sctp_params_t &sctpParams);
445 void buildE2TPrometheusCounters(sctp_params_t &sctpParams);
446
447 int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message);
448 #endif //X2_SCTP_THREAD_H