2 * Copyright 2020 AT&T Intellectual Property
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 // Created by adi ENZEL on 3/24/20.
22 #ifndef E2_STATCOLLECTOR_H
23 #define E2_STATCOLLECTOR_H
25 #include <unordered_map>
27 #include <shared_mutex>
35 #include <mdclog/mdclog.h>
36 #include <tbb/concurrent_unordered_map.h>
40 typedef struct statResult {
42 uint32_t receivedMessages;
43 uint32_t sentMessages;
48 static std::mutex singltonMutex;
49 static std::atomic<StatCollector *> obj;
52 static StatCollector* GetInstance() {
53 StatCollector* pStatCollector = obj.load(std::memory_order_acquire);
54 if (pStatCollector == nullptr) {
55 std::lock_guard<std::mutex> lock(singltonMutex);
56 pStatCollector = obj.load(std::memory_order_relaxed);
57 if (pStatCollector == nullptr) {
58 pStatCollector = new StatCollector();
59 obj.store(pStatCollector, std::memory_order_release);
62 return pStatCollector;
65 void incSentMessage(const std::string &key) {
66 increment(sentMessages, key);
68 void incRecvMessage(const std::string &key) {
69 increment(recvMessages, key);
72 std::vector<statResult_t> &getCurrentStats() {
75 for (auto const &e : recvMessages) {
76 statResult_t result {};
77 result.ranName = e.first;
78 result.receivedMessages = e.second;
79 auto found = sentMessages.find(result.ranName);
80 if (found != sentMessages.end()) {
81 result.sentMessages = found->second;
83 result.sentMessages = 0;
86 results.emplace_back(result);
91 StatCollector(const StatCollector&)= delete;
92 StatCollector& operator=(const StatCollector&)= delete;
95 tbb::concurrent_unordered_map<std::string, int> sentMessages;
96 //std::unordered_map<std::string, int> sentMessages;
97 //std::unordered_map<std::string, int> recvMessages;
98 tbb::concurrent_unordered_map<std::string, int> recvMessages;
99 std::vector<statResult_t> results;
102 // StatCollector() = default;
104 sentMessages.clear();
105 recvMessages.clear();
107 ~StatCollector() = default;
110 void increment(tbb::concurrent_unordered_map<std::string, int> &map, const std::string &key);
114 void StatCollector::increment(tbb::concurrent_unordered_map<std::string, int> &map, const std::string &key) {
115 if (mdclog_level_get() >= MDCLOG_DEBUG) {
116 mdclog_write(MDCLOG_INFO, "in file %s at finction %s in line %d", __FILE__, __func__, __LINE__);
119 if (mdclog_level_get() >= MDCLOG_DEBUG) {
120 mdclog_write(MDCLOG_INFO, "in file %s at finction %s in line %d", __FILE__, __func__, __LINE__);
122 map.emplace(std::piecewise_construct,
123 std::forward_as_tuple(key),
124 std::forward_as_tuple(1));
125 if (mdclog_level_get() >= MDCLOG_DEBUG) {
126 mdclog_write(MDCLOG_INFO, "in file %s at finction %s in line %d", __FILE__, __func__, __LINE__);
130 if (mdclog_level_get() >= MDCLOG_DEBUG) {
131 mdclog_write(MDCLOG_INFO, "in file %s at finction %s in line %d", __FILE__, __func__, __LINE__);
133 auto found = map.find(key);
134 if (mdclog_level_get() >= MDCLOG_DEBUG) {
135 mdclog_write(MDCLOG_INFO, "in file %s at finction %s in line %d", __FILE__, __func__, __LINE__);
137 if (found != map.end()) { //inc
138 if (mdclog_level_get() >= MDCLOG_DEBUG) {
139 mdclog_write(MDCLOG_INFO, "in file %s at finction %s in line %d", __FILE__, __func__, __LINE__);
143 //sentMessages.emplace(std::make_pair(std::string(key), std::atomic<int>(0)));
144 if (mdclog_level_get() >= MDCLOG_DEBUG) {
145 mdclog_write(MDCLOG_INFO, "in file %s at finction %s in line %d", __FILE__, __func__, __LINE__);
148 map.emplace(std::piecewise_construct,
149 std::forward_as_tuple(key),
150 std::forward_as_tuple(1));
151 if (mdclog_level_get() >= MDCLOG_DEBUG) {
152 mdclog_write(MDCLOG_INFO, "in file %s at finction %s in line %d", __FILE__, __func__, __LINE__);
159 // must define this to allow StatCollector private variables to be known to compiler linker
160 std::mutex StatCollector::singltonMutex;
161 std::atomic<StatCollector *> StatCollector::obj;
164 void statColectorThread(void *runtime) {
165 bool *stop_loop = (bool *)runtime;
166 auto *statCollector = StatCollector::GetInstance();
167 std::time_t tt = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now());
169 struct std::tm * ptm = std::localtime(&tt);
170 std::cout << "Waiting for the next minute to begin...\n";
171 ptm->tm_min = ptm->tm_min + (5 - ptm->tm_min % 5);
174 std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm)));
176 // alligned to 5 minutes
181 for (auto const &e : statCollector->getCurrentStats()) {
182 if (mdclog_level_get() >= MDCLOG_INFO) {
183 mdclog_write(MDCLOG_INFO, "RAN : %s sent messages : %d recived messages : %d\n",
184 e.ranName.c_str(), e.sentMessages, e.receivedMessages);
187 std::this_thread::sleep_for(std::chrono::seconds(300));
190 #endif //E2_STATCOLLECTOR_H