X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=smo-install%2Ftest%2Fpythonsdk%2Fsrc%2Foransdk%2Fpolicy%2Fpolicy.py;fp=smo-install%2Ftest%2Fpythonsdk%2Fsrc%2Foransdk%2Fpolicy%2Fpolicy.py;h=34139097770be7c7cddc35fdbfe825edf85ec943;hb=285d9a5c96b23594b419d95c71c5d6a2cf52052e;hp=0000000000000000000000000000000000000000;hpb=0fd7875b5673d8d9b56c73adff2c8368d95e825b;p=it%2Fdep.git diff --git a/smo-install/test/pythonsdk/src/oransdk/policy/policy.py b/smo-install/test/pythonsdk/src/oransdk/policy/policy.py new file mode 100644 index 00000000..34139097 --- /dev/null +++ b/smo-install/test/pythonsdk/src/oransdk/policy/policy.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: Apache-2.0 +"""Onap Policy module.""" + +from dataclasses import dataclass +from typing import Dict +from oransdk.configuration import settings +from onapsdk.onap_service import OnapService + +@dataclass +class PolicyType: + """PolicyType dataclass.""" + + type: str + version: str + + +class OranPolicy(OnapService): + """Onap Policy library.""" + + pap_url = settings.POLICY_PAP_URL + api_url = settings.POLICY_API_URL + header = {"Accept": "application/json", "Content-Type": "application/json"} + + @classmethod + def get_components_status(cls, + basic_auth: Dict[str, str]) -> Dict: + """ + Get status of Policy component. + + Args: + basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' } + + Returns: + the status of the Policy component + + """ + url = f"{cls.pap_url}/policy/pap/v1/components/healthcheck" + status = cls.send_message_json('GET', + 'Get status of Policy component', + url, + basic_auth=basic_auth) + return status + + @classmethod + def get_policy_status(cls, + basic_auth: Dict[str, str]) -> Dict: + """ + Get status of all the policies. + + Returns: + the status of all the policies + + """ + url = f"{cls.pap_url}/policy/pap/v1/policies/status" + status = cls.send_message_json('GET', + 'Get status of all the policies', + url, + basic_auth=basic_auth) + return status + + @classmethod + def get_policy(cls, + policy_type: PolicyType, + policy_name, + policy_version, + basic_auth: Dict[str, str]) -> Dict: + """ + Get the policy. + + Args: + policy_type: the policy type + policy_name: the policy name + policy_version: the version of the policy + basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' } + + Returns: + the policy + + """ + url = f"{cls.api_url}/policy/api/v1/policytypes/{policy_type.type}/versions/"\ + + f"{policy_type.version}/policies/{policy_name}/versions/{policy_version}" + policy = cls.send_message_json('GET', + 'Get the policy', + url, + basic_auth=basic_auth) + return policy + + @classmethod + def create_policy(cls, + policy_type: PolicyType, + policy_data, + basic_auth: Dict[str, str]) -> None: + """ + Create a policy. + + Args: + policy_type: the policy type + type_version: the version of the policy type + policy_data: the policy to be created, in binary format + + """ + url = f"{cls.api_url}/policy/api/v1/policytypes/{policy_type.type}/"\ + + f"versions/{policy_type.version}/policies" + cls.send_message('POST', + 'Create Policy', + url, + data=policy_data, + headers=cls.header, + basic_auth=basic_auth) + + @classmethod + def deploy_policy(cls, + policy_data, + basic_auth: Dict[str, str]) -> None: + """ + Deploy a policy. + + Args: + policy_data: the policy to be deployed, in binary format + + """ + url = f"{cls.pap_url}/policy/pap/v1/pdps/policies" + cls.send_message('POST', + 'Deploy Policy', + url, + data=policy_data, + headers=cls.header, + basic_auth=basic_auth)