Introduce more test cases to improve code coverage 40/9140/5
authordhivarprajakta <prajakta.dhivar@xoriant.com>
Fri, 30 Sep 2022 06:50:27 +0000 (12:20 +0530)
committerdhivarprajakta <prajakta.dhivar@xoriant.com>
Mon, 10 Oct 2022 11:39:47 +0000 (17:09 +0530)
SMO-96

Signed-off-by: dhivarprajakta <prajakta.dhivar@xoriant.com>
Change-Id: I8f513ac052ebd1854fcdf3d1e4bc3bbf26b73b2f
Signed-off-by: dhivarprajakta <prajakta.dhivar@xoriant.com>
26 files changed:
collector/Dockerfile
collector/evel-test-collector/code/collector/monitor.py
collector/evel-test-collector/code/collector/rest_dispatcher.py
collector/evel-test-collector/config/collector.conf
collector/evel-test-collector/docs/schema/README.md [new file with mode: 0644]
collector/pip.conf [new file with mode: 0644]
dmaapadapter/Dockerfile
dmaapadapter/adapter/code/dmaap_adapter.py
dmaapadapter/pip.conf [new file with mode: 0644]
influxdb-connector/Dockerfile
influxdb-connector/influxdb-connector/code/influxdb_connector.py
influxdb-connector/influxdb-connector/config/influxdb_connector.conf
influxdb-connector/pip.conf [new file with mode: 0644]
requirements.txt [new file with mode: 0644]
tests/collector/__init__.py
tests/collector/port_config.conf [new file with mode: 0644]
tests/collector/schema.json [new file with mode: 0644]
tests/collector/test_collector.conf
tests/collector/test_monitor.py
tests/collector/test_rest_dispatcher.py [new file with mode: 0644]
tests/collector/wrong_config.conf [new file with mode: 0644]
tests/dmaap_adaptor/test_appConfig.py
tests/dmaap_adaptor/test_consumer.py
tests/dmaap_adaptor/test_dmaap_adapter.py
tests/influxdb_connector/test_influxdb_events.py
tox.ini

index 74c65ab..f67e224 100755 (executable)
@@ -22,8 +22,10 @@ FROM ubuntu:focal
 
 RUN apt-get update && apt-get -y upgrade
 RUN apt-get install -y git curl python3 python3-pip
-RUN pip3 install requests jsonschema kafka-python gevent
 
+COPY pip.conf /etc/pip.conf
+
+RUN pip3 install requests jsonschema kafka-python gevent PyYAML
 
 RUN mkdir -p /opt/smo/certs
 
index 9daf78c..3cb7f86 100755 (executable)
@@ -39,6 +39,8 @@ from json import dumps
 import datetime
 import time
 from gevent import pywsgi
+import yaml
+import requests
 
 monitor_mode = "f"
 vdu_id = ['', '', '', '', '', '']
@@ -77,6 +79,11 @@ vel_password = ''
 # ------------------------------------------------------------------------------
 vel_schema = None
 
+#------------------------------------------------------------------------------
+# The yaml schema which we will use to validate events.
+# ------------------------------------------------------------------------------
+vel_yaml_schema = None
+
 # ------------------------------------------------------------------------------
 # The JSON schema which we will use to validate client throttle state.
 # ------------------------------------------------------------------------------
@@ -100,7 +107,7 @@ logger = None
 producer = None
 
 
-def listener(environ, start_response, schema):
+def listener(environ, start_response, schema, yaml_schema=None):
     '''
     Handler for the Vendor Event Listener REST API.
 
@@ -149,6 +156,14 @@ def listener(environ, start_response, schema):
             decoded_body = json.loads(body)
             validate = jsonschema.validate(decoded_body, schema)
             assert (validate is None), "Invalid event!"
+            if 'stndDefinedFields' in decoded_body['event'].keys():
+                logger.debug('in yaml validation')
+                schema_ref = decoded_body['event']['stndDefinedFields']["schemaReference"]
+                if "https://forge.3gpp.org"  in schema_ref or "https://gerrit.o-ran-sc.org" in schema_ref:
+                    stnd_define_event_validation(yaml_schema, decoded_body)
+                else :
+                    logger.error("schema reference {0} not supported.".format(schema_ref))
+                    raise Exception("schema reference {0} not supported.".format(schema_ref))
 
             logger.info('Event is valid!')
             logger.info('Valid body decoded & checked against schema OK:\n'
@@ -186,7 +201,7 @@ def listener(environ, start_response, schema):
                     start_response('202 Accepted', [])
                     yield ''.encode()
             else:
-                logger.warn('Failed to authenticate OK; creds: ' + credentials)
+                logger.warn('Failed to authenticate OK; creds: ' , credentials)
                 logger.warn('Failed to authenticate agent credentials: ',
                             credentials,
                             'against expected ',
@@ -244,8 +259,60 @@ def listener(environ, start_response, schema):
         except Exception as e:
             logger.error('Event invalid for unexpected reason! {0}'.format(e))
 
+# --------------------------------------------------------------------------
+# check yaml schema file exists or not
+# --------------------------------------------------------------------------
+def check_schema_file_exist(vel_schema_path, schema_ref):
+    logger.debug('in check yaml file')
+    assert (vel_schema_path != ""), "Value of property 'schema_file' is missing in config file"
+    # Fetching file and folder name from url
+    schema_ref =  schema_ref.split('#')[0]
+    name_list = schema_ref.split('/')
+    folder_name = '_'.join(name_list[2:-1])
+    file_name = name_list[-1]
+    updated_vel_schema_path = vel_schema_path +'/{0}/{1}'.format(folder_name,file_name)
+    if "https://forge.3gpp.org" in schema_ref:
+        schema_ref = schema_ref.replace("blob","raw")
+        schema_ref =  schema_ref + '?inline=false'
+    if not os.path.exists(updated_vel_schema_path):
+        logger.warning('Event Listener Schema File ({0}) not found. ''No validation will be undertaken.'.format(vel_schema_path))
+        logger.info('Start downloading yaml file :{}'.format(schema_ref))
+        result = os.system('curl -JOL "{0}"'.format(schema_ref))
+        logger.debug("result {0}".format(result))
+        assert(result == 0), "Invalid URL {0}".format(schema_ref)
+        logger.info("Download Completed")
+        with open(file_name, "r") as file:
+            first_line = file.readline()
+        # checking downloaded file content is yaml or html
+        if first_line.strip() == "<!DOCTYPE html>":
+            logger.info("Downloaded file is not valid yaml")
+            os.system("del {0} ".format(file_name))
+            logger.info("Downloaded file deleted")
+            assert(first_line.strip() != "<!DOCTYPE html>"), "Invalid Schema File"
+        else:
+            # Create a folder from source url
+            os.system('mkdir {0}/{1}'.format(vel_schema_path, folder_name))
+            # move downloaded file in above created folder
+            os.system("mv {0} {1}/{2}".format(file_name, vel_schema_path, folder_name))
+    return updated_vel_schema_path
 
 # --------------------------------------------------------------------------
+# Second level of validation for stnd define message
+# --------------------------------------------------------------------------
+def stnd_define_event_validation(schema_path , body):
+    logger.debug('in second level validation ')
+    schema_ref = body['event']['stndDefinedFields']["schemaReference"]
+    schema_path = check_schema_file_exist(schema_path , schema_ref)
+    logger.debug('end check yaml path ')
+    schema = yaml.full_load(open(schema_path, 'r'))
+    schema_name= schema_ref.split('/')[-1]
+    updated_schema = dict(schema["components"]["schemas"][schema_name],  **schema)
+    decoded_body = body['event']['stndDefinedFields']['data']
+    validate_yaml = jsonschema.validate(decoded_body,updated_schema)
+    assert(validate_yaml is None), "Invalid event!"
+    logger.info('standard defined event validated sucessfully ')
+    return validate_yaml
+# --------------------------------------------------------------------------
 # Save event data in Kafka
 # --------------------------------------------------------------------------
 def save_event_in_kafka(body):
@@ -285,6 +352,7 @@ def test_listener(environ, start_response, schema):
     incoming event on the EVEL interface.
     '''
     global pending_command_list
