1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
19 from rmr.rmr_mocks import rmr_mocks
21 from a1 import exceptions
23 import testing_helpers
26 # http://flask.pocoo.org/docs/1.0/testing/
29 db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
30 app.app.config["TESTING"] = True
31 cl = app.app.test_client()
36 os.unlink(app.app.config["DATABASE"])
41 msg_payload={"status": "SUCCESS", "foo": "bar"},
45 unexpected_first=True,
48 generates a mock rmr message response (returns a function that does; uses closures to set params)
51 # stick a message we don't want at the front of the queue, then stick the message we want
53 monkeypatch.setattr("rmr.rmr.rmr_torcv_msg", rmr_mocks.rcv_mock_generator(msg_payload, -1, msg_state, jsonb))
54 sbuf = rmr.rmr_alloc_msg(None, None)
55 sbuf = rmr.rmr_torcv_msg(None, sbuf, None)
56 summary = rmr.message_summary(sbuf)
57 new_messages.append(summary)
59 monkeypatch.setattr("rmr.rmr.rmr_torcv_msg", rmr_mocks.rcv_mock_generator(msg_payload, msg_type, msg_state, jsonb))
60 sbuf = rmr.rmr_alloc_msg(None, None)
61 sbuf = rmr.rmr_torcv_msg(None, sbuf, None)
62 summary = rmr.message_summary(sbuf)
63 new_messages.append(summary)
74 def _test_put_patch(monkeypatch):
75 testing_helpers.patch_all(monkeypatch)
76 monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0)) # good sends for this whole batch
78 # we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
79 def fake_alloc(_unused, _alsounused):
80 sbuf = rmr_mocks.Rmr_mbuf_t()
81 sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
84 # we also need to repatch set, since in the send function, we alloc, then set a new transid
85 def fake_set_transactionid(sbuf):
86 sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
88 # Note, we could have just patched summary, but this patches at a "lower level" so is a better test
89 monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc)
90 monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
93 def test_xapp_put_good(client, monkeypatch):
94 """ test policy put good"""
95 _test_put_patch(monkeypatch)
96 monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch))
97 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
98 assert res.status_code == 200
99 assert res.json == {"status": "SUCCESS", "foo": "bar"}
102 def test_xapp_put_bad(client, monkeypatch):
103 """Test policy put fails"""
104 _test_put_patch(monkeypatch)
105 # return from policy handler has a status indicating FAIL
107 "a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_payload={"status": "FAIL", "foo": "bar"})
109 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
110 assert res.status_code == 502
111 assert res.json["reason"] == "BAD STATUS"
112 assert res.json["return_payload"] == {"status": "FAIL", "foo": "bar"}
114 # return from policy handler has no status field
115 monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_payload={"foo": "bar"}))
116 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
117 assert res.status_code == 502
118 assert res.json["reason"] == "NO STATUS"
119 assert res.json["return_payload"] == {"foo": "bar"}
121 # return from policy handler not a json
123 "a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_payload="booger", jsonb=False)
125 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
126 assert res.status_code == 502
127 assert res.json["reason"] == "NOT JSON"
128 assert res.json["return_payload"] == "booger"
131 monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_type=666))
132 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
133 assert res.status_code == 504
134 assert res.data == b"\"A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK\"\n"
137 monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_state=666))
138 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
139 assert res.status_code == 504
140 assert res.data == b"\"A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK\"\n"
143 def test_xapp_put_bad_send(client, monkeypatch):
145 Test bad send failures
147 testing_helpers.patch_all(monkeypatch)
149 monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch))
150 res = client.put("/ric/policies/admission_control_policy", json={"not": "expected"})
151 assert res.status_code == 400
153 monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
154 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
155 assert res.status_code == 504
156 assert res.data == b'"A1 was unable to send a needed message to a downstream subscriber"\n'
158 monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(5))
159 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
160 assert res.status_code == 504
161 assert res.data == b'"A1 was unable to send a needed message to a downstream subscriber"\n'
164 def test_bad_requests(client, monkeypatch):
165 """Test bad requests"""
166 testing_helpers.patch_all(monkeypatch)
169 res = client.put("/ric/policies/noexist", json=testing_helpers.good_payload())
170 assert res.status_code == 404
173 res = client.put("/ric/policies/admission_control_policy", data="notajson")
174 assert res.status_code == 415
176 # test a PUT body against a poliucy not expecting one
177 res = client.put("/ric/policies/test_policy", json=testing_helpers.good_payload())
178 assert res.status_code == 400
179 assert res.data == b'"BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY"\n'
182 def test_missing_manifest(client, monkeypatch):
184 test that we get a 500 with an approrpiate message on a missing manifest
188 raise exceptions.MissingManifest()
190 monkeypatch.setattr("a1.utils.get_ric_manifest", f)
192 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
193 assert res.status_code == 500
194 assert res.data == b'"A1 was unable to find the required RIC manifest. report this!"\n'
197 def test_missing_rmr(client, monkeypatch):
199 test that we get a 500 with an approrpiate message on a missing rmr rmr_string
201 testing_helpers.patch_all(monkeypatch, nonexisting_rmr=True)
202 res = client.put("/ric/policies/admission_control_policy", json=testing_helpers.good_payload())
203 assert res.status_code == 500
204 assert res.data == b'"A1 does not have a mapping for the desired rmr string. report this!"\n'