Add ORAN Python SDK first draft
[it/dep.git] / smo-install / test / pythonsdk / unit-tests / test_dmaap.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: Apache-2.0
3 """Test clamp module."""
4
5 from unittest import mock
6 from oransdk.dmaap.dmaap import OranDmaap
7 from onapsdk.dmaap.dmaap_service import DmaapService
8
9 #examples
10 TOPIC = {
11     "topicName": "test.TOPIC",
12     "topicDescription": "test topic",
13     "partitionCount": 1,
14     "replicationCnCount": 1,
15     "transactionEnabled": "false"
16     }
17
18 HEADER={"accept": "application/json", "Content-Type": "application/json"}
19 BASIC_AUTH = {'username': 'dcae@dcae.onap.org', 'password': 'demo123456!'}
20 BASE_URL = "http://localhost:3904"
21
22 def test_initialization():
23     """Class initialization test."""
24     dmaap = OranDmaap()
25     assert isinstance(dmaap, OranDmaap)
26
27
28 @mock.patch.object(OranDmaap, 'send_message')
29 def test_create_topic(mock_send_message):
30     """Test Dmaap's class method."""
31     OranDmaap.create_topic(TOPIC)
32     mock_send_message.assert_called_once_with('POST',
33                                               'Create Dmaap Topic',
34                                               (f"{BASE_URL}/topics/create"),
35                                               data=TOPIC,
36                                               headers=HEADER)
37
38 @mock.patch.object(OranDmaap, 'send_message')
39 def test_create_service(mock_send_message):
40     """Test Dmaap's class method."""
41     event = {}
42     OranDmaap.create_service(event)
43     mock_send_message.assert_called_once_with('POST',
44                                               'Create Service via Dmaap',
45                                               (f"{BASE_URL}/events/A1-POLICY-AGENT-READ/"),
46                                               data=event,
47                                               headers=HEADER)
48
49 @mock.patch.object(OranDmaap, 'send_message')
50 def test_send_link_failure_event(mock_send_message):
51     """Test Dmaap's class method."""
52     event = {}
53     OranDmaap.send_link_failure_event(event)
54     mock_send_message.assert_called_once_with('POST',
55                                               'Send link failure event',
56                                               (f"{BASE_URL}/events/unauthenticated.SEC_FAULT_OUTPUT/"),
57                                               data=event,
58                                               headers=HEADER)
59
60 @mock.patch.object(OranDmaap, 'send_message')
61 def test_get_result(mock_send_message):
62     """Test Dmaap's class method."""
63     OranDmaap.get_result()
64     mock_send_message.assert_called_once_with('GET',
65                                               'Get result from previous request',
66                                               (f"{BASE_URL}/events/A1-POLICY-AGENT-WRITE/users/policy-agent?timeout=15000&limit=100"))
67
68 @mock.patch.object(OranDmaap, 'send_message_json')
69 def test_get_all_topics(mock_send_message_json):
70     """Test Dmaap's class method."""
71     assert OranDmaap.get_all_topics_url == f"{BASE_URL}/topics/listAll"