+    global decoded_body
     logger.info('Got a Test Control input')
     logger.info('============================')
     logger.info('==== TEST CONTROL INPUT ====')
@@ -390,7 +458,7 @@ def main(argv=None):
     program_build_date = str(__updated__)
     program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
                                                            program_build_date)
-    if (__import__('__main__').__doc__ is not None):
+    if (__import__('__main__').__doc__ is not None):       # pragma: no cover
         program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
     else:
         program_shortdesc = 'Running in test harness'
@@ -496,6 +564,9 @@ USAGE
         test_control_schema_file = config.get(config_section,
                                              'test_control_schema_file',
                                               vars=overrides)
+        vel_yaml_schema = config.get(config_section,
+                                     'yaml_schema_path',
+                                     vars=overrides)
 
         # ----------------------------------------------------------------------
         # Finally we have enough info to start a proper flow trace.
@@ -625,7 +696,7 @@ USAGE
         global get_info
         get_info = root_url
         dispatcher = PathDispatcher()
-        vendor_event_listener = partial(listener, schema=vel_schema)
+        vendor_event_listener = partial(listener, schema=vel_schema, yaml_schema=vel_yaml_schema)
         dispatcher.register('GET', root_url, vendor_event_listener)
         dispatcher.register('POST', root_url, vendor_event_listener)
         vendor_throttle_listener = partial(listener, schema=throttle_schema)
@@ -649,7 +720,7 @@ USAGE
         logger.error('Main loop exited unexpectedly!')
         return 0
 
-    except KeyboardInterrupt:
+    except KeyboardInterrupt:       # pragma: no cover
         # ----------------------------------------------------------------------
         # handle keyboard interrupt
         # ----------------------------------------------------------------------
@@ -675,7 +746,7 @@ USAGE
 # ------------------------------------------------------------------------------
 
 
-if __name__ == '__main__':
+if __name__ == '__main__':      # pragma: no cover
     if TESTRUN:
         # ----------------------------------------------------------------------
         # Running tests - note that doctest comments haven't been included so
index 6911d5e..83d79dc 100644 (file)
@@ -63,7 +63,7 @@ def notfound_404(environ, start_response):
                                              environ['REQUEST_METHOD'].upper(),
                                              environ['PATH_INFO']))
     start_response('404 Not Found', [ ('Content-type', 'text/plain') ])
-    return [template_404.format(base_url)]
+    return ['template_404{}'.format(base_url)]
 
 class PathDispatcher:
     '''
@@ -99,6 +99,5 @@ class PathDispatcher:
         Register a handler for a method/path, adding it to the pathmap.
         '''
         logger.debug('Registering for {0} at {1}'.format(method, path))
-        print('Registering for {0} at {1}'.format(method, path))
         self.pathmap[method.lower(), path] = function
         return function
index c5dbe21..ecea678 100755 (executable)
@@ -19,6 +19,7 @@ schema_file = evel-test-collector/docs/att_interface_definition/CommonEventForma
 base_schema_file = evel-test-collector/docs/att_interface_definition/base_schema.json
 throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
 test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = evel-test-collector/docs/schema
 
 #------------------------------------------------------------------------------
 # Details of the Vendor Event Listener REST service.
@@ -57,6 +58,7 @@ schema_file = ../../docs/att_interface_definition/event_format_updated.json
 base_schema_file =
 throttle_schema_file = ../../docs/att_interface_definition/throttle_schema.json
 test_control_schema_file = ../../docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = ../../docs/schema
 
 #------------------------------------------------------------------------------
 # Details of the Vendor Event Listener REST service.
diff --git a/collector/evel-test-collector/docs/schema/README.md b/collector/evel-test-collector/docs/schema/README.md
new file mode 100644 (file)
index 0000000..d73e2a0
--- /dev/null
@@ -0,0 +1 @@
+NOTE: This folder contains yaml schema folder
\ No newline at end of file
diff --git a/collector/pip.conf b/collector/pip.conf
new file mode 100644 (file)
index 0000000..6581d0e
--- /dev/null
@@ -0,0 +1,3 @@
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
index 58a8caa..7b29115 100755 (executable)
 
 FROM ubuntu:focal
 
+
 RUN apt-get update && apt-get -y upgrade
-RUN apt-get install -y git curl python3 python3-pip
-RUN pip3 install requests jsonschema kafka-python flask confluent-kafka
+RUN apt-get install -y git curl python3 python3-pip 
 
+COPY pip.conf /etc/pip.conf
+
+RUN pip3 install requests jsonschema kafka-python flask confluent-kafka
 
 RUN mkdir /opt/smo
 
index df8d2a4..80a512d 100644 (file)
@@ -105,7 +105,7 @@ def getTimeout(timeout):
         return timeout
 
 
-if __name__ == '__main__':
+if __name__ == '__main__':      # pragma: no cover
     appConfig = AppConfig()
 
     if(appConfig.getAssertConfigValue() == 'False'):
diff --git a/dmaapadapter/pip.conf b/dmaapadapter/pip.conf
new file mode 100644 (file)
index 0000000..6581d0e
--- /dev/null
@@ -0,0 +1,3 @@
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
index bd91696..ee60aa9 100755 (executable)
@@ -17,6 +17,9 @@ FROM ubuntu:focal
 
 RUN apt-get update && apt-get -y upgrade
 RUN apt-get install -y git curl python3 python3-pip
+
+COPY pip.conf /etc/pip.conf
+
 RUN pip3 install requests confluent-kafka
 
 # Clone influxdb-connector
index 0cf19e3..e253e30 100644 (file)
@@ -199,6 +199,52 @@ def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, sta
 
     send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
 
+def process_stndDefinedFields_events(values, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+    """
+    Format stndDefined event to store in influx db
+    Values(dict) :- data to store in influxdb,
+    domain(str) :- name of topic ,
+    eventId (str) :- event id,
+    startEpochMicrosec :- Timestamp ,
+    lastEpochMicrosec:- Timestamp
+    """
+    pdata = domain + ",eventId={},system={}".format(eventId, source)
+    nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+    for key, val in values.items():
+        if isinstance(val, str):
+            pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+        elif isinstance(val, dict):
+            for key2, val2 in val.items():
+                if isinstance(val2, str) and val2 != '':
+                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                elif isinstance(val2, dict) and key2 != 'additionalInformation' :
+                    for key3, val3 in val2.items():
+                        if isinstance(val3, str) and val3 != '':
+                            pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
+                        elif val3 !='':
+                            nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+                elif key2 == 'additionalInformation':
+                    for key3, val3 in val2.items():
+                        if isinstance(val3, str) and val3 != '' :
+                            pdata = pdata + ',{}={}'.format('additionalInformation_'+key3, process_special_char(val3))
+                        elif val3 !='':
+                            nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+                elif key2 == 'correlatedNotifications':
+                    for item in val2:
+                        for key4, val4 in item.items():
+                            if isinstance(val4, str) and val4 !='':
+                                pdata = pdata + ',{}={}'.format(key4, process_special_char(val4))
+                            elif val4 !='':
+                                nonstringpdata = nonstringpdata + '{}={},'.format(key4, val4)
+
+                elif val2 !='':
+                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+        elif val !='':
+            nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
 
 def process_special_char(str):
     for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
@@ -290,6 +336,14 @@ def save_event_in_db(body):
                                    jobj['event']['commonEventHeader']['startEpochMicrosec'],
                                    jobj['event']['commonEventHeader']['lastEpochMicrosec'])
 
