Merge "[RICPLT-2503] Automation start and stop" into PI3
[ric-plt/e2mgr.git] / Automation / Tests / Scripts / rsmscripts.py
1 ##############################################################################
2 #
3 #   Copyright (c) 2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 ##############################################################################
18
19 import config
20 import redis
21 import cleanup_db
22 import json
23
24
25 def getRedisClientDecodeResponse():
26
27     c = config.redis_ip_address
28
29     p = config.redis_ip_port
30
31     return redis.Redis(host=c, port=p, db=0, decode_responses=True)
32
33 def set_general_config_resource_status_false():
34
35     r = getRedisClientDecodeResponse()
36     r.set("{rsm},CFG:GENERAL:v1.0.0" , "{\"enableResourceStatus\":false,\"partialSuccessAllowed\":true,\"prbPeriodic\":true,\"tnlLoadIndPeriodic\":true,\"wwLoadIndPeriodic\":true,\"absStatusPeriodic\":true,\"rsrpMeasurementPeriodic\":true,\"csiPeriodic\":true,\"periodicityMs\":1,\"periodicityRsrpMeasurementMs\":3,\"periodicityCsiMs\":3}")
37
38 def verify_rsm_ran_info_start_false():
39
40     r = getRedisClientDecodeResponse()
41     
42     value = "{\"ranName\":\"test1\",\"enb1MeasurementId\":1,\"enb2MeasurementId\":0,\"action\":\"start\",\"actionStatus\":false}"
43
44     if r.get("{rsm},RAN:test1") == value:
45         return True
46     else:
47         return False
48
49
50 def verify_rsm_ran_info_start_true():
51
52     r = getRedisClientDecodeResponse()
53     
54     rsmInfoStr = r.get("{rsm},RAN:test1")
55     rsmInfoJson = json.loads(rsmInfoStr)
56
57     response = rsmInfoJson["ranName"] == "test1" and rsmInfoJson["enb1MeasurementId"] == 1 and rsmInfoJson["enb2MeasurementId"] != 1 and rsmInfoJson["action"] == "start" and rsmInfoJson["actionStatus"] == True
58
59     return response       
60
61
62 def verify_rsm_ran_info_stop_false():
63
64     r = getRedisClientDecodeResponse()
65     
66     rsmInfoStr = r.get("{rsm},RAN:test1")
67     rsmInfoJson = json.loads(rsmInfoStr)
68
69     response = rsmInfoJson["ranName"] == "test1" and rsmInfoJson["enb1MeasurementId"] == 1 and rsmInfoJson["action"] == "stop" and rsmInfoJson["actionStatus"] == False
70
71     return response       
72
73
74 def verify_rsm_ran_info_stop_true():
75
76     r = getRedisClientDecodeResponse()
77     
78     rsmInfoStr = r.get("{rsm},RAN:test1")
79     rsmInfoJson = json.loads(rsmInfoStr)
80
81     response = rsmInfoJson["ranName"] == "test1" and rsmInfoJson["action"] == "stop" and rsmInfoJson["actionStatus"] == True
82
83     return response         
84       
85 def verify_general_config_enable_resource_status_true():
86
87     r = getRedisClientDecodeResponse()
88     
89     configStr = r.get("{rsm},CFG:GENERAL:v1.0.0")
90     configJson = json.loads(configStr)
91
92     if configJson["enableResourceStatus"] == True:
93         return True
94     else:
95         return False
96
97 def verify_general_config_enable_resource_status_false():
98
99     r = getRedisClientDecodeResponse()
100     
101     configStr = r.get("{rsm},CFG:GENERAL:v1.0.0")
102     configJson = json.loads(configStr)
103
104     if configJson["enableResourceStatus"] == False:
105         return True
106     else:
107         return False