2 import ricxappframe.xapp_rest
6 proxies = {"http": "", "https": ""} # disable proxy usage
7 return requests.get(url, proxies=proxies)
10 def doDeleteRequest(url):
11 proxies = {"http": "", "https": ""} # disable proxy usage
12 return requests.delete(url, proxies=proxies)
15 def doPostRequest(url, data):
16 proxies = {"http": "", "https": ""} # disable proxy usage
17 return requests.post(url, data, proxies=proxies)
20 def respPostHandler(name, path, data, ctype):
21 response = ricxappframe.xapp_rest.initResponse()
22 response['payload'] = data.decode("utf-8")
26 def respGetHandler(name, path, data, ctype):
27 response = ricxappframe.xapp_rest.initResponse()
28 response['payload'] = ('{ "Testitem": "Testdata"}')
32 def respDeleteHandler(name, path, data, ctype):
33 response = ricxappframe.xapp_rest.initResponse()
34 response['payload'] = None
35 response['status'] = 204
39 def respGetEmptyHandler(name, path, data, ctype):
40 response = ricxappframe.xapp_rest.initResponse()
41 response['payload'] = None
42 response['status'] = 204
46 def test_subscribe(monkeypatch):
48 server = ricxappframe.xapp_rest.ThreadedHTTPServer("127.0.0.1", 18088)
49 # trick to get the own handler with defined
50 server.handler.add_handler(server.handler, "GET", "get", "/ric/v1/subscriptions", respGetHandler)
51 server.handler.add_handler(server.handler, "GET", "getempty", "/ric/v1/empty", respGetEmptyHandler)
52 server.handler.add_handler(server.handler, "POST", "post", "/ric/v1", respPostHandler)
53 server.handler.add_handler(server.handler, "DELETE", "delete", "/ric/v1/delete", respDeleteHandler)
56 resp = doGetRequest('http://127.0.0.1:18088/ric/v1/subscriptions')
57 assert resp.text == '{ "Testitem": "Testdata"}'
58 assert resp.status_code == 200
60 resp = doGetRequest('http://127.0.0.1:18088/ric/v1/empty')
61 assert resp.text == ""
62 assert resp.status_code == 204
64 resp = doPostRequest('http://127.0.0.1:18088/ric/v1', '{"Testdataitem": "foobar"}')
65 assert resp.text == '{"Testdataitem": "foobar"}'
66 assert resp.status_code == 200
68 resp = doDeleteRequest('http://127.0.0.1:18088/ric/v1/delete')
69 assert resp.text == ""
70 assert resp.status_code == 204
72 resp = doGetRequest('http://127.0.0.1:18088/ricci/v1/subscriptions')
73 assert resp.text == ""
74 assert resp.status_code == 404