+    if "stndDefinedFields" in jobj['event']:
+        logger.debug('Found stndDefinedFields')
+        process_stndDefinedFields_events(jobj['event']['stndDefinedFields'],
+                                   domain,
+                                   jobj['event']['commonEventHeader']['eventId'],
+                                   jobj['event']['commonEventHeader']['startEpochMicrosec'],
+                                   jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+
 
 def main():
 
@@ -398,7 +452,7 @@ def main():
     c = Consumer(settings)
 
     c.subscribe(['measurement', 'pnfregistration',
-    'fault', 'thresholdcrossingalert', 'heartbeat'])
+    'fault', 'thresholdcrossingalert', 'heartbeat', 'stnddefined'])
 
     try:
         while True:
@@ -412,7 +466,7 @@ def main():
                 try:
                     save_event_in_db(msg.value())
                 except Exception as e:
-                    logger.error('Exception occured while saving data : '.format(e))
+                    logger.error('Exception occured while saving data : {} '.format(e))
             elif msg.error().code() == KafkaError._PARTITION_EOF:
                 logger.error('End of partition reached {0}/{1}'
                       .format(msg.topic(), msg.partition()))
@@ -426,5 +480,5 @@ def main():
         c.close()
 
 
-if __name__ == '__main__':
+if __name__ == '__main__':      # pragma: no cover
     main()
index fbd449e..b6ed600 100755 (executable)
@@ -1,3 +1,20 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+
 [default]
 log_file = /opt/smo/influxdbconnector.log
 kafka_server =
diff --git a/influxdb-connector/pip.conf b/influxdb-connector/pip.conf
new file mode 100644 (file)
index 0000000..6581d0e
--- /dev/null
@@ -0,0 +1,3 @@
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
diff --git a/requirements.txt b/requirements.txt
new file mode 100644 (file)
index 0000000..a5367a0
--- /dev/null
@@ -0,0 +1,24 @@
+attrs==22.1.0
+certifi==2022.6.15
+charset-normalizer==2.1.1
+click==8.1.3
+confluent-kafka==1.9.2
+Flask==2.2.2
+gevent==21.12.0
+greenlet==1.1.3
+idna==3.3
+importlib-metadata==4.12.0
+importlib-resources==5.9.0
+itsdangerous==2.1.2
+Jinja2==3.1.2
+jsonschema==4.15.0
+kafka-python==2.0.2
+MarkupSafe==2.1.1
+pkgutil-resolve-name==1.3.10
+pyrsistent==0.18.1
+requests==2.28.1
+urllib3==1.26.12
+Werkzeug==2.2.2
+zipp==3.8.1
+zope.event==4.5.0
+zope.interface==5.4.0
index 2570bf5..03e5222 100755 (executable)
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-## Copyright 2021 Xoriant Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-## Copyright 2021 Xoriant Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
+
 
 import os
 import sys
@@ -70,5 +44,4 @@ if  os.path.isfile(configfile_name):
     cfgfile.close()
 SOURCE_PATH = os.path.join(
     PROJECT_PATH,"collector/evel-test-collector/code/collector")
-print(SOURCE_PATH, PROJECT_PATH,schema_file_path, configfile_name )
 sys.path.append(SOURCE_PATH)
diff --git a/tests/collector/port_config.conf b/tests/collector/port_config.conf
new file mode 100644 (file)
index 0000000..0ec246b
--- /dev/null
@@ -0,0 +1,32 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[default]
+schema_file = "evel-test-collector/docs/att_interface_definition/hello.json"
+base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json
+throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
+test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = evel-test-collector/docs/att_interface_definition
+log_file =collector.log
+vel_domain = 127.0.0.1
+vel_port = 999
+vel_path = 
+vel_username =user
+vel_password =password
+vel_topic_name =
+kafka_server =kafka
+log_level = DEBUG
+kafka_topic =
+schema_ref =https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/test_faultMns.yaml#components/schemas/NotifyNewAlarm
\ No newline at end of file
diff --git a/tests/collector/schema.json b/tests/collector/schema.json
new file mode 100644 (file)
index 0000000..ce4f35e
--- /dev/null
@@ -0,0 +1,63 @@
+{
+    "name": "MyClass",
+    "type": "record",
+    "namespace": "com.acme.avro",
+    "fields": [
+      {
+        "name": "interpretations",
+        "type": {
+          "type": "array",
+          "items": {
+            "name": "interpretations_record",
+            "type": "record",
+            "fields": [
+              {
+                "name": "segments",
+                "type": {
+                  "type": "array",
+                  "items": [
+                    {
+                      "name": "segments_record",
+                      "type": "record",
+                      "fields": [
+                        {
+                          "name": "x",
+                          "type": "string"
+                        },
+                        {
+                          "name": "y",
+                          "type": "string"
+                        },
+                        {
+                          "name": "z",
+                          "type": "string"
+                        }
+                      ]
+                    },
+                    {
+                      "name": "segments_record",
+                      "type": "record",
+                      "fields": [
+                        {
+                          "name": "x",
+                          "type": "string"
+                        },
+                        {
+                          "name": "y",
+                          "type": "string"
+                        },
+                        {
+                          "name": "z",
+                          "type": "string"
+                        }
+                      ]
+                    }
+                  ]
+                }
+              }
+            ]
+          }
+        }
+      }
+    ]
+  }
\ No newline at end of file
index 55678a2..a1ce17e 100755 (executable)
 #
 
 [default]
-schema_file = /home/ves-dev/docs/att_interface_definition/CommonEventFormat-v7-2-2.json
-base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json
-throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
-test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
-log_file = collector.log
+schema_file =collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json
+base_schema_file = tests/collector/schema.json
+throttle_schema_file =tests/collector/schema.json
+test_control_schema_file = tests/collector/schema.json
+yaml_schema_path = evel-test-collector/docs/att_interface_definition
+log_file =collector.log
 vel_domain = 127.0.0.1
 vel_port = 9999
 vel_path =
-vel_username =
-vel_password =
+vel_username =user
+vel_password =password
 vel_topic_name =
 kafka_server = kafka-server
-kafka_topic =
+kafka_topic =topic
 
index e6ff8c5..2106b67 100644 (file)
 # limitations under the License.
 #
 
+import shutil
 import os
 import pytest
 import unittest
 import monitor
 import argparse
+import configparser
 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
 from unittest import mock
 from unittest.mock import patch
+from unittest.mock import MagicMock
 import logging
 from pytest_mock import MockerFixture
 from gevent import socket
@@ -42,6 +45,18 @@ def get_config_path():
         project_path,"tests/collector/test_collector.conf")
     return config_path
 
