--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2020 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+import argparse
+from datetime import datetime
+from jinja2 import Template
+from flask import Flask, request
+import os.path
+from os import path
+from pygments.util import xrange
+from requests import ConnectionError
+import requests
+import sys
+import threading
+import time
+
+SERVICE_NAME = 'HealthCheck'
+BASE_URL = 'http://localhost:8081'
+RIC_CHUNK_SIZE = 10
+TIME_BETWEEN_CHECKS = 60
+
+type_to_use = ''
+policy_body = ''
+
+app = Flask(__name__)
+
+# Server info
+HOST_IP = "::"
+HOST_PORT = 9990
+APP_URL = "/stats"
+
+stat_page_template = """
+<!DOCTYPE html>
+<html>
+ <head>
+ <meta http-equiv=\"refresh\" content=\"{{refreshTime}}\">
+ <title>Non-RealTime RIC Health Check</title>
+ </head>
+ <body>
+ <h3>General</h3>
+ <font face=\"monospace\">
+ Policy type ID:...............................{{policyTypeId}}<br>
+ Policy body path:.............................{{policyBodyPath}}<br>
+ Time of last check:...........................{{time}}<br>
+ Duration of check:............................{{duration}}<br>
+ Number of checks:.............................{{noOfChecks}}<br>
+ </font>
+ <h3>Near-RT RICs</h3>
+ <font face=\"monospace\">
+ Number of unavailable Near-RT RICS:...........{{noOfUnavailableRics}}<br>
+ Number of Near-RT RICS not supporting type....{{noOfNotSupportingRics}}<br>
+ Number of Near-RT RICS supporting type:.......{{noOfSupportingRics}}<br>
+ </font>
+ <h3>Policies</h3>
+ <font face=\"monospace\">
+ Number of created policies:...................{{noOfCreatedPolicies}}<br>
+ Number of read policies:......................{{noOfReadPolicies}}<br>
+ Number of updated policies:...................{{noOfUpdatedPolicies}}<br>
+ Number of deleted policies:...................{{noOfDeletedPolicies}}<br>
+ </font>
+ </body>
+</html>
+"""
+type_to_use = "2"
+policy_body_path = 'pihw_template.json'
+
+duration = 0
+no_of_checks = 0
+no_of_unavailable_rics = 0
+no_of_rics_not_supporting_type = 0
+no_of_rics_supporting_type = 0
+no_of_created_policies = 0
+no_of_read_policies = 0
+no_of_updated_policies = 0
+no_of_deleted_policies = 0
+
+class Ric:
+
+ def __init__(self, name, supported_types, state):
+ self.name = name
+ self.supports_type_to_use = self.policy_type_supported(supported_types)
+ self.state = state
+ self.no_of_created_policies = 0
+ self.no_of_read_policies = 0
+ self.no_of_updated_policies = 0
+ self.no_of_deleted_policies = 0
+
+ def update_supported_types(self, supported_types):
+ self.supports_type_to_use = self.policy_type_supported(supported_types)
+
+ def policy_type_supported(self, supported_policy_types):
+ for supported_type in supported_policy_types:
+ if type_to_use == supported_type:
+ return True
+
+ return False
+
+
+class PolicyCheckThread (threading.Thread):
+
+ def __init__(self, thread_id, ric):
+ threading.Thread.__init__(self)
+ self.thread_id = thread_id
+ self.ric = ric
+
+ def run(self):
+ verboseprint(f'Checking ric: {self.ric.name}')
+ if put_policy(self.thread_id, self.ric.name):
+ verboseprint(f'Created policy: {self.thread_id} in ric: {self.ric.name}')
+ self.ric.no_of_created_policies += 1
+ time.sleep(0.5)
+ if get_policy(self.thread_id):
+ verboseprint(f'Read policy: {self.thread_id} from ric: {self.ric.name}')
+ self.ric.no_of_read_policies += 1
+ if put_policy(self.thread_id, self.ric.name, update_value=1):
+ verboseprint(f'Updated policy: {self.thread_id} in ric: {self.ric.name}')
+ self.ric.no_of_updated_policies += 1
+ if delete_policy(self.thread_id):
+ verboseprint(f'Deleted policy: {self.thread_id} from ric: {self.ric.name}')
+ self.ric.no_of_deleted_policies += 1
+
+
+class MonitorServer (threading.Thread):
+ def __init__(self):
+ threading.Thread.__init__(self)
+
+ def run(self):
+ verboseprint('Staring monitor server')
+ app.run(port=HOST_PORT, host=HOST_IP)
+
+
+@app.route(APP_URL,
+ methods=['GET'])
+def produceStatsPage():
+ t = Template(stat_page_template)
+ page = t.render(refreshTime=TIME_BETWEEN_CHECKS, policyTypeId=type_to_use, policyBodyPath=policy_body_path,
+ time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), duration=duration, noOfChecks=no_of_checks,
+ noOfUnavailableRics=no_of_unavailable_rics, noOfNotSupportingRics=no_of_rics_not_supporting_type,
+ noOfSupportingRics=no_of_rics_supporting_type, noOfCreatedPolicies=no_of_created_policies,
+ noOfReadPolicies=no_of_read_policies, noOfUpdatedPolicies=no_of_updated_policies,
+ noOfDeletedPolicies=no_of_deleted_policies)
+ return page,200
+
+def get_rics_from_agent():
+ resp = requests.get(BASE_URL + '/rics')
+ if not resp.ok:
+ verboseprint(f'Unable to get Rics {resp.status_code}')
+ return {}
+ return resp.json()
+
+
+def create_ric_dict(rics_as_json):
+ rics = {}
+ for ric_info in rics_as_json:
+ rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"], ric_info['state']))
+ verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
+
+ return rics
+
+
+def update_rics():
+ added_rics = {}
+ for ric_info in get_rics_from_agent():
+ if ric_info["ricName"] in rics:
+ rics[ric_info["ricName"]].update_supported_types(ric_info["policyTypes"])
+ rics[ric_info["ricName"]].state = ric_info['state']
+ else:
+ added_rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"]))
+ verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
+
+ rics.update(added_rics)
+
+
+def put_policy(thread_id, ric_name, update_value=0):
+ policy_id = f'thread_{thread_id}'
+ complete_url = f'{BASE_URL}/policy?type={type_to_use}&id={policy_id}&ric={ric_name}&service={SERVICE_NAME}'
+ headers = {'content-type': 'application/json'}
+ resp = requests.put(complete_url, policy_body.replace('XXX', str(thread_id + update_value)), headers=headers, verify=False)
+
+ if not resp.ok:
+ verboseprint(f'Unable to create policy {resp}')
+ return False
+ else:
+ return True
+
+
+def get_policy(thread_id):
+ policy_id = f'thread_{thread_id}'
+ complete_url = f'{BASE_URL}/policy?id={policy_id}'
+ resp = requests.get(complete_url)
+
+ if not resp.ok:
+ verboseprint(f'Unable to get policy {resp}')
+ return False
+ else:
+ return True
+
+
+def delete_policy(thread_id):
+ policy_id = f'thread_{thread_id}'
+ complete_url = f'{BASE_URL}/policy?id={policy_id}'
+ resp = requests.delete(complete_url)
+
+ if not resp.ok:
+ verboseprint(f'Unable to delete policy for policy ID {policy_id}')
+ return False
+
+ return True
+
+
+def statistics():
+ global duration
+ global no_of_checks
+ global no_of_unavailable_rics
+ global no_of_rics_not_supporting_type
+ global no_of_rics_supporting_type
+ global no_of_created_policies
+ global no_of_read_policies
+ global no_of_updated_policies
+ global no_of_deleted_policies
+
+ # Clear ric data between checks as it may have changed since last check.
+ no_of_unavailable_rics = 0
+ no_of_rics_not_supporting_type = 0
+ no_of_rics_supporting_type = 0
+
+ for ric in rics.values():
+ if not (ric.state == 'AVAILABLE' or ric.state == 'CONSISTENCY_CHECK'):
+ no_of_unavailable_rics += 1
+ elif ric.supports_type_to_use:
+ no_of_rics_supporting_type += 1
+ no_of_created_policies += ric.no_of_created_policies
+ no_of_read_policies += ric.no_of_read_policies
+ no_of_updated_policies += ric.no_of_updated_policies
+ no_of_deleted_policies += ric.no_of_deleted_policies
+ else:
+ no_of_rics_not_supporting_type += 1
+
+ print(f'*********** Statistics {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} *******************')
+ print(f'Duration of check: {duration.total_seconds()} seconds')
+ print(f'Number of checks: {no_of_checks}')
+ print(f'Number of unavailable Near-RT RICS: {no_of_unavailable_rics}')
+ print(f'Number of Near-RT RICS not supporting type: {no_of_rics_not_supporting_type}')
+ print(f'Number of Near-RT RICS supporting type: {no_of_rics_supporting_type}')
+ print(f'Number of created policies: {no_of_created_policies}')
+ print(f'Number of read policies: {no_of_read_policies}')
+ print(f'Number of updated policies: {no_of_updated_policies}')
+ print(f'Number of deleted policies: {no_of_deleted_policies}')
+ print('**************************************************************')
+
+
+def run_check_threads(rics):
+ thread_id = 1
+ threads = []
+ for ric in rics.values():
+ if ric.supports_type_to_use and (ric.state == 'AVAILABLE' or ric.state == 'CONSISTENCY_CHECK'): #or ric.name == 'ric_not_working':
+ policy_checker = PolicyCheckThread(thread_id, ric)
+ policy_checker.start()
+ thread_id += 1
+ threads.append(policy_checker)
+
+ for checker in threads:
+ checker.join()
+
+
+def split_rics_equally(chunks):
+ # prep with empty dicts
+ return_list = [dict() for _ in xrange(chunks)]
+ if len(rics) < RIC_CHUNK_SIZE:
+ return [rics]
+
+ idx = 0
+ for k,v in rics.items():
+ return_list[idx][k] = v
+ if idx < chunks-1: # indexes start at 0
+ idx += 1
+ else:
+ idx = 0
+ return return_list
+
+
+def get_no_of_chunks(size_of_chunks, size_to_chunk):
+ (q, _) = divmod(size_to_chunk, size_of_chunks)
+ return q
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(prog='PROG')
+ parser.add_argument('--policyTypeId', help='The ID of the policy type to use')
+ parser.add_argument('--policyBodyPath', help='The path to the JSON body of the policy to create')
+ parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing')
+ parser.add_argument('--version', action='version', version='%(prog)s 1.0')
+ args = vars(parser.parse_args())
+
+ if args['verbose']:
+ def verboseprint(*args, **kwargs):
+ print(*args, **kwargs)
+ else:
+ verboseprint = lambda *a, **k: None # do-nothing function
+
+ if args["policyTypeId"]:
+ type_to_use = args["policyTypeId"]
+
+ if args["policyBodyPath"]:
+ policy_body_path = args["policyBodyPath"]
+ if not os.path.exists(policy_body_path):
+ print(f'Policy body {policy_body_path} does not exist.')
+ sys.exit(1)
+
+ verboseprint(f'Using policy type {type_to_use}')
+ verboseprint(f'Using policy file {policy_body_path}')
+
+ with open(policy_body_path) as json_file:
+ policy_body = json_file.read()
+ verboseprint(f'Policy body: {policy_body}')
+
+ try:
+ rics_from_agent = get_rics_from_agent()
+ except ConnectionError:
+ print(f'A1PMS is not answering on {BASE_URL}, cannot start!')
+ sys.exit(1)
+
+ rics = create_ric_dict(rics_from_agent)
+
+ monitor_server = MonitorServer()
+ monitor_server.start()
+
+ while True:
+ start_time = datetime.now()
+ chunked_rics = split_rics_equally(get_no_of_chunks(RIC_CHUNK_SIZE, rics.__len__()))
+ for ric_chunk in chunked_rics:
+ run_check_threads(ric_chunk)
+
+ no_of_checks += 1
+ finish_time = datetime.now()
+ duration = finish_time - start_time
+ statistics()
+ sleep_time = TIME_BETWEEN_CHECKS - duration.total_seconds()
+ verboseprint(f'Sleeping {sleep_time} seconds')
+ time.sleep(sleep_time)
+ update_rics()
+
+ verboseprint('Exiting main')