2 * Copyright 2020 AT&T Intellectual Property
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 // Created by adi ENZEL on 3/24/20.
22 #ifndef E2_STATCOLLECTOR_H
23 #define E2_STATCOLLECTOR_H
25 #include <unordered_map>
27 #include <shared_mutex>
35 #include <mdclog/mdclog.h>
36 //#include <tbb/concurrent_unordered_map.h>
38 //using namespace tbb;
40 typedef struct statResult {
42 uint32_t receivedMessages;
43 uint32_t sentMessages;
48 static std::mutex singltonMutex;
49 static std::atomic<StatCollector *> obj;
52 static StatCollector* GetInstance() {
53 StatCollector* pStatCollector = obj.load(std::memory_order_acquire);
54 if (pStatCollector == nullptr) {
55 std::lock_guard<std::mutex> lock(singltonMutex);
56 pStatCollector = obj.load(std::memory_order_relaxed);
57 if (pStatCollector == nullptr) {
58 pStatCollector = new StatCollector();
59 obj.store(pStatCollector, std::memory_order_release);
62 return pStatCollector;
65 void incSentMessage(const std::string &key) {
66 increment(sentMessages, key);
68 void incRecvMessage(const std::string &key) {
69 increment(recvMessages, key);
72 std::vector<statResult_t> &getCurrentStats() {
75 for (auto const &e : recvMessages) {
76 statResult_t result {};
77 result.ranName = e.first;
78 result.receivedMessages = e.second.load(std::memory_order_acquire);
79 auto found = sentMessages.find(result.ranName);
80 if (found != sentMessages.end()) {
81 result.sentMessages = found->second.load(std::memory_order_acquire);
83 result.sentMessages = 0;
86 results.emplace_back(result);
91 StatCollector(const StatCollector&)= delete;
92 StatCollector& operator=(const StatCollector&)= delete;
95 //tbb::concurrent_unordered_map<std::string, int> sentMessages;
96 std::unordered_map<std::string, std::atomic<int>> sentMessages;
97 std::unordered_map<std::string, std::atomic<int>> recvMessages;
98 // tbb::concurrent_unordered_map<std::string, int> recvMessages;
99 std::vector<statResult_t> results;
102 // StatCollector() = default;
104 sentMessages.clear();
105 recvMessages.clear();
107 ~StatCollector() = default;
110 void increment(std::unordered_map<std::string, std::atomic<int>> &map, const std::string &key);
114 void StatCollector::increment(std::unordered_map<std::string, std::atomic<int>> &map, const std::string &key) {
116 map.emplace(std::piecewise_construct,
117 std::forward_as_tuple(key),
118 std::forward_as_tuple(1));
121 auto found = map.find(key);
122 if (found != map.end()) { //inc
123 map[key].fetch_add(1, std::memory_order_release);
126 //sentMessages.emplace(std::make_pair(std::string(key), std::atomic<int>(0)));
127 map.emplace(std::piecewise_construct,
128 std::forward_as_tuple(key),
129 std::forward_as_tuple(1));
135 // must define this to allow StatCollector private variables to be known to compiler linker
136 std::mutex StatCollector::singltonMutex;
137 std::atomic<StatCollector *> StatCollector::obj;
140 void statColectorThread(void *runtime) {
141 bool *stop_loop = (bool *)runtime;
142 auto *statCollector = StatCollector::GetInstance();
143 std::time_t tt = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now());
145 struct std::tm * ptm = std::localtime(&tt);
146 std::cout << "Waiting for the next minute to begin...\n";
147 ptm->tm_min = ptm->tm_min + (5 - ptm->tm_min % 5);
150 std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm)));
152 // alligned to 5 minutes
157 for (auto const &e : statCollector->getCurrentStats()) {
158 if (mdclog_level_get() >= MDCLOG_INFO) {
159 mdclog_write(MDCLOG_INFO, "RAN : %s sent messages : %d recived messages : %d\n",
160 e.ranName.c_str(), e.sentMessages, e.receivedMessages);
163 std::this_thread::sleep_for(std::chrono::seconds(300));
166 #endif //E2_STATCOLLECTOR_H