RIC:1060: Change in PTL
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.h
1 /*
2  * Copyright 2019 AT&T Intellectual Property
3  * Copyright 2019 Nokia
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 #ifndef X2_SCTP_THREAD_H
24 #define X2_SCTP_THREAD_H
25
26 #include <algorithm>
27
28 #include <cstdio>
29 #include <cerrno>
30 #include <cstdlib>
31 #include <cstring>
32 #include <random>
33 #include <sys/socket.h>
34 #include <arpa/inet.h>
35 #include <netinet/in_systm.h>
36 #include <netinet/in.h>
37 #include <netinet/ip.h>
38 #include <netinet/ip_icmp.h>
39 #include <netinet/sctp.h>
40 #include <thread>
41 #include <atomic>
42 #include <sys/param.h>
43 #include <sys/file.h>
44 #include <sys/types.h>
45 #include <ifaddrs.h>
46 #include <ctime>
47 #include <netdb.h>
48 #include <sys/epoll.h>
49 #include <mutex>
50 #include <shared_mutex>
51 #include <iterator>
52 #include <map>
53 #include <sys/inotify.h>
54 #include <csignal>
55 #include <future>
56 #include <bitset>
57
58 #include <rmr/rmr.h>
59 #include <rmr/RIC_message_types.h>
60 #include <mdclog/mdclog.h>
61 #include <functional>
62 #include <iostream>
63
64 #include <boost/algorithm/string/predicate.hpp>
65 #include <boost/lexical_cast.hpp>
66 #include <boost/move/utility.hpp>
67 #include <boost/log/sources/logger.hpp>
68 #include <boost/log/sources/record_ostream.hpp>
69 #include <boost/log/sources/global_logger_storage.hpp>
70 #include <boost/log/utility/setup/file.hpp>
71 #include <boost/log/utility/setup/common_attributes.hpp>
72 #include <boost/filesystem.hpp>
73
74 #include <mdclog/mdclog.h>
75
76 #include "oranE2/E2AP-PDU.h"
77 #include "oranE2/ProtocolIE-Container.h"
78 #include "oranE2/InitiatingMessage.h"
79 #include "oranE2/SuccessfulOutcome.h"
80 #include "oranE2/UnsuccessfulOutcome.h"
81 #include "oranE2/ProtocolIE-Container.h"
82 #include "oranE2/ProtocolIE-Field.h"
83 #include "oranE2/GlobalE2node-gNB-ID.h"
84 #include "oranE2/GlobalE2node-en-gNB-ID.h"
85 #include "oranE2/GlobalE2node-ng-eNB-ID.h"
86 #include "oranE2/GlobalE2node-eNB-ID.h"
87
88 #include "cxxopts.hpp"
89 //#include "config-cpp/include/config-cpp/config-cpp.h"
90 #include <zlib.h>
91 #include <prometheus/counter.h>
92 #include <prometheus/exposer.h>
93 #include <prometheus/gateway.h>
94 #include <prometheus/registry.h>
95
96 using namespace prometheus;
97
98 #include "mapWrapper.h"
99
100 #include "base64.h"
101
102 #include "ReadConfigFile.h"
103
104 using namespace std;
105 namespace logging = boost::log;
106 namespace src = boost::log::sources;
107 namespace keywords = boost::log::keywords;
108 namespace sinks = boost::log::sinks;
109 namespace posix_time = boost::posix_time;
110 namespace expr = boost::log::expressions;
111
112 #define SRC_PORT 36422
113 #define SA      struct sockaddr
114 #define MAX_ENODB_NAME_SIZE 64
115
116 #define MAXEVENTS 128
117
118 #define RECEIVE_SCTP_BUFFER_SIZE (8 * 1024)
119 #define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE
120
121 typedef mapWrapper Sctp_Map_t;
122
123
124
125 #define VOLUME_URL_SIZE 256
126 #define KA_MESSAGE_SIZE 2048
127
128 enum E2T_Internal_Counters
129 {
130     SCTP_ABORT_INITIATED_BY_E2NODE = 0,
131     INVALID_MESSAGE_RECEIVED = 1,
132     E2T_MAX_INTERNAL_COUNTER = 2,
133 };
134
135 typedef struct sctp_params {
136     int      epollTimeOut = -1;
137     uint16_t rmrPort = 0;
138     uint16_t sctpPort = SRC_PORT;
139     int      epoll_fd = 0;
140     int      listenFD = 0;
141     int      rmrListenFd = 0;
142     int      inotifyFD = 0;
143     int      inotifyWD = 0;
144     void     *rmrCtx = nullptr;
145     Sctp_Map_t *sctpMap = nullptr;
146     char      ka_message[KA_MESSAGE_SIZE] {};
147     int       ka_message_length = 0;
148     char       rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566
149     char volume[VOLUME_URL_SIZE];
150     string myIP {};
151     string fqdn {};
152     string podName {};
153     string configFilePath {};
154     string configFileName {};
155     bool trace = true;
156     shared_ptr<prometheus::Registry> prometheusRegistry;
157     string prometheusPort {"8088"};
158     Family<Counter> *prometheusFamily;
159     Exposer *prometheusExposer = nullptr;
160     Counter *e2tCounters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {};
161     Counter *e2tInternalCounters[E2T_Internal_Counters::E2T_MAX_INTERNAL_COUNTER] {};
162 } sctp_params_t;
163
164 // RAN to RIC
165 #define IN_INITI 0 //INITIATING
166 #define IN_SUCC 1 //SUCCESSFUL
167 #define IN_UN_SUCC 2 //UN-Successful
168
169 // RIC To RAN
170 #define OUT_INITI 3 //INITIATING
171 #define OUT_SUCC 4 //SUCCESSFUL
172 #define OUT_UN_SUCC 5 //UN-Successful
173
174 #define MSG_COUNTER 0
175 #define BYTES_COUNTER 1
176
177 #define INVALID_STREAM_ID -1
178
179 typedef struct ConnectedCU {
180     int fileDescriptor = 0;
181     char hostName[NI_MAXHOST] {};
182     char portNumber[NI_MAXSERV] {};
183     char enodbName[MAX_ENODB_NAME_SIZE] {};
184     char asnData[RECEIVE_SCTP_BUFFER_SIZE] {};
185     size_t asnLength = 0;
186     int mtype = 0;
187     bool isConnected = false;
188     bool gotSetup = false;
189     sctp_params_t *sctpParams = nullptr;
190     Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {};
191     bool isSingleStream = false;
192     int singleStreamId = 0;
193     Counter *e2tInternalCounters[E2T_Internal_Counters::E2T_MAX_INTERNAL_COUNTER] {};
194 } ConnectedCU_t ;
195
196
197 #define MAX_RMR_BUFF_ARRAY 32
198 typedef struct RmrMessagesBuffer {
199     char ka_message[KA_MESSAGE_SIZE] {};
200     int  ka_message_len = 0;
201     void *rmrCtx = nullptr;
202     rmr_mbuf_t *sendMessage= nullptr;
203     //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
204     rmr_mbuf_t *rcvMessage= nullptr;
205     //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
206 } RmrMessagesBuffer_t;
207
208 typedef struct formatedMessage {
209     char enodbName[MAX_ENODB_NAME_SIZE];
210     struct timespec time;
211     int messageType;
212     char direction;
213     ssize_t asnLength;
214     unsigned char *asndata;
215 } FormatedMessage_t;
216
217 typedef struct ReportingMessages {
218     FormatedMessage_t message {};
219     ConnectedCU_t *peerInfo = nullptr;
220     long outLen = 0;
221     unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {};
222     char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {};
223 } ReportingMessages_t;
224
225 enum E2T_Procedure_States
226 {
227     E2_SETUP_PROCEDURE_NOT_INITIATED = 0,
228     E2_SETUP_PROCEDURE_ONGOING = 1,
229     E2_SETUP_PROCEDURE_COMPLETED = 2,
230     RIC_SERVICE_UPDATE_PROCEDURE_ONGOING = 3,
231     RIC_SERVICE_UPDATE_PROCEDURE_COMPLETED = 4,
232     RIC_SUBS_PROCEDURE_ONGOING = 5,
233     RIC_SUBS_PROCEDURE_COMPLETED = 6,
234     RIC_INDICATION_PROCEDURE_ONGOING = 7,
235     RIC_INDICATION_PROCEDURE_COMPLETED = 8,
236     RIC_SUBS_DEL_PROCEDURE_ONGOING = 9,
237     RIC_SUBS_DEL_PROCEDURE_COMPLETED = 10,
238     CONTROL_PROCEDURE_ONGOING = 11,
239     CONTROL_PROCEDURE_COMPLETED = 12,
240     E2_NODE_CONF_UPDATE_PROCEDURE_ONGOING = 13,
241     E2_NODE_CONF_UPDATE_PROCEDURE_COMPLETED = 14,
242     RESET_PROCEDURE_ONGOING = 15,
243     RESET_PROCEDURE_COMPLETED = 16,
244 };
245
246 struct E2NodeConnectionHandling
247 {
248     E2T_Procedure_States e2tProcedureOngoingStatus;
249     long e2SetupProcedureTransactionId;
250 };
251
252 constexpr int negativeOne = -1;
253 constexpr int negativeSix = -6;
254 constexpr int negativeSeven = -7;
255 constexpr int numberZero = 0;
256 constexpr int numberOne = 1;
257 constexpr int numberTwo = 2;
258 constexpr int numberThree = 3;
259 constexpr int numberFour = 4;
260 constexpr int numberFive = 5;
261 constexpr int numberTwenty = 20;
262
263 constexpr uint8_t sendMsgMaxBitPosition = 2;
264 constexpr uint8_t sendMsgToE2MBitSetPosition = 0;
265 constexpr uint8_t sendMsgToSubMgrBitSetPosition = 1;
266
267 constexpr uint8_t requiredIePresentMaxBitSetPosition = 3;
268 constexpr uint8_t transactionIdIeBitSetPosition = 0;
269 constexpr uint8_t ricRequestIdIeBitSetPosition = 1;
270 constexpr uint8_t causeIeBitSetPosition = 2;
271
272 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams);
273
274 int buildInotify(sctp_params_t &sctpParams);
275
276 void handleTermInit(sctp_params_t &sctpParams);
277
278 void handleConfigChange(sctp_params_t *sctpParams);
279
280 void listener(sctp_params_t *params);
281
282 void sendTermInit(sctp_params_t &sctpParams);
283
284 int setSocketNoBlocking(int socket);
285
286 void handleEinprogressMessages(struct epoll_event &event,
287                                ReportingMessages_t &message,
288                                RmrMessagesBuffer_t &rmrMessageBuffer,
289                                sctp_params_t *params);
290
291 void handlepoll_error(struct epoll_event &event,
292                       ReportingMessages_t &message,
293                       RmrMessagesBuffer_t &rmrMessageBuffer,
294                       sctp_params_t *params);
295
296
297 void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m);
298
299
300 /**
301  *
302  * @param message
303  * @param rmrMessageBuffer
304  */
305 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer);
306
307 /**
308  *
309  * @param sctpMap
310  * @param messageBuffer
311  * @param message
312  * @param failedMesgId
313  * @return
314  */
315 int sendMessagetoCu(Sctp_Map_t *sctpMap,
316                     RmrMessagesBuffer_t &messageBuffer,
317                     ReportingMessages_t &message,
318                     int failedMesgId);
319
320 void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer,
321                                     ReportingMessages_t &message,
322                                     int failedMesgId);
323
324 int sendRequestToXapp(ReportingMessages_t &message,
325                       int requestId,
326                       RmrMessagesBuffer_t &rmrMmessageBuffer);
327
328 /**
329  *
330  * @param message
331  * @param msgType
332  * @param requestType
333  * @param rmrMessageBuffer
334  * @param sctpMap
335  * @return
336  */
337 /*
338 int sendResponseToXapp(ReportingMessages_t &message,
339                        int msgType,
340                        int requestType,
341                        RmrMessagesBuffer_t &rmrMessageBuffer,
342                        Sctp_Map_t *sctpMap);
343 */
344
345 /**
346  *
347  * @param peerInfo
348  * @param message
349  * @param m
350  * @return
351  */
352 int sendSctpMsg(ConnectedCU_t *peerInfo,
353                 ReportingMessages_t &message,
354                 Sctp_Map_t *m);
355
356 /**
357  *
358  * @param events
359  * @param sctpMap
360  * @param numOfMessages
361  * @param rmrMessageBuffer
362  * @param ts
363  * @return
364  */
365 int receiveDataFromSctp(struct epoll_event *events,
366                         Sctp_Map_t *sctpMap,
367                         int &numOfMessages,
368                         RmrMessagesBuffer_t &rmrMessageBuffer,
369                         struct timespec &ts);
370
371 /**
372  *
373  * @param rmrAddress
374  * @return
375  */
376 void getRmrContext(sctp_params_t &pSctpParams);
377
378 /**
379  *
380  * @param sctpMap
381  * @param rmrMessageBuffer
382  * @param ts
383  * @return
384  */
385 int receiveXappMessages(Sctp_Map_t *sctpMap,
386                         RmrMessagesBuffer_t &rmrMessageBuffer,
387                         struct timespec &ts);
388
389 /**
390  *
391  * @param messageBuffer
392  * @param failedMsgId
393  * @param sctpMap
394  * @return
395  */
396 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
397                            ReportingMessages_t &message,
398                            int failedMsgId,
399                            Sctp_Map_t *sctpMap);
400 /**
401  *
402  * @param pdu
403  * @param message
404  * @param rmrMessageBuffer
405  */
406 void asnInitiatingRequest(E2AP_PDU_t *pdu,
407                           Sctp_Map_t *sctpMap,
408                           ReportingMessages_t &message,
409                           RmrMessagesBuffer_t &rmrMessageBuffer,int streamId);
410 /**
411  *
412  * @param pdu
413  * @param message
414  * @param sctpMap
415  * @param rmrMessageBuffer
416  */
417 void asnSuccessfulMsg(E2AP_PDU_t *pdu,
418                       Sctp_Map_t *sctpMap,
419                       ReportingMessages_t &message,
420                       RmrMessagesBuffer_t &rmrMessageBuffer);
421 /**
422  *
423  * @param pdu
424  * @param message
425  * @param sctpMap
426  * @param rmrMessageBuffer
427  */
428 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
429                         Sctp_Map_t *sctpMap,
430                         ReportingMessages_t &message,
431                         RmrMessagesBuffer_t &rmrMessageBuffer);
432
433 /**
434  *
435  * @param rmrMessageBuffer
436  * @param message
437  * @return
438  */
439 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message);
440 /**
441  *
442  * @param epoll_fd
443  * @param peerInfo
444  * @param events
445  * @param sctpMap
446  * @param enodbName
447  * @param msgType
448  * @returnsrc::logger_mt& lg = my_logger::get();
449  */
450 int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
451 /**
452  *
453  * @param epoll_fd
454  * @param peerInfo
455  * @param events
456  * @param sctpMap
457  * @param enodbName
458  * @param msgType
459  * @return
460  */
461 int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
462
463 /**
464  *
465  * @param message
466  */
467 void buildJsonMessage(ReportingMessages_t &message);
468
469 /**
470  *
471  *
472  * @param state
473  * @return
474  */
475 string translateRmrErrorMessages(int state);
476
477 int buildConfiguration(sctp_params_t &sctpParams);
478 void startPrometheus(sctp_params_t &sctpParams);
479 static int enable_log_change_notify(const char* fileName);
480 static int register_log_change_notify(const char *fileName);
481 static void * monitor_loglevel_change_handler(void* arg);
482 void  update_mdc_log_level_severity(char* log_level);
483 char* getinterfaceip();
484 static char* parse_file(char* filename);
485
486
487 static inline uint64_t rdtscp(uint32_t &aux) {
488     uint64_t rax,rdx;
489     asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :);
490     return (rdx << (unsigned)32) + rax;
491 }
492 #ifndef RIC_SCTP_CONNECTION_FAILURE
493 #define RIC_SCTP_CONNECTION_FAILURE  10080
494 #endif
495
496 #ifdef UNIT_TEST
497     #define FILE_DESCRIPTOR 53424 /*Dummy value for file descriptor only when UT is defined*/
498 #endif
499
500 int buildListeningPort(sctp_params_t &sctpParams);
501 void buildE2TPrometheusCounters(sctp_params_t &sctpParams);
502
503 int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message);
504 void removeE2ConnectionEntryFromMap(char* eNBName);
505 bool getE2tProcedureOngoingStatus(char *enbName, E2NodeConnectionHandling &e2NodeConnectionHandling);
506 void setE2ProcedureOngoingStatus(char *enbName, E2T_Procedure_States state);
507 void insertE2ProcedureOngoing(char *enbName, long &transactionID);
508 E2T_Procedure_States currentE2tProcedureOngoingStatus(char *enbName);
509 void printEntryPresentInMap();
510 void handleE2SetupReq(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, E2AP_PDU_t *pdu, long &transactionID, int streamId, Sctp_Map_t *sctpMap);
511 bitset<sendMsgMaxBitPosition> getSendMsgBitSetValue(int procedureCode, bitset<requiredIePresentMaxBitSetPosition> isRequiredIesPresent, char* enbName);
512 #endif //X2_SCTP_THREAD_H