from datetime import datetime
from jinja2 import Template
from flask import Flask, request
+import json
import os.path
from os import path
from pygments.util import xrange
import time
SERVICE_NAME = 'HealthCheck'
-BASE_URL = 'http://localhost:8081'
+DEFAULT_HOST = "http://localhost:8081"
+BASE_PATH = "/a1-policy/v2"
RIC_CHUNK_SIZE = 10
TIME_BETWEEN_CHECKS = 60
type_to_use = ''
-policy_body = ''
+policy_data = ''
app = Flask(__name__)
</body>
</html>
"""
+base_url = DEFAULT_HOST + BASE_PATH
type_to_use = "2"
policy_body_path = 'pihw_template.json'
return page,200
def get_rics_from_agent():
- resp = requests.get(BASE_URL + '/rics')
+ resp = requests.get(base_url + '/rics')
if not resp.ok:
verboseprint(f'Unable to get Rics {resp.status_code}')
return {}
def create_ric_dict(rics_as_json):
rics = {}
- for ric_info in rics_as_json:
- rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"], ric_info['state']))
- verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
+ for ric_info in rics_as_json["rics"]:
+ rics[ric_info["ric_id"]] = (Ric(ric_info["ric_id"], ric_info["policytype_ids"], ric_info['state']))
+ verboseprint(f'Adding ric: {rics[ric_info["ric_id"]]}')
return rics
def update_rics():
added_rics = {}
- for ric_info in get_rics_from_agent():
- if ric_info["ricName"] in rics:
- rics[ric_info["ricName"]].update_supported_types(ric_info["policyTypes"])
- rics[ric_info["ricName"]].state = ric_info['state']
+ for ric_info in get_rics_from_agent()["rics"]:
+ if ric_info["ric_id"] in rics:
+ rics[ric_info["ric_id"]].update_supported_types(ric_info["policytype_ids"])
+ rics[ric_info["ric_id"]].state = ric_info['state']
else:
- added_rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"]))
- verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
+ added_rics[ric_info["ric_id"]] = (Ric(ric_info["ric_id"], ric_info["policytype_ids"]))
+ verboseprint(f'Adding ric: {rics[ric_info["ric_id"]]}')
rics.update(added_rics)
def put_policy(thread_id, ric_name, update_value=0):
policy_id = f'thread_{thread_id}'
- complete_url = f'{BASE_URL}/policy?type={type_to_use}&id={policy_id}&ric={ric_name}&service={SERVICE_NAME}'
+ complete_url = base_url + '/policies'
headers = {'content-type': 'application/json'}
- resp = requests.put(complete_url, policy_body.replace('XXX', str(thread_id + update_value)), headers=headers, verify=False)
+ policy_obj = json.loads(policy_data.replace('XXX', str(thread_id + update_value)))
+ body = {
+ "ric_id": ric_name,
+ "policy_id": policy_id,
+ "service_id": SERVICE_NAME,
+ "policy_data": policy_obj,
+ "policytype_id": type_to_use
+ }
+ resp = requests.put(complete_url, json=body, headers=headers, verify=False)
if not resp.ok:
verboseprint(f'Unable to create policy {resp}')
def get_policy(thread_id):
policy_id = f'thread_{thread_id}'
- complete_url = f'{BASE_URL}/policy?id={policy_id}'
+ complete_url = f'{base_url}/policies/{policy_id}'
resp = requests.get(complete_url)
if not resp.ok:
def delete_policy(thread_id):
policy_id = f'thread_{thread_id}'
- complete_url = f'{BASE_URL}/policy?id={policy_id}'
+ complete_url = f'{base_url}/policies/{policy_id}'
resp = requests.delete(complete_url)
if not resp.ok:
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='PROG')
+ parser.add_argument('--pmsHost', help='The host of the A1 PMS, e.g. http://localhost:8081')
parser.add_argument('--policyTypeId', help='The ID of the policy type to use')
parser.add_argument('--policyBodyPath', help='The path to the JSON body of the policy to create')
parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing')
- parser.add_argument('--version', action='version', version='%(prog)s 1.0')
+ parser.add_argument('--version', action='version', version='%(prog)s 1.1')
args = vars(parser.parse_args())
if args['verbose']:
else:
verboseprint = lambda *a, **k: None # do-nothing function
+ if args["pmsHost"]:
+ base_url = args["pmsHost"] + BASE_PATH
+
if args["policyTypeId"]:
type_to_use = args["policyTypeId"]
verboseprint(f'Using policy file {policy_body_path}')
with open(policy_body_path) as json_file:
- policy_body = json_file.read()
- verboseprint(f'Policy body: {policy_body}')
+ policy_data = json_file.read()
+ verboseprint(f'Policy body: {policy_data}')
try:
rics_from_agent = get_rics_from_agent()
except ConnectionError:
- print(f'A1PMS is not answering on {BASE_URL}, cannot start!')
+ print(f'A1PMS is not answering on {base_url}, cannot start!')
sys.exit(1)
rics = create_ric_dict(rics_from_agent)