###
"""Closed Loop Apex usecase tests module."""
# This usecase has limitations due to Clamp issue.
-# 1. make sure using the policy-clamp-be version 6.2.0-snapshot-latest at this the moment
+# 1. make sure using the policy-k8s-participant version is higher than 6.3.0
import time
import logging.config
import subprocess
clcommissioning_utils = ClCommissioningUtils()
clamp = ClampToscaTemplate(settings.CLAMP_BASICAUTH)
-chartmuseum_port = "8080"
usecase_name = "script_usecase"
chartmuseum_ip = "http://test-chartmuseum.test:8080"
-global app_name
@pytest.fixture(autouse=True)
def setup_simulators(request):
logger.info("Add the remote repo to Clamp k8s pod")
k8s_pod = subprocess.run("kubectl get pods -n onap | grep k8s | awk '{print $1}'", shell=True, check=True, stdout=subprocess.PIPE).stdout.decode('utf-8').strip()
- repo_url = subprocess.run("kubectl get services -n test | grep test-chartmuseum | awk '{print $3}'", shell=True, check=True, stdout=subprocess.PIPE).stdout.decode('utf-8').strip()+":8080"
- logger.info("k8s: %s, repo_url:%s", k8s_pod, repo_url)
- cmd = f"kubectl exec -it -n onap {k8s_pod} -- sh -c \"helm repo add chartmuseum http://{repo_url}\""
+ cmd = f"kubectl exec -it -n onap {k8s_pod} -- sh -c \"helm repo add chartmuseum http://test-chartmuseum.test:8080\""
check_output(cmd, shell=True).decode('utf-8')
cmd = f"kubectl exec -it -n onap {k8s_pod} -- sh -c \"helm repo update\""
check_output(cmd, shell=True).decode('utf-8')
yield
# Finish and delete the cl instance
clcommissioning_utils.clean_instance(usecase_name)
- wait(lambda: is_rapp_down(app_name), sleep_seconds=5, timeout_seconds=60, waiting_for="Rapp is down")
+ wait(lambda: is_rapp_down(pytest.app_name), sleep_seconds=5, timeout_seconds=60, waiting_for="Rapp is down")
# Remove the remote repo to Clamp k8s pod
cmd = f"kubectl exec -it -n onap {k8s_pod} -- sh -c \"helm repo remove chartmuseum\""
check_output(cmd, shell=True).decode('utf-8')
return False
def test_cl_oru_app_deploy():
+ """The Closed Loop O-RU Fronthaul Recovery usecase Apex version."""
chart_version = "1.0.0"
chart_name = "oru-app"
- release_name = "oru-app"
- global app_name
- app_name = chart_name
- """The Closed Loop O-RU Fronthaul Recovery usecase Apex version."""
+ release_name = "nonrtric"
+ pytest.app_name = chart_name
logger.info("Upload tosca to commissioning")
- commissioning_payload = jinja_env().get_template("commission_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartmuseumPort=chartmuseum_port, chartVersion=chart_version, chartName=chart_name, releaseName=release_name)
- instance_payload = jinja_env().get_template("create_instance_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartmuseumPort=chartmuseum_port, chartVersion=chart_version, chartName=chart_name, releaseName=release_name, instanceName=usecase_name)
+ commissioning_payload = jinja_env().get_template("commission_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartVersion=chart_version, chartName=chart_name, releaseName=release_name)
+ instance_payload = jinja_env().get_template("create_instance_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartVersion=chart_version, chartName=chart_name, releaseName=release_name, instanceName=usecase_name)
assert clcommissioning_utils.create_instance(usecase_name, commissioning_payload, instance_payload) is True
logger.info("Check if oru-app is up")
wait(lambda: is_rapp_up(chart_name), sleep_seconds=5, timeout_seconds=300, waiting_for="Oru app to be up")
def test_cl_odu_app_smo_deploy():
+ """The O-DU Slice Assurance SMO Version use case."""
chart_version = "1.0.0"
chart_name = "odu-app"
release_name = "odu-app"
- global app_name
- app_name = chart_name
- """The O-DU Slice Assurance SMO Version use case."""
+ pytest.app_name = chart_name
logger.info("Upload tosca to commissioning")
- commissioning_payload = jinja_env().get_template("commission_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartmuseumPort=chartmuseum_port, chartVersion=chart_version, chartName=chart_name, releaseName=release_name)
- instance_payload = jinja_env().get_template("create_instance_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartmuseumPort=chartmuseum_port, chartVersion=chart_version, chartName=chart_name, releaseName=release_name, instanceName=usecase_name)
+ commissioning_payload = jinja_env().get_template("commission_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartVersion=chart_version, chartName=chart_name, releaseName=release_name)
+ instance_payload = jinja_env().get_template("create_instance_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartVersion=chart_version, chartName=chart_name, releaseName=release_name, instanceName=usecase_name)
assert clcommissioning_utils.create_instance(usecase_name, commissioning_payload, instance_payload) is True
logger.info("Check if odu-app smo version is up")
wait(lambda: is_rapp_up(chart_name), sleep_seconds=5, timeout_seconds=300, waiting_for="Odu app smo version to be up")
def test_cl_odu_app_ics_deploy():
+ """The O-DU Slice Assurance ICS Version use case."""
chart_version = "1.0.0"
chart_name = "odu-app-ics-version"
release_name = "odu-app-ics-version"
- global app_name
- app_name = chart_name
- """The O-DU Slice Assurance ICS Version use case."""
+ pytest.app_name = chart_name
logger.info("Upload tosca to commissioning")
- commissioning_payload = jinja_env().get_template("commission_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartmuseumPort=chartmuseum_port, chartVersion=chart_version, chartName=chart_name, releaseName=release_name)
- instance_payload = jinja_env().get_template("create_instance_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartmuseumPort=chartmuseum_port, chartVersion=chart_version, chartName=chart_name, releaseName=release_name, instanceName=usecase_name)
+ commissioning_payload = jinja_env().get_template("commission_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartVersion=chart_version, chartName=chart_name, releaseName=release_name)
+ instance_payload = jinja_env().get_template("create_instance_k8s.json.j2").render(chartmuseumIp=chartmuseum_ip, chartVersion=chart_version, chartName=chart_name, releaseName=release_name, instanceName=usecase_name)
assert clcommissioning_utils.create_instance(usecase_name, commissioning_payload, instance_payload) is True
logger.info("Check if odu-app ics version is up")