RIC-642 related changes: REST subscription, rnib enhancements, symptomdata, rest...
[ric-plt/xapp-frame-py.git] / tests / test_symptomdata.py
1 import os
2 import requests
3 from zipfile import ZipFile
4 from requests import Response as response
5 from ricxappframe.xapp_symptomdata import Symptomdata
6
7
8 def new_loaddata(*args, **kwargs):
9     # Your custom testing override
10     return ""
11
12
13 class MockResponse(object):
14     def __init__(self, reponse, jsonout):
15         self.status_code = response
16         self.url = 'http://lwsd.ricplt:8089/ric/v1/lwsd'
17         self.headers = {'Content-type': 'application/json'}
18         self.jsonout = jsonout
19
20     def json(self):
21         return self.jsonout
22
23     def raise_for_status(self):
24         return self.status_code
25
26
27 class MockStat(object):
28     def __init__(self, st_ctime=None):
29         self.st_ctime = st_ctime
30
31
32 class MockOs(object):
33     def __init__(self, walk=None, st_ctime=None):
34         self.walkresp = walk
35         self.st_ctime = st_ctime
36
37     def walk(self, path):
38         return self.walkresp
39
40     def stat(self, filename):
41         st = MockStat(self.st_ctime)
42         return st
43
44
45 def test_symptomdata_subscribe(monkeypatch):
46     def mock_requests_post(uri, data, headers, proxies):
47         print("%s %s" % (uri, data))
48         return MockResponse(200, [{'service': 'xapp.service'}])
49
50     def mock_requests_get(uri, headers, proxies):
51         print("%s" % (uri))
52         return MockResponse(200, [])
53
54     # mock the http get and post
55     monkeypatch.setattr(requests, 'post', mock_requests_post)
56     monkeypatch.setattr(requests, 'get', mock_requests_get)
57
58     # this will return not found
59     s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
60     # stop timer loop
61     s.stop()
62     # make subscription
63     s.subscribe(None)
64     assert s.lwsdok is True
65
66
67 def test_symptomdata_subscribe_exists(monkeypatch):
68     def mock_requests_get(uri, headers, proxies):
69         print("%s" % (uri))
70         return MockResponse(200, [{'service': 'xapp_other'}, {'service': 'xapp'}])
71
72     # mock the http get
73     monkeypatch.setattr(requests, 'get', mock_requests_get)
74
75     # this will return not found
76     s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
77     # stop timer loop
78     s.stop()
79     assert s.lwsdok is True
80
81
82 def test_symptomdata_collect_time(monkeypatch):
83     myos = MockOs(walk=[('mydir', (), ('file1.csv', 'file2.csv', 'file3.txt', 'file.json'))], st_ctime=1647502471)
84
85     def mock_requests_get(uri, headers, proxies):
86         return MockResponse(200, [{'service': 'xapp_other'}, {'service': 'xapp'}])
87
88     def mock_os_walk(path):
89         return myos.walk(path)
90
91     def mock_os_stat(filename):
92         return myos.stat(filename)
93
94     def mock_zipfile_write(me, fromfile, tofile):
95         return
96
97     # mock the http get
98     monkeypatch.setattr(requests, 'get', mock_requests_get)
99     # mock the os walk
100     monkeypatch.setattr(os, 'walk', mock_os_walk)
101     # mock the os stat
102     monkeypatch.setattr(os, 'stat', mock_os_stat)
103
104     # mock the zipfile stat
105     monkeypatch.setattr(ZipFile, 'write', mock_zipfile_write)
106
107     # this will return not found
108     s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
109     # stop timer loop
110     s.stop()
111     assert s.lwsdok is True
112
113     zipfile = s.collect("zipfile.zip", (r'/tmp/csv/.*\.csv', r'/tmp/json/.*\.json'), 1647502470, 1647502570)
114     assert zipfile is not None
115
116
117 def test_symptomdata_collect(monkeypatch):
118     myos = MockOs(walk=[('mydir', (), ('file1.csv', 'file2.csv', 'file3.txt', 'file.json'))], st_ctime=1647502471)
119
120     def mock_requests_get(uri, headers, proxies):
121         return MockResponse(200, [{'service': 'xapp_other'}, {'service': 'xapp'}])
122
123     def mock_os_walk(path):
124         return myos.walk(path)
125
126     def mock_os_stat(filename):
127         return myos.stat(filename)
128
129     def mock_zipfile_write(me, fromfile, tofile):
130         return
131
132     # mock the http get
133     monkeypatch.setattr(requests, 'get', mock_requests_get)
134     # mock the os walk
135     monkeypatch.setattr(os, 'walk', mock_os_walk)
136     # mock the os stat
137     monkeypatch.setattr(os, 'stat', mock_os_stat)
138
139     # mock the zipfile stat
140     monkeypatch.setattr(ZipFile, 'write', mock_zipfile_write)
141
142     # this will return not found
143     s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
144     # stop timer loop
145     s.stop()
146     assert s.lwsdok is True
147
148     zipfile = s.collect("zipfile.zip", ('/tmp/csv/.*.csv', '/tmp/json/.*.json'), 0, 0)
149     assert zipfile is not None