Implement Standard Defined Validator 83/8983/6
authorsahilkoli <sahil.koli@xoriant.com>
Tue, 30 Aug 2022 07:55:57 +0000 (13:25 +0530)
committersahil-xoriant <sahil.koli@xoriant.com>
Fri, 7 Oct 2022 12:10:56 +0000 (17:40 +0530)
SMO-76

Signed-off-by: sahilkoli <sahil.koli@xoriant.com>
Change-Id: I51d7fb9e0da93b9f611371607aaa4d61bae04717
Signed-off-by: sahilkoli <sahil.koli@xoriant.com>
Signed-off-by: sahil-xoriant <sahil.koli@xoriant.com>
collector/Dockerfile
collector/evel-test-collector/code/collector/monitor.py
collector/evel-test-collector/config/collector.conf
collector/evel-test-collector/docs/schema/README.md [new file with mode: 0644]
influxdb-connector/influxdb-connector/code/influxdb_connector.py

index 4932f55..f67e224 100755 (executable)
@@ -25,8 +25,7 @@ RUN apt-get install -y git curl python3 python3-pip
 
 COPY pip.conf /etc/pip.conf
 
-RUN pip3 install requests jsonschema kafka-python gevent
-
+RUN pip3 install requests jsonschema kafka-python gevent PyYAML
 
 RUN mkdir -p /opt/smo/certs
 
index 9daf78c..7cab45f 100755 (executable)
@@ -39,6 +39,8 @@ from json import dumps
 import datetime
 import time
 from gevent import pywsgi
+import yaml
+import requests
 
 monitor_mode = "f"
 vdu_id = ['', '', '', '', '', '']
@@ -77,6 +79,11 @@ vel_password = ''
 # ------------------------------------------------------------------------------
 vel_schema = None
 
+#------------------------------------------------------------------------------
+# The yaml schema which we will use to validate events.
+# ------------------------------------------------------------------------------
+vel_yaml_schema = None
+
 # ------------------------------------------------------------------------------
 # The JSON schema which we will use to validate client throttle state.
 # ------------------------------------------------------------------------------
@@ -100,7 +107,7 @@ logger = None
 producer = None
 
 
