bb4ce5d382b0ec1b1ead344b030e7f3d937af6d9
[ric-plt/e2.git] / RIC-E2-TERMINATION / statCollector.h
1 /*
2  * Copyright 2020 AT&T Intellectual Property
3  * Copyright 2020 Nokia
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 //
19 // Created by adi ENZEL on 3/24/20.
20 //
21
22 #ifndef E2_STATCOLLECTOR_H
23 #define E2_STATCOLLECTOR_H
24
25 #include <unordered_map>
26 #include <mutex>
27 #include <shared_mutex>
28 #include <thread>
29 #include <string>
30 #include <iostream>
31 #include <utility>
32 #include <chrono>
33 #include <ctime>
34 #include <iomanip>
35 #include <mdclog/mdclog.h>
36
37 typedef struct statResult {
38     std::string ranName;
39     uint32_t receivedMessages;
40     uint32_t sentMessages;
41 } statResult_t ;
42
43 class StatCollector {
44
45     static std::mutex singltonMutex;
46     static std::atomic<StatCollector *> obj;
47
48 public:
49     static StatCollector* GetInstance() {
50         StatCollector* pStatCollector = obj.load(std::memory_order_acquire);
51         if (pStatCollector == nullptr) {
52             std::lock_guard<std::mutex> lock(singltonMutex);
53             pStatCollector = obj.load(std::memory_order_relaxed);
54             if (pStatCollector == nullptr) {
55                 pStatCollector = new StatCollector();
56                 obj.store(pStatCollector, std::memory_order_release);
57             }
58         }
59         return pStatCollector;
60     }
61
62     void incSentMessage(const std::string &key) {
63         increment(sentMessages, key);
64     }
65     void incRecvMessage(const std::string &key) {
66         increment(recvMessages, key);
67     }
68
69     std::vector<statResult_t> &getCurrentStats() {
70         results.clear();
71
72         for (auto const &e : recvMessages) {
73             statResult_t result {};
74             result.ranName = e.first;
75             result.receivedMessages = e.second.load(std::memory_order_acquire);
76             auto found = sentMessages.find(result.ranName);
77             if (found != sentMessages.end()) {
78                 result.sentMessages = found->second.load(std::memory_order_acquire);
79             } else {
80               result.sentMessages = 0;
81             }
82
83             results.emplace_back(result);
84         }
85         return results;
86     }
87
88     StatCollector(const StatCollector&)= delete;
89     StatCollector& operator=(const StatCollector&)= delete;
90
91 private:
92     std::unordered_map<std::string, std::atomic<int>> sentMessages;
93     std::unordered_map<std::string, std::atomic<int>> recvMessages;
94     std::vector<statResult_t> results;
95
96
97     StatCollector() = default;
98     ~StatCollector() = default;
99
100
101     void inline static increment(std::unordered_map<std::string, std::atomic<int>> &map, const std::string &key) {
102         auto found = map.find(key);
103         if (found != map.end()) { //inc
104             map[key].fetch_add(1, std::memory_order_release);
105         } else { //add
106             //sentMessages.emplace(std::make_pair(std::string(key), std::atomic<int>(0)));
107             map.emplace(std::piecewise_construct,
108                                  std::forward_as_tuple(key),
109                                  std::forward_as_tuple(0));
110         }
111
112     }
113
114 };
115
116
117 // must define this to allow StatCollector private variables to be known to compiler linker
118 std::mutex StatCollector::singltonMutex;
119 std::atomic<StatCollector *> StatCollector::obj;
120
121
122 void statColectorThread(void *runtime) {
123     bool *stop_loop = (bool *)runtime;
124     auto *statCollector = StatCollector::GetInstance();
125     std::time_t tt = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now());
126
127     struct std::tm * ptm = std::localtime(&tt);
128     std::cout << "Waiting for the next minute to begin...\n";
129     ptm->tm_min = ptm->tm_min + (5 - ptm->tm_min % 5);
130     ptm->tm_sec=0;
131
132     std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm)));
133
134 // alligned to 5 minutes
135     while (true) {
136         if (*stop_loop) {
137             break;
138         }
139         for (auto const &e : statCollector->getCurrentStats()) {
140             if (mdclog_level_get() >= MDCLOG_INFO) {
141                 mdclog_write(MDCLOG_INFO, "RAN : %s sent messages : %d recived messages : %d\n",
142                              e.ranName.c_str(), e.sentMessages, e.receivedMessages);
143             }
144         }
145         std::this_thread::sleep_for(std::chrono::seconds(300));
146     }
147 }
148 #endif //E2_STATCOLLECTOR_H