# ============LICENSE_START=============================================== # Copyright (C) 2020 Nordix Foundation. All rights reserved. # ======================================================================== # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============LICENSE_END================================================= # import argparse from datetime import datetime from jinja2 import Template from flask import Flask, request import json import os.path from os import path from pygments.util import xrange from requests import ConnectionError import requests import sys import threading import time SERVICE_NAME = 'HealthCheck' DEFAULT_HOST = "http://localhost:8081" BASE_PATH = "/a1-policy/v2" RIC_CHUNK_SIZE = 10 TIME_BETWEEN_CHECKS = 60 type_to_use = '' policy_data = '' app = Flask(__name__) # Server info HOST_IP = "::" HOST_PORT = 9990 APP_URL = "/stats" stat_page_template = """ Non-RealTime RIC Health Check

General

Policy type ID:...............................{{policyTypeId}}
Policy body path:.............................{{policyBodyPath}}
Time of last check:...........................{{time}}
Duration of check:............................{{duration}}
Number of checks:.............................{{noOfChecks}}

Near-RT RICs

Number of unavailable Near-RT RICS:...........{{noOfUnavailableRics}}
Number of Near-RT RICS not supporting type....{{noOfNotSupportingRics}}
Number of Near-RT RICS supporting type:.......{{noOfSupportingRics}}

Policies