-def listener(environ, start_response, schema):
+def listener(environ, start_response, schema, yaml_schema=None):
     '''
     Handler for the Vendor Event Listener REST API.
 
@@ -149,6 +156,14 @@ def listener(environ, start_response, schema):
             decoded_body = json.loads(body)
             validate = jsonschema.validate(decoded_body, schema)
             assert (validate is None), "Invalid event!"
+            if 'stndDefinedFields' in decoded_body['event'].keys():
+                logger.debug('in yaml validation')
+                schema_ref = decoded_body['event']['stndDefinedFields']["schemaReference"]
+                if "https://forge.3gpp.org"  in schema_ref or "https://gerrit.o-ran-sc.org" in schema_ref:
+                    stnd_define_event_validation(yaml_schema, decoded_body)
+                else :
+                    logger.error("schema reference {0} not supported.".format(schema_ref))
+                    raise Exception("schema reference {0} not supported.".format(schema_ref))
 
             logger.info('Event is valid!')
             logger.info('Valid body decoded & checked against schema OK:\n'
@@ -244,8 +259,60 @@ def listener(environ, start_response, schema):
         except Exception as e:
             logger.error('Event invalid for unexpected reason! {0}'.format(e))
 
+# --------------------------------------------------------------------------
+# check yaml schema file exists or not
+# --------------------------------------------------------------------------
+def check_schema_file_exist(vel_schema_path, schema_ref):
+    logger.debug('in check yaml file')
+    assert (vel_schema_path != ""), "Value of property 'schema_file' is missing in config file"
+    # Fetching file and folder name from url
+    schema_ref =  schema_ref.split('#')[0]
+    name_list = schema_ref.split('/')
+    folder_name = '_'.join(name_list[2:-1])
+    file_name = name_list[-1]
+    updated_vel_schema_path = vel_schema_path +'/{0}/{1}'.format(folder_name,file_name)
+    if "https://forge.3gpp.org" in schema_ref:
+        schema_ref = schema_ref.replace("blob","raw")
+        schema_ref =  schema_ref + '?inline=false'
+    if not os.path.exists(updated_vel_schema_path):
+        logger.warning('Event Listener Schema File ({0}) not found. ''No validation will be undertaken.'.format(vel_schema_path))
+        logger.info('Start downloading yaml file :{}'.format(schema_ref))
+        result = os.system('curl -JOL "{0}"'.format(schema_ref))
+        logger.debug("result {0}".format(result))
+        assert(result == 0), "Invalid URL {0}".format(schema_ref)
+        logger.info("Download Completed")
+        with open(file_name, "r") as file:
+            first_line = file.readline()
+        # checking downloaded file content is yaml or html
+        if first_line.strip() == "<!DOCTYPE html>":
+            logger.info("Downloaded file is not valid yaml")
+            os.system("del {0} ".format(file_name))
+            logger.info("Downloaded file deleted")
+            assert(first_line.strip() != "<!DOCTYPE html>"), "Invalid Schema File"
+        else:
+            # Create a folder from source url
+            os.system('mkdir {0}/{1}'.format(vel_schema_path, folder_name))
+            # move downloaded file in above created folder
+            os.system("mv {0} {1}/{2}".format(file_name, vel_schema_path, folder_name))
+    return updated_vel_schema_path
 
 # --------------------------------------------------------------------------
+# Second level of validation for stnd define message
+# --------------------------------------------------------------------------
+def stnd_define_event_validation(schema_path , body):
+    logger.debug('in second level validation ')
+    schema_ref = body['event']['stndDefinedFields']["schemaReference"]
+    schema_path = check_schema_file_exist(schema_path , schema_ref)
+    logger.debug('end check yaml path ')
+    schema = yaml.full_load(open(schema_path, 'r'))
+    schema_name= schema_ref.split('/')[-1]
+    updated_schema = dict(schema["components"]["schemas"][schema_name],  **schema)
+    decoded_body = body['event']['stndDefinedFields']['data']
+    validate_yaml = jsonschema.validate(decoded_body,updated_schema)
+    assert(validate_yaml is None), "Invalid event!"
+    logger.info('standard defined event validated sucessfully ')
+    return validate_yaml
+# --------------------------------------------------------------------------
 # Save event data in Kafka
 # --------------------------------------------------------------------------
 def save_event_in_kafka(body):
@@ -496,6 +563,9 @@ USAGE
         test_control_schema_file = config.get(config_section,
                                              'test_control_schema_file',
                                               vars=overrides)
+        vel_yaml_schema = config.get(config_section,
+                                     'yaml_schema_path',
+                                     vars=overrides)
 
         # ----------------------------------------------------------------------
         # Finally we have enough info to start a proper flow trace.
@@ -625,7 +695,7 @@ USAGE
         global get_info
         get_info = root_url
         dispatcher = PathDispatcher()
-        vendor_event_listener = partial(listener, schema=vel_schema)
+        vendor_event_listener = partial(listener, schema=vel_schema, yaml_schema=vel_yaml_schema)
         dispatcher.register('GET', root_url, vendor_event_listener)
         dispatcher.register('POST', root_url, vendor_event_listener)
         vendor_throttle_listener = partial(listener, schema=throttle_schema)
index c5dbe21..ecea678 100755 (executable)
@@ -19,6 +19,7 @@ schema_file = evel-test-collector/docs/att_interface_definition/CommonEventForma
 base_schema_file = evel-test-collector/docs/att_interface_definition/base_schema.json
 throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
 test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = evel-test-collector/docs/schema
 
 #------------------------------------------------------------------------------
 # Details of the Vendor Event Listener REST service.
@@ -57,6 +58,7 @@ schema_file = ../../docs/att_interface_definition/event_format_updated.json
 base_schema_file =
 throttle_schema_file = ../../docs/att_interface_definition/throttle_schema.json
 test_control_schema_file = ../../docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = ../../docs/schema
 
 #------------------------------------------------------------------------------
 # Details of the Vendor Event Listener REST service.
diff --git a/collector/evel-test-collector/docs/schema/README.md b/collector/evel-test-collector/docs/schema/README.md
new file mode 100644 (file)
index 0000000..d73e2a0
--- /dev/null
@@ -0,0 +1 @@
+NOTE: This folder contains yaml schema folder
\ No newline at end of file
index 0cf19e3..5f06228 100644 (file)
@@ -199,6 +199,52 @@ def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, sta
 
     send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
 
+def process_stndDefinedFields_events(values, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+    """
+    Format stndDefined event to store in influx db
+    Values(dict) :- data to store in influxdb,
+    domain(str) :- name of topic ,
+    eventId (str) :- event id,
+    startEpochMicrosec :- Timestamp ,
+    lastEpochMicrosec:- Timestamp
+    """
+    pdata = domain + ",eventId={},system={}".format(eventId, source)
+    nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+    for key, val in values.items():
+        if isinstance(val, str):
+            pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+        elif isinstance(val, dict):
+            for key2, val2 in val.items():
+                if isinstance(val2, str) and val2 != '':
+                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                elif isinstance(val2, dict) and key2 != 'additionalInformation' :
+                    for key3, val3 in val2.items():
+                        if isinstance(val3, str) and val3 != '':
+                            pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
+                        elif val3 !='':
+                            nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+                elif key2 == 'additionalInformation':
+                    for key3, val3 in val2.items():
+                        if isinstance(val3, str) and val3 != '' :
+                            pdata = pdata + ',{}={}'.format('additionalInformation_'+key3, process_special_char(val3))
+                        elif val3 !='':
+                            nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+                elif key2 == 'correlatedNotifications':
+                    for item in val2:
+                        for key4, val4 in item.items():
+                            if isinstance(val4, str) and val4 !='':
+                                pdata = pdata + ',{}={}'.format(key4, process_special_char(val4))
+                            elif val4 !='':
+                                nonstringpdata = nonstringpdata + '{}={},'.format(key4, val4)
+
+                elif val2 !='':
+                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+        elif val !='':
+            nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
 
 def process_special_char(str):
     for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
@@ -290,6 +336,14 @@ def save_event_in_db(body):
                                    jobj['event']['commonEventHeader']['startEpochMicrosec'],
                                    jobj['event']['commonEventHeader']['lastEpochMicrosec'])
 
+    if "stndDefinedFields" in jobj['event']:
+        logger.debug('Found stndDefinedFields')
+        process_stndDefinedFields_events(jobj['event']['stndDefinedFields'],
+                                   domain,
+                                   jobj['event']['commonEventHeader']['eventId'],
+                                   jobj['event']['commonEventHeader']['startEpochMicrosec'],
+                                   jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+
 
 def main():
 
@@ -398,7 +452,7 @@ def main():
     c = Consumer(settings)
 
     c.subscribe(['measurement', 'pnfregistration',
-    'fault', 'thresholdcrossingalert', 'heartbeat'])
+    'fault', 'thresholdcrossingalert', 'heartbeat', 'stnddefined'])
 
     try:
         while True:
@@ -412,7 +466,7 @@ def main():
                 try:
                     save_event_in_db(msg.value())
                 except Exception as e:
-                    logger.error('Exception occured while saving data : '.format(e))
+                    logger.error('Exception occured while saving data : {} '.format(e))
             elif msg.error().code() == KafkaError._PARTITION_EOF:
                 logger.error('End of partition reached {0}/{1}'
                       .format(msg.topic(), msg.partition()))