Merge "4.0.10 upgrade rmr to 4.02"
[ric-plt/e2.git] / RIC-E2-TERMINATION / statCollector.h
1 /*
2  * Copyright 2020 AT&T Intellectual Property
3  * Copyright 2020 Nokia
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 //
19 // Created by adi ENZEL on 3/24/20.
20 //
21
22 #ifndef E2_STATCOLLECTOR_H
23 #define E2_STATCOLLECTOR_H
24
25 #include <unordered_map>
26 #include <mutex>
27 #include <shared_mutex>
28 #include <thread>
29 #include <string>
30 #include <iostream>
31 #include <utility>
32 #include <chrono>
33 #include <ctime>
34 #include <iomanip>
35 #include <mdclog/mdclog.h>
36 //#include <tbb/concurrent_unordered_map.h>
37
38 //using namespace tbb;
39
40 typedef struct statResult {
41     std::string ranName;
42     uint32_t receivedMessages;
43     uint32_t sentMessages;
44 } statResult_t ;
45
46 class StatCollector {
47
48     static std::mutex singltonMutex;
49     static std::atomic<StatCollector *> obj;
50
51 public:
52     static StatCollector* GetInstance() {
53         StatCollector* pStatCollector = obj.load(std::memory_order_acquire);
54         if (pStatCollector == nullptr) {
55             std::lock_guard<std::mutex> lock(singltonMutex);
56             pStatCollector = obj.load(std::memory_order_relaxed);
57             if (pStatCollector == nullptr) {
58                 pStatCollector = new StatCollector();
59                 obj.store(pStatCollector, std::memory_order_release);
60             }
61         }
62         return pStatCollector;
63     }
64
65     void incSentMessage(const std::string &key) {
66         increment(sentMessages, key);
67     }
68     void incRecvMessage(const std::string &key) {
69         increment(recvMessages, key);
70     }
71
72     std::vector<statResult_t> &getCurrentStats() {
73         results.clear();
74
75         for (auto const &e : recvMessages) {
76             statResult_t result {};
77             result.ranName = e.first;
78             result.receivedMessages = e.second.load(std::memory_order_acquire);
79             auto found = sentMessages.find(result.ranName);
80             if (found != sentMessages.end()) {
81                 result.sentMessages = found->second.load(std::memory_order_acquire);
82             } else {
83               result.sentMessages = 0;
84             }
85
86             results.emplace_back(result);
87         }
88         return results;
89     }
90
91     StatCollector(const StatCollector&)= delete;
92     StatCollector& operator=(const StatCollector&)= delete;
93
94 private:
95     //tbb::concurrent_unordered_map<std::string, int> sentMessages;
96     std::unordered_map<std::string, std::atomic<int>> sentMessages;
97     std::unordered_map<std::string, std::atomic<int>> recvMessages;
98 //    tbb::concurrent_unordered_map<std::string, int> recvMessages;
99     std::vector<statResult_t> results;
100
101
102 //    StatCollector() = default;
103     StatCollector() {
104         sentMessages.clear();
105         recvMessages.clear();
106     }
107     ~StatCollector() = default;
108
109
110     void increment(std::unordered_map<std::string, std::atomic<int>> &map, const std::string &key);
111
112 };
113
114 void StatCollector::increment(std::unordered_map<std::string, std::atomic<int>> &map, const std::string &key) {
115     if (map.empty()) {
116         map.emplace(std::piecewise_construct,
117                     std::forward_as_tuple(key),
118                     std::forward_as_tuple(1));
119         return;
120     }
121     auto found = map.find(key);
122     if (found != map.end()) { //inc
123         map[key].fetch_add(1, std::memory_order_release);
124         //map[key]++;
125     } else { //add
126         //sentMessages.emplace(std::make_pair(std::string(key), std::atomic<int>(0)));
127         map.emplace(std::piecewise_construct,
128                     std::forward_as_tuple(key),
129                     std::forward_as_tuple(1));
130     }
131
132 }
133
134
135 // must define this to allow StatCollector private variables to be known to compiler linker
136 std::mutex StatCollector::singltonMutex;
137 std::atomic<StatCollector *> StatCollector::obj;
138
139
140 void statColectorThread(void *runtime) {
141     bool *stop_loop = (bool *)runtime;
142     auto *statCollector = StatCollector::GetInstance();
143     std::time_t tt = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now());
144
145     struct std::tm * ptm = std::localtime(&tt);
146     std::cout << "Waiting for the next minute to begin...\n";
147     ptm->tm_min = ptm->tm_min + (5 - ptm->tm_min % 5);
148     ptm->tm_sec=0;
149
150     std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm)));
151
152 // alligned to 5 minutes
153     while (true) {
154         if (*stop_loop) {
155             break;
156         }
157         for (auto const &e : statCollector->getCurrentStats()) {
158             if (mdclog_level_get() >= MDCLOG_INFO) {
159                 mdclog_write(MDCLOG_INFO, "RAN : %s sent messages : %d recived messages : %d\n",
160                              e.ranName.c_str(), e.sentMessages, e.receivedMessages);
161             }
162         }
163         std::this_thread::sleep_for(std::chrono::seconds(300));
164     }
165 }
166 #endif //E2_STATCOLLECTOR_H