2 * Copyright 2019 AT&T Intellectual Property
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
23 #ifndef X2_SCTP_THREAD_H
24 #define X2_SCTP_THREAD_H
33 #include <sys/socket.h>
34 #include <arpa/inet.h>
35 #include <netinet/in_systm.h>
36 #include <netinet/in.h>
37 #include <netinet/ip.h>
38 #include <netinet/ip_icmp.h>
39 #include <netinet/sctp.h>
42 #include <sys/param.h>
44 #include <sys/types.h>
48 #include <sys/epoll.h>
50 #include <shared_mutex>
53 #include <sys/inotify.h>
58 #include <rmr/RIC_message_types.h>
59 #include <mdclog/mdclog.h>
63 #include <boost/algorithm/string/predicate.hpp>
64 #include <boost/lexical_cast.hpp>
65 #include <boost/move/utility.hpp>
66 #include <boost/log/sources/logger.hpp>
67 #include <boost/log/sources/record_ostream.hpp>
68 #include <boost/log/sources/global_logger_storage.hpp>
69 #include <boost/log/utility/setup/file.hpp>
70 #include <boost/log/utility/setup/common_attributes.hpp>
71 #include <boost/filesystem.hpp>
73 #include <mdclog/mdclog.h>
75 #include "oranE2/E2AP-PDU.h"
76 #include "oranE2/ProtocolIE-Container.h"
77 #include "oranE2/InitiatingMessage.h"
78 #include "oranE2/SuccessfulOutcome.h"
79 #include "oranE2/UnsuccessfulOutcome.h"
80 #include "oranE2/ProtocolIE-Container.h"
81 #include "oranE2/ProtocolIE-Field.h"
82 #include "oranE2/GlobalE2node-gNB-ID.h"
83 #include "oranE2/GlobalE2node-en-gNB-ID.h"
84 #include "oranE2/GlobalE2node-ng-eNB-ID.h"
85 #include "oranE2/GlobalE2node-eNB-ID.h"
87 #include "cxxopts.hpp"
88 //#include "config-cpp/include/config-cpp/config-cpp.h"
90 #include <prometheus/counter.h>
91 #include <prometheus/exposer.h>
92 #include <prometheus/gateway.h>
93 #include <prometheus/registry.h>
95 using namespace prometheus;
97 #include "mapWrapper.h"
101 #include "ReadConfigFile.h"
104 namespace logging = boost::log;
105 namespace src = boost::log::sources;
106 namespace keywords = boost::log::keywords;
107 namespace sinks = boost::log::sinks;
108 namespace posix_time = boost::posix_time;
109 namespace expr = boost::log::expressions;
111 #define SRC_PORT 36422
112 #define SA struct sockaddr
113 #define MAX_ENODB_NAME_SIZE 64
115 #define MAXEVENTS 128
117 #define RECEIVE_SCTP_BUFFER_SIZE (256 * 1024)
118 #define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE
120 typedef mapWrapper Sctp_Map_t;
124 #define VOLUME_URL_SIZE 256
125 #define KA_MESSAGE_SIZE 2048
127 typedef struct sctp_params {
128 int epollTimeOut = -1;
129 uint16_t rmrPort = 0;
130 uint16_t sctpPort = SRC_PORT;
136 void *rmrCtx = nullptr;
137 Sctp_Map_t *sctpMap = nullptr;
138 char ka_message[KA_MESSAGE_SIZE] {};
139 int ka_message_length = 0;
140 char rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566
141 mdclog_severity_t logLevel = MDCLOG_INFO;
142 char volume[VOLUME_URL_SIZE];
146 string configFilePath {};
147 string configFileName {};
149 shared_ptr<prometheus::Registry> prometheusRegistry;
150 string prometheusPort {"8088"};
151 Family<Counter> *prometheusFamily;
152 Exposer *prometheusExposer = nullptr;
153 Counter *e2tCounters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {};
157 #define IN_INITI 0 //INITIATING
158 #define IN_SUCC 1 //SUCCESSFUL
159 #define IN_UN_SUCC 2 //UN-Successful
162 #define OUT_INITI 3 //INITIATING
163 #define OUT_SUCC 4 //SUCCESSFUL
164 #define OUT_UN_SUCC 5 //UN-Successful
166 #define MSG_COUNTER 0
167 #define BYTES_COUNTER 1
169 #define INVALID_STREAM_ID -1
171 typedef struct ConnectedCU {
172 int fileDescriptor = 0;
173 char hostName[NI_MAXHOST] {};
174 char portNumber[NI_MAXSERV] {};
175 char enodbName[MAX_ENODB_NAME_SIZE] {};
176 char asnData[RECEIVE_SCTP_BUFFER_SIZE] {};
177 size_t asnLength = 0;
179 bool isConnected = false;
180 bool gotSetup = false;
181 sctp_params_t *sctpParams = nullptr;
182 Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {};
183 bool isSingleStream = false;
184 int singleStreamId = 0;
188 #define MAX_RMR_BUFF_ARRAY 32
189 typedef struct RmrMessagesBuffer {
190 char ka_message[KA_MESSAGE_SIZE] {};
191 int ka_message_len = 0;
192 void *rmrCtx = nullptr;
193 rmr_mbuf_t *sendMessage= nullptr;
194 //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
195 rmr_mbuf_t *rcvMessage= nullptr;
196 //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
197 } RmrMessagesBuffer_t;
199 typedef struct formatedMessage {
200 char enodbName[MAX_ENODB_NAME_SIZE];
201 struct timespec time;
205 unsigned char *asndata;
208 typedef struct ReportingMessages {
209 FormatedMessage_t message {};
210 ConnectedCU_t *peerInfo = nullptr;
212 unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {};
213 char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {};
214 } ReportingMessages_t;
216 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams);
218 int buildInotify(sctp_params_t &sctpParams);
220 void handleTermInit(sctp_params_t &sctpParams);
222 void handleConfigChange(sctp_params_t *sctpParams);
224 void listener(sctp_params_t *params);
226 void sendTermInit(sctp_params_t &sctpParams);
228 int setSocketNoBlocking(int socket);
230 void handleEinprogressMessages(struct epoll_event &event,
231 ReportingMessages_t &message,
232 RmrMessagesBuffer_t &rmrMessageBuffer,
233 sctp_params_t *params);
235 void handlepoll_error(struct epoll_event &event,
236 ReportingMessages_t &message,
237 RmrMessagesBuffer_t &rmrMessageBuffer,
238 sctp_params_t *params);
241 void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m);
247 * @param rmrMessageBuffer
249 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer);
254 * @param messageBuffer
256 * @param failedMesgId
259 int sendMessagetoCu(Sctp_Map_t *sctpMap,
260 RmrMessagesBuffer_t &messageBuffer,
261 ReportingMessages_t &message,
264 void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer,
265 ReportingMessages_t &message,
268 int sendRequestToXapp(ReportingMessages_t &message,
270 RmrMessagesBuffer_t &rmrMmessageBuffer);
277 * @param rmrMessageBuffer
282 int sendResponseToXapp(ReportingMessages_t &message,
285 RmrMessagesBuffer_t &rmrMessageBuffer,
286 Sctp_Map_t *sctpMap);
296 int sendSctpMsg(ConnectedCU_t *peerInfo,
297 ReportingMessages_t &message,
304 * @param numOfMessages
305 * @param rmrMessageBuffer
309 int receiveDataFromSctp(struct epoll_event *events,
312 RmrMessagesBuffer_t &rmrMessageBuffer,
313 struct timespec &ts);
320 void getRmrContext(sctp_params_t &pSctpParams);
325 * @param rmrMessageBuffer
329 int receiveXappMessages(Sctp_Map_t *sctpMap,
330 RmrMessagesBuffer_t &rmrMessageBuffer,
331 struct timespec &ts);
335 * @param messageBuffer
340 int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
341 ReportingMessages_t &message,
343 Sctp_Map_t *sctpMap);
348 * @param rmrMessageBuffer
350 void asnInitiatingRequest(E2AP_PDU_t *pdu,
352 ReportingMessages_t &message,
353 RmrMessagesBuffer_t &rmrMessageBuffer,int streamId);
359 * @param rmrMessageBuffer
361 void asnSuccessfulMsg(E2AP_PDU_t *pdu,
363 ReportingMessages_t &message,
364 RmrMessagesBuffer_t &rmrMessageBuffer);
370 * @param rmrMessageBuffer
372 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
374 ReportingMessages_t &message,
375 RmrMessagesBuffer_t &rmrMessageBuffer);
379 * @param rmrMessageBuffer
383 int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message);
392 * @returnsrc::logger_mt& lg = my_logger::get();
394 int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
405 int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
411 void buildJsonMessage(ReportingMessages_t &message);
419 string translateRmrErrorMessages(int state);
421 int buildConfiguration(sctp_params_t &sctpParams);
422 void startPrometheus(sctp_params_t &sctpParams);
423 static int enable_log_change_notify(const char* fileName);
424 static int register_log_change_notify(const char *fileName);
425 static void * monitor_loglevel_change_handler(void* arg);
426 void update_mdc_log_level_severity(char* log_level);
427 char* getinterfaceip();
428 static char* parse_file(char* filename);
431 static inline uint64_t rdtscp(uint32_t &aux) {
433 asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :);
434 return (rdx << (unsigned)32) + rax;
436 #ifndef RIC_SCTP_CONNECTION_FAILURE
437 #define RIC_SCTP_CONNECTION_FAILURE 10080
441 #define FILE_DESCRIPTOR 53424 /*Dummy value for file descriptor only when UT is defined*/
444 int buildListeningPort(sctp_params_t &sctpParams);
445 void buildE2TPrometheusCounters(sctp_params_t &sctpParams);
447 int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message);
448 #endif //X2_SCTP_THREAD_H