+def get_wrong_config_path():
+    project_path=get_path()
+    config_path = os.path.join(
+        project_path,"tests/collector/wrong_config.conf")
+    return config_path
+
+def get_wrong_config_port_path():
+    project_path=get_path()
+    config_path = os.path.join(
+        project_path,"tests/collector/port_config.conf")
+    return config_path
+
 def get_schema_path():
     project_path=get_path()
     schema_path = os.path.join(
@@ -76,7 +91,14 @@ def topic_name():
     topic_name="topic"
     return topic_name
 
+
+def test_init():
+   obj=monitor.JSONObject({})
+   assert obj.__dict__=={}
+
+
 #@pytest.mark.skip
+@patch('monitor.logger',logging.getLogger('monitor'))
 @mock.patch('gevent.pywsgi.Input',autospec=True)
 @mock.patch('monitor.save_event_in_kafka')
 def test_listener(mock_monitor,mock_input,body,start_response,schema):
@@ -87,21 +109,390 @@ def test_listener(mock_monitor,mock_input,body,start_response,schema):
     logger = logging.getLogger('monitor')
     logger.setLevel(logging.DEBUG)
     with mock.patch.object(logger,'debug') as mock_debug:
-        monitor.listener(environ,mock_start_response,schema)
+        result=list(monitor.listener(environ,mock_start_response,schema))
+        assert result==[b'']
+
+
+#test for listener Exception
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_exp(mock_monitor,mock_input,body,start_response,schema):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2","PATH_INFO": '/eventListener/v5/events'}
+    body={}
+    mock_input.read.return_value=json.dumps(body)
+    mock_start_response= mock.Mock(start_response)
+    project_path = os.getcwd()
+    dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-1.json")}
+    try:
+        result = list(monitor.listener(environ, mock_start_response, dict_schema))
+
+    except TypeError:
+        assert result == None
+    except Exception:
+        pytest.fail('unexcepted error')
+
+
+#test b64decode credentials in listener()
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_b64decode(mock_monitor,mock_input,body,start_response,schema):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "None None", "CONTENT_LENGTH": "2","PATH_INFO": '/eventListener/v5/events'}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    logger = logging.getLogger('monitor')
+    logger.setLevel(logging.WARN)
+    project_path = os.getcwd()
+    dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-1.json")}
+    with mock.patch.object(logger,'warn') as mock_warn:
+        result = list(monitor.listener(environ, mock_start_response, dict_schema))
+        mock_monitor.assert_called_with(body)
+
+
+#test listener pending command list
+@patch('monitor.vel_username','user')
+@patch('monitor.vel_password','password')
+@patch('monitor.logger',logging.getLogger('monitor'))
+@patch('monitor.pending_command_list',[1,2,3])
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_command_list(mock_monitor,mock_input,body,start_response,schema,topic_name):
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2", 'PATH_INFO': '/eventListener/v5/events'}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    logger = logging.getLogger('monitor')
+    logger.setLevel(logging.DEBUG)
+    project_path = os.getcwd()
+    dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json")}
+    with mock.patch.object(logger,'debug') as mock_debug:
+        result = list(monitor.listener(environ, mock_start_response, dict_schema))
+        assert [b'[1, 2, 3]'] ==result
+
+
+#test listener if pending_command list is none
+@patch('monitor.vel_username','user')
+@patch('monitor.vel_password','password')
+@patch('monitor.logger',logging.getLogger('monitor'))
+@patch('monitor.pending_command_list',None)
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_command_list_none(mock_monitor,mock_input,body,start_response,schema,topic_name):
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2", 'PATH_INFO': '/eventListener/v5/events'}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    logger = logging.getLogger('monitor')
+    logger.setLevel(logging.DEBUG)
+    project_path = os.getcwd()
+    dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-1.json")}
+    with mock.patch.object(logger,'debug') as mock_debug:
+        result = list(monitor.listener(environ, mock_start_response, dict_schema))
+        assert [b'']==result  
+
+
+#test jsonschema error
+@patch('monitor.vel_username','user')
+@patch('monitor.vel_password','password')
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_schema_none(mock_monitor,mock_input,body,start_response,schema):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2","PATH_INFO": '/eventListener/v5/events'}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    project_path=os.getcwd()
+    dict_schema =os.path.join(project_path,"tests/collector/schema.json")
+    os._exit = mock.MagicMock()
+    list(monitor.listener(environ, mock_start_response, dict_schema))
+    assert os._exit.called   
+
+
+
+#test jsonschema validation exception
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_jsonschema_validation(mock_monitor,mock_input,body,start_response,schema):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    body={"event": {"commonEventHeader": {"domain": 6,"eventId": "11","eventName": "","eventType": "platform","lastEpochMicrosec": 0,"priority": "Normal","reportingEntityId": "localhost","reportingEntityName": "localhost","sequence": 0,"sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a","sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a","startEpochMicrosec": 1642961518.919,"version": "4.0","vesEventListenerVersion": "7.2.1"}}}
+    body=json.dumps(body)
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    result=list(monitor.listener(environ,mock_start_response,schema))
+    assert [b'']==result
+
 
