RUN apt-get update && apt-get -y upgrade
RUN apt-get install -y git curl python3 python3-pip
-RUN pip3 install requests jsonschema kafka-python gevent
+COPY pip.conf /etc/pip.conf
+
+RUN pip3 install requests jsonschema kafka-python gevent PyYAML
RUN mkdir -p /opt/smo/certs
import datetime
import time
from gevent import pywsgi
+import yaml
+import requests
monitor_mode = "f"
vdu_id = ['', '', '', '', '', '']
# ------------------------------------------------------------------------------
vel_schema = None
+#------------------------------------------------------------------------------
+# The yaml schema which we will use to validate events.
+# ------------------------------------------------------------------------------
+vel_yaml_schema = None
+
# ------------------------------------------------------------------------------
# The JSON schema which we will use to validate client throttle state.
# ------------------------------------------------------------------------------
producer = None
-def listener(environ, start_response, schema):
+def listener(environ, start_response, schema, yaml_schema=None):
'''
Handler for the Vendor Event Listener REST API.
decoded_body = json.loads(body)
validate = jsonschema.validate(decoded_body, schema)
assert (validate is None), "Invalid event!"
+ if 'stndDefinedFields' in decoded_body['event'].keys():
+ logger.debug('in yaml validation')
+ schema_ref = decoded_body['event']['stndDefinedFields']["schemaReference"]
+ if "https://forge.3gpp.org" in schema_ref or "https://gerrit.o-ran-sc.org" in schema_ref:
+ stnd_define_event_validation(yaml_schema, decoded_body)
+ else :
+ logger.error("schema reference {0} not supported.".format(schema_ref))
+ raise Exception("schema reference {0} not supported.".format(schema_ref))
logger.info('Event is valid!')
logger.info('Valid body decoded & checked against schema OK:\n'
start_response('202 Accepted', [])
yield ''.encode()
else:
- logger.warn('Failed to authenticate OK; creds: ' + credentials)
+ logger.warn('Failed to authenticate OK; creds: ' , credentials)
logger.warn('Failed to authenticate agent credentials: ',
credentials,
'against expected ',
except Exception as e:
logger.error('Event invalid for unexpected reason! {0}'.format(e))
+# --------------------------------------------------------------------------
+# check yaml schema file exists or not
+# --------------------------------------------------------------------------
+def check_schema_file_exist(vel_schema_path, schema_ref):
+ logger.debug('in check yaml file')
+ assert (vel_schema_path != ""), "Value of property 'schema_file' is missing in config file"
+ # Fetching file and folder name from url
+ schema_ref = schema_ref.split('#')[0]
+ name_list = schema_ref.split('/')
+ folder_name = '_'.join(name_list[2:-1])
+ file_name = name_list[-1]
+ updated_vel_schema_path = vel_schema_path +'/{0}/{1}'.format(folder_name,file_name)
+ if "https://forge.3gpp.org" in schema_ref:
+ schema_ref = schema_ref.replace("blob","raw")
+ schema_ref = schema_ref + '?inline=false'
+ if not os.path.exists(updated_vel_schema_path):
+ logger.warning('Event Listener Schema File ({0}) not found. ''No validation will be undertaken.'.format(vel_schema_path))
+ logger.info('Start downloading yaml file :{}'.format(schema_ref))
+ result = os.system('curl -JOL "{0}"'.format(schema_ref))
+ logger.debug("result {0}".format(result))
+ assert(result == 0), "Invalid URL {0}".format(schema_ref)
+ logger.info("Download Completed")
+ with open(file_name, "r") as file:
+ first_line = file.readline()
+ # checking downloaded file content is yaml or html
+ if first_line.strip() == "<!DOCTYPE html>":
+ logger.info("Downloaded file is not valid yaml")
+ os.system("del {0} ".format(file_name))
+ logger.info("Downloaded file deleted")
+ assert(first_line.strip() != "<!DOCTYPE html>"), "Invalid Schema File"
+ else:
+ # Create a folder from source url
+ os.system('mkdir {0}/{1}'.format(vel_schema_path, folder_name))
+ # move downloaded file in above created folder
+ os.system("mv {0} {1}/{2}".format(file_name, vel_schema_path, folder_name))
+ return updated_vel_schema_path
# --------------------------------------------------------------------------
+# Second level of validation for stnd define message
+# --------------------------------------------------------------------------
+def stnd_define_event_validation(schema_path , body):
+ logger.debug('in second level validation ')
+ schema_ref = body['event']['stndDefinedFields']["schemaReference"]
+ schema_path = check_schema_file_exist(schema_path , schema_ref)
+ logger.debug('end check yaml path ')
+ schema = yaml.full_load(open(schema_path, 'r'))
+ schema_name= schema_ref.split('/')[-1]
+ updated_schema = dict(schema["components"]["schemas"][schema_name], **schema)
+ decoded_body = body['event']['stndDefinedFields']['data']
+ validate_yaml = jsonschema.validate(decoded_body,updated_schema)
+ assert(validate_yaml is None), "Invalid event!"
+ logger.info('standard defined event validated sucessfully ')
+ return validate_yaml
+# --------------------------------------------------------------------------
# Save event data in Kafka
# --------------------------------------------------------------------------
def save_event_in_kafka(body):
incoming event on the EVEL interface.
'''
global pending_command_list
+ global decoded_body
logger.info('Got a Test Control input')
logger.info('============================')
logger.info('==== TEST CONTROL INPUT ====')
program_build_date = str(__updated__)
program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
program_build_date)
- if (__import__('__main__').__doc__ is not None):
+ if (__import__('__main__').__doc__ is not None): # pragma: no cover
program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
else:
program_shortdesc = 'Running in test harness'
test_control_schema_file = config.get(config_section,
'test_control_schema_file',
vars=overrides)
+ vel_yaml_schema = config.get(config_section,
+ 'yaml_schema_path',
+ vars=overrides)
# ----------------------------------------------------------------------
# Finally we have enough info to start a proper flow trace.
global get_info
get_info = root_url
dispatcher = PathDispatcher()
- vendor_event_listener = partial(listener, schema=vel_schema)
+ vendor_event_listener = partial(listener, schema=vel_schema, yaml_schema=vel_yaml_schema)
dispatcher.register('GET', root_url, vendor_event_listener)
dispatcher.register('POST', root_url, vendor_event_listener)
vendor_throttle_listener = partial(listener, schema=throttle_schema)
logger.error('Main loop exited unexpectedly!')
return 0
- except KeyboardInterrupt:
+ except KeyboardInterrupt: # pragma: no cover
# ----------------------------------------------------------------------
# handle keyboard interrupt
# ----------------------------------------------------------------------
# ------------------------------------------------------------------------------
-if __name__ == '__main__':
+if __name__ == '__main__': # pragma: no cover
if TESTRUN:
# ----------------------------------------------------------------------
# Running tests - note that doctest comments haven't been included so
environ['REQUEST_METHOD'].upper(),
environ['PATH_INFO']))
start_response('404 Not Found', [ ('Content-type', 'text/plain') ])
- return [template_404.format(base_url)]
+ return ['template_404{}'.format(base_url)]
class PathDispatcher:
'''
Register a handler for a method/path, adding it to the pathmap.
'''
logger.debug('Registering for {0} at {1}'.format(method, path))
- print('Registering for {0} at {1}'.format(method, path))
self.pathmap[method.lower(), path] = function
return function
base_schema_file = evel-test-collector/docs/att_interface_definition/base_schema.json
throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = evel-test-collector/docs/schema
#------------------------------------------------------------------------------
# Details of the Vendor Event Listener REST service.
base_schema_file =
throttle_schema_file = ../../docs/att_interface_definition/throttle_schema.json
test_control_schema_file = ../../docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = ../../docs/schema
#------------------------------------------------------------------------------
# Details of the Vendor Event Listener REST service.
--- /dev/null
+NOTE: This folder contains yaml schema folder
\ No newline at end of file
--- /dev/null
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
FROM ubuntu:focal
+
RUN apt-get update && apt-get -y upgrade
-RUN apt-get install -y git curl python3 python3-pip
-RUN pip3 install requests jsonschema kafka-python flask confluent-kafka
+RUN apt-get install -y git curl python3 python3-pip
+COPY pip.conf /etc/pip.conf
+
+RUN pip3 install requests jsonschema kafka-python flask confluent-kafka
RUN mkdir /opt/smo
return timeout
-if __name__ == '__main__':
+if __name__ == '__main__': # pragma: no cover
appConfig = AppConfig()
if(appConfig.getAssertConfigValue() == 'False'):
--- /dev/null
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
RUN apt-get update && apt-get -y upgrade
RUN apt-get install -y git curl python3 python3-pip
+
+COPY pip.conf /etc/pip.conf
+
RUN pip3 install requests confluent-kafka
# Clone influxdb-connector
send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+def process_stndDefinedFields_events(values, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+ """
+ Format stndDefined event to store in influx db
+ Values(dict) :- data to store in influxdb,
+ domain(str) :- name of topic ,
+ eventId (str) :- event id,
+ startEpochMicrosec :- Timestamp ,
+ lastEpochMicrosec:- Timestamp
+ """
+ pdata = domain + ",eventId={},system={}".format(eventId, source)
+ nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+ for key, val in values.items():
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ elif isinstance(val, dict):
+ for key2, val2 in val.items():
+ if isinstance(val2, str) and val2 != '':
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ elif isinstance(val2, dict) and key2 != 'additionalInformation' :
+ for key3, val3 in val2.items():
+ if isinstance(val3, str) and val3 != '':
+ pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
+ elif val3 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+ elif key2 == 'additionalInformation':
+ for key3, val3 in val2.items():
+ if isinstance(val3, str) and val3 != '' :
+ pdata = pdata + ',{}={}'.format('additionalInformation_'+key3, process_special_char(val3))
+ elif val3 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+ elif key2 == 'correlatedNotifications':
+ for item in val2:
+ for key4, val4 in item.items():
+ if isinstance(val4, str) and val4 !='':
+ pdata = pdata + ',{}={}'.format(key4, process_special_char(val4))
+ elif val4 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key4, val4)
+
+ elif val2 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+ elif val !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
def process_special_char(str):
for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
jobj['event']['commonEventHeader']['startEpochMicrosec'],
jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+ if "stndDefinedFields" in jobj['event']:
+ logger.debug('Found stndDefinedFields')
+ process_stndDefinedFields_events(jobj['event']['stndDefinedFields'],
+ domain,
+ jobj['event']['commonEventHeader']['eventId'],
+ jobj['event']['commonEventHeader']['startEpochMicrosec'],
+ jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+
def main():
c = Consumer(settings)
c.subscribe(['measurement', 'pnfregistration',
- 'fault', 'thresholdcrossingalert', 'heartbeat'])
+ 'fault', 'thresholdcrossingalert', 'heartbeat', 'stnddefined'])
try:
while True:
try:
save_event_in_db(msg.value())
except Exception as e:
- logger.error('Exception occured while saving data : '.format(e))
+ logger.error('Exception occured while saving data : {} '.format(e))
elif msg.error().code() == KafkaError._PARTITION_EOF:
logger.error('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
c.close()
-if __name__ == '__main__':
+if __name__ == '__main__': # pragma: no cover
main()
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+
[default]
log_file = /opt/smo/influxdbconnector.log
kafka_server =
--- /dev/null
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
--- /dev/null
+attrs==22.1.0
+certifi==2022.6.15
+charset-normalizer==2.1.1
+click==8.1.3
+confluent-kafka==1.9.2
+Flask==2.2.2
+gevent==21.12.0
+greenlet==1.1.3
+idna==3.3
+importlib-metadata==4.12.0
+importlib-resources==5.9.0
+itsdangerous==2.1.2
+Jinja2==3.1.2
+jsonschema==4.15.0
+kafka-python==2.0.2
+MarkupSafe==2.1.1
+pkgutil-resolve-name==1.3.10
+pyrsistent==0.18.1
+requests==2.28.1
+urllib3==1.26.12
+Werkzeug==2.2.2
+zipp==3.8.1
+zope.event==4.5.0
+zope.interface==5.4.0
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-## Copyright 2021 Xoriant Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-## Copyright 2021 Xoriant Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
+
import os
import sys
cfgfile.close()
SOURCE_PATH = os.path.join(
PROJECT_PATH,"collector/evel-test-collector/code/collector")
-print(SOURCE_PATH, PROJECT_PATH,schema_file_path, configfile_name )
sys.path.append(SOURCE_PATH)
--- /dev/null
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[default]
+schema_file = "evel-test-collector/docs/att_interface_definition/hello.json"
+base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json
+throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
+test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = evel-test-collector/docs/att_interface_definition
+log_file =collector.log
+vel_domain = 127.0.0.1
+vel_port = 999
+vel_path =
+vel_username =user
+vel_password =password
+vel_topic_name =
+kafka_server =kafka
+log_level = DEBUG
+kafka_topic =
+schema_ref =https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/test_faultMns.yaml#components/schemas/NotifyNewAlarm
\ No newline at end of file
--- /dev/null
+{
+ "name": "MyClass",
+ "type": "record",
+ "namespace": "com.acme.avro",
+ "fields": [
+ {
+ "name": "interpretations",
+ "type": {
+ "type": "array",
+ "items": {
+ "name": "interpretations_record",
+ "type": "record",
+ "fields": [
+ {
+ "name": "segments",
+ "type": {
+ "type": "array",
+ "items": [
+ {
+ "name": "segments_record",
+ "type": "record",
+ "fields": [
+ {
+ "name": "x",
+ "type": "string"
+ },
+ {
+ "name": "y",
+ "type": "string"
+ },
+ {
+ "name": "z",
+ "type": "string"
+ }
+ ]
+ },
+ {
+ "name": "segments_record",
+ "type": "record",
+ "fields": [
+ {
+ "name": "x",
+ "type": "string"
+ },
+ {
+ "name": "y",
+ "type": "string"
+ },
+ {
+ "name": "z",
+ "type": "string"
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
\ No newline at end of file
#
[default]
-schema_file = /home/ves-dev/docs/att_interface_definition/CommonEventFormat-v7-2-2.json
-base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json
-throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
-test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
-log_file = collector.log
+schema_file =collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json
+base_schema_file = tests/collector/schema.json
+throttle_schema_file =tests/collector/schema.json
+test_control_schema_file = tests/collector/schema.json
+yaml_schema_path = evel-test-collector/docs/att_interface_definition
+log_file =collector.log
vel_domain = 127.0.0.1
vel_port = 9999
vel_path =
-vel_username =
-vel_password =
+vel_username =user
+vel_password =password
vel_topic_name =
kafka_server = kafka-server
-kafka_topic =
+kafka_topic =topic
# limitations under the License.
#
+import shutil
import os
import pytest
import unittest
import monitor
import argparse
+import configparser
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from unittest import mock
from unittest.mock import patch
+from unittest.mock import MagicMock
import logging
from pytest_mock import MockerFixture
from gevent import socket
project_path,"tests/collector/test_collector.conf")
return config_path
+def get_wrong_config_path():
+ project_path=get_path()
+ config_path = os.path.join(
+ project_path,"tests/collector/wrong_config.conf")
+ return config_path
+
+def get_wrong_config_port_path():
+ project_path=get_path()
+ config_path = os.path.join(
+ project_path,"tests/collector/port_config.conf")
+ return config_path
+
def get_schema_path():
project_path=get_path()
schema_path = os.path.join(
topic_name="topic"
return topic_name
+
+def test_init():
+ obj=monitor.JSONObject({})
+ assert obj.__dict__=={}
+
+
#@pytest.mark.skip
+@patch('monitor.logger',logging.getLogger('monitor'))
@mock.patch('gevent.pywsgi.Input',autospec=True)
@mock.patch('monitor.save_event_in_kafka')
def test_listener(mock_monitor,mock_input,body,start_response,schema):
logger = logging.getLogger('monitor')
logger.setLevel(logging.DEBUG)
with mock.patch.object(logger,'debug') as mock_debug:
- monitor.listener(environ,mock_start_response,schema)
+ result=list(monitor.listener(environ,mock_start_response,schema))
+ assert result==[b'']
+
+
+#test for listener Exception
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_exp(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2","PATH_INFO": '/eventListener/v5/events'}
+ body={}
+ mock_input.read.return_value=json.dumps(body)
+ mock_start_response= mock.Mock(start_response)
+ project_path = os.getcwd()
+ dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-1.json")}
+ try:
+ result = list(monitor.listener(environ, mock_start_response, dict_schema))
+
+ except TypeError:
+ assert result == None
+ except Exception:
+ pytest.fail('unexcepted error')
+
+
+#test b64decode credentials in listener()
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_b64decode(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "None None", "CONTENT_LENGTH": "2","PATH_INFO": '/eventListener/v5/events'}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ logger = logging.getLogger('monitor')
+ logger.setLevel(logging.WARN)
+ project_path = os.getcwd()
+ dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-1.json")}
+ with mock.patch.object(logger,'warn') as mock_warn:
+ result = list(monitor.listener(environ, mock_start_response, dict_schema))
+ mock_monitor.assert_called_with(body)
+
+
+#test listener pending command list
+@patch('monitor.vel_username','user')
+@patch('monitor.vel_password','password')
+@patch('monitor.logger',logging.getLogger('monitor'))
+@patch('monitor.pending_command_list',[1,2,3])
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_command_list(mock_monitor,mock_input,body,start_response,schema,topic_name):
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2", 'PATH_INFO': '/eventListener/v5/events'}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ logger = logging.getLogger('monitor')
+ logger.setLevel(logging.DEBUG)
+ project_path = os.getcwd()
+ dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json")}
+ with mock.patch.object(logger,'debug') as mock_debug:
+ result = list(monitor.listener(environ, mock_start_response, dict_schema))
+ assert [b'[1, 2, 3]'] ==result
+
+
+#test listener if pending_command list is none
+@patch('monitor.vel_username','user')
+@patch('monitor.vel_password','password')
+@patch('monitor.logger',logging.getLogger('monitor'))
+@patch('monitor.pending_command_list',None)
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_command_list_none(mock_monitor,mock_input,body,start_response,schema,topic_name):
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2", 'PATH_INFO': '/eventListener/v5/events'}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ logger = logging.getLogger('monitor')
+ logger.setLevel(logging.DEBUG)
+ project_path = os.getcwd()
+ dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-1.json")}
+ with mock.patch.object(logger,'debug') as mock_debug:
+ result = list(monitor.listener(environ, mock_start_response, dict_schema))
+ assert [b'']==result
+
+
+#test jsonschema error
+@patch('monitor.vel_username','user')
+@patch('monitor.vel_password','password')
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_schema_none(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2","PATH_INFO": '/eventListener/v5/events'}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ project_path=os.getcwd()
+ dict_schema =os.path.join(project_path,"tests/collector/schema.json")
+ os._exit = mock.MagicMock()
+ list(monitor.listener(environ, mock_start_response, dict_schema))
+ assert os._exit.called
+
+
+
+#test jsonschema validation exception
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_jsonschema_validation(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ body={"event": {"commonEventHeader": {"domain": 6,"eventId": "11","eventName": "","eventType": "platform","lastEpochMicrosec": 0,"priority": "Normal","reportingEntityId": "localhost","reportingEntityName": "localhost","sequence": 0,"sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a","sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a","startEpochMicrosec": 1642961518.919,"version": "4.0","vesEventListenerVersion": "7.2.1"}}}
+ body=json.dumps(body)
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.listener(environ,mock_start_response,schema))
+ assert [b'']==result
+
+
+#test if schema is none
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_schma_is_empty(mock_monitor,mock_input,body,start_response):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.listener(environ,mock_start_response,None))
+ assert []==result
+
+
+
+#test listener() Exception event is invalid for unexpected reason
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_Event_Invalid(mock_monitor,mock_input,body,start_response):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ body={}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.listener(environ,mock_start_response,None))
+ assert []==result
+
+
+
+#check main() function
+@patch('monitor.logger',logging.getLogger('monitor'))
@mock.patch('argparse.ArgumentParser.parse_args',
return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
def test_main(server,parser,body):
argv=None
- logger = logging.getLogger('monitor')
- logger.setLevel(logging.ERROR)
- with mock.patch.object(logger,'error') as mock_error:
- monitor.main(argv=None)
- #server.assert_called_once_with()
- mock_error.assert_called_once_with('Main loop exited unexpectedly!')
+ result=monitor.main(argv=None)
+ assert 0==result
+
-#@pytest.mark.skip
+
+#test main() function argv is None
+@patch('monitor.logger')
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=2, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_argv(server,parser,logger,body):
+ argv=''
+ logger.return_value=logging.getLogger('monitor')
+ try:
+ result=monitor.main(argv)
+ except TypeError:
+ assert result == None
+ except Exception:
+ pytest.fail('unexcepted error')
+
+
+
+#test platform.system in main
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_platform(server,parser,body):
+ argv=None
+ sys = mock.MagicMock()
+ try:
+ with patch('platform.system', MagicMock(return_value='Windows')):
+ res=monitor.main(argv)
+ except RuntimeError:
+ assert res == None
+ except Exception:
+ pytest.fail('Exiting because of exception')
+
+
+#test vel_port in main
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_port_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_vel_port(server,parser,body):
+ argv=''
+ res=monitor.main(argv)
+ assert res == 2
+
+
+
+# test vel_path in main
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_path(server,parser,body):
+ argv=None
+ try:
+ result = monitor.main(argv)
+ except RuntimeError:
+ assert result == None
+ except Exception:
+ pytest.fail('fail beacuase of exception')
+
+
+
+@pytest.fixture
+def vel_schema_path():
+ config = configparser.ConfigParser()
+ config_file=get_config_path()
+ config.read(config_file)
+ ref = config.get('default', 'schema_file')
+ return ref
+
+# check listener() vel_schema, if it exists
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_vel_schema_path(server,parser,vel_schema_path):
+ argv=None
+ with mock.patch('os.path.exists') as m:
+ m.return_value=vel_schema_path
+ result=monitor.main(argv)
+ assert 0==result
+
+
+
+#test unhandle exception
+@patch('monitor.DEBUG',True)
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_port_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_unhandle_exception(server,parser,body):
+ argv=None
+ result=None
+ try:
+ result = monitor.main(argv)
+ except RuntimeError:
+ assert result == None
+ except Exception:
+ pytest.fail('Exiting because of exception')
+
+
+
+#check test_listener() function
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ,mock_start_response,schema))
+ assert ['']==result
+
+
+
+#check test_listener() GET method
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_get_method(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "GET","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ response= ['{"event": {"commonEventHeader": {"domain": "measurement", "eventId": "11", "eventName": "", "eventType": "platform", "lastEpochMicrosec": 0, "priority": "Normal", "reportingEntityId": "localhost", "reportingEntityName": "localhost", "sequence": 0, "sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a", "sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a", "startEpochMicrosec": 1642961518.919, "version": "4.0", "vesEventListenerVersion": "7.2.1"}}}']
+ result=list(monitor.test_listener(environ,mock_start_response,schema))
+ assert response==result
+
+
+#test test_listener() jsonschema error
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_schema_error(mocker,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ project_path=os.getcwd()
+ schema_path =os.path.join(project_path,"tests/collector/schema.json")
+ schema=json.load(open(schema_path, 'r'))
+ result=list(monitor.test_listener(environ, mock_start_response,schema))
+ assert ['']==result
+
+
+#test test_listener() jsonschema validation error
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_schema_validation_error(mocker,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ body={"event": {"commonEventHeader": {"domain": 6,"eventId": "11","eventName": "","eventType": "platform","lastEpochMicrosec": 0,"priority": "Normal","reportingEntityId": "localhost","reportingEntityName": "localhost","sequence": 0,"sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a","sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a","startEpochMicrosec": 1642961518.919,"version": "4.0","vesEventListenerVersion": "7.2.1"}}}
+ body=json.dumps(body)
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ, mock_start_response,schema))
+ assert ['']==result
+
+
+
+@pytest.fixture
+def schema_wrong():
+ schema_path ="/home/ves-dev/ves/tests/collector/schema.json"
+ schema=json.load(open(schema_path, 'r'))
+ return schema
+
+
+#test test_listener() exception TestControl input not valid
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_exception(mocker,mock_input,body,start_response,schema_wrong):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ body={}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ, mock_start_response,schema_wrong))
+ assert ['']==result
+
+
+
+#check test_listener() Missing schema
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_Missing_schema(mocker,mock_input,body,start_response):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ, mock_start_response,None))
+ assert ['']==result
+
+
+#check test_listener() Invalid Input
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_Listener_Input_invalid(mocker,mock_input,body,start_response):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ body={}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ, mock_start_response,None))
+ assert ['']==result
+
+
+#test listener() get method
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_get_method(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "GET","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result = list(monitor.listener(environ, mock_start_response, schema))
+ assert [b'POST /eventListener/v7'] == result
+
+
+
+#check save_event_in_kafka() function
@mock.patch('monitor.kafka_server')
def test_save_event_in_kafka(mocker,data_set,topic_name):
data_set_string=json.dumps(data_set)
with mock.patch.object(logger,'info') as mock_info:
monitor.save_event_in_kafka(data_set_string)
mock_info.assert_called_once_with('Got an event request for topic domain')
- #monitor.produce_events_in_kafka.assert_called_once_with(data_set,topic_name)
+# check save_event_in_kafka() topic length
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('monitor.produce_events_in_kafka')
+@mock.patch('monitor.kafka_server')
+def test_save_event_in_kafka_topic_len(server,mock_producer,topic_name):
+ body={'event':{'commonEventHeader':{'domain':''}}}
+ body=json.dumps(body)
+ monitor.save_event_in_kafka(body)
+ data_set={'event': {'commonEventHeader': {'domain': ''}}}
+ mock_producer.assert_called_once_with(data_set,'')
+
+
+
+#check produce_event_in_kafka() function
@mock.patch('monitor.KafkaProducer')
@mock.patch('monitor.producer')
def test_produce_events_in_kafka(mock_pro,mock_producer,data_set,topic_name):
monitor.produce_events_in_kafka(data_set,topic_name)
mock_pro.send.assert_called_with(topic_name,value=data_set)
mock_debug.assert_called_once_with('Event has been successfully posted into kafka bus')
+ path=os.getcwd()
+ os.remove(os.path.join(path,'collector.log'))
+
--- /dev/null
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import pytest
+import unittest
+import monitor
+from urllib import response
+from unittest import mock
+from unittest.mock import patch
+from pytest_mock import MockerFixture
+import logging
+import rest_dispatcher
+from gevent import socket
+from gevent import pywsgi
+import gevent
+
+@pytest.fixture
+def start_response():
+ sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ start_response=pywsgi.WSGIHandler(sock,"","")
+ return start_response
+
+#test test_notfound_404
+@patch('rest_dispatcher.base_url','')
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('rest_dispatcher.set_404_content')
+def test_notfound_404(mocker_dispatcher,mock_input,start_response):
+ environ={"REQUEST_METHOD": "POST","PATH_INFO":''}
+ mock_start_response= mock.Mock(start_response)
+ base_url=''
+ logger = logging.getLogger('monitor')
+ logger.setLevel(logging.DEBUG)
+ with mock.patch.object(logger,'debug') as mock_debug:
+ result=rest_dispatcher.notfound_404(environ, mock_start_response)
+ assert result==['template_404']
+
+#test call of
+@patch('rest_dispatcher.base_url','')
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+def test_call(mock_input,start_response):
+ environ={"REQUEST_METHOD": "POST","PATH_INFO":''}
+ mock_start_response= mock.Mock(start_response)
+ rest_obj=rest_dispatcher.PathDispatcher()
+ res=rest_obj.__call__(environ,mock_start_response)
+ assert ['template_404'] ==res
+
+
+@patch('rest_dispatcher.base_url')
+def test_set_404_content(mock_url):
+ mock_url.return_value=''
+ result=rest_dispatcher.set_404_content('')
+ assert result==None
+
+@pytest.fixture
+def path():
+ path='/eventListener/v5/events'
+ return path
+
+@pytest.fixture
+def method():
+ method='post'
+ return method
+
+def test_register(path,method):
+ rest_obj=rest_dispatcher.PathDispatcher()
+ res=rest_obj.register(path,method,None)
+ assert res==None
--- /dev/null
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[default]
+schema_file = "evel-test-collector/docs/att_interface_definition/hello.json"
+base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json
+throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
+test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = evel-test-collector/docs/att_interface_definition
+log_file =collector.log
+vel_domain = 127.0.0.1
+vel_port = 9999
+vel_path = "vendor_event_listener/event"
+vel_username =
+vel_password =user
+vel_topic_name =password
+kafka_server =kafka
+log_level = ERROR
+kafka_topic =topic
+schema_ref =https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/test_faultMns.html#components/schemas/NotifyNewAlarm
@pytest.fixture
def logger():
logger = logging.getLogger('DMaaP')
+ logger.setLevel(logging.DEBUG)
+ logger.setLevel(logging.ERROR)
logger.setLevel(logging.INFO)
return logger
+@pytest.fixture
+def enable_assert():
+ assert_value='enable'
+ return assert_value
+
+
+#test init function in appconfig
@mock.patch('app_config.AppConfig.setLogger')
@mock.patch('argparse.ArgumentParser.parse_args',
return_value=argparse.Namespace(config=get_config_path(),section='default'))
AppConfig.__init__(AppConfig)
mock_setLogger.assert_called_with('dmaap.log','error')
+
+#test kafka broker
def test_getKafkaBroker(kafkaBroker):
AppConfig.kafka_broker=kafkaBroker
res=AppConfig.getKafkaBroker(AppConfig)
assert res == kafkaBroker
+#test getLogger
def test_getLogger(logger):
AppConfig.logger=logger
res=AppConfig.getLogger(AppConfig)
assert res.getEffectiveLevel()==20
+
+#test logger level Info
def test_setLogger(logger):
log_file= 'dmaap.log'
log_level='INFO'
with mock.patch.object(logger,'info') as mock_info:
AppConfig.setLogger(AppConfig,log_file,log_level)
mock_info.assert_called_with('Log level INFO and log file dmaap.log : ')
+
+
+#test setLogger Debug
+def test_setLogger_debug(logger):
+ log_file= 'dmaap.log'
+ log_level= 'DEBUG'
+ with mock.patch.object(logger,'info') as mock_debug:
+ AppConfig.setLogger(AppConfig,log_file,log_level)
+ mock_debug.assert_called_with('Log level DEBUG and log file dmaap.log : ')
+
+
+#test setLogger error
+def test_setLogger_error(logger):
+ log_file= 'dmaap.log'
+ log_level='ERROR'
+ with mock.patch.object(logger,'info') as mock_error:
+ AppConfig.setLogger(AppConfig,log_file,log_level)
+ mock_error.assert_called_with('Log level ERROR and log file dmaap.log : ')
+
+
+
+#test AssertConfigValue
+def test_getAssertConfigValue(enable_assert):
+ AppConfig.enable_assert=enable_assert
+ res=AppConfig.getAssertConfigValue(AppConfig)
+ assert res==enable_assert
# limitations under the License.
#
-
+import os
+import argparse
+import configparser
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import abc
import pytest
from unittest import mock
from unittest.mock import patch
topic_list=ListTopics()
return topic_list
+@pytest.fixture
+def empty_topic_list():
+ empty_topic_list=EmptyListTopics()
+ return empty_topic_list
+
+
@pytest.fixture
def resCode():
responseCode=200
return responseCode
+def get_path():
+ project_path = os.getcwd()
+ project_path = project_path[:project_path.rfind('/')]
+ return project_path
+
+def get_config_path():
+ project_path=get_path()
+ config_path = os.path.join(
+ project_path,"ves/dmaapadapter/adapter/config/adapter.conf")
+ return config_path
+
+#test __init__ of EventConsumer
+@mock.patch('app_config.AppConfig.setLogger')
+@mock.patch('argparse.ArgumentParser.parse_args',
+return_value=argparse.Namespace(config=get_config_path(),section='default'))
+def test_init_event(parser,mock_setLogger):
+ EventConsumer.__init__(EventConsumer)
+ mock_setLogger.assert_called_with('dmaap.log','error')
+
+
+#test __init__ of TpoicConsumer
+@mock.patch('app_config.AppConfig.setLogger')
+@mock.patch('argparse.ArgumentParser.parse_args',
+return_value=argparse.Namespace(config=get_config_path(),section='default'))
+def test_init_consumer(parser,mock_setLogger):
+ TopicConsumer.__init__(TopicConsumer)
+ mock_setLogger.assert_called_with('dmaap.log','error')
+
+
@mock.patch('confluent_kafka.Consumer')
def test_consumeEvents(mock_consumer,prepareResponse,topic,resCode):
consumergroup="test"
assert resCode == prepareResponse.getResponseCode()
assert resMsg == prepareResponse.getResponseMsg()
+
+#test consumeEvents for break
+@mock.patch('confluent_kafka.Consumer')
+def test_consumeEvents_break(mock_consumer,prepareResponse,topic,resCode):
+ consumergroup="test"
+ consumerid="test1"
+ limit=0
+ timeout=1
+ mock_consumer.__name__ = 'subscribe'
+ mock_consumer.__name__ = 'poll'
+ mock_consumer.poll.return_value=None
+ resMsg='[]'
+ EventConsumer.consumeEvents(EventConsumer, prepareResponse, topic, consumergroup, consumerid,limit, timeout)
+ assert resCode == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+
+#test consumeEvents for Exception
+@mock.patch('confluent_kafka.Consumer')
+def test_consumeEvents_Exceptions(mock_consumer,prepareResponse,topic):
+ consumergroup="test"
+ consumerid="test1"
+ limit=abc
+ timeout=1
+ mock_consumer.__name__ = 'subscribe'
+ mock_consumer.__name__ = 'poll'
+ mock_consumer.poll.return_value=None
+ resMsg='"Failed to return the events"'
+ EventConsumer.consumeEvents(EventConsumer, prepareResponse, topic, consumergroup, consumerid,limit, timeout)
+ assert 500 == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+
def test_getTopics(mocker,prepareResponse,topic_list,resCode):
mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
return_value=topic_list)
assert resCode == prepareResponse.getResponseCode()
assert resMsg == prepareResponse.getResponseMsg()
+
+#test getTopics Exception
+def test_getTopics_Exceptions(mocker,prepareResponse):
+ mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+ return_value='')
+ TopicConsumer.getTopics(TopicConsumer, prepareResponse)
+ resMsg='"Failed to return the topics"'
+ assert 500 == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+
+#test ListALLTopics() function
def test_listAllTopics(mocker,prepareResponse,topic_list,resCode):
mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
return_value=topic_list)
assert resCode == prepareResponse.getResponseCode()
assert resMsg == prepareResponse.getResponseMsg()
+
+#test listAllTopics Exceptions
+def test_listAllTopics_Exceptions(mocker,prepareResponse):
+ mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+ return_value='')
+ TopicConsumer.listAllTopics(TopicConsumer, prepareResponse)
+ resMsg='"Failed to return the topics"'
+ assert 500 == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+
+#test getTopicDetails() function
def test_getTopicDetails(mocker,prepareResponse,topic,topic_list,resCode):
mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
return_value=topic_list)
assert resCode == prepareResponse.getResponseCode()
assert resMsg == prepareResponse.getResponseMsg()
+
+#test getTopicDetails Exceptions
+def test_getTopicDetails_Exceptions(mocker,prepareResponse,topic):
+ mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+ return_value='')
+ TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse,topic)
+ resMsg='"Failed to return the topics"'
+ assert 500 == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+
+#test getTopicDetails Topic exists
+def test_getTopicDetails_Topic_exists(mocker,prepareResponse,topic,empty_topic_list,resCode):
+ mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+ return_value=empty_topic_list)
+ TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse,topic)
+ resMsg='"Topic [test1] not found"'
+ assert 404 == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+
class ListTopics:
topics={"test1":"value1", "test2":"value2"}
+
+
+class EmptyListTopics:
+ topics={}
\ No newline at end of file
}
}
return data_set
+
+
+#test index()
+@mock.patch('dmaap_adapter.TopicConsumer')
+def test_index(self):
+ res=dmaap_adapter.index()
+ assert res=="Welcome !!"
+
+
+#test get_all_topics
+@mock.patch('flask.Flask.response_class')
+@mock.patch('dmaap_adapter.PrepareResponse')
+@mock.patch('dmaap_adapter.TopicConsumer')
+def test_get_all_topics(mock_consumer, mock_response, mock_app, prepareResponse, data_set):
+ mock_app.return_value = prepareResponse
+ res = dmaap_adapter.get_all_topics()
+ mock_consumer.getTopics(mock_response)
+ mock_consumer.getTopics.assert_called_with(mock_response)
+ assert res.responseCode == prepareResponse.getResponseCode()
+
+
+#test listall_topics
+@mock.patch('flask.Flask.response_class')
+@mock.patch('dmaap_adapter.PrepareResponse')
+@mock.patch('dmaap_adapter.TopicConsumer')
+def test_listall_topics(mock_consumer, mock_response, mock_app, prepareResponse, data_set):
+ mock_app.return_value = prepareResponse
+ res = dmaap_adapter.listall_topics()
+ mock_consumer.listAllTopics(mock_response)
+ mock_consumer.listAllTopics.assert_called_with(mock_response)
+ assert res.responseCode == prepareResponse.getResponseCode()
+
+
+#test getLimit()
+def test_getLimit():
+ limit ='abc'
+ res=dmaap_adapter.getLimit(limit)
+ assert res == -1
+
+# test getTimeout exception
+def test_getTimeout_exception():
+ timeout= 'abc'
+ res=dmaap_adapter.getTimeout(timeout)
+ assert res == 15
+
+#test getTimeout
+def test_getTimeout():
+ timeout = -1
+ response=dmaap_adapter.getTimeout(timeout)
+ if timeout<response:
+ timeout = 15
+ assert response==timeout
+
mocker_send_to_influxdb.assert_called_with(domain, hb_expected_pdata)
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_heartbeat_events(mocker_process_time, mocker_send_to_influxdb, hb_json, hb_data, hb_nonstringpdata, hb_expected_pdata, event_Timestamp):
+ domain = "heartbeat"
+ jobj={'additionalFields':{'eventTime':6}}
+ hb_ex='heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None lastEpochMicrosec=1639965574292938,sequence=357,startEpochMicrosec=1639965574292938,eventTime=6 1639985333218840000'
+ influxdb_connector.process_heartbeat_events(domain, jobj, hb_data, hb_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain, hb_ex)
+
+
# ------------------------------------------------------------------------------
# Address of pnfRegistration event.
# ------------------------------------------------------------------------------
mocker_send_to_influxdb.assert_called_with(domain, pnf_expected_pdata)
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_pnfRegistration_event(mock_process_time ,mocker_send_to_influxdb, pnf_json, pnf_data, pnf_nonstringpdata, pnf_expected_pdata, event_Timestamp):
+ domain = "pnfRegistration"
+ jobj={1:2,2:4}
+ non_pnf='pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087,1=2,2=4 1639985333218840000'
+ influxdb_connector.process_pnfRegistration_event(domain, jobj, pnf_data, pnf_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain, non_pnf)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_pnfRegistration_event_elif(mock_process_time ,mocker_send_to_influxdb, pnf_json, pnf_data, pnf_nonstringpdata, pnf_expected_pdata, event_Timestamp):
+ domain = "pnfRegistration"
+ jobj={'additionalFields': {'oamPort': 830}}
+ non_pnf='pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087,oamPort=830 1639985333218840000'
+ influxdb_connector.process_pnfRegistration_event(domain, jobj, pnf_data, pnf_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain, non_pnf)
+
+
+
# ------------------------------------------------------------------------------
# Address of fault event unit test case
# ------------------------------------------------------------------------------
mocker_send_to_influxdb.assert_called_with(domain, flt_expected_pdata)
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_fault_event(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp):
+ domain = "fault"
+ payload=flt_json
+ for key, val in payload.items():
+ if key != 'alarmAdditionalInformation' and val != "":
+ if isinstance(val, list):
+ influxdb_connector.process_fault_event(payload.get('alarmAdditionalInformation'),domain, flt_json, flt_data, flt_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain, flt_expected_pdata)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_fault_event_nonstr(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp):
+ domain = "fault"
+ jobj={2:2}
+ flt_ex='fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840,2=2 1639985333218840000'
+ influxdb_connector.process_fault_event(domain, jobj, flt_data, flt_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain, flt_ex)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_fault_event_nonstr_elif(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp):
+ domain = "fault"
+ jobj={'alarmAdditionalInformation':{'eventTime': 234, 'equipType': 345, 'vendor': 'VENDORA', 'model': 'FancyNextGeneration'}}
+ flt_ex='fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,vendor=VENDORA,model=FancyNextGeneration sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840,eventTime=234,equipType=345 1639985333218840000'
+ influxdb_connector.process_fault_event(domain, jobj, flt_data, flt_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain, flt_ex)
+
+
# ------------------------------------------------------------------------------
# Address of measurement event unit test_cases
# ------------------------------------------------------------------------------
mocker_send_to_influxdb.assert_called_with(domain, meas_expected_data)
+
+@patch('influxdb_connector.process_nonadditional_measurements')
+@patch('influxdb_connector.process_additional_measurements')
+@patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_measurement_events(mock_time,mocker_send_to_influxdb, mocker_additional, mocker_nonadditional, meas_json,
+ meas_data, meas_nonstringpdata, event_Id, start_Epoch_Microsec, last_Epoch_Microsec,
+ meas_expected_data, non_add_meas_data, add_meas_data, event_Timestamp):
+ domain = "measurement"
+ jobj={"test":[1,2,3],'networkSliceArray':[1,2,3]}
+ means_ex='measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000'
+ influxdb_connector.process_measurement_events('measurement',jobj, meas_data, meas_nonstringpdata, event_Id,
+ start_Epoch_Microsec, last_Epoch_Microsec)
+ influxdb_connector.process_additional_measurements(domain,event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+ mocker_nonadditional.process_nonadditional_measurements([], 'measurementnicperformance', event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+ mocker_send_to_influxdb.assert_called_with(domain, means_ex)
+
+
+
+@patch('influxdb_connector.process_nonadditional_measurements')
+@patch('influxdb_connector.process_additional_measurements')
+@patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_measurement_events_elif(mock_time,mocker_send_to_influxdb, mocker_additional, mocker_nonadditional, meas_json,
+ meas_data, meas_nonstringpdata, event_Id, start_Epoch_Microsec, last_Epoch_Microsec,
+ meas_expected_data, non_add_meas_data, add_meas_data, event_Timestamp):
+ domain = "measurement"
+ jobj={"test":{1:26,2:56},'networkSliceArray':{1:4,2:7}}
+ means_ex='measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1=26,2=56,1=4,2=7 1639985333218840000'
+ influxdb_connector.process_measurement_events('measurement',jobj, meas_data, meas_nonstringpdata, event_Id,
+ start_Epoch_Microsec, last_Epoch_Microsec)
+ influxdb_connector.process_additional_measurements(domain,event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+ mocker_additional.process_additional_measurements(add_meas_data.get('additionalMeasurements'), 'measurementadditionalmeasurements',
+ event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+
+ mocker_nonadditional.process_nonadditional_measurements([], 'measurementnicperformance', event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+ mocker_send_to_influxdb.assert_called_with(domain, means_ex)
+
+
+
@pytest.fixture
def add_meas_expected_pdata():
additional_expected_pdata = 'measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,name=LP-MWPS-RADIO-2,es=0,ses=1,cses=0,unavailability=0 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000'
return additional_expected_pdata
+
# ## process_additional_measurements unit test_case
@mock.patch('influxdb_connector.send_to_influxdb')
@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
mocker_send_to_influxdb.assert_called_with(domain, add_meas_expected_pdata)
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_additional_measurements(mock_time, mocker_send_to_influxdb, event_Id, start_Epoch_Microsec, last_Epoch_Microsec,
+ add_meas_data, add_meas_expected_pdata, event_Timestamp):
+ payload = [{1:23}]
+ domain = 'measurementadditionalmeasurements'
+ expected_pdata='measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1=23 1639985333218840000'
+ influxdb_connector.process_additional_measurements(payload, domain,
+ event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+ mocker_send_to_influxdb.assert_called_with(domain, expected_pdata)
+
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_additional_measurements_else(mock_time, mocker_send_to_influxdb, event_Id, start_Epoch_Microsec, last_Epoch_Microsec,
+ add_meas_data, add_meas_expected_pdata, event_Timestamp):
+ payload = [{1:{1:{67}}}]
+ domain = 'measurementadditionalmeasurements'
+ expected_pdata='measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1={67} 1639985333218840000'
+ influxdb_connector.process_additional_measurements(payload, domain,
+ event_Id, start_Epoch_Microsec, last_Epoch_Microsec)
+ mocker_send_to_influxdb.assert_called_with(domain, expected_pdata)
+
+
+
+
+
@pytest.fixture
def non_add_expected_data():
non_additional_expected_pdata = "measurementcpuusage,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,name=LP-MWPS-RADIO-2 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,hashMap={'es': '0', 'ses': '1', 'cses': '0', 'unavailability': '0'} 1639985333218840000"
func.assert_called_with(domain, thre_json, threshold_data, thres_nonstringpdata)
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_event(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+ jobj= {"test":"test"}
+ pdata= 'thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None'
+ domain = "thresholdCrossingAlert"
+ thres_data='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,system=None,thresholdCrossingFieldsVersion=4.0,criticality=MAJ,additionalProperties=up-and-down,thresholdCrossed=packetLoss,alertAction=SET,alertDescription=TCA,alertType=INTERFACE-ANOMALY,alertValue=1OSF,associatedAlertIdList=loss-of-signal,collectionTimestamp=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:28:56\\ +0000,dataCollector=data-lake,elementType=1OSF,eventSeverity=WARNING,eventStartTimestamp=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,networkService=from-a-to-b,possibleRootCause=always-the-others,eventTime=2021-12-20T07:28:56.443218Z,equipType=1OSF sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218 1639985333218840000'
+ influxdb_connector.process_thresholdCrossingAlert_event(domain,thre_json, pdata, thres_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain, thres_data)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_elif(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+ jobj={'additionalParameters': [{'addParameter': 'MAJ', 'abc':
+ {'additionalProperties': 'up-and-down'}, 'thresholdCrossed': 'packetLoss'}],}
+ domain = "thresholdCrossingAlert"
+ nonstr="thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,addParameter=MAJ,thresholdCrossed=packetLoss sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,abc={'additionalProperties': 'up-and-down'} 1639985333218840000"
+ influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain, nonstr)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_elif_elif(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+ jobj={'additionalParameters': [{'addParameter': 'MAJ', 'hashMap':
+ {'additionalProperties':67}, 'thresholdCrossed': 'packetLoss'}],}
+ domain = "thresholdCrossingAlert"
+ nonstr='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,addParameter=MAJ,thresholdCrossed=packetLoss sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,additionalProperties=67 1639985333218840000'
+ influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain, nonstr)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_event_elif(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+ jobj= {1:2}
+ domain = "thresholdCrossingAlert"
+ nonstr='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,1=2 1639985333218840000'
+ influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain,nonstr)
+
+
+@mock.patch('influxdb_connector.send_to_influxdb')
+@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000')
+def test_process_thresholdCrossingAlert_event_nonstr(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp):
+ jobj= {'additionalFields': {'eventTime': 2}}
+ domain = "thresholdCrossingAlert"
+ nonstr='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,eventTime=2 1639985333218840000'
+ influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata)
+ mocker_send_to_influxdb.assert_called_with(domain,nonstr)
+
+
+#.................................................................................
# ## save_event_in_db unit test_cases.
+#....................................................................................
+
@patch('influxdb_connector.logger')
@pytest.mark.parametrize("key", [("heartbeat"), ("pnfRegistration"), ("measurement"), ("fault"), ("thresholdCrossingAlert")])
def test_save_event_in_db(mock_logger, key, hb_json, hb_data, hb_nonstringpdata, pnf_json, pnf_data, pnf_nonstringpdata,
influxdb_connector.save_event_in_db(data_set)
func.assert_called_with('thresholdCrossingAlert', thre_json, threshold_data, thres_nonstringpdata)
+
+
+@patch('influxdb_connector.logger')
+def test_save_event_in_db_localhost(mock_logger):
+ data_set = {'event':{'commonEventHeader':{'reportingEntityName':'LOCALHOST','domain':'heartbeat','startEpochMicrosec':'1639965574292938','sourceId':'1223'}}}
+ try:
+ res=influxdb_connector.save_event_in_db(json.dumps(data_set))
+ except Exception:
+ pytest.fail('Exception occured while saving data')
+ assert res==None
+
+
+@patch('influxdb_connector.logger')
+def test_save_event_in_db_comman(mock_logger):
+ data_set = {'event':{'commonEventHeader':{'reportingEntityName':'LOCALHOST','domain':'heartbeat','startEpochMicrosec':'1639965574292938','sourceId':'1223','internalHeaderFields':{1:78}}}}
+ try:
+ res=influxdb_connector.save_event_in_db(json.dumps(data_set))
+ except Exception:
+ pytest.fail('Exception occured while saving data')
+ assert res==None
+
+
+
+@pytest.fixture
+def event():
+ event="domain"
+ return event
+
+
+@pytest.fixture
+def p_data():
+ p_data='heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1'
+ return p_data
+
+
+#send_to_influxdb unittest
+@patch('influxdb_connector.requests.post')
+@patch('influxdb_connector.logger')
+def test_send_to_influxdb(mock_logger,mock_post,event,p_data):
+ mock_post.return_value.status_code=201
+ try:
+ res=influxdb_connector.send_to_influxdb(event,p_data)
+ except Exception:
+ pytest.fail('Exception occured while saving data')
+ assert res==None
whitelist_externals = sh
commands = sh -c 'pip freeze > requirements.txt'
+
+[coverage:run]
+# exclude test folder from coverage report
+omit = *tests*
+
+
[testenv:code]
basepython = python3
deps=
# which streams the logs as they come in, rather than saving them
# all for the end of tests
commands =
- pytest --ignore=functionaltest --ignore=collector --cov {toxinidir} --cov-report xml --cov-report term-missing --cov-report html --cov-fail-under=70 --junitxml={toxinidir}/tmp/tests.xml
+ pytest --ignore=functionaltest --ignore=collector --cov {toxinidir} --cov-report xml --cov-report term-missing --cov-report html --cov-fail-under=70 --junitxml={toxinidir}/tmp/tests.xml
\ No newline at end of file