J release: Release container Image
[ric-plt/e2mgr.git] / Automation / Tests / Scripts / e2mdbscripts.py
1 ##############################################################################
2 #
3 #   Copyright (c) 2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 ##############################################################################
18
19 import config
20 import json
21 import redis
22 import variables
23
24
25 def get_redis_client_decode_response():
26     c = config.redis_ip_address
27     p = config.redis_ip_port
28     return redis.Redis(host=c, port=p, db=0, decode_responses=True)
29
30
31 def verify_ran_is_associated_with_e2t_instance(ran_name, e2t_address):
32     r = get_redis_client_decode_response()
33     e2t_instance_json = r.get("{e2Manager},E2TInstance:" + e2t_address)
34
35     if e2t_instance_json is None:
36         return False
37
38     e2t_instance_dic = json.loads(e2t_instance_json)
39     assoc_ran_list = e2t_instance_dic.get("associatedRanList")
40     if assoc_ran_list is None:
41         return False
42     else:
43         return ran_name in assoc_ran_list
44
45
46 def verify_e2t_instance_has_no_associated_rans(e2t_address):
47     r = get_redis_client_decode_response()
48     e2t_instance_json = r.get("{e2Manager},E2TInstance:" + e2t_address)
49     e2t_instance_dic = json.loads(e2t_instance_json)
50     assoc_ran_list = e2t_instance_dic.get("associatedRanList")
51     return not assoc_ran_list
52
53
54 def verify_e2t_instance_exists_in_addresses(e2t_address):
55     r = get_redis_client_decode_response()
56     e2t_addresses_json = r.get("{e2Manager},E2TAddresses")
57     e2t_addresses = json.loads(e2t_addresses_json)
58     return e2t_address in e2t_addresses
59
60
61 def verify_e2t_instance_key_exists(e2t_address):
62     r = get_redis_client_decode_response()
63     return r.exists("{e2Manager},E2TInstance:" + e2t_address)
64
65
66 def populate_e2t_instances_in_e2m_db_for_get_e2t_instances_tc():
67     r = get_redis_client_decode_response()
68     r.set("{e2Manager},E2TAddresses", "[\"e2t.att.com:38000\"]")
69     r.set("{e2Manager},E2TInstance:e2t.att.com:38000",
70           "{\"address\":\"e2t.att.com:38000\",\"associatedRanList\":[\"test1\",\"test2\",\"test3\"],"
71           "\"keepAliveTimestamp\":1577619310484022369,\"state\":\"ACTIVE\"}")
72     return True
73
74
75 def verify_e2t_addresses_for_e2t_initialization_tc():
76     r = get_redis_client_decode_response()
77
78     value = "[\"{}\"]".format(variables.e2t_alpha_address)
79
80     return r.get("{e2Manager},E2TAddresses") == value
81
82
83 def verify_e2t_instance_for_e2t_initialization_tc():
84     r = get_redis_client_decode_response()
85
86     e2_address = "\"address\":\"{}\"".format(variables.e2t_alpha_address)
87     e2_associated_ran_list = "\"associatedRanList\":[]"
88     e2_state = "\"state\":\"ACTIVE\""
89
90     e2_db_instance = r.get("{{e2Manager}},E2TInstance:{}".format(variables.e2t_alpha_address))
91
92     if e2_db_instance.find(e2_address) < 0:
93         return False
94     if e2_db_instance.find(e2_associated_ran_list) < 0:
95         return False
96     if e2_db_instance.find(e2_state) < 0:
97         return False
98
99     return True
100
101
102 def set_enable_ric_false():
103     r = get_redis_client_decode_response()
104     r.set("{e2Manager},GENERAL", "{\"enableRic\":false}")
105     return True
106