+
+#test if schema is none
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_schma_is_empty(mock_monitor,mock_input,body,start_response):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    result=list(monitor.listener(environ,mock_start_response,None))
+    assert []==result
+
+
+
+#test listener() Exception event is invalid for unexpected reason
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_Event_Invalid(mock_monitor,mock_input,body,start_response):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    body={}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    result=list(monitor.listener(environ,mock_start_response,None))
+    assert []==result
+
+
+
+#check main() function
+@patch('monitor.logger',logging.getLogger('monitor'))
 @mock.patch('argparse.ArgumentParser.parse_args',
             return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
 @mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
 def test_main(server,parser,body):
     argv=None
-    logger = logging.getLogger('monitor')
-    logger.setLevel(logging.ERROR)
-    with mock.patch.object(logger,'error') as mock_error:
-        monitor.main(argv=None)
-        #server.assert_called_once_with()
-        mock_error.assert_called_once_with('Main loop exited unexpectedly!')
+    result=monitor.main(argv=None)
+    assert 0==result
+    
 
-#@pytest.mark.skip
+
+#test main() function argv is None
+@patch('monitor.logger')
+@mock.patch('argparse.ArgumentParser.parse_args',
+            return_value=argparse.Namespace(verbose=2, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_argv(server,parser,logger,body):
+    argv=''
+    logger.return_value=logging.getLogger('monitor')
+    try:
+        result=monitor.main(argv)
+    except TypeError:
+        assert result == None
+    except Exception:
+        pytest.fail('unexcepted error')
+
+
+
+#test platform.system in main
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+            return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_platform(server,parser,body):
+    argv=None
+    sys = mock.MagicMock()
+    try:
+        with patch('platform.system', MagicMock(return_value='Windows')):
+            res=monitor.main(argv)
+    except RuntimeError:
+        assert res == None
+    except Exception:
+        pytest.fail('Exiting because of exception')
+
+
+#test vel_port in main
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+            return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_port_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_vel_port(server,parser,body):
+    argv=''
+    res=monitor.main(argv)
+    assert res == 2
+
+
+
+# test vel_path in main
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+            return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_path(server,parser,body):
+    argv=None
+    try:
+        result = monitor.main(argv)
+    except RuntimeError:
+        assert result == None
+    except Exception:
+        pytest.fail('fail beacuase of exception')
+
+
+
+@pytest.fixture
+def vel_schema_path():
+    config = configparser.ConfigParser()
+    config_file=get_config_path()
+    config.read(config_file)
+    ref = config.get('default', 'schema_file')
+    return ref
+
+# check listener() vel_schema, if it exists
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+            return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_vel_schema_path(server,parser,vel_schema_path):
+    argv=None
+    with mock.patch('os.path.exists') as m:
+        m.return_value=vel_schema_path
+        result=monitor.main(argv)
+        assert 0==result
+
+
+
+#test unhandle exception
+@patch('monitor.DEBUG',True)
+@mock.patch('argparse.ArgumentParser.parse_args',
+            return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_port_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_unhandle_exception(server,parser,body):
+    argv=None
+    result=None
+    try:
+        result = monitor.main(argv)
+    except RuntimeError:
+        assert result == None
+    except Exception:
+        pytest.fail('Exiting because of exception')
+
+
+
+#check test_listener() function
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener(mock_monitor,mock_input,body,start_response,schema):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    result=list(monitor.test_listener(environ,mock_start_response,schema))
+    assert ['']==result
+
+
+
+#check test_listener() GET method
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_get_method(mock_monitor,mock_input,body,start_response,schema):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "GET","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    response= ['{"event": {"commonEventHeader": {"domain": "measurement", "eventId": "11", "eventName": "", "eventType": "platform", "lastEpochMicrosec": 0, "priority": "Normal", "reportingEntityId": "localhost", "reportingEntityName": "localhost", "sequence": 0, "sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a", "sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a", "startEpochMicrosec": 1642961518.919, "version": "4.0", "vesEventListenerVersion": "7.2.1"}}}']
+    result=list(monitor.test_listener(environ,mock_start_response,schema))
+    assert response==result
+
+
+#test test_listener() jsonschema error
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_schema_error(mocker,mock_input,body,start_response,schema):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    project_path=os.getcwd()
+    schema_path =os.path.join(project_path,"tests/collector/schema.json")
+    schema=json.load(open(schema_path, 'r'))
+    result=list(monitor.test_listener(environ, mock_start_response,schema))
+    assert ['']==result
+
+
+#test test_listener() jsonschema validation error
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_schema_validation_error(mocker,mock_input,body,start_response,schema):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    body={"event": {"commonEventHeader": {"domain": 6,"eventId": "11","eventName": "","eventType": "platform","lastEpochMicrosec": 0,"priority": "Normal","reportingEntityId": "localhost","reportingEntityName": "localhost","sequence": 0,"sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a","sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a","startEpochMicrosec": 1642961518.919,"version": "4.0","vesEventListenerVersion": "7.2.1"}}}
+    body=json.dumps(body)
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    result=list(monitor.test_listener(environ, mock_start_response,schema))
+    assert ['']==result
+
+
+
+@pytest.fixture
+def schema_wrong():
+    schema_path ="/home/ves-dev/ves/tests/collector/schema.json"
+    schema=json.load(open(schema_path, 'r'))
+    return schema
+
+
+#test test_listener() exception TestControl input not valid
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_exception(mocker,mock_input,body,start_response,schema_wrong):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    body={}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    result=list(monitor.test_listener(environ, mock_start_response,schema_wrong))
+    assert ['']==result
+
+
+
+#check test_listener() Missing schema 
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_Missing_schema(mocker,mock_input,body,start_response):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    result=list(monitor.test_listener(environ, mock_start_response,None))
+    assert ['']==result
+
+
+#check test_listener() Invalid Input
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_Listener_Input_invalid(mocker,mock_input,body,start_response):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    body={}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    result=list(monitor.test_listener(environ, mock_start_response,None))
+    assert ['']==result
+
+
+#test listener() get method
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_get_method(mock_monitor,mock_input,body,start_response,schema):
+    mock_input.__name__ = 'read'
+    environ={"REQUEST_METHOD": "GET","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+    mock_input.read.return_value=body
+    mock_start_response= mock.Mock(start_response)
+    result = list(monitor.listener(environ, mock_start_response, schema))
+    assert [b'POST  /eventListener/v7'] == result
+
+
+
+#check  save_event_in_kafka() function
 @mock.patch('monitor.kafka_server')
 def test_save_event_in_kafka(mocker,data_set,topic_name):
     data_set_string=json.dumps(data_set)
@@ -111,9 +502,22 @@ def test_save_event_in_kafka(mocker,data_set,topic_name):
     with mock.patch.object(logger,'info') as mock_info:
         monitor.save_event_in_kafka(data_set_string)
         mock_info.assert_called_once_with('Got an event request for topic domain')
-        #monitor.produce_events_in_kafka.assert_called_once_with(data_set,topic_name)
 
 
+# check save_event_in_kafka() topic length
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('monitor.produce_events_in_kafka')
+@mock.patch('monitor.kafka_server')
+def test_save_event_in_kafka_topic_len(server,mock_producer,topic_name):
+    body={'event':{'commonEventHeader':{'domain':''}}}
+    body=json.dumps(body)
+    monitor.save_event_in_kafka(body)
+    data_set={'event': {'commonEventHeader': {'domain': ''}}}
+    mock_producer.assert_called_once_with(data_set,'')
+
+
+
+#check produce_event_in_kafka() function      
 @mock.patch('monitor.KafkaProducer')
 @mock.patch('monitor.producer')
 def test_produce_events_in_kafka(mock_pro,mock_producer,data_set,topic_name):
@@ -123,4 +527,7 @@ def test_produce_events_in_kafka(mock_pro,mock_producer,data_set,topic_name):
         monitor.produce_events_in_kafka(data_set,topic_name)
         mock_pro.send.assert_called_with(topic_name,value=data_set)
         mock_debug.assert_called_once_with('Event has been successfully posted into kafka bus')
+        path=os.getcwd()
+        os.remove(os.path.join(path,'collector.log'))
+
 
diff --git a/tests/collector/test_rest_dispatcher.py b/tests/collector/test_rest_dispatcher.py
new file mode 100644 (file)
index 0000000..154883c
--- /dev/null
@@ -0,0 +1,80 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import pytest
+import unittest
+import monitor
+from urllib import response
+from unittest import mock
+from unittest.mock import patch
+from pytest_mock import MockerFixture
+import logging
+import rest_dispatcher
+from gevent import socket
+from gevent import pywsgi
+import gevent
+
+@pytest.fixture
+def start_response():
+    sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    start_response=pywsgi.WSGIHandler(sock,"","")
+    return start_response
+
+#test test_notfound_404
+@patch('rest_dispatcher.base_url','')
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('rest_dispatcher.set_404_content')
+def test_notfound_404(mocker_dispatcher,mock_input,start_response):
+    environ={"REQUEST_METHOD": "POST","PATH_INFO":''}
+    mock_start_response= mock.Mock(start_response)
+    base_url=''
+    logger = logging.getLogger('monitor')
+    logger.setLevel(logging.DEBUG)
+    with mock.patch.object(logger,'debug') as mock_debug:
+        result=rest_dispatcher.notfound_404(environ, mock_start_response)
+        assert result==['template_404']
+
+#test call of 
+@patch('rest_dispatcher.base_url','')
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+def test_call(mock_input,start_response):
+     environ={"REQUEST_METHOD": "POST","PATH_INFO":''}
+     mock_start_response= mock.Mock(start_response)
+     rest_obj=rest_dispatcher.PathDispatcher()
+     res=rest_obj.__call__(environ,mock_start_response)
+     assert  ['template_404'] ==res
+
+
+@patch('rest_dispatcher.base_url')
+def test_set_404_content(mock_url):
+    mock_url.return_value=''
+    result=rest_dispatcher.set_404_content('')
+    assert result==None
+
+@pytest.fixture
+def path():
+    path='/eventListener/v5/events'
+    return path
+
+@pytest.fixture
+def method():
+    method='post'
+    return method
+
+def test_register(path,method):
+    rest_obj=rest_dispatcher.PathDispatcher()
+    res=rest_obj.register(path,method,None)
+    assert res==None
diff --git a/tests/collector/wrong_config.conf b/tests/collector/wrong_config.conf
new file mode 100644 (file)
index 0000000..fe90735
--- /dev/null
@@ -0,0 +1,32 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[default]
+schema_file = "evel-test-collector/docs/att_interface_definition/hello.json"
+base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json
+throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
+test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = evel-test-collector/docs/att_interface_definition
+log_file =collector.log
+vel_domain = 127.0.0.1
+vel_port = 9999
+vel_path = "vendor_event_listener/event"
+vel_username =
+vel_password =user
+vel_topic_name =password
+kafka_server =kafka
+log_level = ERROR
+kafka_topic =topic
+schema_ref =https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/test_faultMns.html#components/schemas/NotifyNewAlarm
index c3deb11..bffbbb4 100644 (file)
@@ -43,9 +43,18 @@ def kafkaBroker():
 @pytest.fixture
 def logger():
     logger = logging.getLogger('DMaaP')
+    logger.setLevel(logging.DEBUG)
+    logger.setLevel(logging.ERROR)
     logger.setLevel(logging.INFO)
     return logger
 
+@pytest.fixture
+def enable_assert():
+    assert_value='enable'
+    return assert_value
+
+
+#test init function in appconfig
 @mock.patch('app_config.AppConfig.setLogger')
 @mock.patch('argparse.ArgumentParser.parse_args',
 return_value=argparse.Namespace(config=get_config_path(),section='default'))
@@ -53,19 +62,50 @@ def test___init__(parser,mock_setLogger):
     AppConfig.__init__(AppConfig)
     mock_setLogger.assert_called_with('dmaap.log','error')
 
+
+#test kafka broker
 def test_getKafkaBroker(kafkaBroker):
     AppConfig.kafka_broker=kafkaBroker
     res=AppConfig.getKafkaBroker(AppConfig)
     assert res == kafkaBroker
 
+#test getLogger
 def test_getLogger(logger):
     AppConfig.logger=logger
     res=AppConfig.getLogger(AppConfig)
     assert res.getEffectiveLevel()==20
 
+
+#test logger level Info
 def test_setLogger(logger):
     log_file= 'dmaap.log'
     log_level='INFO'
     with mock.patch.object(logger,'info') as mock_info:
         AppConfig.setLogger(AppConfig,log_file,log_level)
         mock_info.assert_called_with('Log level INFO and log file dmaap.log : ')
+
+
+#test setLogger Debug
+def test_setLogger_debug(logger):
+    log_file= 'dmaap.log'
+    log_level= 'DEBUG'
+    with mock.patch.object(logger,'info') as mock_debug:
+        AppConfig.setLogger(AppConfig,log_file,log_level)
+        mock_debug.assert_called_with('Log level DEBUG and log file dmaap.log : ')
+
+
+#test setLogger error
+def test_setLogger_error(logger):
+    log_file= 'dmaap.log'
+    log_level='ERROR'
+    with mock.patch.object(logger,'info') as mock_error:
+        AppConfig.setLogger(AppConfig,log_file,log_level)
+        mock_error.assert_called_with('Log level ERROR and log file dmaap.log : ')
+
+        
+
+#test AssertConfigValue
+def test_getAssertConfigValue(enable_assert):
+    AppConfig.enable_assert=enable_assert
+    res=AppConfig.getAssertConfigValue(AppConfig)
+    assert res==enable_assert
index cb92957..55a29c9 100644 (file)
 # limitations under the License.
 #
 
-
+import os
+import argparse
+import configparser
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import abc
 import pytest
 from unittest import mock
 from unittest.mock import patch
@@ -38,11 +42,46 @@ def topic_list():
     topic_list=ListTopics()
     return topic_list
 
+@pytest.fixture
+def empty_topic_list():
+    empty_topic_list=EmptyListTopics()
+    return empty_topic_list
+
+
 @pytest.fixture
 def resCode():
     responseCode=200
     return responseCode
 
+def get_path():
+    project_path = os.getcwd()
+    project_path = project_path[:project_path.rfind('/')]
+    return project_path
+
+def get_config_path():
+    project_path=get_path()
+    config_path = os.path.join(
+        project_path,"ves/dmaapadapter/adapter/config/adapter.conf")
+    return config_path
+
+#test __init__ of EventConsumer
+@mock.patch('app_config.AppConfig.setLogger')
+@mock.patch('argparse.ArgumentParser.parse_args',
+return_value=argparse.Namespace(config=get_config_path(),section='default'))
+def test_init_event(parser,mock_setLogger):
+    EventConsumer.__init__(EventConsumer)
+    mock_setLogger.assert_called_with('dmaap.log','error')
+
+
+#test __init__ of TpoicConsumer
+@mock.patch('app_config.AppConfig.setLogger')
+@mock.patch('argparse.ArgumentParser.parse_args',
+return_value=argparse.Namespace(config=get_config_path(),section='default'))
+def test_init_consumer(parser,mock_setLogger):
+    TopicConsumer.__init__(TopicConsumer)
+    mock_setLogger.assert_called_with('dmaap.log','error')
+
+
 @mock.patch('confluent_kafka.Consumer')
 def test_consumeEvents(mock_consumer,prepareResponse,topic,resCode):
     consumergroup="test"
@@ -57,6 +96,39 @@ def test_consumeEvents(mock_consumer,prepareResponse,topic,resCode):
     assert resCode == prepareResponse.getResponseCode()
     assert resMsg == prepareResponse.getResponseMsg()
 
+
+#test consumeEvents for break
+@mock.patch('confluent_kafka.Consumer')
+def test_consumeEvents_break(mock_consumer,prepareResponse,topic,resCode):
+    consumergroup="test"
+    consumerid="test1"
+    limit=0
+    timeout=1
+    mock_consumer.__name__ = 'subscribe'
+    mock_consumer.__name__ = 'poll'
+    mock_consumer.poll.return_value=None
+    resMsg='[]'
+    EventConsumer.consumeEvents(EventConsumer, prepareResponse, topic, consumergroup, consumerid,limit, timeout)
+    assert resCode == prepareResponse.getResponseCode()
+    assert resMsg == prepareResponse.getResponseMsg()
+
+
+#test consumeEvents for Exception
+@mock.patch('confluent_kafka.Consumer')
+def test_consumeEvents_Exceptions(mock_consumer,prepareResponse,topic):
+    consumergroup="test"
+    consumerid="test1"
+    limit=abc
+    timeout=1
+    mock_consumer.__name__ = 'subscribe'
+    mock_consumer.__name__ = 'poll'
+    mock_consumer.poll.return_value=None
+    resMsg='"Failed to return the events"'
+    EventConsumer.consumeEvents(EventConsumer, prepareResponse, topic, consumergroup, consumerid,limit, timeout)
+    assert 500 == prepareResponse.getResponseCode()
+    assert resMsg == prepareResponse.getResponseMsg()
+
+
 def test_getTopics(mocker,prepareResponse,topic_list,resCode):
     mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
     return_value=topic_list)
@@ -65,6 +137,18 @@ def test_getTopics(mocker,prepareResponse,topic_list,resCode):
     assert resCode == prepareResponse.getResponseCode()
     assert resMsg == prepareResponse.getResponseMsg()
 
+
+#test getTopics Exception
+def test_getTopics_Exceptions(mocker,prepareResponse):
+    mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+    return_value='')
+    TopicConsumer.getTopics(TopicConsumer, prepareResponse)
+    resMsg='"Failed to return the topics"'
+    assert 500 == prepareResponse.getResponseCode()
+    assert resMsg == prepareResponse.getResponseMsg()
+
+
+#test ListALLTopics() function
 def test_listAllTopics(mocker,prepareResponse,topic_list,resCode):
     mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
     return_value=topic_list)
@@ -73,6 +157,18 @@ def test_listAllTopics(mocker,prepareResponse,topic_list,resCode):
     assert resCode == prepareResponse.getResponseCode()
     assert resMsg == prepareResponse.getResponseMsg()
 
+
+#test listAllTopics Exceptions
+def test_listAllTopics_Exceptions(mocker,prepareResponse):
+    mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+    return_value='')
+    TopicConsumer.listAllTopics(TopicConsumer, prepareResponse)
+    resMsg='"Failed to return the topics"'
+    assert 500 == prepareResponse.getResponseCode()
+    assert resMsg == prepareResponse.getResponseMsg()
+
+
+#test getTopicDetails() function
 def test_getTopicDetails(mocker,prepareResponse,topic,topic_list,resCode):
     mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
     return_value=topic_list)
