2 * Copyright 2020 AT&T Intellectual Property
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 // Created by adi ENZEL on 3/24/20.
22 #ifndef E2_STATCOLLECTOR_H
23 #define E2_STATCOLLECTOR_H
25 #include <unordered_map>
27 #include <shared_mutex>
35 #include <mdclog/mdclog.h>
37 typedef struct statResult {
39 uint32_t receivedMessages;
40 uint32_t sentMessages;
45 static std::mutex singltonMutex;
46 static std::atomic<StatCollector *> obj;
49 static StatCollector* GetInstance() {
50 StatCollector* pStatCollector = obj.load(std::memory_order_acquire);
51 if (pStatCollector == nullptr) {
52 std::lock_guard<std::mutex> lock(singltonMutex);
53 pStatCollector = obj.load(std::memory_order_relaxed);
54 if (pStatCollector == nullptr) {
55 pStatCollector = new StatCollector();
56 obj.store(pStatCollector, std::memory_order_release);
59 return pStatCollector;
62 void incSentMessage(const std::string &key) {
63 increment(sentMessages, key);
65 void incRecvMessage(const std::string &key) {
66 increment(recvMessages, key);
69 std::vector<statResult_t> &getCurrentStats() {
72 for (auto const &e : recvMessages) {
73 statResult_t result {};
74 result.ranName = e.first;
75 result.receivedMessages = e.second.load(std::memory_order_acquire);
76 auto found = sentMessages.find(result.ranName);
77 if (found != sentMessages.end()) {
78 result.sentMessages = found->second.load(std::memory_order_acquire);
80 result.sentMessages = 0;
83 results.emplace_back(result);
88 StatCollector(const StatCollector&)= delete;
89 StatCollector& operator=(const StatCollector&)= delete;
92 std::unordered_map<std::string, std::atomic<int>> sentMessages;
93 std::unordered_map<std::string, std::atomic<int>> recvMessages;
94 std::vector<statResult_t> results;
97 StatCollector() = default;
98 ~StatCollector() = default;
101 void inline static increment(std::unordered_map<std::string, std::atomic<int>> &map, const std::string &key) {
102 auto found = map.find(key);
103 if (found != map.end()) { //inc
104 map[key].fetch_add(1, std::memory_order_release);
106 //sentMessages.emplace(std::make_pair(std::string(key), std::atomic<int>(0)));
107 map.emplace(std::piecewise_construct,
108 std::forward_as_tuple(key),
109 std::forward_as_tuple(0));
117 // must define this to allow StatCollector private variables to be known to compiler linker
118 std::mutex StatCollector::singltonMutex;
119 std::atomic<StatCollector *> StatCollector::obj;
122 void statColectorThread(void *runtime) {
123 bool *stop_loop = (bool *)runtime;
124 auto *statCollector = StatCollector::GetInstance();
125 std::time_t tt = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now());
127 struct std::tm * ptm = std::localtime(&tt);
128 std::cout << "Waiting for the next minute to begin...\n";
129 ptm->tm_min = ptm->tm_min + (5 - ptm->tm_min % 5);
132 std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm)));
134 // alligned to 5 minutes
139 for (auto const &e : statCollector->getCurrentStats()) {
140 if (mdclog_level_get() >= MDCLOG_INFO) {
141 mdclog_write(MDCLOG_INFO, "RAN : %s sent messages : %d recived messages : %d\n",
142 e.ranName.c_str(), e.sentMessages, e.receivedMessages);
145 std::this_thread::sleep_for(std::chrono::seconds(300));
148 #endif //E2_STATCOLLECTOR_H