1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
20 from rmr.rmr_mocks import rmr_mocks
25 ADM_CTRL = "admission_control_policy"
26 ADM_CTRL_POLICIES = "/a1-p/policytypes/20000/policies"
27 ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL
28 ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status"
29 ADM_CTRL_TYPE = "/a1-p/policytypes/20000"
30 TEST_TYPE = "/a1-p/policytypes/20001"
33 # http://flask.pocoo.org/docs/1.0/testing/
36 db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
37 app.app.config["TESTING"] = True
38 cl = app.app.test_client()
43 os.unlink(app.app.config["DATABASE"])
46 def _fake_dequeue(_filter_type):
48 for monkeypatching a1rmnr.dequeue_all_messages
51 pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
52 fake_msg["payload"] = pay
53 new_messages = [fake_msg]
57 def _test_put_patch(monkeypatch):
58 rmr_mocks.patch_rmr(monkeypatch)
59 monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0)) # good sends for this whole batch
61 # we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
62 def fake_alloc(_unused, _alsounused):
63 sbuf = rmr_mocks.Rmr_mbuf_t()
64 sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
67 # we also need to repatch set, since in the send function, we alloc, then set a new transid
68 def fake_set_transactionid(sbuf):
69 sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
71 # Note, we could have just patched summary, but this patches at a "lower level" so is a better test
72 monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc)
73 monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
79 def test_xapp_put_good(client, monkeypatch, adm_type_good, adm_instance_good):
80 """ test policy put good"""
83 res = client.get(ADM_CTRL_TYPE)
84 assert res.status_code == 404
87 res = client.get("/a1-p/policytypes")
88 assert res.status_code == 200
91 # instance 404 because type not there yet
92 res = client.get(ADM_CTRL_POLICIES)
93 assert res.status_code == 404
96 res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
97 assert res.status_code == 201
100 res = client.get(ADM_CTRL_TYPE)
101 assert res.status_code == 200
102 assert res.json == adm_type_good
103 res = client.get("/a1-p/policytypes")
104 assert res.status_code == 200
105 assert res.json == [20000]
107 # instance 200 but empty list
108 res = client.get(ADM_CTRL_POLICIES)
109 assert res.status_code == 200
110 assert res.json == []
112 # no instance there yet
113 res = client.get(ADM_CTRL_INSTANCE)
114 assert res.status_code == 404
115 res = client.get(ADM_CTRL_INSTANCE_STATUS)
116 assert res.status_code == 404
118 # create a good instance
119 _test_put_patch(monkeypatch)
120 res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
121 assert res.status_code == 201
123 # instance 200 and in list
124 res = client.get(ADM_CTRL_POLICIES)
125 assert res.status_code == 200
126 assert res.json == [ADM_CTRL]
129 res = client.get(ADM_CTRL_INSTANCE)
130 assert res.status_code == 200
131 assert res.json == adm_instance_good
133 # get the instance status
134 monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
135 res = client.get(ADM_CTRL_INSTANCE_STATUS)
136 assert res.status_code == 200
137 assert res.json == [{"handler_id": "test_receiver", "status": "OK"}]
139 # assert that rmr bad states don't cause problems
140 monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
141 res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
142 assert res.status_code == 201
144 monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(5))
145 res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
146 assert res.status_code == 201
149 def test_bad_instances(client, monkeypatch, adm_type_good):
151 Test bad send failures
153 rmr_mocks.patch_rmr(monkeypatch)
155 # TODO: reenable this after delete!
157 # res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
158 # assert res.status_code == 201
161 res = client.put("/a1-p/policytypes/19999", json=adm_type_good)
162 assert res.status_code == 400
163 res = client.put("/a1-p/policytypes/21024", json=adm_type_good)
164 assert res.status_code == 400
167 res = client.put(ADM_CTRL_INSTANCE, json={"not": "expected"})
168 assert res.status_code == 400
171 res = client.put(ADM_CTRL_INSTANCE, data="notajson")
172 assert res.status_code == 415
175 def test_healthcheck(client):
179 res = client.get("/a1-p/healthcheck")
180 assert res.status_code == 200