1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
20 from rmr.rmr_mocks import rmr_mocks
25 ADM_CTRL = "admission_control_policy"
26 ADM_CTRL_POLICIES = "/a1-p/policytypes/20000/policies"
27 ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL
28 ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status"
29 ADM_CTRL_TYPE = "/a1-p/policytypes/20000"
30 TEST_TYPE = "/a1-p/policytypes/20001"
33 # http://flask.pocoo.org/docs/1.0/testing/
36 db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
37 app.app.config["TESTING"] = True
38 cl = app.app.test_client()
43 os.unlink(app.app.config["DATABASE"])
46 def _fake_dequeue(_filter_type):
48 for monkeypatching a1rmnr.dequeue_all_messages with a good status
51 pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
52 fake_msg["payload"] = pay
53 new_messages = [fake_msg]
57 def _fake_dequeue_none(_filter_type):
59 for monkeypatching a1rmnr.dequeue_all_messages with no waiting messages
64 def _fake_dequeue_deleted(_filter_type):
66 for monkeypatching a1rmnr.dequeue_all_messages with a DELETED status
69 pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}'
70 fake_msg["payload"] = pay
71 new_messages = [fake_msg]
75 def _test_put_patch(monkeypatch):
76 rmr_mocks.patch_rmr(monkeypatch)
77 monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0)) # good sends for this whole batch
79 # we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
80 def fake_alloc(_unused, _alsounused):
81 sbuf = rmr_mocks.Rmr_mbuf_t()
82 sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
85 # we also need to repatch set, since in the send function, we alloc, then set a new transid
86 def fake_set_transactionid(sbuf):
87 sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
89 # Note, we could have just patched summary, but this patches at a "lower level" so is a better test
90 monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc)
91 monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
97 def test_xapp_put_good(client, monkeypatch, adm_type_good, adm_instance_good):
98 """ test policy put good"""
101 res = client.get(ADM_CTRL_TYPE)
102 assert res.status_code == 404
105 res = client.get("/a1-p/policytypes")
106 assert res.status_code == 200
107 assert res.json == []
109 # instance 404 because type not there yet
110 monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
111 res = client.get(ADM_CTRL_POLICIES)
112 assert res.status_code == 404
115 res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
116 assert res.status_code == 201
119 res = client.get(ADM_CTRL_TYPE)
120 assert res.status_code == 200
121 assert res.json == adm_type_good
122 res = client.get("/a1-p/policytypes")
123 assert res.status_code == 200
124 assert res.json == [20000]
126 # instance 200 but empty list
127 res = client.get(ADM_CTRL_POLICIES)
128 assert res.status_code == 200
129 assert res.json == []
131 # no instance there yet
132 res = client.get(ADM_CTRL_INSTANCE)
133 assert res.status_code == 404
134 res = client.get(ADM_CTRL_INSTANCE_STATUS)
135 assert res.status_code == 404
137 # create a good instance
138 _test_put_patch(monkeypatch)
139 res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
140 assert res.status_code == 202
142 # instance 200 and in list
143 res = client.get(ADM_CTRL_POLICIES)
144 assert res.status_code == 200
145 assert res.json == [ADM_CTRL]
147 def get_instance_good(expected):
149 res = client.get(ADM_CTRL_INSTANCE)
150 assert res.status_code == 200
151 assert res.json == adm_instance_good
153 # get the instance status
154 res = client.get(ADM_CTRL_INSTANCE_STATUS)
155 assert res.status_code == 200
156 assert res.get_data(as_text=True) == expected
158 # try a status get but pretend we didn't get any ACKs yet to test NOT IN EFFECT
159 monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
160 get_instance_good("NOT IN EFFECT")
162 # now pretend we did get a good ACK
163 monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
164 get_instance_good("IN EFFECT")
167 res = client.delete(ADM_CTRL_INSTANCE)
168 assert res.status_code == 202
169 res = client.delete(ADM_CTRL_INSTANCE) # should be able to do multiple deletes
170 assert res.status_code == 202
172 # status after a delete, but there are no messages yet, should still return
173 monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
174 get_instance_good("IN EFFECT")
176 # now pretend we deleted successfully
177 monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_deleted)
178 res = client.get(ADM_CTRL_INSTANCE_STATUS) # cant get status
179 assert res.status_code == 404
180 res = client.get(ADM_CTRL_INSTANCE) # cant get instance
181 assert res.status_code == 404
182 # list still 200 but no instance
183 res = client.get(ADM_CTRL_POLICIES)
184 assert res.status_code == 200
185 assert res.json == []
188 def test_xapp_put_good_bad_rmr(client, monkeypatch, adm_instance_good):
190 assert that rmr bad states don't cause problems
192 _test_put_patch(monkeypatch)
193 monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
194 res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
195 assert res.status_code == 202
197 monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(5))
198 res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
199 assert res.status_code == 202
202 def test_bad_instances(client, monkeypatch, adm_type_good):
204 Test bad send failures
206 rmr_mocks.patch_rmr(monkeypatch)
208 # TODO: reenable this after delete!
210 # res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
211 # assert res.status_code == 201
214 res = client.put("/a1-p/policytypes/19999", json=adm_type_good)
215 assert res.status_code == 400
216 res = client.put("/a1-p/policytypes/21024", json=adm_type_good)
217 assert res.status_code == 400
220 res = client.put(ADM_CTRL_INSTANCE, json={"not": "expected"})
221 assert res.status_code == 400
224 res = client.put(ADM_CTRL_INSTANCE, data="notajson")
225 assert res.status_code == 415
228 def test_healthcheck(client):
232 res = client.get("/a1-p/healthcheck")
233 assert res.status_code == 200