[RIC-621] Add redis monitor verification to automation tests
[ric-plt/e2mgr.git] / Automation / deprecated / Scripts / rsmscripts.py
1 ##############################################################################
2 #
3 #   Copyright (c) 2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 ##############################################################################
18
19 import config
20 import redis
21 import json
22
23
24 def getRedisClientDecodeResponse():
25
26     c = config.redis_ip_address
27
28     p = config.redis_ip_port
29
30     return redis.Redis(host=c, port=p, db=0, decode_responses=True)
31
32 def set_general_config_resource_status_false():
33
34     r = getRedisClientDecodeResponse()
35     r.set("{rsm},CFG:GENERAL:v1.0.0" , "{\"enableResourceStatus\":false,\"partialSuccessAllowed\":true,\"prbPeriodic\":true,\"tnlLoadIndPeriodic\":true,\"wwLoadIndPeriodic\":true,\"absStatusPeriodic\":true,\"rsrpMeasurementPeriodic\":true,\"csiPeriodic\":true,\"periodicityMs\":1,\"periodicityRsrpMeasurementMs\":3,\"periodicityCsiMs\":3}")
36
37 def verify_rsm_ran_info_start_false():
38
39     r = getRedisClientDecodeResponse()
40     
41     value = "{\"ranName\":\"test1\",\"enb1MeasurementId\":1,\"enb2MeasurementId\":0,\"action\":\"start\",\"actionStatus\":false}"
42
43     return r.get("{rsm},RAN:test1") == value
44
45
46 def verify_rsm_ran_info_start_true():
47
48     r = getRedisClientDecodeResponse()
49
50     rsmInfoStr = r.get("{rsm},RAN:test1")
51     rsmInfoJson = json.loads(rsmInfoStr)
52
53     response = rsmInfoJson["ranName"] == "test1" and rsmInfoJson["enb1MeasurementId"] == 1 and rsmInfoJson["enb2MeasurementId"] != 1 and rsmInfoJson["action"] == "start" and rsmInfoJson["actionStatus"] == True
54
55     return response
56
57
58 def verify_rsm_ran_info_stop_false():
59
60     r = getRedisClientDecodeResponse()
61
62     rsmInfoStr = r.get("{rsm},RAN:test1")
63     rsmInfoJson = json.loads(rsmInfoStr)
64
65     response = rsmInfoJson["ranName"] == "test1" and rsmInfoJson["enb1MeasurementId"] == 1 and rsmInfoJson["action"] == "stop" and rsmInfoJson["actionStatus"] == False
66
67     return response
68
69
70 def verify_rsm_ran_info_stop_true():
71
72     r = getRedisClientDecodeResponse()
73
74     rsmInfoStr = r.get("{rsm},RAN:test1")
75     rsmInfoJson = json.loads(rsmInfoStr)
76
77     response = rsmInfoJson["ranName"] == "test1" and rsmInfoJson["action"] == "stop" and rsmInfoJson["actionStatus"] == True
78
79     return response
80
81 def verify_general_config_enable_resource_status_true():
82
83     r = getRedisClientDecodeResponse()
84
85     configStr = r.get("{rsm},CFG:GENERAL:v1.0.0")
86     configJson = json.loads(configStr)
87
88     return configJson["enableResourceStatus"] == True
89
90 def verify_general_config_enable_resource_status_false():
91
92     r = getRedisClientDecodeResponse()
93
94     configStr = r.get("{rsm},CFG:GENERAL:v1.0.0")
95     configJson = json.loads(configStr)
96
97     return configJson["enableResourceStatus"] == False