3 from zipfile import ZipFile
4 from requests import Response as response
5 from ricxappframe.xapp_symptomdata import Symptomdata
8 def new_loaddata(*args, **kwargs):
9 # Your custom testing override
13 class MockResponse(object):
14 def __init__(self, reponse, jsonout):
15 self.status_code = response
16 self.url = 'http://lwsd.ricplt:8089/ric/v1/lwsd'
17 self.headers = {'Content-type': 'application/json'}
18 self.jsonout = jsonout
23 def raise_for_status(self):
24 return self.status_code
27 class MockStat(object):
28 def __init__(self, st_ctime=None):
29 self.st_ctime = st_ctime
33 def __init__(self, walk=None, st_ctime=None):
35 self.st_ctime = st_ctime
40 def stat(self, filename):
41 st = MockStat(self.st_ctime)
45 def test_symptomdata_subscribe(monkeypatch):
46 def mock_requests_post(uri, data, headers, proxies):
47 print("%s %s" % (uri, data))
48 return MockResponse(200, [{'service': 'xapp.service'}])
50 def mock_requests_get(uri, headers, proxies):
52 return MockResponse(200, [])
54 # mock the http get and post
55 monkeypatch.setattr(requests, 'post', mock_requests_post)
56 monkeypatch.setattr(requests, 'get', mock_requests_get)
58 # this will return not found
59 s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
64 assert s.lwsdok is True
67 def test_symptomdata_subscribe_exists(monkeypatch):
68 def mock_requests_get(uri, headers, proxies):
70 return MockResponse(200, [{'service': 'xapp_other'}, {'service': 'xapp'}])
73 monkeypatch.setattr(requests, 'get', mock_requests_get)
75 # this will return not found
76 s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
79 assert s.lwsdok is True
82 def test_symptomdata_collect_time(monkeypatch):
83 myos = MockOs(walk=[('mydir', (), ('file1.csv', 'file2.csv', 'file3.txt', 'file.json'))], st_ctime=1647502471)
85 def mock_requests_get(uri, headers, proxies):
86 return MockResponse(200, [{'service': 'xapp_other'}, {'service': 'xapp'}])
88 def mock_os_walk(path):
89 return myos.walk(path)
91 def mock_os_stat(filename):
92 return myos.stat(filename)
94 def mock_zipfile_write(me, fromfile, tofile):
98 monkeypatch.setattr(requests, 'get', mock_requests_get)
100 monkeypatch.setattr(os, 'walk', mock_os_walk)
102 monkeypatch.setattr(os, 'stat', mock_os_stat)
104 # mock the zipfile stat
105 monkeypatch.setattr(ZipFile, 'write', mock_zipfile_write)
107 # this will return not found
108 s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
111 assert s.lwsdok is True
113 zipfile = s.collect("zipfile.zip", (r'/tmp/csv/.*\.csv', r'/tmp/json/.*\.json'), 1647502470, 1647502570)
114 assert zipfile is not None
117 def test_symptomdata_collect(monkeypatch):
118 myos = MockOs(walk=[('mydir', (), ('file1.csv', 'file2.csv', 'file3.txt', 'file.json'))], st_ctime=1647502471)
120 def mock_requests_get(uri, headers, proxies):
121 return MockResponse(200, [{'service': 'xapp_other'}, {'service': 'xapp'}])
123 def mock_os_walk(path):
124 return myos.walk(path)
126 def mock_os_stat(filename):
127 return myos.stat(filename)
129 def mock_zipfile_write(me, fromfile, tofile):
133 monkeypatch.setattr(requests, 'get', mock_requests_get)
135 monkeypatch.setattr(os, 'walk', mock_os_walk)
137 monkeypatch.setattr(os, 'stat', mock_os_stat)
139 # mock the zipfile stat
140 monkeypatch.setattr(ZipFile, 'write', mock_zipfile_write)
142 # this will return not found
143 s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
146 assert s.lwsdok is True
148 zipfile = s.collect("zipfile.zip", ('/tmp/csv/.*.csv', '/tmp/json/.*.json'), 0, 0)
149 assert zipfile is not None