Number of created policies:...................{{noOfCreatedPolicies}}
Number of read policies:......................{{noOfReadPolicies}}
Number of updated policies:...................{{noOfUpdatedPolicies}}
Number of deleted policies:...................{{noOfDeletedPolicies}}
""" base_url = DEFAULT_HOST + BASE_PATH type_to_use = "2" policy_body_path = 'pihw_template.json' duration = 0 no_of_checks = 0 no_of_unavailable_rics = 0 no_of_rics_not_supporting_type = 0 no_of_rics_supporting_type = 0 no_of_created_policies = 0 no_of_read_policies = 0 no_of_updated_policies = 0 no_of_deleted_policies = 0 class Ric: def __init__(self, name, supported_types, state): self.name = name self.supports_type_to_use = self.policy_type_supported(supported_types) self.state = state self.no_of_created_policies = 0 self.no_of_read_policies = 0 self.no_of_updated_policies = 0 self.no_of_deleted_policies = 0 def update_supported_types(self, supported_types): self.supports_type_to_use = self.policy_type_supported(supported_types) def policy_type_supported(self, supported_policy_types): for supported_type in supported_policy_types: if type_to_use == supported_type: return True return False class PolicyCheckThread (threading.Thread): def __init__(self, thread_id, ric): threading.Thread.__init__(self) self.thread_id = thread_id self.ric = ric def run(self): verboseprint(f'Checking ric: {self.ric.name}') if put_policy(self.thread_id, self.ric.name): verboseprint(f'Created policy: {self.thread_id} in ric: {self.ric.name}') self.ric.no_of_created_policies += 1 time.sleep(0.5) if get_policy(self.thread_id): verboseprint(f'Read policy: {self.thread_id} from ric: {self.ric.name}') self.ric.no_of_read_policies += 1 if put_policy(self.thread_id, self.ric.name, update_value=1): verboseprint(f'Updated policy: {self.thread_id} in ric: {self.ric.name}') self.ric.no_of_updated_policies += 1 if delete_policy(self.thread_id): verboseprint(f'Deleted policy: {self.thread_id} from ric: {self.ric.name}') self.ric.no_of_deleted_policies += 1 class MonitorServer (threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): verboseprint('Staring monitor server') app.run(port=HOST_PORT, host=HOST_IP) @app.route(APP_URL, methods=['GET']) def produceStatsPage(): t = Template(stat_page_template) page = t.render(refreshTime=TIME_BETWEEN_CHECKS, policyTypeId=type_to_use, policyBodyPath=policy_body_path, time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), duration=duration, noOfChecks=no_of_checks, noOfUnavailableRics=no_of_unavailable_rics, noOfNotSupportingRics=no_of_rics_not_supporting_type, noOfSupportingRics=no_of_rics_supporting_type, noOfCreatedPolicies=no_of_created_policies, noOfReadPolicies=no_of_read_policies, noOfUpdatedPolicies=no_of_updated_policies, noOfDeletedPolicies=no_of_deleted_policies) return page,200 def get_rics_from_agent(): resp = requests.get(base_url + '/rics') if not resp.ok: verboseprint(f'Unable to get Rics {resp.status_code}') return {} return resp.json() def create_ric_dict(rics_as_json): rics = {} for ric_info in rics_as_json["rics"]: rics[ric_info["ric_id"]] = (Ric(ric_info["ric_id"], ric_info["policytype_ids"], ric_info['state'])) verboseprint(f'Adding ric: {rics[ric_info["ric_id"]]}') return rics def update_rics(): added_rics = {} for ric_info in get_rics_from_agent()["rics"]: if ric_info["ric_id"] in rics: rics[ric_info["ric_id"]].update_supported_types(ric_info["policytype_ids"]) rics[ric_info["ric_id"]].state = ric_info['state'] else: added_rics[ric_info["ric_id"]] = (Ric(ric_info["ric_id"], ric_info["policytype_ids"])) verboseprint(f'Adding ric: {rics[ric_info["ric_id"]]}') rics.update(added_rics) def put_policy(thread_id, ric_name, update_value=0): policy_id = f'thread_{thread_id}' complete_url = base_url + '/policies' headers = {'content-type': 'application/json'} policy_obj = json.loads(policy_data.replace('XXX', str(thread_id + update_value))) body = { "ric_id": ric_name, "policy_id": policy_id, "service_id": SERVICE_NAME, "policy_data": policy_obj, "policytype_id": type_to_use } resp = requests.put(complete_url, json=body, headers=headers, verify=False) if not resp.ok: verboseprint(f'Unable to create policy {resp}') return False else: return True def get_policy(thread_id): policy_id = f'thread_{thread_id}' complete_url = f'{base_url}/policies/{policy_id}' resp = requests.get(complete_url) if not resp.ok: verboseprint(f'Unable to get policy {resp}') return False else: return True def delete_policy(thread_id): policy_id = f'thread_{thread_id}' complete_url = f'{base_url}/policies/{policy_id}' resp = requests.delete(complete_url) if not resp.ok: verboseprint(f'Unable to delete policy for policy ID {policy_id}') return False return True def statistics(): global duration global no_of_checks global no_of_unavailable_rics global no_of_rics_not_supporting_type global no_of_rics_supporting_type global no_of_created_policies global no_of_read_policies global no_of_updated_policies global no_of_deleted_policies # Clear ric data between checks as it may have changed since last check. no_of_unavailable_rics = 0 no_of_rics_not_supporting_type = 0 no_of_rics_supporting_type = 0 for ric in rics.values(): if not (ric.state == 'AVAILABLE' or ric.state == 'CONSISTENCY_CHECK'): no_of_unavailable_rics += 1 elif ric.supports_type_to_use: no_of_rics_supporting_type += 1 no_of_created_policies += ric.no_of_created_policies no_of_read_policies += ric.no_of_read_policies no_of_updated_policies += ric.no_of_updated_policies no_of_deleted_policies += ric.no_of_deleted_policies else: no_of_rics_not_supporting_type += 1 print(f'*********** Statistics {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} *******************') print(f'Duration of check: {duration.total_seconds()} seconds') print(f'Number of checks: {no_of_checks}') print(f'Number of unavailable Near-RT RICS: {no_of_unavailable_rics}') print(f'Number of Near-RT RICS not supporting type: {no_of_rics_not_supporting_type}') print(f'Number of Near-RT RICS supporting type: {no_of_rics_supporting_type}') print(f'Number of created policies: {no_of_created_policies}') print(f'Number of read policies: {no_of_read_policies}') print(f'Number of updated policies: {no_of_updated_policies}') print(f'Number of deleted policies: {no_of_deleted_policies}') print('**************************************************************') def run_check_threads(rics): thread_id = 1 threads = [] for ric in rics.values(): if ric.supports_type_to_use and (ric.state == 'AVAILABLE' or ric.state == 'CONSISTENCY_CHECK'): #or ric.name == 'ric_not_working': policy_checker = PolicyCheckThread(thread_id, ric) policy_checker.start() thread_id += 1 threads.append(policy_checker) for checker in threads: checker.join() def split_rics_equally(chunks): # prep with empty dicts return_list = [dict() for _ in xrange(chunks)] if len(rics) < RIC_CHUNK_SIZE: return [rics] idx = 0 for k,v in rics.items(): return_list[idx][k] = v if idx < chunks-1: # indexes start at 0 idx += 1 else: idx = 0 return return_list def get_no_of_chunks(size_of_chunks, size_to_chunk): (q, _) = divmod(size_to_chunk, size_of_chunks) return q if __name__ == '__main__': parser = argparse.ArgumentParser(prog='PROG') parser.add_argument('--pmsHost', help='The host of the A1 PMS, e.g. http://localhost:8081') parser.add_argument('--policyTypeId', help='The ID of the policy type to use') parser.add_argument('--policyBodyPath', help='The path to the JSON body of the policy to create') parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing') parser.add_argument('--version', action='version', version='%(prog)s 1.1') args = vars(parser.parse_args()) if args['verbose']: def verboseprint(*args, **kwargs): print(*args, **kwargs) else: verboseprint = lambda *a, **k: None # do-nothing function if args["pmsHost"]: base_url = args["pmsHost"] + BASE_PATH if args["policyTypeId"]: type_to_use = args["policyTypeId"] if args["policyBodyPath"]: policy_body_path = args["policyBodyPath"] if not os.path.exists(policy_body_path): print(f'Policy body {policy_body_path} does not exist.') sys.exit(1) verboseprint(f'Using policy type {type_to_use}') verboseprint(f'Using policy file {policy_body_path}') with open(policy_body_path) as json_file: policy_data = json_file.read() verboseprint(f'Policy body: {policy_data}') try: rics_from_agent = get_rics_from_agent() except ConnectionError: print(f'A1PMS is not answering on {base_url}, cannot start!') sys.exit(1) rics = create_ric_dict(rics_from_agent) monitor_server = MonitorServer() monitor_server.start() while True: start_time = datetime.now() chunked_rics = split_rics_equally(get_no_of_chunks(RIC_CHUNK_SIZE, rics.__len__())) for ric_chunk in chunked_rics: run_check_threads(ric_chunk) no_of_checks += 1 finish_time = datetime.now() duration = finish_time - start_time statistics() sleep_time = TIME_BETWEEN_CHECKS - duration.total_seconds() verboseprint(f'Sleeping {sleep_time} seconds') time.sleep(sleep_time) update_rics() verboseprint('Exiting main')