#include <string>
#include <iostream>
-using namespace std;
-
class mapWrapper {
public:
void *find(char *key) {
- shared_lock<shared_timed_mutex> read(fence);
+ std::shared_lock<std::shared_timed_mutex> read(fence);
auto entry = keyMap.find(key);
if (entry == keyMap.end()) {
return nullptr;
}
void setkey(char *key, void *val) {
- unique_lock<shared_timed_mutex> write(fence);
+ std::unique_lock<std::shared_timed_mutex> write(fence);
keyMap[key] = val;
}
void *erase(char *key) {
- unique_lock<shared_timed_mutex> write(fence);
+ std::unique_lock<std::shared_timed_mutex> write(fence);
return (void *)keyMap.erase(key);
}
void clear() {
- unique_lock<shared_timed_mutex> write(fence);
+ std::unique_lock<std::shared_timed_mutex> write(fence);
keyMap.clear();
}
- void getKeys(vector<char *> &v) {
- shared_lock<shared_timed_mutex> read(fence);
+ void getKeys(std::vector<char *> &v) {
+ std::shared_lock<std::shared_timed_mutex> read(fence);
for (auto const &e : keyMap) {
v.emplace_back((char *)e.first.c_str());
}
}
-
-
private:
- std::unordered_map<string, void *> keyMap;
- shared_timed_mutex fence;
+ std::unordered_map<std::string, void *> keyMap;
+ std::shared_timed_mutex fence;
};
#endif //E2_MAPWRAPPER_H
return 0;
}
+
+
int main(const int argc, char **argv) {
sctp_params_t sctpParams;
}
}
+ auto statFlag = false;
+ auto statThread = std::thread(statColectorThread, (void *)&statFlag);
+
//loop over term_init until first message from xApp
handleTermInit(sctpParams);
t.join();
}
+ statFlag = true;
+ statThread.join();
+
return 0;
}
rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
}
+ message.statCollector = StatCollector::GetInstance();
+
while (true) {
if (mdclog_level_get() >= MDCLOG_DEBUG) {
mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
m->erase(key);
return -1;
}
- peerInfo->sentMesgs++;
+ message.statCollector->incSentMessage(string(message.message.enodbName));
message.message.direction = 'D';
// send report.buffer of size
buildJsonMessage(message);
// get the identity of the interface
message.peerInfo = (ConnectedCU_t *)events->data.ptr;
-
+ message.statCollector = StatCollector::GetInstance();
struct timespec start{0, 0};
struct timespec decodestart{0, 0};
struct timespec end{0, 0};
mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
message.peerInfo->fileDescriptor, message.message.asnLength);
}
- message.peerInfo->rcvMsgs++;
+
memcpy(message.message.enodbName, message.peerInfo->enodbName, sizeof(message.peerInfo->enodbName));
+ message.statCollector->incRecvMessage(string(message.message.enodbName));
message.message.direction = 'U';
message.message.time.tv_nsec = ts.tv_nsec;
message.message.time.tv_sec = ts.tv_sec;
--- /dev/null
+/*
+ * Copyright 2020 AT&T Intellectual Property
+ * Copyright 2020 Nokia
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// Created by adi ENZEL on 3/24/20.
+//
+
+#ifndef E2_STATCOLLECTOR_H
+#define E2_STATCOLLECTOR_H
+
+#include <unordered_map>
+#include <mutex>
+#include <shared_mutex>
+#include <thread>
+#include <string>
+#include <iostream>
+#include <utility>
+#include <chrono>
+#include <ctime>
+#include <iomanip>
+#include <mdclog/mdclog.h>
+
+typedef struct statResult {
+ std::string ranName;
+ uint32_t receivedMessages;
+ uint32_t sentMessages;
+} statResult_t ;
+
+class StatCollector {
+
+ static std::mutex singltonMutex;
+ static std::atomic<StatCollector *> obj;
+
+public:
+ static StatCollector* GetInstance() {
+ StatCollector* pStatCollector = obj.load(std::memory_order_acquire);
+ if (pStatCollector == nullptr) {
+ std::lock_guard<std::mutex> lock(singltonMutex);
+ pStatCollector = obj.load(std::memory_order_relaxed);
+ if (pStatCollector == nullptr) {
+ pStatCollector = new StatCollector();
+ obj.store(pStatCollector, std::memory_order_release);
+ }
+ }
+ return pStatCollector;
+ }
+
+ void incSentMessage(const std::string &key) {
+ increment(sentMessages, key);
+ }
+ void incRecvMessage(const std::string &key) {
+ increment(recvMessages, key);
+ }
+
+ std::vector<statResult_t> &getCurrentStats() {
+ results.clear();
+
+ for (auto const &e : recvMessages) {
+ statResult_t result {};
+ result.ranName = e.first;
+ result.receivedMessages = e.second.load(std::memory_order_acquire);
+ auto found = sentMessages.find(result.ranName);
+ if (found != sentMessages.end()) {
+ result.sentMessages = found->second.load(std::memory_order_acquire);
+ } else {
+ result.sentMessages = 0;
+ }
+
+ results.emplace_back(result);
+ }
+ return results;
+ }
+
+ StatCollector(const StatCollector&)= delete;
+ StatCollector& operator=(const StatCollector&)= delete;
+
+private:
+ std::unordered_map<std::string, std::atomic<int>> sentMessages;
+ std::unordered_map<std::string, std::atomic<int>> recvMessages;
+ std::vector<statResult_t> results;
+
+
+ StatCollector() = default;
+ ~StatCollector() = default;
+
+
+ void inline static increment(std::unordered_map<std::string, std::atomic<int>> &map, const std::string &key) {
+ auto found = map.find(key);
+ if (found != map.end()) { //inc
+ map[key].fetch_add(1, std::memory_order_release);
+ } else { //add
+ //sentMessages.emplace(std::make_pair(std::string(key), std::atomic<int>(0)));
+ map.emplace(std::piecewise_construct,
+ std::forward_as_tuple(key),
+ std::forward_as_tuple(0));
+ }
+
+ }
+
+};
+
+
+// must define this to allow StatCollector private variables to be known to compiler linker
+std::mutex StatCollector::singltonMutex;
+std::atomic<StatCollector *> StatCollector::obj;
+
+
+void statColectorThread(void *runtime) {
+ bool *stop_loop = (bool *)runtime;
+ auto *statCollector = StatCollector::GetInstance();
+ std::time_t tt = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now());
+
+ struct std::tm * ptm = std::localtime(&tt);
+ std::cout << "Waiting for the next minute to begin...\n";
+ ptm->tm_min = ptm->tm_min + (5 - ptm->tm_min % 5);
+ ptm->tm_sec=0;
+
+ std::this_thread::sleep_until(std::chrono::system_clock::from_time_t(mktime(ptm)));
+
+// alligned to 5 minutes
+ while (true) {
+ if (*stop_loop) {
+ break;
+ }
+ for (auto const &e : statCollector->getCurrentStats()) {
+ if (mdclog_level_get() >= MDCLOG_INFO) {
+ mdclog_write(MDCLOG_INFO, "RAN : %s sent messages : %d recived messages : %d\n",
+ e.ranName.c_str(), e.sentMessages, e.receivedMessages);
+ }
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(300));
+ }
+}
+#endif //E2_STATCOLLECTOR_H