Issue-ID: NONRTRIC-749
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I27f0ae89889b41cfc920ba4cf376dcda984947e4
--- /dev/null
+bin/
+lib/
+lib64
+my_venv/
+pyvenv.cfg
+share/
+launch.json
+.project
+.pydevproject
# syntax=docker/dockerfile:1
# syntax=docker/dockerfile:1
-FROM python:3.8-slim-buster
+FROM python:3.10-slim-buster
from datetime import datetime
from jinja2 import Template
from flask import Flask, request
from datetime import datetime
from jinja2 import Template
from flask import Flask, request
import os.path
from os import path
from pygments.util import xrange
import os.path
from os import path
from pygments.util import xrange
import time
SERVICE_NAME = 'HealthCheck'
import time
SERVICE_NAME = 'HealthCheck'
-BASE_URL = 'http://localhost:8081'
+DEFAULT_HOST = "http://localhost:8081"
+BASE_PATH = "/a1-policy/v2"
RIC_CHUNK_SIZE = 10
TIME_BETWEEN_CHECKS = 60
type_to_use = ''
RIC_CHUNK_SIZE = 10
TIME_BETWEEN_CHECKS = 60
type_to_use = ''
+base_url = DEFAULT_HOST + BASE_PATH
type_to_use = "2"
policy_body_path = 'pihw_template.json'
type_to_use = "2"
policy_body_path = 'pihw_template.json'
return page,200
def get_rics_from_agent():
return page,200
def get_rics_from_agent():
- resp = requests.get(BASE_URL + '/rics')
+ resp = requests.get(base_url + '/rics')
if not resp.ok:
verboseprint(f'Unable to get Rics {resp.status_code}')
return {}
if not resp.ok:
verboseprint(f'Unable to get Rics {resp.status_code}')
return {}
def create_ric_dict(rics_as_json):
rics = {}
def create_ric_dict(rics_as_json):
rics = {}
- for ric_info in rics_as_json:
- rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"], ric_info['state']))
- verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
+ for ric_info in rics_as_json["rics"]:
+ rics[ric_info["ric_id"]] = (Ric(ric_info["ric_id"], ric_info["policytype_ids"], ric_info['state']))
+ verboseprint(f'Adding ric: {rics[ric_info["ric_id"]]}')
return rics
def update_rics():
added_rics = {}
return rics
def update_rics():
added_rics = {}
- for ric_info in get_rics_from_agent():
- if ric_info["ricName"] in rics:
- rics[ric_info["ricName"]].update_supported_types(ric_info["policyTypes"])
- rics[ric_info["ricName"]].state = ric_info['state']
+ for ric_info in get_rics_from_agent()["rics"]:
+ if ric_info["ric_id"] in rics:
+ rics[ric_info["ric_id"]].update_supported_types(ric_info["policytype_ids"])
+ rics[ric_info["ric_id"]].state = ric_info['state']
- added_rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"]))
- verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
+ added_rics[ric_info["ric_id"]] = (Ric(ric_info["ric_id"], ric_info["policytype_ids"]))
+ verboseprint(f'Adding ric: {rics[ric_info["ric_id"]]}')
rics.update(added_rics)
def put_policy(thread_id, ric_name, update_value=0):
policy_id = f'thread_{thread_id}'
rics.update(added_rics)
def put_policy(thread_id, ric_name, update_value=0):
policy_id = f'thread_{thread_id}'
- complete_url = f'{BASE_URL}/policy?type={type_to_use}&id={policy_id}&ric={ric_name}&service={SERVICE_NAME}'
+ complete_url = base_url + '/policies'
headers = {'content-type': 'application/json'}
headers = {'content-type': 'application/json'}
- resp = requests.put(complete_url, policy_body.replace('XXX', str(thread_id + update_value)), headers=headers, verify=False)
+ policy_obj = json.loads(policy_data.replace('XXX', str(thread_id + update_value)))
+ body = {
+ "ric_id": ric_name,
+ "policy_id": policy_id,
+ "service_id": SERVICE_NAME,
+ "policy_data": policy_obj,
+ "policytype_id": type_to_use
+ }
+ resp = requests.put(complete_url, json=body, headers=headers, verify=False)
if not resp.ok:
verboseprint(f'Unable to create policy {resp}')
if not resp.ok:
verboseprint(f'Unable to create policy {resp}')
def get_policy(thread_id):
policy_id = f'thread_{thread_id}'
def get_policy(thread_id):
policy_id = f'thread_{thread_id}'
- complete_url = f'{BASE_URL}/policy?id={policy_id}'
+ complete_url = f'{base_url}/policies/{policy_id}'
resp = requests.get(complete_url)
if not resp.ok:
resp = requests.get(complete_url)
if not resp.ok:
def delete_policy(thread_id):
policy_id = f'thread_{thread_id}'
def delete_policy(thread_id):
policy_id = f'thread_{thread_id}'
- complete_url = f'{BASE_URL}/policy?id={policy_id}'
+ complete_url = f'{base_url}/policies/{policy_id}'
resp = requests.delete(complete_url)
if not resp.ok:
resp = requests.delete(complete_url)
if not resp.ok:
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='PROG')
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='PROG')
+ parser.add_argument('--pmsHost', help='The host of the A1 PMS, e.g. http://localhost:8081')
parser.add_argument('--policyTypeId', help='The ID of the policy type to use')
parser.add_argument('--policyBodyPath', help='The path to the JSON body of the policy to create')
parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing')
parser.add_argument('--policyTypeId', help='The ID of the policy type to use')
parser.add_argument('--policyBodyPath', help='The path to the JSON body of the policy to create')
parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing')
- parser.add_argument('--version', action='version', version='%(prog)s 1.0')
+ parser.add_argument('--version', action='version', version='%(prog)s 1.1')
args = vars(parser.parse_args())
if args['verbose']:
args = vars(parser.parse_args())
if args['verbose']:
else:
verboseprint = lambda *a, **k: None # do-nothing function
else:
verboseprint = lambda *a, **k: None # do-nothing function
+ if args["pmsHost"]:
+ base_url = args["pmsHost"] + BASE_PATH
+
if args["policyTypeId"]:
type_to_use = args["policyTypeId"]
if args["policyTypeId"]:
type_to_use = args["policyTypeId"]
verboseprint(f'Using policy file {policy_body_path}')
with open(policy_body_path) as json_file:
verboseprint(f'Using policy file {policy_body_path}')
with open(policy_body_path) as json_file:
- policy_body = json_file.read()
- verboseprint(f'Policy body: {policy_body}')
+ policy_data = json_file.read()
+ verboseprint(f'Policy body: {policy_data}')
try:
rics_from_agent = get_rics_from_agent()
except ConnectionError:
try:
rics_from_agent = get_rics_from_agent()
except ConnectionError:
- print(f'A1PMS is not answering on {BASE_URL}, cannot start!')
+ print(f'A1PMS is not answering on {base_url}, cannot start!')
sys.exit(1)
rics = create_ric_dict(rics_from_agent)
sys.exit(1)
rics = create_ric_dict(rics_from_agent)