version 4.0.3 50/2950/1
authoraa7133@att.com <aa7133@att.com>
Tue, 24 Mar 2020 16:01:51 +0000 (18:01 +0200)
committeraa7133@att.com <aa7133@att.com>
Tue, 24 Mar 2020 16:02:14 +0000 (18:02 +0200)
start internal stats

Change-Id: I521f71e35c49f0d7f9096eac60ff61b64fdda4c0
Signed-off-by: aa7133@att.com <aa7133@att.com>
.gitignore
RIC-E2-TERMINATION/container-tag.yaml
RIC-E2-TERMINATION/mapWrapper.h
RIC-E2-TERMINATION/sctpThread.cpp
RIC-E2-TERMINATION/sctpThread.h
RIC-E2-TERMINATION/statCollector.h [new file with mode: 0644]

index 3801b98..ce363a4 100644 (file)
@@ -56,3 +56,4 @@ RIC-E2-TERMINATION/3rdparty/log/
 RIC-E2-TERMINATION/3rdparty/rapidjson/
 /RIC-E2-TERMINATION/3rdparty/int/
 /RIC-E2-TERMINATION/3rdparty/int/autogen/licenses/codevbak.txt
+!/RIC-E2-TERMINATION/3rdparty/log/3rdparty/googletest/
index f80bc61..968d713 100644 (file)
@@ -1,3 +1,3 @@
 # The Jenkins job requires a tag to build the Docker image.
 # Global-JJB script assumes this file is in the repo root.
-tag: 4.0.2
+tag: 4.0.3
index a219d56..ffa0eb3 100644 (file)
 #include <string>
 #include <iostream>
 
