1 ##############################################################################
3 # Copyright (c) 2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 ##############################################################################
25 def get_redis_client_decode_response():
26 c = config.redis_ip_address
27 p = config.redis_ip_port
28 return redis.Redis(host=c, port=p, db=0, decode_responses=True)
31 def verify_ran_is_associated_with_e2t_instance(ran_name, e2t_address):
32 r = get_redis_client_decode_response()
33 e2t_instance_json = r.get("{e2Manager},E2TInstance:" + e2t_address)
35 if e2t_instance_json is None:
38 e2t_instance_dic = json.loads(e2t_instance_json)
39 assoc_ran_list = e2t_instance_dic.get("associatedRanList")
40 if assoc_ran_list is None:
43 return ran_name in assoc_ran_list
46 def verify_e2t_instance_has_no_associated_rans(e2t_address):
47 r = get_redis_client_decode_response()
48 e2t_instance_json = r.get("{e2Manager},E2TInstance:" + e2t_address)
49 e2t_instance_dic = json.loads(e2t_instance_json)
50 assoc_ran_list = e2t_instance_dic.get("associatedRanList")
51 return not assoc_ran_list
54 def verify_e2t_instance_exists_in_addresses(e2t_address):
55 r = get_redis_client_decode_response()
56 e2t_addresses_json = r.get("{e2Manager},E2TAddresses")
57 e2t_addresses = json.loads(e2t_addresses_json)
58 return e2t_address in e2t_addresses
61 def verify_e2t_instance_key_exists(e2t_address):
62 r = get_redis_client_decode_response()
63 return r.exists("{e2Manager},E2TInstance:" + e2t_address)
66 def populate_e2t_instances_in_e2m_db_for_get_e2t_instances_tc():
67 r = get_redis_client_decode_response()
68 r.set("{e2Manager},E2TAddresses", "[\"e2t.att.com:38000\"]")
69 r.set("{e2Manager},E2TInstance:e2t.att.com:38000",
70 "{\"address\":\"e2t.att.com:38000\",\"associatedRanList\":[\"test1\",\"test2\",\"test3\"],"
71 "\"keepAliveTimestamp\":1577619310484022369,\"state\":\"ACTIVE\"}")
75 def verify_e2t_addresses_for_e2t_initialization_tc():
76 r = get_redis_client_decode_response()
78 value = "[\"{}\"]".format(variables.e2t_alpha_address)
80 return r.get("{e2Manager},E2TAddresses") == value
83 def verify_e2t_instance_for_e2t_initialization_tc():
84 r = get_redis_client_decode_response()
86 e2_address = "\"address\":\"{}\"".format(variables.e2t_alpha_address)
87 e2_associated_ran_list = "\"associatedRanList\":[]"
88 e2_state = "\"state\":\"ACTIVE\""
90 e2_db_instance = r.get("{{e2Manager}},E2TInstance:{}".format(variables.e2t_alpha_address))
92 if e2_db_instance.find(e2_address) < 0:
94 if e2_db_instance.find(e2_associated_ran_list) < 0:
96 if e2_db_instance.find(e2_state) < 0:
102 def set_enable_ric_false():
103 r = get_redis_client_decode_response()
104 r.set("{e2Manager},GENERAL", "{\"enableRic\":false}")