3 # ============LICENSE_START===================================================
4 # ORAN SMO PACKAGE - PYTHONSDK TESTS
5 # ================================================================================
6 # Copyright (C) 2022 AT&T Intellectual Property. All rights
8 # ============================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
21 # SPDX-License-Identifier: Apache-2.0
22 # ============LICENSE_END=====================================================
25 """Prepare OOF for Network Slicing option2 test."""
31 from subprocess import check_output
32 from onapsdk.configuration import settings
33 from oransdk.policy.policy import OranPolicy
35 logging.config.dictConfig(settings.LOG_CONFIG)
36 logger = logging.getLogger("####################### Start OOF Preparation")
38 # Set working dir as python script location
39 abspath = os.path.abspath(__file__)
40 dname = os.path.dirname(abspath)
43 class OofPreparation():
44 """Can be used to prepare OOF for Network Slicing usecase option2."""
47 def prepare_oof(cls, nst_name, an_nsst_name, tn_nsst_name):
48 """Prepare OOF, create optimization policies."""
49 # copy policy creation package to oof pod
50 logger.info("####################### copy policy generation package to OOF pod:%s", dname)
51 oof_pod = subprocess.run("kubectl get pods -n onap | awk '{print $1}' | grep onap-oof-[a-z0-9]*-[a-z0-9]*$", shell=True, check=True, stdout=subprocess.PIPE).stdout.decode('utf-8').strip()
52 cmd = f"kubectl cp ../resources/policies_option2.tar.gz -n onap {oof_pod}:/opt/osdf"
53 check_output(cmd, shell=True).decode('utf-8')
55 cmd = f"kubectl exec -ti -n onap {oof_pod} -- tar -xvf policies_option2.tar.gz"
56 check_output(cmd, shell=True).decode('utf-8')
58 # run python command to create policies
59 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_policy_types policy_types"
60 check_output(cmd, shell=True).decode('utf-8')
62 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_and_push_policies nst_policies"
63 check_output(cmd, shell=True).decode('utf-8')
65 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py generate_nsi_policies {nst_name}"
66 check_output(cmd, shell=True).decode('utf-8')
68 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_and_push_policies gen_nsi_policies"
69 check_output(cmd, shell=True).decode('utf-8')
71 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py generate_nssi_policies {an_nsst_name} minimize latency"
72 check_output(cmd, shell=True).decode('utf-8')
74 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_and_push_policies gen_nssi_policies"
75 check_output(cmd, shell=True).decode('utf-8')
77 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py generate_nssi_policies {tn_nsst_name} minimize latency"
78 check_output(cmd, shell=True).decode('utf-8')
80 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py create_and_push_policies gen_nssi_policies"
81 check_output(cmd, shell=True).decode('utf-8')
83 #Verify policies created
85 policy_status_list = policy.get_policy_status(settings.POLICY_BASICAUTH)
86 if len(policy_status_list) != 20:
87 logger.info("####################### Policy created failed. 20 policies expected, but only %s found. Please verify manually.", str(len(policy_status_list)))
88 sys.exit('OOF preparation failed. Exception while creating policies. Please check the policies manually.')
92 """Delete OOF optimization policies."""
93 oof_pod = subprocess.run("kubectl get pods -n onap | awk '{print $1}' | grep onap-oof-[a-z0-9]*-[a-z0-9]*$", shell=True, check=True, stdout=subprocess.PIPE).stdout.decode('utf-8').strip()
95 # run python command to create policies
96 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py delete_policies nst_policies"
97 check_output(cmd, shell=True).decode('utf-8')
99 #python3 policy_utils.py create_and_push_policies gen_nsi_policies
100 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py delete_policies gen_nsi_policies"
101 check_output(cmd, shell=True).decode('utf-8')
103 #python3 policy_utils.py create_and_push_policies gen_nssi_policies
104 cmd = f"kubectl exec -ti -n onap {oof_pod} -- python3 policies_option2/policy_utils.py delete_policies gen_nssi_policies"
105 check_output(cmd, shell=True).decode('utf-8')
107 # run python command to create policies
108 logger.info("####################### copy policy generation package to OOF pod:%s", dname)
109 cmd = f"kubectl exec -ti -n onap {oof_pod} -- rm -rf policies_option2"
110 check_output(cmd, shell=True).decode('utf-8')