-using namespace std;
-
 class mapWrapper {
 public:
     void *find(char *key) {
-        shared_lock<shared_timed_mutex> read(fence);
+        std::shared_lock<std::shared_timed_mutex> read(fence);
         auto entry = keyMap.find(key);
         if (entry == keyMap.end()) {
             return nullptr;
@@ -49,32 +47,30 @@ public:
     }
 
     void setkey(char *key, void *val) {
-        unique_lock<shared_timed_mutex> write(fence);
+        std::unique_lock<std::shared_timed_mutex> write(fence);
         keyMap[key] = val;
     }
 
     void *erase(char *key) {
-        unique_lock<shared_timed_mutex> write(fence);
+        std::unique_lock<std::shared_timed_mutex> write(fence);
         return (void *)keyMap.erase(key);
     }
 
     void clear() {
-        unique_lock<shared_timed_mutex> write(fence);
+        std::unique_lock<std::shared_timed_mutex> write(fence);
         keyMap.clear();
     }
 
-    void getKeys(vector<char *> &v) {
-        shared_lock<shared_timed_mutex> read(fence);
+    void getKeys(std::vector<char *> &v) {
+        std::shared_lock<std::shared_timed_mutex> read(fence);
         for (auto const &e : keyMap) {
             v.emplace_back((char *)e.first.c_str());
         }
     }
 
-
-
 private:
-    std::unordered_map<string, void *> keyMap;
-    shared_timed_mutex fence;
+    std::unordered_map<std::string, void *> keyMap;
+    std::shared_timed_mutex fence;
 
 };
 #endif //E2_MAPWRAPPER_H
index a271d05..9fc72b4 100644 (file)
@@ -277,6 +277,8 @@ int buildConfiguration(sctp_params_t &sctpParams) {
     return 0;
 }
 
+
+
 int main(const int argc, char **argv) {
     sctp_params_t sctpParams;
 
@@ -363,6 +365,9 @@ int main(const int argc, char **argv) {
         }
     }
 
+    auto statFlag = false;
+    auto statThread = std::thread(statColectorThread, (void *)&statFlag);
+
     //loop over term_init until first message from xApp
     handleTermInit(sctpParams);
 
@@ -370,6 +375,9 @@ int main(const int argc, char **argv) {
         t.join();
     }
 
+    statFlag = true;
+    statThread.join();
+
     return 0;
 }
 
@@ -544,6 +552,8 @@ void listener(sctp_params_t *params) {
         rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
     }
 
+    message.statCollector = StatCollector::GetInstance();
+
     while (true) {
         if (mdclog_level_get() >= MDCLOG_DEBUG) {
             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
@@ -963,7 +973,7 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_
             m->erase(key);
             return -1;
         }
-        peerInfo->sentMesgs++;
+        message.statCollector->incSentMessage(string(message.message.enodbName));
         message.message.direction = 'D';
         // send report.buffer of size
         buildJsonMessage(message);
@@ -1021,7 +1031,7 @@ int receiveDataFromSctp(struct epoll_event *events,
     // get the identity of the interface
     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
 
-
+    message.statCollector = StatCollector::GetInstance();
     struct timespec start{0, 0};
     struct timespec decodestart{0, 0};
     struct timespec end{0, 0};
@@ -1043,8 +1053,9 @@ int receiveDataFromSctp(struct epoll_event *events,
             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
                     message.peerInfo->fileDescriptor, message.message.asnLength);
         }
-        message.peerInfo->rcvMsgs++;
+
         memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
+        message.statCollector->incRecvMessage(string(message.message.enodbName));
         message.message.direction = 'U';
         message.message.time.tv_nsec = ts.tv_nsec;
         message.message.time.tv_sec = ts.tv_sec;
index f6b9752..637dd17 100644 (file)
 #include "cxxopts.hpp"
 //#include "config-cpp/include/config-cpp/config-cpp.h"
 
-#ifdef __TRACING__
-#include "openTracing.h"
-#endif
 
 #include "mapWrapper.h"
+#include "statCollector.h"
 
 #include "base64.h"
 
@@ -114,6 +112,7 @@ namespace expr = boost::log::expressions;
 typedef mapWrapper Sctp_Map_t;
 
 
+
 #define VOLUME_URL_SIZE 256
 #define KA_MESSAGE_SIZE 2048
 
@@ -147,8 +146,6 @@ typedef struct ConnectedCU {
     char portNumber[NI_MAXSERV] {};
     char enodbName[MAX_ENODB_NAME_SIZE] {};
     char asnData[RECEIVE_SCTP_BUFFER_SIZE] {};
-    int rcvMsgs = 0;
-    int sentMesgs = 0;
     size_t asnLength = 0;
     int mtype = 0;
     bool isConnected = false;
@@ -182,6 +179,7 @@ typedef struct ReportingMessages {
     long outLen = 0;
     unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {};
     char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {};
+    StatCollector *statCollector = nullptr;
 } ReportingMessages_t;
 
 cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams);
diff --git a/RIC-E2-TERMINATION/statCollector.h b/RIC-E2-TERMINATION/statCollector.h
new file mode 100644 (file)
index 0000000..bb4ce5d
--- /dev/null
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2020 AT&T Intellectual Property
+ * Copyright 2020 Nokia
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// Created by adi ENZEL on 3/24/20.
+//
+
+#ifndef E2_STATCOLLECTOR_H
+#define E2_STATCOLLECTOR_H
+
+#include <unordered_map>
+#include <mutex>
+#include <shared_mutex>
+#include <thread>
+#include <string>
+#include <iostream>
+#include <utility>
+#include <chrono>
+#include <ctime>
+#include <iomanip>
+#include <mdclog/mdclog.h>
+
+typedef struct statResult {
+    std::string ranName;
+    uint32_t receivedMessages;
+    uint32_t sentMessages;
+} statResult_t ;
+
+class StatCollector {
+
+    static std::mutex singltonMutex;
+    static std::atomic<StatCollector *> obj;
+
+public:
+    static StatCollector* GetInstance() {
+        StatCollector* pStatCollector = obj.load(std::memory_order_acquire);
+        if (pStatCollector == nullptr) {
+            std::lock_guard<std::mutex> lock(singltonMutex);
+            pStatCollector = obj.load(std::memory_order_relaxed);
+            if (pStatCollector == nullptr) {
+                pStatCollector = new StatCollector();
+                obj.store(pStatCollector, std::memory_order_release);
+            }
+        }
+        return pStatCollector;
+    }
+
+    void incSentMessage(const std::string &key) {
+        increment(sentMessages, key);
+    }
+    void incRecvMessage(const std::string &key) {
+        increment(recvMessages, key);
+    }
+
+    std::vector<statResult_t> &getCurrentStats() {
+        results.clear();
+
+        for (auto const &e : recvMessages) {
+            statResult_t result {};
+            result.ranName = e.first;
+            result.receivedMessages = e.second.load(std::memory_order_acquire);
+            auto found = sentMessages.find(result.ranName);
+            if (found != sentMessages.end()) {
+                result.sentMessages = found->second.load(std::memory_order_acquire);
+            } else {
+              result.sentMessages = 0;
+            }
+
+            results.emplace_back(result);
+        }
+        return results;
+    }
+
+    StatCollector(const StatCollector&)= delete;
+    StatCollector& operator=(const StatCollector&)= delete;
+
+private:
+    std::unordered_map<std::string, std::atomic<int>> sentMessages;
+    std::unordered_map<std::string, std::atomic<int>> recvMessages;
+    std::vector<statResult_t> results;
+
+
+    StatCollector() = default;
+    ~StatCollector() = default;
+
+
+    void inline static increment(std::unordered_map<std::string, std::atomic<int>> &map, const std::string &key) {
+        auto found = map.find(key);
+        if (found != map.end()) { //inc
+            map[key].fetch_add(1, std::memory_order_release);
+        } else { //add
+            //sentMessages.emplace(std::make_pair(std::string(key), std::atomic<int>(0)));
+            map.emplace(std::piecewise_construct,
+                                 std::forward_as_tuple(key),
+                                 std::forward_as_tuple(0));
+        }
+
+    }
+
+};
+
+
+// must define this to allow StatCollector private variables to be known to compiler linker
+std::mutex StatCollector::singltonMutex;
+std::atomic<StatCollector *> StatCollector::obj;
+
+
+void statColectorThread(void *runtime) {
+    bool *stop_loop = (bool *)runtime;
+    auto *statCollector = StatCollector::GetInstance();
+    std::time_t tt = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now());
+
+    struct std::tm * ptm = std::localtime(&tt);
+    std::cout << "Waiting for the next minute to begin...\n";
+    ptm->tm_min = ptm->tm_min + (5 - ptm->tm_min % 5);
+    ptm->tm_sec=0;
+
+    std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm)));
+
+// alligned to 5 minutes
+    while (true) {
+        if (*stop_loop) {
+            break;
+        }
+        for (auto const &e : statCollector->getCurrentStats()) {
+            if (mdclog_level_get() >= MDCLOG_INFO) {
+                mdclog_write(MDCLOG_INFO, "RAN : %s sent messages : %d recived messages : %d\n",
+                             e.ranName.c_str(), e.sentMessages, e.receivedMessages);
+            }
+        }
+        std::this_thread::sleep_for(std::chrono::seconds(300));
+    }
+}
+#endif //E2_STATCOLLECTOR_H