From: sahilkoli Date: Tue, 30 Aug 2022 07:55:57 +0000 (+0530) Subject: Implement Standard Defined Validator X-Git-Tag: 6.0.2~5 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=e983725ff83ffb08c5b9658963747ee6b373c1dc;p=smo%2Fves.git Implement Standard Defined Validator SMO-76 Signed-off-by: sahilkoli Change-Id: I51d7fb9e0da93b9f611371607aaa4d61bae04717 Signed-off-by: sahilkoli Signed-off-by: sahil-xoriant --- diff --git a/collector/Dockerfile b/collector/Dockerfile index 4932f55..f67e224 100755 --- a/collector/Dockerfile +++ b/collector/Dockerfile @@ -25,8 +25,7 @@ RUN apt-get install -y git curl python3 python3-pip COPY pip.conf /etc/pip.conf -RUN pip3 install requests jsonschema kafka-python gevent - +RUN pip3 install requests jsonschema kafka-python gevent PyYAML RUN mkdir -p /opt/smo/certs diff --git a/collector/evel-test-collector/code/collector/monitor.py b/collector/evel-test-collector/code/collector/monitor.py index 9daf78c..7cab45f 100755 --- a/collector/evel-test-collector/code/collector/monitor.py +++ b/collector/evel-test-collector/code/collector/monitor.py @@ -39,6 +39,8 @@ from json import dumps import datetime import time from gevent import pywsgi +import yaml +import requests monitor_mode = "f" vdu_id = ['', '', '', '', '', ''] @@ -77,6 +79,11 @@ vel_password = '' # ------------------------------------------------------------------------------ vel_schema = None +#------------------------------------------------------------------------------ +# The yaml schema which we will use to validate events. +# ------------------------------------------------------------------------------ +vel_yaml_schema = None + # ------------------------------------------------------------------------------ # The JSON schema which we will use to validate client throttle state. # ------------------------------------------------------------------------------ @@ -100,7 +107,7 @@ logger = None producer = None -def listener(environ, start_response, schema): +def listener(environ, start_response, schema, yaml_schema=None): ''' Handler for the Vendor Event Listener REST API. @@ -149,6 +156,14 @@ def listener(environ, start_response, schema): decoded_body = json.loads(body) validate = jsonschema.validate(decoded_body, schema) assert (validate is None), "Invalid event!" + if 'stndDefinedFields' in decoded_body['event'].keys(): + logger.debug('in yaml validation') + schema_ref = decoded_body['event']['stndDefinedFields']["schemaReference"] + if "https://forge.3gpp.org" in schema_ref or "https://gerrit.o-ran-sc.org" in schema_ref: + stnd_define_event_validation(yaml_schema, decoded_body) + else : + logger.error("schema reference {0} not supported.".format(schema_ref)) + raise Exception("schema reference {0} not supported.".format(schema_ref)) logger.info('Event is valid!') logger.info('Valid body decoded & checked against schema OK:\n' @@ -244,8 +259,60 @@ def listener(environ, start_response, schema): except Exception as e: logger.error('Event invalid for unexpected reason! {0}'.format(e)) +# -------------------------------------------------------------------------- +# check yaml schema file exists or not +# -------------------------------------------------------------------------- +def check_schema_file_exist(vel_schema_path, schema_ref): + logger.debug('in check yaml file') + assert (vel_schema_path != ""), "Value of property 'schema_file' is missing in config file" + # Fetching file and folder name from url + schema_ref = schema_ref.split('#')[0] + name_list = schema_ref.split('/') + folder_name = '_'.join(name_list[2:-1]) + file_name = name_list[-1] + updated_vel_schema_path = vel_schema_path +'/{0}/{1}'.format(folder_name,file_name) + if "https://forge.3gpp.org" in schema_ref: + schema_ref = schema_ref.replace("blob","raw") + schema_ref = schema_ref + '?inline=false' + if not os.path.exists(updated_vel_schema_path): + logger.warning('Event Listener Schema File ({0}) not found. ''No validation will be undertaken.'.format(vel_schema_path)) + logger.info('Start downloading yaml file :{}'.format(schema_ref)) + result = os.system('curl -JOL "{0}"'.format(schema_ref)) + logger.debug("result {0}".format(result)) + assert(result == 0), "Invalid URL {0}".format(schema_ref) + logger.info("Download Completed") + with open(file_name, "r") as file: + first_line = file.readline() + # checking downloaded file content is yaml or html + if first_line.strip() == "": + logger.info("Downloaded file is not valid yaml") + os.system("del {0} ".format(file_name)) + logger.info("Downloaded file deleted") + assert(first_line.strip() != ""), "Invalid Schema File" + else: + # Create a folder from source url + os.system('mkdir {0}/{1}'.format(vel_schema_path, folder_name)) + # move downloaded file in above created folder + os.system("mv {0} {1}/{2}".format(file_name, vel_schema_path, folder_name)) + return updated_vel_schema_path # -------------------------------------------------------------------------- +# Second level of validation for stnd define message +# -------------------------------------------------------------------------- +def stnd_define_event_validation(schema_path , body): + logger.debug('in second level validation ') + schema_ref = body['event']['stndDefinedFields']["schemaReference"] + schema_path = check_schema_file_exist(schema_path , schema_ref) + logger.debug('end check yaml path ') + schema = yaml.full_load(open(schema_path, 'r')) + schema_name= schema_ref.split('/')[-1] + updated_schema = dict(schema["components"]["schemas"][schema_name], **schema) + decoded_body = body['event']['stndDefinedFields']['data'] + validate_yaml = jsonschema.validate(decoded_body,updated_schema) + assert(validate_yaml is None), "Invalid event!" + logger.info('standard defined event validated sucessfully ') + return validate_yaml +# -------------------------------------------------------------------------- # Save event data in Kafka # -------------------------------------------------------------------------- def save_event_in_kafka(body): @@ -496,6 +563,9 @@ USAGE test_control_schema_file = config.get(config_section, 'test_control_schema_file', vars=overrides) + vel_yaml_schema = config.get(config_section, + 'yaml_schema_path', + vars=overrides) # ---------------------------------------------------------------------- # Finally we have enough info to start a proper flow trace. @@ -625,7 +695,7 @@ USAGE global get_info get_info = root_url dispatcher = PathDispatcher() - vendor_event_listener = partial(listener, schema=vel_schema) + vendor_event_listener = partial(listener, schema=vel_schema, yaml_schema=vel_yaml_schema) dispatcher.register('GET', root_url, vendor_event_listener) dispatcher.register('POST', root_url, vendor_event_listener) vendor_throttle_listener = partial(listener, schema=throttle_schema) diff --git a/collector/evel-test-collector/config/collector.conf b/collector/evel-test-collector/config/collector.conf index c5dbe21..ecea678 100755 --- a/collector/evel-test-collector/config/collector.conf +++ b/collector/evel-test-collector/config/collector.conf @@ -19,6 +19,7 @@ schema_file = evel-test-collector/docs/att_interface_definition/CommonEventForma base_schema_file = evel-test-collector/docs/att_interface_definition/base_schema.json throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json +yaml_schema_path = evel-test-collector/docs/schema #------------------------------------------------------------------------------ # Details of the Vendor Event Listener REST service. @@ -57,6 +58,7 @@ schema_file = ../../docs/att_interface_definition/event_format_updated.json base_schema_file = throttle_schema_file = ../../docs/att_interface_definition/throttle_schema.json test_control_schema_file = ../../docs/att_interface_definition/test_control_schema.json +yaml_schema_path = ../../docs/schema #------------------------------------------------------------------------------ # Details of the Vendor Event Listener REST service. diff --git a/collector/evel-test-collector/docs/schema/README.md b/collector/evel-test-collector/docs/schema/README.md new file mode 100644 index 0000000..d73e2a0 --- /dev/null +++ b/collector/evel-test-collector/docs/schema/README.md @@ -0,0 +1 @@ +NOTE: This folder contains yaml schema folder \ No newline at end of file diff --git a/influxdb-connector/influxdb-connector/code/influxdb_connector.py b/influxdb-connector/influxdb-connector/code/influxdb_connector.py index 0cf19e3..5f06228 100644 --- a/influxdb-connector/influxdb-connector/code/influxdb_connector.py +++ b/influxdb-connector/influxdb-connector/code/influxdb_connector.py @@ -199,6 +199,52 @@ def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, sta send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) +def process_stndDefinedFields_events(values, domain, eventId, startEpochMicrosec, lastEpochMicrosec): + """ + Format stndDefined event to store in influx db + Values(dict) :- data to store in influxdb, + domain(str) :- name of topic , + eventId (str) :- event id, + startEpochMicrosec :- Timestamp , + lastEpochMicrosec:- Timestamp + """ + pdata = domain + ",eventId={},system={}".format(eventId, source) + nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec) + for key, val in values.items(): + if isinstance(val, str): + pdata = pdata + ',{}={}'.format(key, process_special_char(val)) + elif isinstance(val, dict): + for key2, val2 in val.items(): + if isinstance(val2, str) and val2 != '': + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) + elif isinstance(val2, dict) and key2 != 'additionalInformation' : + for key3, val3 in val2.items(): + if isinstance(val3, str) and val3 != '': + pdata = pdata + ',{}={}'.format(key3, process_special_char(val3)) + elif val3 !='': + nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3) + elif key2 == 'additionalInformation': + for key3, val3 in val2.items(): + if isinstance(val3, str) and val3 != '' : + pdata = pdata + ',{}={}'.format('additionalInformation_'+key3, process_special_char(val3)) + elif val3 !='': + nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3) + elif key2 == 'correlatedNotifications': + for item in val2: + for key4, val4 in item.items(): + if isinstance(val4, str) and val4 !='': + pdata = pdata + ',{}={}'.format(key4, process_special_char(val4)) + elif val4 !='': + nonstringpdata = nonstringpdata + '{}={},'.format(key4, val4) + + elif val2 !='': + nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2) + elif val !='': + nonstringpdata = nonstringpdata + '{}={},'.format(key, val) + + send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) + + def process_special_char(str): for search_char, replace_char in {" ": "\ ", ",": "\,"}.items(): @@ -290,6 +336,14 @@ def save_event_in_db(body): jobj['event']['commonEventHeader']['startEpochMicrosec'], jobj['event']['commonEventHeader']['lastEpochMicrosec']) + if "stndDefinedFields" in jobj['event']: + logger.debug('Found stndDefinedFields') + process_stndDefinedFields_events(jobj['event']['stndDefinedFields'], + domain, + jobj['event']['commonEventHeader']['eventId'], + jobj['event']['commonEventHeader']['startEpochMicrosec'], + jobj['event']['commonEventHeader']['lastEpochMicrosec']) + def main(): @@ -398,7 +452,7 @@ def main(): c = Consumer(settings) c.subscribe(['measurement', 'pnfregistration', - 'fault', 'thresholdcrossingalert', 'heartbeat']) + 'fault', 'thresholdcrossingalert', 'heartbeat', 'stnddefined']) try: while True: @@ -412,7 +466,7 @@ def main(): try: save_event_in_db(msg.value()) except Exception as e: - logger.error('Exception occured while saving data : '.format(e)) + logger.error('Exception occured while saving data : {} '.format(e)) elif msg.error().code() == KafkaError._PARTITION_EOF: logger.error('End of partition reached {0}/{1}' .format(msg.topic(), msg.partition()))