#!/usr/bin/env python3 # -*- coding: utf-8 -*- # SPDX-License-Identifier: Apache-2.0 """Onap Policy module.""" from dataclasses import dataclass from typing import Dict from oransdk.configuration import settings from onapsdk.onap_service import OnapService @dataclass class PolicyType: """PolicyType dataclass.""" type: str version: str class OranPolicy(OnapService): """Onap Policy library.""" pap_url = settings.POLICY_PAP_URL api_url = settings.POLICY_API_URL header = {"Accept": "application/json", "Content-Type": "application/json"} @classmethod def get_components_status(cls, basic_auth: Dict[str, str]) -> Dict: """ Get status of Policy component. Args: basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' } Returns: the status of the Policy component """ url = f"{cls.pap_url}/policy/pap/v1/components/healthcheck" status = cls.send_message_json('GET', 'Get status of Policy component', url, basic_auth=basic_auth) return status @classmethod def get_policy_status(cls, basic_auth: Dict[str, str]) -> Dict: """ Get status of all the policies. Returns: the status of all the policies """ url = f"{cls.pap_url}/policy/pap/v1/policies/status" status = cls.send_message_json('GET', 'Get status of all the policies', url, basic_auth=basic_auth) return status @classmethod def get_policy(cls, policy_type: PolicyType, policy_name, policy_version, basic_auth: Dict[str, str]) -> Dict: """ Get the policy. Args: policy_type: the policy type policy_name: the policy name policy_version: the version of the policy basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' } Returns: the policy """ url = f"{cls.api_url}/policy/api/v1/policytypes/{policy_type.type}/versions/"\ + f"{policy_type.version}/policies/{policy_name}/versions/{policy_version}" policy = cls.send_message_json('GET', 'Get the policy', url, basic_auth=basic_auth) return policy @classmethod def create_policy(cls, policy_type: PolicyType, policy_data, basic_auth: Dict[str, str]) -> None: """ Create a policy. Args: policy_type: the policy type type_version: the version of the policy type policy_data: the policy to be created, in binary format """ url = f"{cls.api_url}/policy/api/v1/policytypes/{policy_type.type}/"\ + f"versions/{policy_type.version}/policies" cls.send_message('POST', 'Create Policy', url, data=policy_data, headers=cls.header, basic_auth=basic_auth) @classmethod def deploy_policy(cls, policy_data, basic_auth: Dict[str, str]) -> None: """ Deploy a policy. Args: policy_data: the policy to be deployed, in binary format """ url = f"{cls.pap_url}/policy/pap/v1/pdps/policies" cls.send_message('POST', 'Deploy Policy', url, data=policy_data, headers=cls.header, basic_auth=basic_auth)