@@ -81,5 +177,30 @@ def test_getTopicDetails(mocker,prepareResponse,topic,topic_list,resCode):
     assert resCode == prepareResponse.getResponseCode()
     assert resMsg == prepareResponse.getResponseMsg()
 
+
+#test getTopicDetails Exceptions
+def test_getTopicDetails_Exceptions(mocker,prepareResponse,topic):
+    mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+    return_value='')
+    TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse,topic)
+    resMsg='"Failed to return the topics"'
+    assert 500 == prepareResponse.getResponseCode()
+    assert resMsg == prepareResponse.getResponseMsg()
+
+
+#test getTopicDetails Topic exists
+def test_getTopicDetails_Topic_exists(mocker,prepareResponse,topic,empty_topic_list,resCode):
+    mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+    return_value=empty_topic_list)
+    TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse,topic)
+    resMsg='"Topic [test1] not found"'
+    assert 404 == prepareResponse.getResponseCode()
+    assert resMsg == prepareResponse.getResponseMsg()
+
+
 class ListTopics:
     topics={"test1":"value1", "test2":"value2"}
+
+
+class EmptyListTopics:
+    topics={}
\ No newline at end of file
index 1a62bca..2c6e105 100644 (file)
@@ -183,3 +183,56 @@ def data_set():
                }
             }
     return data_set
