2 # SPDX-License-Identifier: Apache-2.0
3 """Test clamp module."""
5 from unittest import mock
6 from oransdk.dmaap.dmaap import OranDmaap
7 from onapsdk.dmaap.dmaap_service import DmaapService
11 "topicName": "test.TOPIC",
12 "topicDescription": "test topic",
14 "replicationCnCount": 1,
15 "transactionEnabled": "false"
18 HEADER={"accept": "application/json", "Content-Type": "application/json"}
19 BASIC_AUTH = {'username': 'dcae@dcae.onap.org', 'password': 'demo123456!'}
20 BASE_URL = "http://localhost:3904"
22 def test_initialization():
23 """Class initialization test."""
25 assert isinstance(dmaap, OranDmaap)
28 @mock.patch.object(OranDmaap, 'send_message')
29 def test_create_topic(mock_send_message):
30 """Test Dmaap's class method."""
31 OranDmaap.create_topic(TOPIC)
32 mock_send_message.assert_called_once_with('POST',
34 (f"{BASE_URL}/topics/create"),
38 @mock.patch.object(OranDmaap, 'send_message')
39 def test_create_service(mock_send_message):
40 """Test Dmaap's class method."""
42 OranDmaap.create_service(event)
43 mock_send_message.assert_called_once_with('POST',
44 'Create Service via Dmaap',
45 (f"{BASE_URL}/events/A1-POLICY-AGENT-READ/"),
49 @mock.patch.object(OranDmaap, 'send_message')
50 def test_send_link_failure_event(mock_send_message):
51 """Test Dmaap's class method."""
53 OranDmaap.send_link_failure_event(event)
54 mock_send_message.assert_called_once_with('POST',
55 'Send link failure event',
56 (f"{BASE_URL}/events/unauthenticated.SEC_FAULT_OUTPUT/"),
60 @mock.patch.object(OranDmaap, 'send_message')
61 def test_get_result(mock_send_message):
62 """Test Dmaap's class method."""
63 OranDmaap.get_result()
64 mock_send_message.assert_called_once_with('GET',
65 'Get result from previous request',
66 (f"{BASE_URL}/events/A1-POLICY-AGENT-WRITE/users/policy-agent?timeout=15000&limit=100"))
68 @mock.patch.object(OranDmaap, 'send_message_json')
69 def test_get_all_topics(mock_send_message_json):
70 """Test Dmaap's class method."""
71 assert OranDmaap.get_all_topics_url == f"{BASE_URL}/topics/listAll"