Fix Major bugs, Vulnerabilities, Security issues as per Sonar job report 58/9058/2
authorsahilkoli <sahil.koli@xoriant.com>
Wed, 21 Sep 2022 09:59:35 +0000 (15:29 +0530)
committersahil-xoriant <sahil.koli@xoriant.com>
Tue, 11 Oct 2022 05:02:07 +0000 (10:32 +0530)
SMO-94

Signed-off-by: sahilkoli <sahil.koli@xoriant.com>
Change-Id: Ia4d119abb20ca0085d3bf4c1baa288d02193ea14
Signed-off-by: sahil-xoriant <sahil.koli@xoriant.com>
18 files changed:
collector/Dockerfile
collector/evel-test-collector/code/collector/monitor.py
collector/evel-test-collector/config/collector.conf
collector/evel-test-collector/docs/schema/README.md [new file with mode: 0644]
collector/pip.conf [new file with mode: 0644]
dmaapadapter/Dockerfile
dmaapadapter/adapter/code/dmaap_adapter.py
dmaapadapter/pip.conf [new file with mode: 0644]
functionaltest/test_negative_testsuit.py
functionaltest/test_positive_testsuit.py
influxdb-connector/Dockerfile
influxdb-connector/influxdb-connector/code/influxdb_connector.py
influxdb-connector/pip.conf [new file with mode: 0644]
requirements.txt [new file with mode: 0644]
tests/collector/test_monitor.py
tests/influxdb_connector/events.txt
tests/influxdb_connector/test_influxdb_connector.py
tests/influxdb_connector/test_influxdb_events.py

index 74c65ab..f67e224 100755 (executable)
@@ -22,8 +22,10 @@ FROM ubuntu:focal
 
 RUN apt-get update && apt-get -y upgrade
 RUN apt-get install -y git curl python3 python3-pip
-RUN pip3 install requests jsonschema kafka-python gevent
 
+COPY pip.conf /etc/pip.conf
+
+RUN pip3 install requests jsonschema kafka-python gevent PyYAML
 
 RUN mkdir -p /opt/smo/certs
 
index 9daf78c..7cab45f 100755 (executable)
@@ -39,6 +39,8 @@ from json import dumps
 import datetime
 import time
 from gevent import pywsgi
+import yaml
+import requests
 
 monitor_mode = "f"
 vdu_id = ['', '', '', '', '', '']
@@ -77,6 +79,11 @@ vel_password = ''
 # ------------------------------------------------------------------------------
 vel_schema = None
 
+#------------------------------------------------------------------------------
+# The yaml schema which we will use to validate events.
+# ------------------------------------------------------------------------------
+vel_yaml_schema = None
+
 # ------------------------------------------------------------------------------
 # The JSON schema which we will use to validate client throttle state.
 # ------------------------------------------------------------------------------
@@ -100,7 +107,7 @@ logger = None
 producer = None
 
 
