Adapt to latest version of A1 PMS 41/8141/2
authorelinuxhenrik <henrik.b.andersson@est.tech>
Wed, 4 May 2022 08:44:10 +0000 (10:44 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Wed, 4 May 2022 11:13:03 +0000 (13:13 +0200)
Issue-ID: NONRTRIC-749
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I27f0ae89889b41cfc920ba4cf376dcda984947e4

.gitignore [new file with mode: 0644]
Dockerfile
src/main.py

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..b59a651
--- /dev/null
@@ -0,0 +1,9 @@
+bin/
+lib/
+lib64
+my_venv/
+pyvenv.cfg
+share/
+launch.json
+.project
+.pydevproject
index 541a0e9..4434e70 100644 (file)
@@ -1,6 +1,6 @@
 # syntax=docker/dockerfile:1
 
-FROM python:3.8-slim-buster
+FROM python:3.10-slim-buster
 
 WORKDIR /src
 
index 330c43d..82482b5 100644 (file)
@@ -19,6 +19,7 @@ import argparse
 from datetime import datetime
 from jinja2 import Template
 from flask import Flask, request
+import json
 import os.path
 from os import path
 from pygments.util import xrange
@@ -29,12 +30,13 @@ import threading
 import time
 
 SERVICE_NAME = 'HealthCheck'
-BASE_URL = 'http://localhost:8081'
+DEFAULT_HOST = "http://localhost:8081"
+BASE_PATH = "/a1-policy/v2"
 RIC_CHUNK_SIZE = 10
 TIME_BETWEEN_CHECKS = 60
 
 type_to_use = ''
-policy_body = ''
+policy_data = ''
 
 app = Flask(__name__)
 
@@ -75,6 +77,7 @@ stat_page_template = """
     </body>
 </html>
 """
+base_url = DEFAULT_HOST + BASE_PATH
 type_to_use = "2"
 policy_body_path = 'pihw_template.json'
 
@@ -156,7 +159,7 @@ def produceStatsPage():
     return page,200
 
 def get_rics_from_agent():
-    resp = requests.get(BASE_URL + '/rics')
+    resp = requests.get(base_url + '/rics')
     if not resp.ok:
         verboseprint(f'Unable to get Rics {resp.status_code}')
         return {}
@@ -165,31 +168,39 @@ def get_rics_from_agent():
 
 def create_ric_dict(rics_as_json):
     rics = {}
-    for ric_info in rics_as_json:
-        rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"], ric_info['state']))
-        verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
+    for ric_info in rics_as_json["rics"]:
+        rics[ric_info["ric_id"]] = (Ric(ric_info["ric_id"], ric_info["policytype_ids"], ric_info['state']))
+        verboseprint(f'Adding ric: {rics[ric_info["ric_id"]]}')
 
     return rics
 
 
 def update_rics():
     added_rics = {}
-    for ric_info in get_rics_from_agent():
-        if ric_info["ricName"] in rics:
-            rics[ric_info["ricName"]].update_supported_types(ric_info["policyTypes"])
-            rics[ric_info["ricName"]].state = ric_info['state']
+    for ric_info in get_rics_from_agent()["rics"]:
+        if ric_info["ric_id"] in rics:
+            rics[ric_info["ric_id"]].update_supported_types(ric_info["policytype_ids"])
+            rics[ric_info["ric_id"]].state = ric_info['state']
         else:
-            added_rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"]))
-            verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
+            added_rics[ric_info["ric_id"]] = (Ric(ric_info["ric_id"], ric_info["policytype_ids"]))
+            verboseprint(f'Adding ric: {rics[ric_info["ric_id"]]}')
 
     rics.update(added_rics)
 
 
 def put_policy(thread_id, ric_name, update_value=0):
     policy_id = f'thread_{thread_id}'
-    complete_url = f'{BASE_URL}/policy?type={type_to_use}&id={policy_id}&ric={ric_name}&service={SERVICE_NAME}'
+    complete_url = base_url + '/policies'
     headers = {'content-type': 'application/json'}
-    resp = requests.put(complete_url, policy_body.replace('XXX', str(thread_id + update_value)), headers=headers, verify=False)
+    policy_obj = json.loads(policy_data.replace('XXX', str(thread_id + update_value)))
+    body = {
+        "ric_id": ric_name,
+        "policy_id": policy_id,
+        "service_id": SERVICE_NAME,
+        "policy_data": policy_obj,
+        "policytype_id": type_to_use
+    }
+    resp = requests.put(complete_url, json=body, headers=headers, verify=False)
 
     if not resp.ok:
         verboseprint(f'Unable to create policy {resp}')
@@ -200,7 +211,7 @@ def put_policy(thread_id, ric_name, update_value=0):
 
 def get_policy(thread_id):
     policy_id = f'thread_{thread_id}'
-    complete_url = f'{BASE_URL}/policy?id={policy_id}'
+    complete_url = f'{base_url}/policies/{policy_id}'
     resp = requests.get(complete_url)
 
     if not resp.ok:
@@ -212,7 +223,7 @@ def get_policy(thread_id):
 
 def delete_policy(thread_id):
     policy_id = f'thread_{thread_id}'
-    complete_url = f'{BASE_URL}/policy?id={policy_id}'
+    complete_url = f'{base_url}/policies/{policy_id}'
     resp = requests.delete(complete_url)
 
     if not resp.ok:
@@ -300,10 +311,11 @@ def get_no_of_chunks(size_of_chunks, size_to_chunk):
 
 if __name__ == '__main__':
     parser = argparse.ArgumentParser(prog='PROG')
+    parser.add_argument('--pmsHost', help='The host of the A1 PMS, e.g. http://localhost:8081')
     parser.add_argument('--policyTypeId', help='The ID of the policy type to use')
     parser.add_argument('--policyBodyPath', help='The path to the JSON body of the policy to create')
     parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing')
-    parser.add_argument('--version', action='version', version='%(prog)s 1.0')
+    parser.add_argument('--version', action='version', version='%(prog)s 1.1')
     args = vars(parser.parse_args())
 
     if args['verbose']:
@@ -312,6 +324,9 @@ if __name__ == '__main__':
     else:
         verboseprint = lambda *a, **k: None # do-nothing function
 
+    if args["pmsHost"]:
+        base_url = args["pmsHost"] + BASE_PATH
+
     if args["policyTypeId"]:
         type_to_use = args["policyTypeId"]
 
@@ -325,13 +340,13 @@ if __name__ == '__main__':
     verboseprint(f'Using policy file {policy_body_path}')
 
     with open(policy_body_path) as json_file:
-        policy_body = json_file.read()
-        verboseprint(f'Policy body: {policy_body}')
+        policy_data = json_file.read()
+        verboseprint(f'Policy body: {policy_data}')
 
     try:
         rics_from_agent = get_rics_from_agent()
     except ConnectionError:
-        print(f'A1PMS is not answering on {BASE_URL}, cannot start!')
+        print(f'A1PMS is not answering on {base_url}, cannot start!')
         sys.exit(1)
 
     rics = create_ric_dict(rics_from_agent)