+
+
+#test index()
+@mock.patch('dmaap_adapter.TopicConsumer')
+def test_index(self):
+   res=dmaap_adapter.index()
+   assert res=="Welcome !!"
+
+
+#test get_all_topics
+@mock.patch('flask.Flask.response_class')
+@mock.patch('dmaap_adapter.PrepareResponse')
+@mock.patch('dmaap_adapter.TopicConsumer')
+def test_get_all_topics(mock_consumer, mock_response, mock_app, prepareResponse, data_set):
+    mock_app.return_value = prepareResponse
+    res = dmaap_adapter.get_all_topics()
+    mock_consumer.getTopics(mock_response)
+    mock_consumer.getTopics.assert_called_with(mock_response)
+    assert res.responseCode == prepareResponse.getResponseCode()
+
+
+#test listall_topics
+@mock.patch('flask.Flask.response_class')
+@mock.patch('dmaap_adapter.PrepareResponse')
+@mock.patch('dmaap_adapter.TopicConsumer')
+def test_listall_topics(mock_consumer, mock_response, mock_app, prepareResponse, data_set):
+    mock_app.return_value = prepareResponse
+    res = dmaap_adapter.listall_topics()
+    mock_consumer.listAllTopics(mock_response)
+    mock_consumer.listAllTopics.assert_called_with(mock_response)
+    assert res.responseCode == prepareResponse.getResponseCode()
+
+
+#test getLimit()
+def test_getLimit():
+   limit ='abc'
+   res=dmaap_adapter.getLimit(limit)
+   assert res == -1
+
+# test getTimeout exception
+def test_getTimeout_exception():
+   timeout= 'abc'
+   res=dmaap_adapter.getTimeout(timeout)
+   assert res == 15
+
+#test getTimeout
+def test_getTimeout():
+   timeout = -1
+   response=dmaap_adapter.getTimeout(timeout)
+   if timeout<response:
+      timeout = 15
+   assert response==timeout
+
index fcb3212..074fb22 100644 (file)
@@ -82,6 +82,17 @@ def test_process_heartbeat_events_called(mocker_process_time, mocker_send_to_inf
     mocker_send_to_influxdb.assert_called_with(domain, hb_expected_pdata)
 
 
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_heartbeat_events(mocker_process_time, mocker_send_to_influxdb, hb_json, hb_data, hb_nonstringpdata, hb_expected_pdata, event_Timestamp):
+    domain = "heartbeat"
+    jobj={'additionalFields':{'eventTime':6}}
+    hb_ex='heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None lastEpochMicrosec=1639965574292938,sequence=357,startEpochMicrosec=1639965574292938,eventTime=6 1639985333218840000'
+    influxdb_connector.process_heartbeat_events(domain, jobj, hb_data, hb_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain, hb_ex)
+
+
 # ------------------------------------------------------------------------------
 # Address of pnfRegistration event.
 # ------------------------------------------------------------------------------
@@ -119,6 +130,27 @@ def test_process_pnfRegistration_event_called(mock_process_time ,mocker_send_to_
     mocker_send_to_influxdb.assert_called_with(domain, pnf_expected_pdata)
 
 
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_pnfRegistration_event(mock_process_time ,mocker_send_to_influxdb, pnf_json, pnf_data, pnf_nonstringpdata, pnf_expected_pdata, event_Timestamp):
+    domain = "pnfRegistration"
+    jobj={1:2,2:4}
+    non_pnf='pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087,1=2,2=4 1639985333218840000'
+    influxdb_connector.process_pnfRegistration_event(domain, jobj, pnf_data, pnf_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain, non_pnf)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_pnfRegistration_event_elif(mock_process_time ,mocker_send_to_influxdb, pnf_json, pnf_data, pnf_nonstringpdata, pnf_expected_pdata, event_Timestamp):
+    domain = "pnfRegistration"
+    jobj={'additionalFields': {'oamPort': 830}}
+    non_pnf='pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087,oamPort=830 1639985333218840000'
+    influxdb_connector.process_pnfRegistration_event(domain, jobj, pnf_data, pnf_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain, non_pnf)
+
+
+
 # ------------------------------------------------------------------------------
 # Address of fault event unit test case
 # ------------------------------------------------------------------------------
@@ -158,6 +190,38 @@ def test_process_fault_event_called(mock_time,mocker_send_to_influxdb, flt_json,
     mocker_send_to_influxdb.assert_called_with(domain, flt_expected_pdata)
 
 
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_fault_event(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp):
+    domain = "fault"
+    payload=flt_json
+    for key, val in payload.items():
+        if key != 'alarmAdditionalInformation' and val != "":
+            if isinstance(val, list):
+                influxdb_connector.process_fault_event(payload.get('alarmAdditionalInformation'),domain, flt_json, flt_data, flt_nonstringpdata)
+                mocker_send_to_influxdb.assert_called_with(domain, flt_expected_pdata)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_fault_event_nonstr(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp):
+    domain = "fault"
+    jobj={2:2}
+    flt_ex='fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840,2=2 1639985333218840000'
+    influxdb_connector.process_fault_event(domain, jobj, flt_data, flt_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain, flt_ex)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_fault_event_nonstr_elif(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp):
+    domain = "fault"
+    jobj={'alarmAdditionalInformation':{'eventTime': 234, 'equipType': 345, 'vendor': 'VENDORA', 'model': 'FancyNextGeneration'}}
+    flt_ex='fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,vendor=VENDORA,model=FancyNextGeneration sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840,eventTime=234,equipType=345 1639985333218840000'
+    influxdb_connector.process_fault_event(domain, jobj, flt_data, flt_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain, flt_ex)
+
+
 # ------------------------------------------------------------------------------
 # Address of measurement event unit test_cases
 # ------------------------------------------------------------------------------
@@ -244,12 +308,53 @@ def test_process_measurement_events_called(mock_time,mocker_send_to_influxdb, mo
     mocker_send_to_influxdb.assert_called_with(domain, meas_expected_data)
 
 
+
+@patch('influxdb_connector.process_nonadditional_measurements')
+@patch('influxdb_connector.process_additional_measurements')
+@patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_measurement_events(mock_time,mocker_send_to_influxdb, mocker_additional, mocker_nonadditional, meas_json,
+                                           meas_data, meas_nonstringpdata, event_Id, start_Epoch_Microsec, last_Epoch_Microsec,
+                                           meas_expected_data, non_add_meas_data, add_meas_data, event_Timestamp):
+    domain = "measurement"
+    jobj={"test":[1,2,3],'networkSliceArray':[1,2,3]}
+    means_ex='measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000'
+    influxdb_connector.process_measurement_events('measurement',jobj, meas_data, meas_nonstringpdata, event_Id,
+                                                  start_Epoch_Microsec, last_Epoch_Microsec)
+    influxdb_connector.process_additional_measurements(domain,event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+    mocker_nonadditional.process_nonadditional_measurements([], 'measurementnicperformance', event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+    mocker_send_to_influxdb.assert_called_with(domain, means_ex)
+
+
+
+@patch('influxdb_connector.process_nonadditional_measurements')
+@patch('influxdb_connector.process_additional_measurements')
+@patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_measurement_events_elif(mock_time,mocker_send_to_influxdb, mocker_additional, mocker_nonadditional, meas_json,
+                                           meas_data, meas_nonstringpdata, event_Id, start_Epoch_Microsec, last_Epoch_Microsec,
+                                           meas_expected_data, non_add_meas_data, add_meas_data, event_Timestamp):
+    domain = "measurement"
+    jobj={"test":{1:26,2:56},'networkSliceArray':{1:4,2:7}}
+    means_ex='measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1=26,2=56,1=4,2=7 1639985333218840000'
+    influxdb_connector.process_measurement_events('measurement',jobj, meas_data, meas_nonstringpdata, event_Id,
+                                                  start_Epoch_Microsec, last_Epoch_Microsec)
+    influxdb_connector.process_additional_measurements(domain,event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+    mocker_additional.process_additional_measurements(add_meas_data.get('additionalMeasurements'), 'measurementadditionalmeasurements',
+                                                      event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+
+    mocker_nonadditional.process_nonadditional_measurements([], 'measurementnicperformance', event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+    mocker_send_to_influxdb.assert_called_with(domain, means_ex)
+
+
+
 @pytest.fixture
 def add_meas_expected_pdata():
             additional_expected_pdata = 'measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,name=LP-MWPS-RADIO-2,es=0,ses=1,cses=0,unavailability=0 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000'
             return additional_expected_pdata
 
 
+
 # ## process_additional_measurements unit test_case
 @mock.patch('influxdb_connector.send_to_influxdb')
 @mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
@@ -265,6 +370,35 @@ def test_process_additional_measurements_called(mock_time, mocker_send_to_influx
                     mocker_send_to_influxdb.assert_called_with(domain, add_meas_expected_pdata)
 
 
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_additional_measurements(mock_time, mocker_send_to_influxdb, event_Id, start_Epoch_Microsec, last_Epoch_Microsec,
+                                                add_meas_data, add_meas_expected_pdata, event_Timestamp):
+    payload = [{1:23}]
+    domain = 'measurementadditionalmeasurements'
+    expected_pdata='measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1=23 1639985333218840000'
+    influxdb_connector.process_additional_measurements(payload, domain,
+                                                                       event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+    mocker_send_to_influxdb.assert_called_with(domain, expected_pdata)
+
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_additional_measurements_else(mock_time, mocker_send_to_influxdb, event_Id, start_Epoch_Microsec, last_Epoch_Microsec,
+                                                add_meas_data, add_meas_expected_pdata, event_Timestamp):
+    payload = [{1:{1:{67}}}]
+    domain = 'measurementadditionalmeasurements'
+    expected_pdata='measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1={67} 1639985333218840000'
+    influxdb_connector.process_additional_measurements(payload, domain,
+                                                                       event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+    mocker_send_to_influxdb.assert_called_with(domain, expected_pdata)
+
+
+
+
+
 @pytest.fixture
 def non_add_expected_data():
             non_additional_expected_pdata = "measurementcpuusage,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,name=LP-MWPS-RADIO-2 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,hashMap={'es': '0', 'ses': '1', 'cses': '0', 'unavailability': '0'} 1639985333218840000"
@@ -322,7 +456,63 @@ def test_process_thresholdCrossingAlert_event_called(thre_json, threshold_data,
          func.assert_called_with(domain, thre_json, threshold_data, thres_nonstringpdata)
 
 
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_event(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+    jobj= {"test":"test"}
+    pdata= 'thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None'
+    domain = "thresholdCrossingAlert"
+    thres_data='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,system=None,thresholdCrossingFieldsVersion=4.0,criticality=MAJ,additionalProperties=up-and-down,thresholdCrossed=packetLoss,alertAction=SET,alertDescription=TCA,alertType=INTERFACE-ANOMALY,alertValue=1OSF,associatedAlertIdList=loss-of-signal,collectionTimestamp=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:28:56\\ +0000,dataCollector=data-lake,elementType=1OSF,eventSeverity=WARNING,eventStartTimestamp=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,networkService=from-a-to-b,possibleRootCause=always-the-others,eventTime=2021-12-20T07:28:56.443218Z,equipType=1OSF sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218 1639985333218840000'
+    influxdb_connector.process_thresholdCrossingAlert_event(domain,thre_json, pdata, thres_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain, thres_data)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_elif(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+    jobj={'additionalParameters': [{'addParameter': 'MAJ', 'abc':
+                     {'additionalProperties': 'up-and-down'}, 'thresholdCrossed': 'packetLoss'}],}
+    domain = "thresholdCrossingAlert"
+    nonstr="thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,addParameter=MAJ,thresholdCrossed=packetLoss sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,abc={'additionalProperties': 'up-and-down'} 1639985333218840000"
+    influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain,  nonstr)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_elif_elif(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+    jobj={'additionalParameters': [{'addParameter': 'MAJ', 'hashMap':
+                     {'additionalProperties':67}, 'thresholdCrossed': 'packetLoss'}],}
+    domain = "thresholdCrossingAlert"
+    nonstr='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,addParameter=MAJ,thresholdCrossed=packetLoss sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,additionalProperties=67 1639985333218840000'
+    influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain,  nonstr)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_event_elif(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+    jobj= {1:2}
+    domain = "thresholdCrossingAlert"
+    nonstr='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,1=2 1639985333218840000'
+    influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain,nonstr)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_event_nonstr(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+    jobj= {'additionalFields': {'eventTime': 2}}
+    domain = "thresholdCrossingAlert"
+    nonstr='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,eventTime=2 1639985333218840000'
+    influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata)
+    mocker_send_to_influxdb.assert_called_with(domain,nonstr)
+
+
+#.................................................................................
 # ## save_event_in_db unit test_cases.
+#....................................................................................
+
 @patch('influxdb_connector.logger')
 @pytest.mark.parametrize("key", [("heartbeat"), ("pnfRegistration"), ("measurement"), ("fault"), ("thresholdCrossingAlert")])
 def test_save_event_in_db(mock_logger, key, hb_json, hb_data, hb_nonstringpdata, pnf_json, pnf_data, pnf_nonstringpdata,
@@ -361,3 +551,48 @@ def test_save_event_in_db(mock_logger, key, hb_json, hb_data, hb_nonstringpdata,
                influxdb_connector.save_event_in_db(data_set)
                func.assert_called_with('thresholdCrossingAlert', thre_json, threshold_data, thres_nonstringpdata)
 
+
+
+@patch('influxdb_connector.logger')
+def test_save_event_in_db_localhost(mock_logger):
+    data_set = {'event':{'commonEventHeader':{'reportingEntityName':'LOCALHOST','domain':'heartbeat','startEpochMicrosec':'1639965574292938','sourceId':'1223'}}}
+    try:
+        res=influxdb_connector.save_event_in_db(json.dumps(data_set))
+    except Exception:
+        pytest.fail('Exception occured while saving data')
+    assert res==None
+
+
+@patch('influxdb_connector.logger')
+def test_save_event_in_db_comman(mock_logger):
+    data_set = {'event':{'commonEventHeader':{'reportingEntityName':'LOCALHOST','domain':'heartbeat','startEpochMicrosec':'1639965574292938','sourceId':'1223','internalHeaderFields':{1:78}}}}
+    try:
+        res=influxdb_connector.save_event_in_db(json.dumps(data_set))
+    except Exception:
+        pytest.fail('Exception occured while saving data')
+    assert res==None
+
+    
+
+@pytest.fixture
+def event():
+    event="domain"
+    return event
+
+
+@pytest.fixture
+def p_data():
+    p_data='heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1'
+    return p_data
+
+
+#send_to_influxdb unittest
+@patch('influxdb_connector.requests.post')
+@patch('influxdb_connector.logger')
+def test_send_to_influxdb(mock_logger,mock_post,event,p_data):
+    mock_post.return_value.status_code=201
+    try:
+        res=influxdb_connector.send_to_influxdb(event,p_data)
+    except Exception:
+        pytest.fail('Exception occured while saving data')
+    assert res==None
diff --git a/tox.ini b/tox.ini
index a934567..2123858 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -31,6 +31,12 @@ commands = sphinx-build -W -b linkcheck -d {envtmpdir}/doctrees ./docs/ {toxinid
 whitelist_externals = sh
 commands = sh -c 'pip freeze > requirements.txt'
 
+
+[coverage:run]
+# exclude test folder from coverage report
+omit = *tests*
+
+
 [testenv:code]
 basepython = python3
 deps=
@@ -56,4 +62,4 @@ deps=
 # which streams the logs as they come in, rather than saving them
 # all for the end of tests
 commands =
-  pytest --ignore=functionaltest --ignore=collector --cov {toxinidir}  --cov-report xml --cov-report term-missing --cov-report html --cov-fail-under=70 --junitxml={toxinidir}/tmp/tests.xml
+  pytest --ignore=functionaltest --ignore=collector --cov {toxinidir}  --cov-report xml --cov-report term-missing --cov-report html --cov-fail-under=70 --junitxml={toxinidir}/tmp/tests.xml
\ No newline at end of file