-def listener(environ, start_response, schema):
+def listener(environ, start_response, schema, yaml_schema=None):
     '''
     Handler for the Vendor Event Listener REST API.
 
@@ -149,6 +156,14 @@ def listener(environ, start_response, schema):
             decoded_body = json.loads(body)
             validate = jsonschema.validate(decoded_body, schema)
             assert (validate is None), "Invalid event!"
+            if 'stndDefinedFields' in decoded_body['event'].keys():
+                logger.debug('in yaml validation')
+                schema_ref = decoded_body['event']['stndDefinedFields']["schemaReference"]
+                if "https://forge.3gpp.org"  in schema_ref or "https://gerrit.o-ran-sc.org" in schema_ref:
+                    stnd_define_event_validation(yaml_schema, decoded_body)
+                else :
+                    logger.error("schema reference {0} not supported.".format(schema_ref))
+                    raise Exception("schema reference {0} not supported.".format(schema_ref))
 
             logger.info('Event is valid!')
             logger.info('Valid body decoded & checked against schema OK:\n'
@@ -244,8 +259,60 @@ def listener(environ, start_response, schema):
         except Exception as e:
             logger.error('Event invalid for unexpected reason! {0}'.format(e))
 
+# --------------------------------------------------------------------------
+# check yaml schema file exists or not
+# --------------------------------------------------------------------------
+def check_schema_file_exist(vel_schema_path, schema_ref):
+    logger.debug('in check yaml file')
+    assert (vel_schema_path != ""), "Value of property 'schema_file' is missing in config file"
+    # Fetching file and folder name from url
+    schema_ref =  schema_ref.split('#')[0]
+    name_list = schema_ref.split('/')
+    folder_name = '_'.join(name_list[2:-1])
+    file_name = name_list[-1]
+    updated_vel_schema_path = vel_schema_path +'/{0}/{1}'.format(folder_name,file_name)
+    if "https://forge.3gpp.org" in schema_ref:
+        schema_ref = schema_ref.replace("blob","raw")
+        schema_ref =  schema_ref + '?inline=false'
+    if not os.path.exists(updated_vel_schema_path):
+        logger.warning('Event Listener Schema File ({0}) not found. ''No validation will be undertaken.'.format(vel_schema_path))
+        logger.info('Start downloading yaml file :{}'.format(schema_ref))
+        result = os.system('curl -JOL "{0}"'.format(schema_ref))
+        logger.debug("result {0}".format(result))
+        assert(result == 0), "Invalid URL {0}".format(schema_ref)
+        logger.info("Download Completed")
+        with open(file_name, "r") as file:
+            first_line = file.readline()
+        # checking downloaded file content is yaml or html
+        if first_line.strip() == "<!DOCTYPE html>":
+            logger.info("Downloaded file is not valid yaml")
+            os.system("del {0} ".format(file_name))
+            logger.info("Downloaded file deleted")
+            assert(first_line.strip() != "<!DOCTYPE html>"), "Invalid Schema File"
+        else:
+            # Create a folder from source url
+            os.system('mkdir {0}/{1}'.format(vel_schema_path, folder_name))
+            # move downloaded file in above created folder
+            os.system("mv {0} {1}/{2}".format(file_name, vel_schema_path, folder_name))
+    return updated_vel_schema_path
 
 # --------------------------------------------------------------------------
+# Second level of validation for stnd define message
+# --------------------------------------------------------------------------
+def stnd_define_event_validation(schema_path , body):
+    logger.debug('in second level validation ')
+    schema_ref = body['event']['stndDefinedFields']["schemaReference"]
+    schema_path = check_schema_file_exist(schema_path , schema_ref)
+    logger.debug('end check yaml path ')
+    schema = yaml.full_load(open(schema_path, 'r'))
+    schema_name= schema_ref.split('/')[-1]
+    updated_schema = dict(schema["components"]["schemas"][schema_name],  **schema)
+    decoded_body = body['event']['stndDefinedFields']['data']
+    validate_yaml = jsonschema.validate(decoded_body,updated_schema)
+    assert(validate_yaml is None), "Invalid event!"
+    logger.info('standard defined event validated sucessfully ')
+    return validate_yaml
+# --------------------------------------------------------------------------
 # Save event data in Kafka
 # --------------------------------------------------------------------------
 def save_event_in_kafka(body):
@@ -496,6 +563,9 @@ USAGE
         test_control_schema_file = config.get(config_section,
                                              'test_control_schema_file',
                                               vars=overrides)
+        vel_yaml_schema = config.get(config_section,
+                                     'yaml_schema_path',
+                                     vars=overrides)
 
         # ----------------------------------------------------------------------
         # Finally we have enough info to start a proper flow trace.
@@ -625,7 +695,7 @@ USAGE
         global get_info
         get_info = root_url
         dispatcher = PathDispatcher()
-        vendor_event_listener = partial(listener, schema=vel_schema)
+        vendor_event_listener = partial(listener, schema=vel_schema, yaml_schema=vel_yaml_schema)
         dispatcher.register('GET', root_url, vendor_event_listener)
         dispatcher.register('POST', root_url, vendor_event_listener)
         vendor_throttle_listener = partial(listener, schema=throttle_schema)
index c5dbe21..ecea678 100755 (executable)
@@ -19,6 +19,7 @@ schema_file = evel-test-collector/docs/att_interface_definition/CommonEventForma
 base_schema_file = evel-test-collector/docs/att_interface_definition/base_schema.json
 throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
 test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = evel-test-collector/docs/schema
 
 #------------------------------------------------------------------------------
 # Details of the Vendor Event Listener REST service.
@@ -57,6 +58,7 @@ schema_file = ../../docs/att_interface_definition/event_format_updated.json
 base_schema_file =
 throttle_schema_file = ../../docs/att_interface_definition/throttle_schema.json
 test_control_schema_file = ../../docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = ../../docs/schema
 
 #------------------------------------------------------------------------------
 # Details of the Vendor Event Listener REST service.
diff --git a/collector/evel-test-collector/docs/schema/README.md b/collector/evel-test-collector/docs/schema/README.md
new file mode 100644 (file)
index 0000000..d73e2a0
--- /dev/null
@@ -0,0 +1 @@
+NOTE: This folder contains yaml schema folder
\ No newline at end of file
diff --git a/collector/pip.conf b/collector/pip.conf
new file mode 100644 (file)
index 0000000..6581d0e
--- /dev/null
@@ -0,0 +1,3 @@
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
index 58a8caa..7b29115 100755 (executable)
 
 FROM ubuntu:focal
 
+
 RUN apt-get update && apt-get -y upgrade
-RUN apt-get install -y git curl python3 python3-pip
-RUN pip3 install requests jsonschema kafka-python flask confluent-kafka
+RUN apt-get install -y git curl python3 python3-pip 
 
+COPY pip.conf /etc/pip.conf
+
+RUN pip3 install requests jsonschema kafka-python flask confluent-kafka
 
 RUN mkdir /opt/smo
 
index df8d2a4..f6c1b0d 100644 (file)
@@ -53,7 +53,7 @@ def listall_topics():
 
 @app.route(api_base_url + '/topics/<topic>', methods=['GET'])
 def topic_details(topic):
-    topic == request.view_args['topic']
+    assert topic == request.view_args['topic']
     prepareResponse = PrepareResponse()
     topicConsumer = TopicConsumer()
     topicConsumer.getTopicDetails(prepareResponse, topic)
@@ -65,9 +65,9 @@ def topic_details(topic):
 
 @app.route(api_base_url + '/events/<topic>/<consumergroup>/<consumerid>', methods=['GET'])
 def get_events(topic, consumergroup, consumerid):
-    topic == request.view_args['topic']
-    consumergroup == request.view_args['consumergroup']
-    consumerid == request.view_args['consumerid']
+    assert topic == request.view_args['topic']
+    assert consumergroup == request.view_args['consumergroup']
+    assert consumerid == request.view_args['consumerid']
     limit = ""
     timeout = ""
 
@@ -90,8 +90,7 @@ def getLimit(limit):
         limit = int(limit)
     except Exception:
         limit = -1
-    finally:
-        return limit
+    return limit
 
 
 def getTimeout(timeout):
@@ -101,8 +100,7 @@ def getTimeout(timeout):
             timeout = 15
     except Exception:
         timeout = 15
-    finally:
-        return timeout
+    return timeout
 
 
 if __name__ == '__main__':
diff --git a/dmaapadapter/pip.conf b/dmaapadapter/pip.conf
new file mode 100644 (file)
index 0000000..6581d0e
--- /dev/null
@@ -0,0 +1,3 @@
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
index 2e81cff..4ac35f4 100644 (file)
@@ -55,7 +55,6 @@ def test_invalid_domain():
                         "keepaliveDelay": "120",
                         "maxConnectionAttempts": "100",
                         "oamPort": "830",
-                        "password": "netconf",
                         "protocol": "SSH",
                         "reconnectOnChangedSchema": "false",
                         "sleep-factor": "1.5",
@@ -81,7 +80,6 @@ def test_invalid_domain():
     response = requests.post(collector_url + "/eventListener/v7/events",
                              json=payload,
                              auth=('user', 'password'),
-                             verify=False,
                              headers={"Content-Type": "application/json"})
     assert "400" in str(response)
     print("Success")
index d325728..febcec9 100644 (file)
@@ -64,7 +64,6 @@ def test_generate_pnfRegistration_event():
                     "keepaliveDelay": "120",
                     "maxConnectionAttempts": "100",
                     "oamPort": "830",
-                    "password": "netconf",
                     "protocol": "SSH",
                     "reconnectOnChangedSchema": "false",
                     "sleep-factor": "1.5",
@@ -90,7 +89,6 @@ def test_generate_pnfRegistration_event():
     response = requests.post(collector_url + "/eventListener/v7/events",
                              json=payload,
                              auth=('user', 'password'),
-                             verify=False,
                              headers={"Content-Type": "application/json"})
     assert "202" in str(response)
     print("Success")
@@ -140,7 +138,6 @@ def test_generate_heartBeat_event():
     response = requests.post(collector_url + "/eventListener/v7/events",
                              json=payload,
                              auth=('user', 'password'),
-                             verify=False,
                              headers={"Content-Type": "application/json"})
     assert "202" in str(response)
     print("Success")
@@ -198,7 +195,6 @@ def test_generate_fault_event():
     response = requests.post(collector_url + "/eventListener/v7/events",
                              json=payload,
                              auth=('user', 'password'),
-                             verify=False,
                              headers={"Content-Type": "application/json"})
     assert "202" in str(response)
     print("Success")
@@ -274,7 +270,6 @@ def test_generate_thresholdCrossingAlert_event():
     response = requests.post(collector_url + "/eventListener/v7/events",
                              json=payload,
                              auth=('user', 'password'),
-                             verify=False,
                              headers={"Content-Type": "application/json"})
     assert "202" in str(response)
     print("Success")
@@ -382,7 +377,6 @@ def test_generate_measurement_event():
     response = requests.post(collector_url + "/eventListener/v7/events",
                              json=payload,
                              auth=('user', 'password'),
-                             verify=False,
                              headers={"Content-Type": "application/json"})
     assert "202" in str(response)
     print("Success")
index bd91696..ee60aa9 100755 (executable)
@@ -17,6 +17,9 @@ FROM ubuntu:focal
 
 RUN apt-get update && apt-get -y upgrade
 RUN apt-get install -y git curl python3 python3-pip
+
+COPY pip.conf /etc/pip.conf
+
 RUN pip3 install requests confluent-kafka
 
 # Clone influxdb-connector
index 0cf19e3..dd8b838 100644 (file)
@@ -42,6 +42,20 @@ def send_to_influxdb(event, pdata):
     except Exception as e:
         logger.error('Exception occured while saving data : '.format(e))
 
+def process_event(jobj, pdata, nonstringpdata):
+    for key, val in jobj.items():
+        if key != 'additionalFields' and val != "":
+            if isinstance(val, str):
+                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+            else:
+                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+        elif key == 'additionalFields':
+            for key2, val2 in val.items():
+                if val2 != "" and isinstance(val2, str):
+                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                elif val2 != "":
+                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+    return pdata, nonstringpdata
 
 def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
     for additionalMeasurements in val:
@@ -76,19 +90,7 @@ def process_nonadditional_measurements(val, domain, eventId, startEpochMicrosec,
 
 def process_pnfRegistration_event(domain, jobj, pdata, nonstringpdata):
     pdata = pdata + ",system={}".format(source)
-    for key, val in jobj.items():
-        if key != 'additionalFields' and val != "":
-            if isinstance(val, str):
-                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-            else:
-                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-        elif key == 'additionalFields':
-            for key2, val2 in val.items():
-                if val2 != "" and isinstance(val2, str):
-                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                elif val2 != "":
-                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
+    pdata, nonstringpdata = process_event(jobj, pdata, nonstringpdata)
     send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
 
 
@@ -151,19 +153,7 @@ def process_fault_event(domain, jobj, pdata, nonstringpdata):
 
 def process_heartbeat_events(domain, jobj, pdata, nonstringpdata):
     pdata = pdata + ",system={}".format(source)
-    for key, val in jobj.items():
-        if key != 'additionalFields' and val != "":
-            if isinstance(val, str):
-                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-            else:
-                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-        elif key == 'additionalFields':
-            for key2, val2 in val.items():
-                if val2 != "" and isinstance(val2, str):
-                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                elif val2 != "":
-                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
+    pdata, nonstringpdata = process_event(jobj, pdata, nonstringpdata)
     send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
 
 
@@ -199,6 +189,52 @@ def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, sta
 
     send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
 
+def process_stndDefinedFields_events(values, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+    """
+    Format stndDefined event to store in influx db
+    Values(dict) :- data to store in influxdb,
+    domain(str) :- name of topic ,
+    eventId (str) :- event id,
+    startEpochMicrosec :- Timestamp ,
+    lastEpochMicrosec:- Timestamp
+    """
+    pdata = domain + ",eventId={},system={}".format(eventId, source)
+    nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+    for key, val in values.items():
+        if isinstance(val, str):
+            pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+        elif isinstance(val, dict):
+            for key2, val2 in val.items():
+                if isinstance(val2, str) and val2 != '':
+                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                elif isinstance(val2, dict) and key2 != 'additionalInformation' :
+                    for key3, val3 in val2.items():
+                        if isinstance(val3, str) and val3 != '':
+                            pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
+                        elif val3 !='':
+                            nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+                elif key2 == 'additionalInformation':
+                    for key3, val3 in val2.items():
+                        if isinstance(val3, str) and val3 != '' :
+                            pdata = pdata + ',{}={}'.format('additionalInformation_'+key3, process_special_char(val3))
+                        elif val3 !='':
+                            nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+                elif key2 == 'correlatedNotifications':
+                    for item in val2:
+                        for key4, val4 in item.items():
+                            if isinstance(val4, str) and val4 !='':
+                                pdata = pdata + ',{}={}'.format(key4, process_special_char(val4))
+                            elif val4 !='':
+                                nonstringpdata = nonstringpdata + '{}={},'.format(key4, val4)
+
+                elif val2 !='':
+                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+        elif val !='':
+            nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
 
 def process_special_char(str):
     for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
@@ -290,6 +326,14 @@ def save_event_in_db(body):
                                    jobj['event']['commonEventHeader']['startEpochMicrosec'],
                                    jobj['event']['commonEventHeader']['lastEpochMicrosec'])
 
+    if "stndDefinedFields" in jobj['event']:
+        logger.debug('Found stndDefinedFields')
+        process_stndDefinedFields_events(jobj['event']['stndDefinedFields'],
+                                   domain,
+                                   jobj['event']['commonEventHeader']['eventId'],
+                                   jobj['event']['commonEventHeader']['startEpochMicrosec'],
+                                   jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+
 
 def main():
 
@@ -398,7 +442,7 @@ def main():
     c = Consumer(settings)
 
     c.subscribe(['measurement', 'pnfregistration',
-    'fault', 'thresholdcrossingalert', 'heartbeat'])
+    'fault', 'thresholdcrossingalert', 'heartbeat', 'stnddefined'])
 
     try:
         while True:
@@ -412,7 +456,7 @@ def main():
                 try:
                     save_event_in_db(msg.value())
                 except Exception as e:
-                    logger.error('Exception occured while saving data : '.format(e))
+                    logger.error('Exception occured while saving data : {} '.format(e))
             elif msg.error().code() == KafkaError._PARTITION_EOF:
                 logger.error('End of partition reached {0}/{1}'
                       .format(msg.topic(), msg.partition()))
diff --git a/influxdb-connector/pip.conf b/influxdb-connector/pip.conf
new file mode 100644 (file)
index 0000000..6581d0e
--- /dev/null
@@ -0,0 +1,3 @@
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
diff --git a/requirements.txt b/requirements.txt
new file mode 100644 (file)
index 0000000..a5367a0
--- /dev/null
@@ -0,0 +1,24 @@
+attrs==22.1.0
+certifi==2022.6.15
+charset-normalizer==2.1.1
+click==8.1.3
+confluent-kafka==1.9.2
+Flask==2.2.2
+gevent==21.12.0
+greenlet==1.1.3
+idna==3.3
+importlib-metadata==4.12.0
+importlib-resources==5.9.0
+itsdangerous==2.1.2
+Jinja2==3.1.2
+jsonschema==4.15.0
+kafka-python==2.0.2
+MarkupSafe==2.1.1
+pkgutil-resolve-name==1.3.10
+pyrsistent==0.18.1
+requests==2.28.1
+urllib3==1.26.12
+Werkzeug==2.2.2
+zipp==3.8.1
+zope.event==4.5.0
+zope.interface==5.4.0
index e6ff8c5..4838b5c 100644 (file)
@@ -92,6 +92,7 @@ def test_listener(mock_monitor,mock_input,body,start_response,schema):
 @mock.patch('argparse.ArgumentParser.parse_args',
             return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
 @mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+@mock.patch('monitor.logger', logging.getLogger('monitor'))
 def test_main(server,parser,body):
     argv=None
     logger = logging.getLogger('monitor')
@@ -103,6 +104,7 @@ def test_main(server,parser,body):
 
 #@pytest.mark.skip
 @mock.patch('monitor.kafka_server')
+@mock.patch('monitor.logger', logging.getLogger('monitor'))
 def test_save_event_in_kafka(mocker,data_set,topic_name):
     data_set_string=json.dumps(data_set)
     logger = logging.getLogger('monitor')
@@ -116,6 +118,7 @@ def test_save_event_in_kafka(mocker,data_set,topic_name):
 
 @mock.patch('monitor.KafkaProducer')
 @mock.patch('monitor.producer')
+@mock.patch('monitor.logger', logging.getLogger('monitor'))
 def test_produce_events_in_kafka(mock_pro,mock_producer,data_set,topic_name):
     logger = logging.getLogger('monitor')
     logger.setLevel(logging.DEBUG)
index b15aec5..1e7413c 100644 (file)
@@ -1,5 +1,5 @@
 heartbeat={"event": {"commonEventHeader": {"domain": "heartbeat","eventId": "ORAN-DEV_2021-12-20T07:29:34.292938Z", "eventName": "heartbeat_O_RAN_COMPONENT","eventType": "O_RAN_COMPONENT","lastEpochMicrosec": 1639965574292938, "nfNamingCode": "SDN-Controller","nfVendorName": "O-RAN-SC-OAM","priority": "Low", "reportingEntityId": "","reportingEntityName": "ORAN-DEV","sequence": 357,"sourceId": "", "sourceName": "ORAN-DEV","startEpochMicrosec": 1639965574292938, "timeZoneOffset": "+00:00", "version": "4.1", "vesEventListenerVersion": "7.2.1"},"heartbeatFields": {"additionalFields": {"eventTime": "2021-12-20T07:29:34.292938Z" },"heartbeatFieldsVersion": "3.0", "heartbeatInterval": 20} } }
-pnfRegistration={"event": {"commonEventHeader": {"domain": "pnfRegistration","eventId": "ORAN-DEV_ONAP Controller for Radio","eventName": "pnfRegistration_EventType5G",       "eventType": "EventType5G",     "sequence": 0,"priority": "Low","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "",       "sourceName": "ORAN-DEV","startEpochMicrosec": 1639985329569087,"lastEpochMicrosec": 1639985329569087,"nfNamingCode": "SDNR",   "nfVendorName": "ONAP", "timeZoneOffset": "+00:00","version": "4.1","vesEventListenerVersion": "7.2.1"},"pnfRegistrationFields": {"pnfRegistrationFieldsVersion": "2.1","lastServiceDate": "2021-03-26","macAddress": "02:42:f7:d4:62:ce","manufactureDate": "2021-01-16","modelNumber": "ONAP Controller for Radio",   "oamV4IpAddress": "127.0.0.1","oamV6IpAddress": "0:0:0:0:0:ffff:a0a:0.1",       "serialNumber": "ONAP-SDNR-127.0.0.1-ONAP Controller for Radio","softwareVersion": "2.3.5",     "unitFamily": "ONAP-SDNR","unitType": "SDNR",   "vendorName": "ONAP","additionalFields": {"oamPort": "830","protocol": "SSH","username": "netconf","password": "netconf","reconnectOnChangedSchema": "false","sleep-factor": "1.5","tcpOnly": "false","connectionTimeout": "20000","maxConnectionAttempts": "100","betweenAttemptsTimeout": "2000","keepaliveDelay": "120"}}}}
+pnfRegistration={"event": {"commonEventHeader": {"domain": "pnfRegistration","eventId": "ORAN-DEV_ONAP Controller for Radio","eventName": "pnfRegistration_EventType5G",       "eventType": "EventType5G",     "sequence": 0,"priority": "Low","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "",       "sourceName": "ORAN-DEV","startEpochMicrosec": 1639985329569087,"lastEpochMicrosec": 1639985329569087,"nfNamingCode": "SDNR",   "nfVendorName": "ONAP", "timeZoneOffset": "+00:00","version": "4.1","vesEventListenerVersion": "7.2.1"},"pnfRegistrationFields": {"pnfRegistrationFieldsVersion": "2.1","lastServiceDate": "2021-03-26","macAddress": "02:42:f7:d4:62:ce","manufactureDate": "2021-01-16","modelNumber": "ONAP Controller for Radio","oamV4IpAddress": "127.0.0.1","oamV6IpAddress": "0:0:0:0:0:ffff:a0a:0.1",  "serialNumber": "ONAP-SDNR-127.0.0.1-ONAP Controller for Radio","softwareVersion": "2.3.5",     "unitFamily": "ONAP-SDNR","unitType": "SDNR",   "vendorName": "ONAP","additionalFields": {"oamPort": "830","protocol": "SSH","username": "netconf","reconnectOnChangedSchema": "false","sleep-factor": "1.5","tcpOnly": "false","connectionTimeout": "20000","maxConnectionAttempts": "100","betweenAttemptsTimeout": "2000","keepaliveDelay": "120"}}}}
 measurement={"event": {"commonEventHeader": {"domain": "measurement", "eventId": "O-RAN-FH-IPv6-01_1639984500_PM15min", "eventName": "measurement_O_RAN_COMPONENT_PM15min","eventType": "O_RAN_COMPONENT_PM15min","sequence": 0, "priority": "Low","reportingEntityId": "", "reportingEntityName": "ORAN-DEV", "sourceId": "", "sourceName": "O-RAN-FH-IPv6-01", "startEpochMicrosec": 1639983600000, "lastEpochMicrosec": 1639984500000, "internalHeaderFields": {"intervalStartTime": "Mon, 20 Dec 2021 07:00:00 +0000","intervalEndTime": "Mon, 20 Dec 2021 07:15:00 +0000"}, "version": "4.1", "vesEventListenerVersion": "7.2.1" }, "measurementFields": {"additionalFields": {}, "additionalMeasurements": [{ "name": "LP-MWPS-RADIO-1","hashMap": {"es": "0","ses": "1", "cses": "0", "unavailability": "0" }},{"name": "LP-MWPS-RADIO-2","hashMap": {"es": "0","ses": "1","cses": "0","unavailability": "0"} }],"additionalObjects": [],"codecUsageArray": [],"concurrentSessions": 2,"configuredEntities": 2, "cpuUsageArray": [], "diskUsageArray": [], "featureUsageArray": { "https://www.itu.int/rec/T-REC-G.841": "true" }, "filesystemUsageArray": [], "hugePagesArray": [], "ipmi": {}, "latencyDistribution": [], "loadArray": [], "machineCheckExceptionArray": [], "meanRequestLatency": 1000, "measurementInterval": 234, "measurementFieldsVersion": "4.0", "memoryUsageArray": [], "numberOfMediaPortsInUse": 234, "requestRate": 23,"nfcScalingMetric": 3,"nicPerformanceArray": [],"processStatsArray": []}}}
 fault={ "event": { "commonEventHeader": {"domain": "fault","eventId": "LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA","eventName": "fault_O_RAN_COMPONENT_Alarms_TCA","eventType": "O_RAN_COMPONENT_Alarms", "sequence": 0,"priority": "High","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "","sourceName": "LKCYFL79Q01M01FYNG01","startEpochMicrosec": 1639985333218840,"lastEpochMicrosec": 1639985333218840,"nfNamingCode": "FYNG","nfVendorName": "VENDORA","timeZoneOffset": "+00:00", "version": "4.1","vesEventListenerVersion": "7.2.1"},"faultFields": {"faultFieldsVersion": "1.0","alarmCondition": "TCA", "alarmInterfaceA": "LP-MWPS-RADIO","eventSourceType": "O_RAN_COMPONENT","specificProblem": "TCA","eventSeverity": "NORMAL","vfStatus": "Active","alarmAdditionalInformation": {"eventTime": "2021-12-20T07:28:53.218840Z","equipType": "FYNG","vendor": "VENDORA","model": "FancyNextGeneration"}}}}
 thresholdCrossingAlert={"event": {"commonEventHeader": {"domain": "thresholdCrossingAlert","eventId": "__TCA","eventName": "thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA","eventType": "O_RAN_COMPONENT_TCA","sequence": 0,"priority": "High","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "","sourceName": "","startEpochMicrosec": 1639985336443218,"lastEpochMicrosec": 1639985336443218,"nfNamingCode": "1OSF","nfVendorName": "","timeZoneOffset": "+00:00","version": "4.1","vesEventListenerVersion": "7.2.1"}, "thresholdCrossingAlertFields": {"thresholdCrossingFieldsVersion": "4.0","additionalParameters": [{"criticality": "MAJ","hashMap": { "additionalProperties": "up-and-down" },"thresholdCrossed": "packetLoss" }], "alertAction": "SET","alertDescription": "TCA","alertType": "INTERFACE-ANOMALY","alertValue": "1OSF","associatedAlertIdList": ["loss-of-signal"],"collectionTimestamp": "Mon, 20 Dec 2021 07:28:56 +0000","dataCollector": "data-lake","elementType": "1OSF", "eventSeverity": "WARNING", "eventStartTimestamp": "Mon, 20 Dec 2021 07:15:00 +0000","interfaceName": "", "networkService": "from-a-to-b","possibleRootCause": "always-the-others", "additionalFields": {"eventTime": "2021-12-20T07:28:56.443218Z", "equipType": "1OSF", "vendor": "", "model": ""}}}}
\ No newline at end of file
index 070d036..30624ca 100644 (file)
@@ -80,10 +80,10 @@ def data_set():
     return data_set
 
 # <Response [204]>
+def test_send_event_to_influxdb_successfully(data_set):
     """
     Simply test event should store in influxdb successfully.
     """
-def test_send_event_to_influxdb_successfully(data_set):
     with requests_mock.Mocker() as rm:
         rm.post('http://localhost:8086/write?db=eventsdb', json=data_set, status_code=204)
         response = requests.post(
@@ -92,10 +92,10 @@ def test_send_event_to_influxdb_successfully(data_set):
         assert response.status_code == 204
 
 # <Response [400]>
+def test_send_event_to_influxdb_failed(data_set):
     """
     Bad Request.
     """
-def test_send_event_to_influxdb_failed(data_set):
     with requests_mock.Mocker() as rm:
         rm.post('http://localhost:8086/write?db=eventsdb', json=data_set, status_code=400)
         response = requests.post(
index fcb3212..ab55ec8 100644 (file)
@@ -88,7 +88,7 @@ def test_process_heartbeat_events_called(mocker_process_time, mocker_send_to_inf
 
 @pytest.fixture
 def pnf_json():
-            jobj = {'pnfRegistrationFieldsVersion': '2.1', 'lastServiceDate': '2021-03-26', 'macAddress': '02:42:f7:d4:62:ce', 'manufactureDate': '2021-01-16', 'modelNumber': 'ONAP Controller for Radio', 'oamV4IpAddress': '127.0.0.1', 'oamV6IpAddress': '0:0:0:0:0:ffff:a0a:0.1', 'serialNumber': 'ONAP-SDNR-127.0.0.1-ONAP Controller for Radio', 'softwareVersion': '2.3.5', 'unitFamily': 'ONAP-SDNR', 'unitType': 'SDNR', 'vendorName': 'ONAP', 'additionalFields': {'oamPort': '830', 'protocol': 'SSH', 'username': 'netconf', 'password': 'netconf', 'reconnectOnChangedSchema': 'false', 'sleep-factor': '1.5', 'tcpOnly': 'false', 'connectionTimeout': '20000', 'maxConnectionAttempts': '100', 'betweenAttemptsTimeout': '2000', 'keepaliveDelay': '120'}}
+            jobj = {'pnfRegistrationFieldsVersion': '2.1', 'lastServiceDate': '2021-03-26', 'macAddress': '02:42:f7:d4:62:ce', 'manufactureDate': '2021-01-16', 'modelNumber': 'ONAP Controller for Radio', 'oamV4IpAddress': '127.0.0.1', 'oamV6IpAddress': '0:0:0:0:0:ffff:a0a:0.1', 'serialNumber': 'ONAP-SDNR-127.0.0.1-ONAP Controller for Radio', 'softwareVersion': '2.3.5', 'unitFamily': 'ONAP-SDNR', 'unitType': 'SDNR', 'vendorName': 'ONAP', 'additionalFields': {'oamPort': '830', 'protocol': 'SSH', 'username': 'netconf', 'reconnectOnChangedSchema': 'false', 'sleep-factor': '1.5', 'tcpOnly': 'false', 'connectionTimeout': '20000', 'maxConnectionAttempts': '100', 'betweenAttemptsTimeout': '2000', 'keepaliveDelay': '120'}}
             return jobj
 
 
@@ -106,7 +106,7 @@ def pnf_nonstringpdata():
 
 @pytest.fixture
 def pnf_expected_pdata():
-            pnf_expected_pdata = 'pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,pnfRegistrationFieldsVersion=2.1,lastServiceDate=2021-03-26,macAddress=02:42:f7:d4:62:ce,manufactureDate=2021-01-16,modelNumber=ONAP\\ Controller\\ for\\ Radio,oamV4IpAddress=127.0.0.1,oamV6IpAddress=0:0:0:0:0:ffff:a0a:0.1,serialNumber=ONAP-SDNR-127.0.0.1-ONAP\\ Controller\\ for\\ Radio,softwareVersion=2.3.5,unitFamily=ONAP-SDNR,unitType=SDNR,vendorName=ONAP,oamPort=830,protocol=SSH,username=netconf,password=netconf,reconnectOnChangedSchema=false,sleep-factor=1.5,tcpOnly=false,connectionTimeout=20000,maxConnectionAttempts=100,betweenAttemptsTimeout=2000,keepaliveDelay=120 sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087 1639985333218840000'
+            pnf_expected_pdata = 'pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,pnfRegistrationFieldsVersion=2.1,lastServiceDate=2021-03-26,macAddress=02:42:f7:d4:62:ce,manufactureDate=2021-01-16,modelNumber=ONAP\\ Controller\\ for\\ Radio,oamV4IpAddress=127.0.0.1,oamV6IpAddress=0:0:0:0:0:ffff:a0a:0.1,serialNumber=ONAP-SDNR-127.0.0.1-ONAP\\ Controller\\ for\\ Radio,softwareVersion=2.3.5,unitFamily=ONAP-SDNR,unitType=SDNR,vendorName=ONAP,oamPort=830,protocol=SSH,username=netconf,reconnectOnChangedSchema=false,sleep-factor=1.5,tcpOnly=false,connectionTimeout=20000,maxConnectionAttempts=100,betweenAttemptsTimeout=2000,keepaliveDelay=120 sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087 1639985333218840000'
             return pnf_expected_pdata