From 16c799b750e66b24ff1ac71f5038c18757fd7028 Mon Sep 17 00:00:00 2001 From: "aa7133@att.com" Date: Tue, 24 Mar 2020 18:01:51 +0200 Subject: [PATCH] version 4.0.3 start internal stats Change-Id: I521f71e35c49f0d7f9096eac60ff61b64fdda4c0 Signed-off-by: aa7133@att.com --- .gitignore | 1 + RIC-E2-TERMINATION/container-tag.yaml | 2 +- RIC-E2-TERMINATION/mapWrapper.h | 20 ++--- RIC-E2-TERMINATION/sctpThread.cpp | 17 +++- RIC-E2-TERMINATION/sctpThread.h | 8 +- RIC-E2-TERMINATION/statCollector.h | 148 ++++++++++++++++++++++++++++++++++ 6 files changed, 175 insertions(+), 21 deletions(-) create mode 100644 RIC-E2-TERMINATION/statCollector.h diff --git a/.gitignore b/.gitignore index 3801b98..ce363a4 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,4 @@ RIC-E2-TERMINATION/3rdparty/log/ RIC-E2-TERMINATION/3rdparty/rapidjson/ /RIC-E2-TERMINATION/3rdparty/int/ /RIC-E2-TERMINATION/3rdparty/int/autogen/licenses/codevbak.txt +!/RIC-E2-TERMINATION/3rdparty/log/3rdparty/googletest/ diff --git a/RIC-E2-TERMINATION/container-tag.yaml b/RIC-E2-TERMINATION/container-tag.yaml index f80bc61..968d713 100644 --- a/RIC-E2-TERMINATION/container-tag.yaml +++ b/RIC-E2-TERMINATION/container-tag.yaml @@ -1,3 +1,3 @@ # The Jenkins job requires a tag to build the Docker image. # Global-JJB script assumes this file is in the repo root. -tag: 4.0.2 +tag: 4.0.3 diff --git a/RIC-E2-TERMINATION/mapWrapper.h b/RIC-E2-TERMINATION/mapWrapper.h index a219d56..ffa0eb3 100644 --- a/RIC-E2-TERMINATION/mapWrapper.h +++ b/RIC-E2-TERMINATION/mapWrapper.h @@ -35,12 +35,10 @@ #include #include -using namespace std; - class mapWrapper { public: void *find(char *key) { - shared_lock read(fence); + std::shared_lock read(fence); auto entry = keyMap.find(key); if (entry == keyMap.end()) { return nullptr; @@ -49,32 +47,30 @@ public: } void setkey(char *key, void *val) { - unique_lock write(fence); + std::unique_lock write(fence); keyMap[key] = val; } void *erase(char *key) { - unique_lock write(fence); + std::unique_lock write(fence); return (void *)keyMap.erase(key); } void clear() { - unique_lock write(fence); + std::unique_lock write(fence); keyMap.clear(); } - void getKeys(vector &v) { - shared_lock read(fence); + void getKeys(std::vector &v) { + std::shared_lock read(fence); for (auto const &e : keyMap) { v.emplace_back((char *)e.first.c_str()); } } - - private: - std::unordered_map keyMap; - shared_timed_mutex fence; + std::unordered_map keyMap; + std::shared_timed_mutex fence; }; #endif //E2_MAPWRAPPER_H diff --git a/RIC-E2-TERMINATION/sctpThread.cpp b/RIC-E2-TERMINATION/sctpThread.cpp index a271d05..9fc72b4 100644 --- a/RIC-E2-TERMINATION/sctpThread.cpp +++ b/RIC-E2-TERMINATION/sctpThread.cpp @@ -277,6 +277,8 @@ int buildConfiguration(sctp_params_t &sctpParams) { return 0; } + + int main(const int argc, char **argv) { sctp_params_t sctpParams; @@ -363,6 +365,9 @@ int main(const int argc, char **argv) { } } + auto statFlag = false; + auto statThread = std::thread(statColectorThread, (void *)&statFlag); + //loop over term_init until first message from xApp handleTermInit(sctpParams); @@ -370,6 +375,9 @@ int main(const int argc, char **argv) { t.join(); } + statFlag = true; + statThread.join(); + return 0; } @@ -544,6 +552,8 @@ void listener(sctp_params_t *params) { rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE); } + message.statCollector = StatCollector::GetInstance(); + while (true) { if (mdclog_level_get() >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait"); @@ -963,7 +973,7 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_ m->erase(key); return -1; } - peerInfo->sentMesgs++; + message.statCollector->incSentMessage(string(message.message.enodbName)); message.message.direction = 'D'; // send report.buffer of size buildJsonMessage(message); @@ -1021,7 +1031,7 @@ int receiveDataFromSctp(struct epoll_event *events, // get the identity of the interface message.peerInfo = (ConnectedCU_t *)events->data.ptr; - + message.statCollector = StatCollector::GetInstance(); struct timespec start{0, 0}; struct timespec decodestart{0, 0}; struct timespec end{0, 0}; @@ -1043,8 +1053,9 @@ int receiveDataFromSctp(struct epoll_event *events, mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld", message.peerInfo->fileDescriptor, message.message.asnLength); } - message.peerInfo->rcvMsgs++; + memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName)); + message.statCollector->incRecvMessage(string(message.message.enodbName)); message.message.direction = 'U'; message.message.time.tv_nsec = ts.tv_nsec; message.message.time.tv_sec = ts.tv_sec; diff --git a/RIC-E2-TERMINATION/sctpThread.h b/RIC-E2-TERMINATION/sctpThread.h index f6b9752..637dd17 100644 --- a/RIC-E2-TERMINATION/sctpThread.h +++ b/RIC-E2-TERMINATION/sctpThread.h @@ -84,11 +84,9 @@ #include "cxxopts.hpp" //#include "config-cpp/include/config-cpp/config-cpp.h" -#ifdef __TRACING__ -#include "openTracing.h" -#endif #include "mapWrapper.h" +#include "statCollector.h" #include "base64.h" @@ -114,6 +112,7 @@ namespace expr = boost::log::expressions; typedef mapWrapper Sctp_Map_t; + #define VOLUME_URL_SIZE 256 #define KA_MESSAGE_SIZE 2048 @@ -147,8 +146,6 @@ typedef struct ConnectedCU { char portNumber[NI_MAXSERV] {}; char enodbName[MAX_ENODB_NAME_SIZE] {}; char asnData[RECEIVE_SCTP_BUFFER_SIZE] {}; - int rcvMsgs = 0; - int sentMesgs = 0; size_t asnLength = 0; int mtype = 0; bool isConnected = false; @@ -182,6 +179,7 @@ typedef struct ReportingMessages { long outLen = 0; unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {}; char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {}; + StatCollector *statCollector = nullptr; } ReportingMessages_t; cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams); diff --git a/RIC-E2-TERMINATION/statCollector.h b/RIC-E2-TERMINATION/statCollector.h new file mode 100644 index 0000000..bb4ce5d --- /dev/null +++ b/RIC-E2-TERMINATION/statCollector.h @@ -0,0 +1,148 @@ +/* + * Copyright 2020 AT&T Intellectual Property + * Copyright 2020 Nokia + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// Created by adi ENZEL on 3/24/20. +// + +#ifndef E2_STATCOLLECTOR_H +#define E2_STATCOLLECTOR_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct statResult { + std::string ranName; + uint32_t receivedMessages; + uint32_t sentMessages; +} statResult_t ; + +class StatCollector { + + static std::mutex singltonMutex; + static std::atomic obj; + +public: + static StatCollector* GetInstance() { + StatCollector* pStatCollector = obj.load(std::memory_order_acquire); + if (pStatCollector == nullptr) { + std::lock_guard lock(singltonMutex); + pStatCollector = obj.load(std::memory_order_relaxed); + if (pStatCollector == nullptr) { + pStatCollector = new StatCollector(); + obj.store(pStatCollector, std::memory_order_release); + } + } + return pStatCollector; + } + + void incSentMessage(const std::string &key) { + increment(sentMessages, key); + } + void incRecvMessage(const std::string &key) { + increment(recvMessages, key); + } + + std::vector &getCurrentStats() { + results.clear(); + + for (auto const &e : recvMessages) { + statResult_t result {}; + result.ranName = e.first; + result.receivedMessages = e.second.load(std::memory_order_acquire); + auto found = sentMessages.find(result.ranName); + if (found != sentMessages.end()) { + result.sentMessages = found->second.load(std::memory_order_acquire); + } else { + result.sentMessages = 0; + } + + results.emplace_back(result); + } + return results; + } + + StatCollector(const StatCollector&)= delete; + StatCollector& operator=(const StatCollector&)= delete; + +private: + std::unordered_map> sentMessages; + std::unordered_map> recvMessages; + std::vector results; + + + StatCollector() = default; + ~StatCollector() = default; + + + void inline static increment(std::unordered_map> &map, const std::string &key) { + auto found = map.find(key); + if (found != map.end()) { //inc + map[key].fetch_add(1, std::memory_order_release); + } else { //add + //sentMessages.emplace(std::make_pair(std::string(key), std::atomic(0))); + map.emplace(std::piecewise_construct, + std::forward_as_tuple(key), + std::forward_as_tuple(0)); + } + + } + +}; + + +// must define this to allow StatCollector private variables to be known to compiler linker +std::mutex StatCollector::singltonMutex; +std::atomic StatCollector::obj; + + +void statColectorThread(void *runtime) { + bool *stop_loop = (bool *)runtime; + auto *statCollector = StatCollector::GetInstance(); + std::time_t tt = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now()); + + struct std::tm * ptm = std::localtime(&tt); + std::cout << "Waiting for the next minute to begin...\n"; + ptm->tm_min = ptm->tm_min + (5 - ptm->tm_min % 5); + ptm->tm_sec=0; + + std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm))); + +// alligned to 5 minutes + while (true) { + if (*stop_loop) { + break; + } + for (auto const &e : statCollector->getCurrentStats()) { + if (mdclog_level_get() >= MDCLOG_INFO) { + mdclog_write(MDCLOG_INFO, "RAN : %s sent messages : %d recived messages : %d\n", + e.ranName.c_str(), e.sentMessages, e.receivedMessages); + } + } + std::this_thread::sleep_for(std::chrono::seconds(300)); + } +} +#endif //E2_STATCOLLECTOR_H -- 2.16.6