Fix/add use cases under SMO package
[it/dep.git] / smo-install / test / pythonsdk / src / orantests / network_slicing / preparation / oof_preparation.py
1 #!/usr/bin/env python3
2 ###
3 # ============LICENSE_START===================================================
4 # ORAN SMO PACKAGE - PYTHONSDK TESTS
5 # ================================================================================
6 #  Copyright (C) 2022 AT&T Intellectual Property. All rights
7 #                             reserved.
8 # ============================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #      http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 # SPDX-License-Identifier: Apache-2.0
22 # ============LICENSE_END=====================================================
23 #
24 ###
25 """Prepare OOF for Network Slicing option2 test."""
26 import os
27 import logging
28 import logging.config
29 import subprocess
30 import sys
31 from subprocess import check_output
32 from onapsdk.configuration import settings
33 from oransdk.policy.policy import OranPolicy
34
35 logging.config.dictConfig(settings.LOG_CONFIG)
36 logger = logging.getLogger("####################### Start OOF Preparation")
37
38 # Set working dir as python script location
39 abspath = os.path.abspath(__file__)
40 dname = os.path.dirname(abspath)
41 os.chdir(dname)
42
43 class OofPreparation():
44     """Can be used to prepare OOF for Network Slicing usecase option2."""
45
46     @classmethod
47     def prepare_oof(cls, nst_name, an_nsst_name, tn_nsst_name):
48         """Prepare OOF, create optimization policies."""
49         # copy policy creation package to oof pod
50         logger.info("####################### copy policy generation package to OOF pod:%s", dname)
51         oof_pod = subprocess.run("kubectl get pods -n onap | awk '{print $1}' | grep  onap-oof-[a-z0-9]*-[a-z0-9]*$", shell=True, check=True, stdout=subprocess.PIPE).stdout.decode('utf-8').strip()
52         cmd = f"kubectl cp ../resources/policies_option2.tar.gz -n onap {oof_pod}:/opt/osdf"
53         check_output(cmd, shell=True).decode('utf-8')
54
55         cmd = f"kubectl exec -ti -n onap {oof_pod} -- tar -xvf policies_option2.tar.gz"
56         check_output(cmd, shell=True).decode('utf-8')
57
58         # run python command to create policies
59         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_policy_types policy_types"
60         check_output(cmd, shell=True).decode('utf-8')
61
62         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_and_push_policies nst_policies"
63         check_output(cmd, shell=True).decode('utf-8')
64
65         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py generate_nsi_policies {nst_name}"
66         check_output(cmd, shell=True).decode('utf-8')
67
68         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_and_push_policies gen_nsi_policies"
69         check_output(cmd, shell=True).decode('utf-8')
70
71         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py generate_nssi_policies {an_nsst_name} minimize latency"
72         check_output(cmd, shell=True).decode('utf-8')
73
74         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_and_push_policies gen_nssi_policies"
75         check_output(cmd, shell=True).decode('utf-8')
76
77         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py generate_nssi_policies {tn_nsst_name}  minimize latency"
78         check_output(cmd, shell=True).decode('utf-8')
79
80         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_and_push_policies gen_nssi_policies"
81         check_output(cmd, shell=True).decode('utf-8')
82
83         #Verify policies created
84         policy = OranPolicy()
85         policy_status_list = policy.get_policy_status(settings.POLICY_BASICAUTH)
86         if len(policy_status_list) != 20:
87             logger.info("####################### Policy created failed. 20 policies expected, but only %s found. Please verify manually.", str(len(policy_status_list)))
88             sys.exit('OOF preparation failed. Exception while creating policies. Please check the policies manually.')
89
90     @classmethod
91     def cleanup_oof(cls):
92         """Delete OOF optimization policies."""
93         oof_pod = subprocess.run("kubectl get pods -n onap | awk '{print $1}' | grep  onap-oof-[a-z0-9]*-[a-z0-9]*$", shell=True, check=True, stdout=subprocess.PIPE).stdout.decode('utf-8').strip()
94
95         # run python command to create policies
96         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py delete_policies nst_policies"
97         check_output(cmd, shell=True).decode('utf-8')
98
99         #python3 policy_utils.py create_and_push_policies gen_nsi_policies
100         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py delete_policies gen_nsi_policies"
101         check_output(cmd, shell=True).decode('utf-8')
102
103         #python3 policy_utils.py create_and_push_policies gen_nssi_policies
104         cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py delete_policies gen_nssi_policies"
105         check_output(cmd, shell=True).decode('utf-8')
106
107         # run python command to create policies
108         logger.info("####################### copy policy generation package to OOF pod:%s", dname)
109         cmd = f"kubectl exec -ti -n onap {oof_pod} -- rm -rf policies_option2"
110         check_output(cmd, shell=True).decode('utf-8')