From d0b08354f126a0ff3c6d887ba2f42c0144280b12 Mon Sep 17 00:00:00 2001 From: dhivarprajakta Date: Tue, 18 Oct 2022 12:57:24 +0530 Subject: [PATCH] Write test cases for Standard Defined Validator SMO-99 Signed-off-by: dhivarprajakta Change-Id: I67feb41df1ff4c2bec69998e043103b153c5c7ac Signed-off-by: dhivarprajakta --- .../evel-test-collector/docs/schema/README.md | 2 +- tests/collector/port_config.conf | 32 - tests/collector/stdDefined.json | 21 + tests/collector/test_collector.conf | 38 + tests/collector/test_monitor.py | 1215 +++++++++++------ tests/collector/test_rest_dispatcher.py | 82 +- tests/collector/wrong_config.conf | 32 - tests/dmaap_adaptor/test_consumer.py | 214 +-- tests/influxdb_connector/events.txt | 3 +- tests/influxdb_connector/test_influxdb_events.py | 1372 +++++++++++++++----- 10 files changed, 2069 insertions(+), 942 deletions(-) delete mode 100644 tests/collector/port_config.conf create mode 100644 tests/collector/stdDefined.json delete mode 100644 tests/collector/wrong_config.conf diff --git a/collector/evel-test-collector/docs/schema/README.md b/collector/evel-test-collector/docs/schema/README.md index d73e2a0..a2e43d8 100644 --- a/collector/evel-test-collector/docs/schema/README.md +++ b/collector/evel-test-collector/docs/schema/README.md @@ -1 +1 @@ -NOTE: This folder contains yaml schema folder \ No newline at end of file +NOTE: This folder contains yaml schema files diff --git a/tests/collector/port_config.conf b/tests/collector/port_config.conf deleted file mode 100644 index 0ec246b..0000000 --- a/tests/collector/port_config.conf +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright 2021 Xoriant Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -[default] -schema_file = "evel-test-collector/docs/att_interface_definition/hello.json" -base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json -throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json -test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json -yaml_schema_path = evel-test-collector/docs/att_interface_definition -log_file =collector.log -vel_domain = 127.0.0.1 -vel_port = 999 -vel_path = -vel_username =user -vel_password =password -vel_topic_name = -kafka_server =kafka -log_level = DEBUG -kafka_topic = -schema_ref =https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/test_faultMns.yaml#components/schemas/NotifyNewAlarm \ No newline at end of file diff --git a/tests/collector/stdDefined.json b/tests/collector/stdDefined.json new file mode 100644 index 0000000..aabdcbf --- /dev/null +++ b/tests/collector/stdDefined.json @@ -0,0 +1,21 @@ +{ + "$id": "https://example.com/person.schema.json", + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "Person", + "type": "object", + "properties": { + "firstName": { + "type": "string", + "description": "The person's first name." + }, + "lastName": { + "type": "string", + "description": "The person's last name." + }, + "age": { + "description": "Age in years which must be equal to or greater than zero.", + "type": "integer", + "minimum": 0 + } + } + } \ No newline at end of file diff --git a/tests/collector/test_collector.conf b/tests/collector/test_collector.conf index a1ce17e..6e44289 100755 --- a/tests/collector/test_collector.conf +++ b/tests/collector/test_collector.conf @@ -28,4 +28,42 @@ vel_password =password vel_topic_name = kafka_server = kafka-server kafka_topic =topic +schema_ref =https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm + +[invalid_config] +schema_file = "evel-test-collector/docs/att_interface_definition/hello.json" +base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json +throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json +test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json +yaml_schema_path = evel-test-collector/docs/att_interface_definition +log_file =collector.log +vel_domain = 127.0.0.1 +vel_port = 9999 +vel_path = "vendor_event_listener/event" +vel_username = +vel_password =user +vel_topic_name =password +kafka_server =kafka +log_level = ERROR +kafka_topic =topic +schema_ref =https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/test_faultMns.html#components/schemas/NotifyNewAlarm + + +[wrong_port] +schema_file = "evel-test-collector/docs/att_interface_definition/hello.json" +base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json +throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json +test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json +yaml_schema_path = evel-test-collector/docs/att_interface_definition +log_file =collector.log +vel_domain = 127.0.0.1 +vel_port = 999 +vel_path = "vendor_event_listener/event" +vel_username = +vel_password =user +vel_topic_name =password +kafka_server =kafka +log_level = ERROR +kafka_topic =topic +schema_ref =https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/test_faultMns.yaml#components/schemas/NotifyNewAlarm diff --git a/tests/collector/test_monitor.py b/tests/collector/test_monitor.py index d2ab598..940eece 100644 --- a/tests/collector/test_monitor.py +++ b/tests/collector/test_monitor.py @@ -13,526 +13,937 @@ # limitations under the License. # -import shutil import os import pytest import unittest import monitor import argparse import configparser -from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from unittest import mock from unittest.mock import patch from unittest.mock import MagicMock import logging -from pytest_mock import MockerFixture from gevent import socket -from json import dumps from gevent import pywsgi -import gevent import json -import jsonschema -from kafka import KafkaProducer + + +test_html_schema_file = "test_faultMns.html" + +test_yaml_schema_file = "test_faultMns.yaml" + +test_collector_path = "tests/collector" + +invalid_stdDefined_schema = "tests/collector/stdDefined.json" def get_path(): project_path = os.getcwd() return project_path -def get_config_path(): - project_path=get_path() - config_path = os.path.join( - project_path,"tests/collector/test_collector.conf") - return config_path -def get_wrong_config_path(): - project_path=get_path() - config_path = os.path.join( - project_path,"tests/collector/wrong_config.conf") +def get_config_path(): + project_path = get_path() + config_path = os.path.join(project_path, "tests/collector/test_collector.conf") return config_path -def get_wrong_config_port_path(): - project_path=get_path() - config_path = os.path.join( - project_path,"tests/collector/port_config.conf") - return config_path def get_schema_path(): - project_path=get_path() + project_path = get_path() schema_path = os.path.join( - project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json") + project_path, + "collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json", + ) return schema_path + @pytest.fixture def body(): - body={"event": {"commonEventHeader": {"domain": "measurement","eventId": "11","eventName": "","eventType": "platform","lastEpochMicrosec": 0,"priority": "Normal","reportingEntityId": "localhost","reportingEntityName": "localhost","sequence": 0,"sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a","sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a","startEpochMicrosec": 1642961518.919,"version": "4.0","vesEventListenerVersion": "7.2.1"}}} - body=json.dumps(body) + body = { + "event": { + "commonEventHeader": { + "domain": "measurement", + "eventId": "11", + "eventName": "", + "eventType": "platform", + "lastEpochMicrosec": 0, + "priority": "Normal", + "reportingEntityId": "localhost", + "reportingEntityName": "localhost", + "sequence": 0, + "sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a", + "sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a", + "startEpochMicrosec": 1642961518.919, + "version": "4.0", + "vesEventListenerVersion": "7.2.1", + } + } + } + body = json.dumps(body) return body + @pytest.fixture def start_response(): - sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) - start_response=pywsgi.WSGIHandler(sock,"","") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + start_response = pywsgi.WSGIHandler(sock, "", "") return start_response + @pytest.fixture def schema(): schema_path = get_schema_path() - schema=json.load(open(schema_path, 'r')) + schema = json.load(open(schema_path, "r")) return schema + @pytest.fixture def data_set(): - data_set={"event": {"commonEventHeader": {"domain": "topic" }}} + data_set = {"event": {"commonEventHeader": {"domain": "topic"}}} return data_set + @pytest.fixture def topic_name(): - topic_name="topic" + topic_name = "topic" return topic_name def test_init(): - obj=monitor.JSONObject({}) - assert obj.__dict__=={} - - -#@pytest.mark.skip -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener(mock_monitor,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - logger = logging.getLogger('monitor') - logger.setLevel(logging.DEBUG) - with mock.patch.object(logger,'debug') as mock_debug: - result=list(monitor.listener(environ,mock_start_response,schema)) - assert result==[b''] - - -#test for listener Exception -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener_exp(mock_monitor,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2","PATH_INFO": '/eventListener/v5/events'} - body={} - mock_input.read.return_value=json.dumps(body) - mock_start_response= mock.Mock(start_response) + obj = monitor.JSONObject({}) + assert obj.__dict__ == {} + + +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_listener(mock_monitor, mock_input, body, start_response, schema): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + result = list(monitor.listener(environ, mock_start_response, schema)) + assert result == [b""] + + +# test for listener Exception +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_listener_exp(mock_monitor, mock_input, body, start_response, schema): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + "PATH_INFO": "/eventListener/v7/events", + } + body = {} + mock_input.read.return_value = json.dumps(body) + mock_start_response = mock.Mock(start_response) project_path = os.getcwd() - dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-1.json")} + dict_schema = { + "v7": os.path.join( + project_path, + "collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json", + ) + } try: result = list(monitor.listener(environ, mock_start_response, dict_schema)) except TypeError: assert result == None except Exception: - pytest.fail('unexcepted error') - - -#test b64decode credentials in listener() -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener_b64decode(mock_monitor,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "None None", "CONTENT_LENGTH": "2","PATH_INFO": '/eventListener/v5/events'} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - logger = logging.getLogger('monitor') + pytest.fail("unexcepted error") + + +# test b64decode credentials in listener() +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_b64_credientials_valid_or_not( + mock_monitor, mock_input, body, start_response, schema +): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "None None", + "CONTENT_LENGTH": "2", + "PATH_INFO": "/eventListener/v7/events", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + logger = logging.getLogger("monitor") logger.setLevel(logging.WARN) project_path = os.getcwd() - dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-1.json")} - with mock.patch.object(logger,'warn') as mock_warn: + dict_schema = { + "v7": os.path.join( + project_path, + "collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json", + ) + } + with mock.patch.object(logger, "warn") as mock_warn: result = list(monitor.listener(environ, mock_start_response, dict_schema)) mock_monitor.assert_called_with(body) -#test listener pending command list -@patch('monitor.vel_username','user') -@patch('monitor.vel_password','password') -@patch('monitor.logger',logging.getLogger('monitor')) -@patch('monitor.pending_command_list',[1,2,3]) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener_command_list(mock_monitor,mock_input,body,start_response,schema,topic_name): - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2", 'PATH_INFO': '/eventListener/v5/events'} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - logger = logging.getLogger('monitor') - logger.setLevel(logging.DEBUG) +# test listener pending command list +@patch("monitor.vel_username", "user") +@patch("monitor.vel_password", "password") +@patch("monitor.logger", logging.getLogger("monitor")) +@patch("monitor.pending_command_list", [1, 2, 3]) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_pending_command_list( + mock_monitor, mock_input, body, start_response, schema, topic_name +): + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + "PATH_INFO": "/eventListener/v7/events", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) project_path = os.getcwd() - dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json")} - with mock.patch.object(logger,'debug') as mock_debug: - result = list(monitor.listener(environ, mock_start_response, dict_schema)) - assert [b'[1, 2, 3]'] ==result - - -#test listener if pending_command list is none -@patch('monitor.vel_username','user') -@patch('monitor.vel_password','password') -@patch('monitor.logger',logging.getLogger('monitor')) -@patch('monitor.pending_command_list',None) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener_command_list_none(mock_monitor,mock_input,body,start_response,schema,topic_name): - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2", 'PATH_INFO': '/eventListener/v5/events'} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - logger = logging.getLogger('monitor') + dict_schema = { + "v7": os.path.join( + project_path, + "collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json", + ) + } + result = list(monitor.listener(environ, mock_start_response, dict_schema)) + assert [b"[1, 2, 3]"] == result + + +# test listener if pending_command list is none +@patch("monitor.vel_username", "user") +@patch("monitor.vel_password", "password") +@patch("monitor.logger", logging.getLogger("monitor")) +@patch("monitor.pending_command_list", None) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_pending_command_list_none( + mock_monitor, mock_input, body, start_response, schema, topic_name +): + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + "PATH_INFO": "/eventListener/v7/events", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + logger = logging.getLogger("monitor") logger.setLevel(logging.DEBUG) project_path = os.getcwd() - dict_schema = {"v5": os.path.join(project_path,"collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-1.json")} - with mock.patch.object(logger,'debug') as mock_debug: + dict_schema = { + "v7": os.path.join( + project_path, + "collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json", + ) + } + with mock.patch.object(logger, "debug") as mock_debug: result = list(monitor.listener(environ, mock_start_response, dict_schema)) - assert [b'']==result - - -#test jsonschema error -@patch('monitor.vel_username','user') -@patch('monitor.vel_password','password') -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener_schema_none(mock_monitor,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2","PATH_INFO": '/eventListener/v5/events'} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - project_path=os.getcwd() - dict_schema =os.path.join(project_path,"tests/collector/schema.json") + assert [b""] == result + + +# test jsonschema error +@patch("monitor.vel_username", "user") +@patch("monitor.vel_password", "password") +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_jsonschema_error(mock_monitor, mock_input, body, start_response, schema): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2" + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + project_path = os.getcwd() + dict_schema = os.path.join(project_path, "tests/collector/schema.json") os._exit = mock.MagicMock() list(monitor.listener(environ, mock_start_response, dict_schema)) - assert os._exit.called - - - -#test jsonschema validation exception -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener_jsonschema_validation(mock_monitor,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - body={"event": {"commonEventHeader": {"domain": 6,"eventId": "11","eventName": "","eventType": "platform","lastEpochMicrosec": 0,"priority": "Normal","reportingEntityId": "localhost","reportingEntityName": "localhost","sequence": 0,"sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a","sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a","startEpochMicrosec": 1642961518.919,"version": "4.0","vesEventListenerVersion": "7.2.1"}}} - body=json.dumps(body) - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - result=list(monitor.listener(environ,mock_start_response,schema)) - assert [b'']==result - - - -#test if schema is none -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener_schma_is_empty(mock_monitor,mock_input,body,start_response): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - result=list(monitor.listener(environ,mock_start_response,None)) - assert []==result - - - -#test listener() Exception event is invalid for unexpected reason -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener_Event_Invalid(mock_monitor,mock_input,body,start_response): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - body={} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - result=list(monitor.listener(environ,mock_start_response,None)) - assert []==result - - - -#check main() function -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('argparse.ArgumentParser.parse_args', - return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default')) -@mock.patch('gevent.pywsgi.WSGIServer.serve_forever') -@mock.patch('monitor.logger', logging.getLogger('monitor')) -def test_main(server,parser,body): - argv=None - result=monitor.main(argv=None) - assert 0==result - - - -#test main() function argv is None -@patch('monitor.logger') -@mock.patch('argparse.ArgumentParser.parse_args', - return_value=argparse.Namespace(verbose=2, api_version='7',config=get_config_path(),section='default')) -@mock.patch('gevent.pywsgi.WSGIServer.serve_forever') -def test_main_argv(server,parser,logger,body): - argv='' - logger.return_value=logging.getLogger('monitor') + assert os._exit.called + + +# test jsonschema validation exception +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_listener_jsonschema_validation( + mock_monitor, mock_input, body, start_response, schema +): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + body = { + "event": { + "commonEventHeader": { + "domain": 6, + "eventId": "11", + "eventName": "", + "eventType": "platform", + "lastEpochMicrosec": 0, + "priority": "Normal", + "reportingEntityId": "localhost", + "reportingEntityName": "localhost", + "sequence": 0, + "sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a", + "sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a", + "startEpochMicrosec": 1642961518.919, + "version": "4.0", + "vesEventListenerVersion": "7.2.1", + } + } + } + body = json.dumps(body) + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + result = list(monitor.listener(environ, mock_start_response, schema)) + assert [b""] == result + + +# test if schema is none +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_listener_schma_is_empty(mock_monitor, mock_input, body, start_response): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + result = list(monitor.listener(environ, mock_start_response, None)) + assert [] == result + + +# test listener() Exception event is invalid for unexpected reason +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_listener_Event_Invalid(mock_monitor, mock_input, body, start_response): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + body = {} + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + result = list(monitor.listener(environ, mock_start_response, None)) + assert [] == result + + +# check main() function +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace( + verbose=None, api_version="7", config=get_config_path(), section="default" + ), +) +@mock.patch("gevent.pywsgi.WSGIServer.serve_forever") +@mock.patch("monitor.logger", logging.getLogger("monitor")) +def test_main(server, parser, body): + argv = None + result = monitor.main(argv) + assert 0 == result + + +# test main() function argv is None +@patch("monitor.logger") +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace( + verbose=2, api_version="7", config=get_config_path(), section="default" + ), +) +@mock.patch("gevent.pywsgi.WSGIServer.serve_forever") +def test_main_argv(server, parser, logger, body): + argv = "" + logger.return_value = logging.getLogger("monitor") try: - result=monitor.main(argv) + result = monitor.main(argv) except TypeError: assert result == None except Exception: - pytest.fail('unexcepted error') - - - -#test platform.system in main -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('argparse.ArgumentParser.parse_args', - return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default')) -@mock.patch('gevent.pywsgi.WSGIServer.serve_forever') -def test_main_platform(server,parser,body): - argv=None - sys = mock.MagicMock() + pytest.fail("unexcepted error") + + +# test platform.system in main +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace( + verbose=None, api_version="7", config=get_config_path(), section="default" + ), +) +@mock.patch("gevent.pywsgi.WSGIServer.serve_forever") +def test_main_platform(server, parser, body): + argv = None try: - with patch('platform.system', MagicMock(return_value='Windows')): - res=monitor.main(argv) + with patch("platform.system", MagicMock(return_value="Windows")): + res = monitor.main(argv) except RuntimeError: assert res == None except Exception: - pytest.fail('Exiting because of exception') - - -#test vel_port in main -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('argparse.ArgumentParser.parse_args', - return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_port_path(),section='default')) -@mock.patch('gevent.pywsgi.WSGIServer.serve_forever') -def test_main_vel_port(server,parser,body): - argv='' - res=monitor.main(argv) + pytest.fail("Exiting because of exception") + + +# test vel_port in main +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace( + verbose=None, + api_version="7", + config=get_config_path(), + section="wrong_port", + ), +) +@mock.patch("gevent.pywsgi.WSGIServer.serve_forever") +def test_main_vel_port(server, parser, body): + argv = "" + res = monitor.main(argv) assert res == 2 - # test vel_path in main -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('argparse.ArgumentParser.parse_args', - return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_path(),section='default')) -@mock.patch('gevent.pywsgi.WSGIServer.serve_forever') -def test_main_path(server,parser,body): - argv=None +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace( + verbose=None, + api_version="7", + config=get_config_path(), + section="invalid_config", + ), +) +@mock.patch("gevent.pywsgi.WSGIServer.serve_forever") +def test_main_path(server, parser, body): + argv = None try: result = monitor.main(argv) except RuntimeError: assert result == None except Exception: - pytest.fail('fail beacuase of exception') - + pytest.fail("fail beacuase of exception") @pytest.fixture -def vel_schema_path(): +def vel_schema_file_path(): config = configparser.ConfigParser() - config_file=get_config_path() + config_file = get_config_path() config.read(config_file) - ref = config.get('default', 'schema_file') + ref = config.get("default", "schema_file") return ref + # check listener() vel_schema, if it exists -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('argparse.ArgumentParser.parse_args', - return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default')) -@mock.patch('gevent.pywsgi.WSGIServer.serve_forever') -def test_main_vel_schema_path(server,parser,vel_schema_path): - argv=None - with mock.patch('os.path.exists') as m: - m.return_value=vel_schema_path - result=monitor.main(argv) - assert 0==result - - - -#test unhandle exception -@patch('monitor.DEBUG',True) -@mock.patch('argparse.ArgumentParser.parse_args', - return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_port_path(),section='default')) -@mock.patch('gevent.pywsgi.WSGIServer.serve_forever') -def test_main_unhandle_exception(server,parser,body): - argv=None - result=None +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace( + verbose=None, api_version="7", config=get_config_path(), section="default" + ), +) +@mock.patch("gevent.pywsgi.WSGIServer.serve_forever") +def test_main_vel_schema_file_path(server, parser, vel_schema_file_path): + argv = None + with mock.patch("os.path.exists") as m: + m.return_value = vel_schema_file_path + result = monitor.main(argv) + assert 0 == result + + +# test unhandle exception +@patch("monitor.DEBUG", True) +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace( + verbose=None, + api_version="7", + config=get_config_path(), + section="wrong_port", + ), +) +@mock.patch("gevent.pywsgi.WSGIServer.serve_forever") +def test_main_unhandle_exception(server, parser, body): + argv = None + result = None try: result = monitor.main(argv) except RuntimeError: assert result == None except Exception: - pytest.fail('Exiting because of exception') - - - -#check test_listener() function -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_TestControl_listener(mock_monitor,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - result=list(monitor.test_listener(environ,mock_start_response,schema)) - assert ['']==result - - - -#check test_listener() GET method -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_TestControl_listener_get_method(mock_monitor,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "GET","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - response= ['{"event": {"commonEventHeader": {"domain": "measurement", "eventId": "11", "eventName": "", "eventType": "platform", "lastEpochMicrosec": 0, "priority": "Normal", "reportingEntityId": "localhost", "reportingEntityName": "localhost", "sequence": 0, "sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a", "sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a", "startEpochMicrosec": 1642961518.919, "version": "4.0", "vesEventListenerVersion": "7.2.1"}}}'] - result=list(monitor.test_listener(environ,mock_start_response,schema)) - assert response==result - - -#test test_listener() jsonschema error -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_TestControl_listener_schema_error(mocker,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - project_path=os.getcwd() - schema_path =os.path.join(project_path,"tests/collector/schema.json") - schema=json.load(open(schema_path, 'r')) - result=list(monitor.test_listener(environ, mock_start_response,schema)) - assert ['']==result - - -#test test_listener() jsonschema validation error -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_TestControl_listener_schema_validation_error(mocker,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - body={"event": {"commonEventHeader": {"domain": 6,"eventId": "11","eventName": "","eventType": "platform","lastEpochMicrosec": 0,"priority": "Normal","reportingEntityId": "localhost","reportingEntityName": "localhost","sequence": 0,"sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a","sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a","startEpochMicrosec": 1642961518.919,"version": "4.0","vesEventListenerVersion": "7.2.1"}}} - body=json.dumps(body) - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - result=list(monitor.test_listener(environ, mock_start_response,schema)) - assert ['']==result - + pytest.fail("Exiting because of exception") + + +# check test_listener() function +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_TestControl_listener(mock_monitor, mock_input, body, start_response, schema): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + result = list(monitor.test_listener(environ, mock_start_response, schema)) + assert [""] == result + + +# check test_listener() GET method +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_TestControl_listener_get_method( + mock_monitor, mock_input, body, start_response, schema +): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "GET", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + response = [ + '{"event": {"commonEventHeader": {"domain": "measurement", "eventId": "11", "eventName": "", "eventType": "platform", "lastEpochMicrosec": 0, "priority": "Normal", "reportingEntityId": "localhost", "reportingEntityName": "localhost", "sequence": 0, "sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a", "sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a", "startEpochMicrosec": 1642961518.919, "version": "4.0", "vesEventListenerVersion": "7.2.1"}}}' + ] + result = list(monitor.test_listener(environ, mock_start_response, schema)) + assert response == result + + +# test test_listener() jsonschema error +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_TestControl_listener_schema_error( + mocker, mock_input, body, start_response, schema +): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + project_path = os.getcwd() + schema_path = os.path.join(project_path, "tests/collector/schema.json") + schema = json.load(open(schema_path, "r")) + result = list(monitor.test_listener(environ, mock_start_response, schema)) + assert [""] == result + + +# test test_listener() jsonschema validation error +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_TestControl_listener_schema_validation_error( + mocker, mock_input, body, start_response, schema +): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + body = { + "event": { + "commonEventHeader": { + "domain": 6, + "eventId": "11", + "eventName": "", + "eventType": "platform", + "lastEpochMicrosec": 0, + "priority": "Normal", + "reportingEntityId": "localhost", + "reportingEntityName": "localhost", + "sequence": 0, + "sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a", + "sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a", + "startEpochMicrosec": 1642961518.919, + "version": "4.0", + "vesEventListenerVersion": "7.2.1", + } + } + } + body = json.dumps(body) + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + result = list(monitor.test_listener(environ, mock_start_response, schema)) + assert [""] == result @pytest.fixture def schema_wrong(): project_path = get_path() - schema_path = os.path.join( - project_path, "tests/collector/schema.json") - schema = json.load(open(schema_path, 'r')) + schema_path = os.path.join(project_path, "tests/collector/schema.json") + schema = json.load(open(schema_path, "r")) return schema -#test test_listener() exception TestControl input not valid -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_TestControl_listener_exception(mocker,mock_input,body,start_response,schema_wrong): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - body={} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - result=list(monitor.test_listener(environ, mock_start_response,schema_wrong)) - assert ['']==result - - - -#check test_listener() Missing schema -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_TestControl_listener_Missing_schema(mocker,mock_input,body,start_response): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - result=list(monitor.test_listener(environ, mock_start_response,None)) - assert ['']==result - - -#check test_listener() Invalid Input -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_TestControl_Listener_Input_invalid(mocker,mock_input,body,start_response): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - body={} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) - result=list(monitor.test_listener(environ, mock_start_response,None)) - assert ['']==result - - -#test listener() get method -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('monitor.save_event_in_kafka') -def test_listener_get_method(mock_monitor,mock_input,body,start_response,schema): - mock_input.__name__ = 'read' - environ={"REQUEST_METHOD": "GET","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"} - mock_input.read.return_value=body - mock_start_response= mock.Mock(start_response) +# test test_listener() exception TestControl input not valid +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_TestControl_listener_exception( + mocker, mock_input, body, start_response, schema_wrong +): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + body = {} + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + result = list(monitor.test_listener(environ, mock_start_response, schema_wrong)) + assert [""] == result + + +# check test_listener() Missing schema +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_TestControl_listener_Missing_schema(mocker, mock_input, body, start_response): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + result = list(monitor.test_listener(environ, mock_start_response, None)) + assert [""] == result + + +# check test_listener() Invalid Input +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_TestControl_Listener_Input_invalid(mocker, mock_input, body, start_response): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + body = {} + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) + result = list(monitor.test_listener(environ, mock_start_response, None)) + assert [""] == result + + +# test listener() get method +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_listener_get_method(mock_monitor, mock_input, body, start_response, schema): + mock_input.__name__ = "read" + environ = { + "REQUEST_METHOD": "GET", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + mock_input.read.return_value = body + mock_start_response = mock.Mock(start_response) result = list(monitor.listener(environ, mock_start_response, schema)) - assert [b'POST /eventListener/v7'] == result + assert [b"POST /eventListener/v7"] == result - -#check save_event_in_kafka() function -@mock.patch('monitor.kafka_server') -@mock.patch('monitor.logger', logging.getLogger('monitor')) -def test_save_event_in_kafka(mocker,data_set,topic_name): - data_set_string=json.dumps(data_set) - logger = logging.getLogger('monitor') +# check save_event_in_kafka() function +@mock.patch("monitor.kafka_server") +@mock.patch("monitor.logger", logging.getLogger("monitor")) +def test_save_event_in_kafka(mocker, data_set, topic_name): + data_set_string = json.dumps(data_set) + logger = logging.getLogger("monitor") logger.setLevel(logging.INFO) - mocker.patch('monitor.produce_events_in_kafka') - with mock.patch.object(logger,'info') as mock_info: + mocker.patch("monitor.produce_events_in_kafka") + with mock.patch.object(logger, "info") as mock_info: monitor.save_event_in_kafka(data_set_string) - mock_info.assert_called_once_with('Got an event request for topic domain') + mock_info.assert_called_once_with("Got an event request for topic domain") # check save_event_in_kafka() topic length -@patch('monitor.logger',logging.getLogger('monitor')) -@mock.patch('monitor.produce_events_in_kafka') -@mock.patch('monitor.kafka_server') -def test_save_event_in_kafka_topic_len(server,mock_producer,topic_name): - body={'event':{'commonEventHeader':{'domain':''}}} - body=json.dumps(body) +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("monitor.produce_events_in_kafka") +@mock.patch("monitor.kafka_server") +def test_save_event_in_kafka_topic_len(server, mock_producer, topic_name): + body = {"event": {"commonEventHeader": {"domain": ""}}} + body = json.dumps(body) monitor.save_event_in_kafka(body) - data_set={'event': {'commonEventHeader': {'domain': ''}}} - mock_producer.assert_called_once_with(data_set,'') + data_set = {"event": {"commonEventHeader": {"domain": ""}}} + mock_producer.assert_called_once_with(data_set, topic_name) + + +# test listener standardDefineFields +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("monitor.stnd_define_event_validation") +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_schema_ref_url( + mock_monitor, mock_input, mock_std, body, start_response, schema, topic_name +): + """Test case for checking schema ref url is from 3gpp domain""" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + body = { + "event": {"stndDefinedFields": {"schemaReference": "https://forge.3gpp.org"}} + } + mock_input.read.return_value = json.dumps(body) + mock_start_response = mock.Mock(start_response) + schema = json.load(open(invalid_stdDefined_schema, "r")) + result = list(monitor.listener(environ, mock_start_response, schema)) + mock_std.assert_called_once_with(None, body) + + +# test listener standardDefineFields +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("monitor.stnd_define_event_validation") +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_gerrit_schema_ref_url( + mock_monitor, mock_input, mock_std, body, start_response, schema, topic_name +): + """Test case for checking schema ref url is from gerrit domain""" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + body = { + "event": { + "stndDefinedFields": {"schemaReference": "https://gerrit.o-ran-sc.org"} + } + } + mock_input.read.return_value = json.dumps(body) + mock_start_response = mock.Mock(start_response) + schema = json.load(open(invalid_stdDefined_schema, "r")) + result = list(monitor.listener(environ, mock_start_response, schema)) + mock_std.assert_called_once_with(None, body) + + +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("monitor.save_event_in_kafka") +def test_listener_stdDefine_schema_ref( + mock_monitor, mock_input, body, start_response, schema, topic_name +): + """test case for standardDefineFields schema_ref not found""" + environ = { + "REQUEST_METHOD": "POST", + "wsgi.input": mock_input, + "CONTENT_TYPE": "application/json", + "HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", + "CONTENT_LENGTH": "2", + } + body = {"event": {"stndDefinedFields": {"schemaReference": ""}}} + mock_input.read.return_value = json.dumps(body) + mock_start_response = mock.Mock(start_response) + schema = json.load(open(invalid_stdDefined_schema, "r")) + result = list(monitor.listener(environ, mock_start_response, schema)) + assert [b""] == result +@pytest.fixture +def vel_schema_path(): + path = os.getcwd() + vel_schema_path = os.path.join(path, "collector/evel-test-collector/docs/schema") + return vel_schema_path -#check produce_event_in_kafka() function -@mock.patch('monitor.KafkaProducer') -@mock.patch('monitor.producer') -@mock.patch('monitor.logger', logging.getLogger('monitor')) -def test_produce_events_in_kafka(mock_pro,mock_producer,data_set,topic_name): - logger = logging.getLogger('monitor') - logger.setLevel(logging.DEBUG) - with mock.patch.object(logger,'debug') as mock_debug: - monitor.produce_events_in_kafka(data_set,topic_name) - mock_pro.send.assert_called_with(topic_name,value=data_set) - mock_debug.assert_called_once_with('Event has been successfully posted into kafka bus') - path=os.getcwd() - os.remove(os.path.join(path,'collector.log')) + +@pytest.fixture +def schema_ref(): + config = configparser.ConfigParser() + config_file = get_config_path() + config.read(config_file) + ref = config.get("default", "schema_ref") + return ref +# test check_schema_file_exist +@patch("monitor.logger", logging.getLogger("monitor")) +def test_check_schema_file_exist(vel_schema_path, schema_ref): + result = monitor.check_schema_file_exist(vel_schema_path, schema_ref) + path = os.getcwd() + assert result == os.path.join( + path, + "collector/evel-test-collector/docs/schema/forge.3gpp.org_rep_sa5_MnS_blob_SA88-Rel16_OpenAPI/faultMnS.yaml", + ) + + +#loading schema_ref from config file from invalid_config section +@pytest.fixture +def html_schema_file_ref(): + config = configparser.ConfigParser() + config_file = get_config_path() + config.read(config_file) + ref = config.get("invalid_config", "schema_ref") + return ref + + +# test check schema file path not exist and checking downloaded file content is yaml or html +@patch("monitor.logger", logging.getLogger("monitor")) +@unittest.mock.patch("os.system") +def test_check_schema_file_contain_html(mock_sys, vel_schema_path, html_schema_file_ref): + """Test case for checking downloded schema ref file is html""" + f = open(test_html_schema_file, "w+") + f.write("") + f.close() + path = os.getcwd() + mock_sys.return_value = 0 + with pytest.raises(Exception): + result = monitor.check_schema_file_exist(test_collector_path, html_schema_file_ref) + assert result == os.path.join( + path, + "tests/collector/forge.3gpp.org_rep_sa5_MnS_blob_SA88-Rel16_OpenAPI/test_faultMns.html", + ) + # after creating new file its deleted to keep code in first stage + os.remove(test_html_schema_file) + + +#loading schema_ref from config file from wrong_port section +@pytest.fixture +def yaml_schema_file_ref(): + config = configparser.ConfigParser() + config_file = get_config_path() + config.read(config_file) + ref = config.get("wrong_port", "schema_ref") + return ref + + +# test if downloaded file content is yaml Create a folder from source url +@patch("monitor.logger", logging.getLogger("monitor")) +@unittest.mock.patch("os.system") +def test_check_schema_file_contents_yaml(mock_sys, vel_schema_path, yaml_schema_file_ref): + """Test case for checking downloded schema ref file is yaml""" + f = open(test_yaml_schema_file, "w+") + f.write("NotifyNewAlarm") + f.close() + mock_sys.return_value = 0 + result = monitor.check_schema_file_exist(test_collector_path, yaml_schema_file_ref) + # after running test case, created file is deleted to keep code in first stage + os.remove(test_yaml_schema_file) + assert ( + result + == "tests/collector/forge.3gpp.org_rep_sa5_MnS_blob_SA88-Rel16_OpenAPI/test_faultMns.yaml" + ) + + +# test stnd_define_event_validation +@patch("monitor.logger", logging.getLogger("monitor")) +@mock.patch("monitor.check_schema_file_exist") +def test_stnd_define_event_validation(mocker_check, vel_schema_path, body, schema_ref): + body = { + "event": { + "stndDefinedFields": { + "schemaReference": schema_ref, + "data": { + "href": "href1", + "notificationId": 0, + "notificationType": "notifyNewAlarm", + "eventTime": "2022-06-22T12:43:50.579315Z", + "systemDN": "xyz", + "alarmType": "COMMUNICATIONS_ALARM", + "alarmId": "lossOfSignal", + "probableCause": "lossOfSignal", + "specificProblem": "lossOfSignal", + "perceivedSeverity": "CRITICAL", + "correlatedNotifications": [], + "rootCauseIndicator": False, + "backedUpStatus": True, + "backUpObject": "xyz", + "trendIndication": "MORE_SEVERE", + }, + } + } + } + path = os.getcwd() + mocker_check.return_value = os.path.join( + path, + "collector/evel-test-collector/docs/schema/forge.3gpp.org_rep_sa5_MnS_blob_SA88-Rel16_OpenAPI/faultMnS.yaml", + ) + result = monitor.stnd_define_event_validation(vel_schema_path, body) + assert result == None + + +# check produce_event_in_kafka()function +@mock.patch("monitor.KafkaProducer") +@mock.patch("monitor.producer") +@mock.patch("monitor.logger", logging.getLogger("monitor")) +def test_produce_events_in_kafka(mock_pro, mock_producer, data_set, topic_name): + logger = logging.getLogger("monitor") + logger.setLevel(logging.DEBUG) + with mock.patch.object(logger, "debug") as mock_debug: + monitor.produce_events_in_kafka(data_set, topic_name) + mock_pro.send.assert_called_with(topic_name, value=data_set) + mock_debug.assert_called_once_with( + "Event has been successfully posted into kafka bus" + ) + path = os.getcwd() + os.remove(os.path.join(path, "collector.log")) diff --git a/tests/collector/test_rest_dispatcher.py b/tests/collector/test_rest_dispatcher.py index 154883c..3e2324f 100644 --- a/tests/collector/test_rest_dispatcher.py +++ b/tests/collector/test_rest_dispatcher.py @@ -15,66 +15,66 @@ import pytest -import unittest -import monitor -from urllib import response from unittest import mock from unittest.mock import patch -from pytest_mock import MockerFixture import logging import rest_dispatcher from gevent import socket from gevent import pywsgi -import gevent + @pytest.fixture def start_response(): - sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) - start_response=pywsgi.WSGIHandler(sock,"","") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + start_response = pywsgi.WSGIHandler(sock, "", "") return start_response -#test test_notfound_404 -@patch('rest_dispatcher.base_url','') -@mock.patch('gevent.pywsgi.Input',autospec=True) -@mock.patch('rest_dispatcher.set_404_content') -def test_notfound_404(mocker_dispatcher,mock_input,start_response): - environ={"REQUEST_METHOD": "POST","PATH_INFO":''} - mock_start_response= mock.Mock(start_response) - base_url='' - logger = logging.getLogger('monitor') + +# test test_notfound_404 +@patch("rest_dispatcher.base_url", "") +@mock.patch("gevent.pywsgi.Input", autospec=True) +@mock.patch("rest_dispatcher.set_404_content") +def test_notfound_404(mocker_dispatcher, mock_input, start_response): + environ = {"REQUEST_METHOD": "POST", "PATH_INFO": ""} + mock_start_response = mock.Mock(start_response) + logger = logging.getLogger("monitor") logger.setLevel(logging.DEBUG) - with mock.patch.object(logger,'debug') as mock_debug: - result=rest_dispatcher.notfound_404(environ, mock_start_response) - assert result==['template_404'] - -#test call of -@patch('rest_dispatcher.base_url','') -@mock.patch('gevent.pywsgi.Input',autospec=True) -def test_call(mock_input,start_response): - environ={"REQUEST_METHOD": "POST","PATH_INFO":''} - mock_start_response= mock.Mock(start_response) - rest_obj=rest_dispatcher.PathDispatcher() - res=rest_obj.__call__(environ,mock_start_response) - assert ['template_404'] ==res - - -@patch('rest_dispatcher.base_url') + with mock.patch.object(logger, "debug") as mock_debug: + result = rest_dispatcher.notfound_404(environ, mock_start_response) + assert result == ["template_404"] + + +# test call of +@patch("rest_dispatcher.base_url", "") +@mock.patch("gevent.pywsgi.Input", autospec=True) +def test_call(mock_input, start_response): + environ = {"REQUEST_METHOD": "POST", "PATH_INFO": ""} + mock_start_response = mock.Mock(start_response) + rest_obj = rest_dispatcher.PathDispatcher() + res = rest_obj.__call__(environ, mock_start_response) + assert ["template_404"] == res + + +@patch("rest_dispatcher.base_url") def test_set_404_content(mock_url): - mock_url.return_value='' - result=rest_dispatcher.set_404_content('') - assert result==None + mock_url.return_value = "" + result = rest_dispatcher.set_404_content("") + assert result == None + @pytest.fixture def path(): - path='/eventListener/v5/events' + path = "/eventListener/v7/events" return path + @pytest.fixture def method(): - method='post' + method = "post" return method -def test_register(path,method): - rest_obj=rest_dispatcher.PathDispatcher() - res=rest_obj.register(path,method,None) - assert res==None + +def test_register(path, method): + rest_obj = rest_dispatcher.PathDispatcher() + res = rest_obj.register(path, method, None) + assert res == None diff --git a/tests/collector/wrong_config.conf b/tests/collector/wrong_config.conf deleted file mode 100644 index fe90735..0000000 --- a/tests/collector/wrong_config.conf +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright 2021 Xoriant Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -[default] -schema_file = "evel-test-collector/docs/att_interface_definition/hello.json" -base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json -throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json -test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json -yaml_schema_path = evel-test-collector/docs/att_interface_definition -log_file =collector.log -vel_domain = 127.0.0.1 -vel_port = 9999 -vel_path = "vendor_event_listener/event" -vel_username = -vel_password =user -vel_topic_name =password -kafka_server =kafka -log_level = ERROR -kafka_topic =topic -schema_ref =https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/test_faultMns.html#components/schemas/NotifyNewAlarm diff --git a/tests/dmaap_adaptor/test_consumer.py b/tests/dmaap_adaptor/test_consumer.py index 8395fc4..1fca142 100644 --- a/tests/dmaap_adaptor/test_consumer.py +++ b/tests/dmaap_adaptor/test_consumer.py @@ -15,8 +15,6 @@ import os import argparse -import configparser -from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter import abc import pytest from unittest import mock @@ -24,182 +22,200 @@ from unittest.mock import patch from pytest_mock import MockerFixture from prepare_response import PrepareResponse from confluent_kafka import Consumer, KafkaError -from confluent_kafka.admin import AdminClient from consumer import EventConsumer, TopicConsumer import logging + @pytest.fixture def prepareResponse(): return PrepareResponse() + @pytest.fixture def topic(): topic_name = "test1" return topic_name + @pytest.fixture def topic_list(): - topic_list=ListTopics() + topic_list = ListTopics() return topic_list + @pytest.fixture def empty_topic_list(): - empty_topic_list=EmptyListTopics() + empty_topic_list = EmptyListTopics() return empty_topic_list @pytest.fixture def resCode(): - responseCode=200 + responseCode = 200 return responseCode + def get_path(): project_path = os.getcwd() return project_path + def get_config_path(): - project_path=get_path() - config_path = os.path.join( - project_path,"dmaapadapter/adapter/config/adapter.conf") + project_path = get_path() + config_path = os.path.join(project_path, "dmaapadapter/adapter/config/adapter.conf") return config_path -#test __init__ of EventConsumer -@mock.patch('app_config.AppConfig.setLogger') -@mock.patch('argparse.ArgumentParser.parse_args', -return_value=argparse.Namespace(config=get_config_path(),section='default')) -def test_init_event(parser,mock_setLogger): - EventConsumer.__init__(EventConsumer) - mock_setLogger.assert_called_with('dmaap.log','error') - -#test __init__ of TpoicConsumer -@mock.patch('app_config.AppConfig.setLogger') -@mock.patch('argparse.ArgumentParser.parse_args', -return_value=argparse.Namespace(config=get_config_path(),section='default')) -def test_init_consumer(parser,mock_setLogger): +# test __init__ of TpoicConsumer +@mock.patch("app_config.AppConfig.setLogger") +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace(config=get_config_path(), section="default"), +) +def test_init_consumer(parser, mock_setLogger): TopicConsumer.__init__(TopicConsumer) - mock_setLogger.assert_called_with('dmaap.log','error') - - -@mock.patch('confluent_kafka.Consumer') -def test_consumeEvents(mock_consumer,prepareResponse,topic,resCode): - consumergroup="test" - consumerid="test1" - limit=10 - timeout=1 - mock_consumer.__name__ = 'subscribe' - mock_consumer.__name__ = 'poll' - mock_consumer.poll.return_value=None - EventConsumer.consumeEvents(EventConsumer, prepareResponse, topic, consumergroup, consumerid,limit, timeout) - resMsg='[]' + mock_setLogger.assert_called_with("dmaap.log", "error") + + +# test __init__ of EventConsumer +@mock.patch("app_config.AppConfig.setLogger") +@mock.patch( + "argparse.ArgumentParser.parse_args", + return_value=argparse.Namespace(config=get_config_path(), section="default"), +) +def test_init_event(parser, mock_setLogger): + EventConsumer.__init__(EventConsumer) + mock_setLogger.assert_called_with("dmaap.log", "error") + + +@mock.patch("confluent_kafka.Consumer") +def test_consumeEvents(mock_consumer, prepareResponse, topic, resCode): + consumergroup = "test" + consumerid = "test1" + limit = 10 + timeout = 1 + mock_consumer.__name__ = "subscribe" + mock_consumer.__name__ = "poll" + mock_consumer.poll.return_value = None + EventConsumer.consumeEvents( + EventConsumer, prepareResponse, topic, consumergroup, consumerid, limit, timeout + ) + resMsg = "[]" assert resCode == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() -#test consumeEvents for break -@mock.patch('confluent_kafka.Consumer') -def test_consumeEvents_break(mock_consumer,prepareResponse,topic,resCode): - consumergroup="test" - consumerid="test1" - limit=0 - timeout=1 - mock_consumer.__name__ = 'subscribe' - mock_consumer.__name__ = 'poll' - mock_consumer.poll.return_value=None - resMsg='[]' - EventConsumer.consumeEvents(EventConsumer, prepareResponse, topic, consumergroup, consumerid,limit, timeout) +# test consumeEvents for break +@mock.patch("confluent_kafka.Consumer") +def test_consumeEvents_break(mock_consumer, prepareResponse, topic, resCode): + consumergroup = "test" + consumerid = "test1" + limit = 0 + timeout = 1 + mock_consumer.__name__ = "subscribe" + mock_consumer.__name__ = "poll" + mock_consumer.poll.return_value = None + resMsg = "[]" + EventConsumer.consumeEvents( + EventConsumer, prepareResponse, topic, consumergroup, consumerid, limit, timeout + ) assert resCode == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() -#test consumeEvents for Exception -@mock.patch('confluent_kafka.Consumer') -def test_consumeEvents_Exceptions(mock_consumer,prepareResponse,topic): - consumergroup="test" - consumerid="test1" - limit=abc - timeout=1 - mock_consumer.__name__ = 'subscribe' - mock_consumer.__name__ = 'poll' - mock_consumer.poll.return_value=None - resMsg='"Failed to return the events"' - EventConsumer.consumeEvents(EventConsumer, prepareResponse, topic, consumergroup, consumerid,limit, timeout) +# test consumeEvents for Exception +@mock.patch("confluent_kafka.Consumer") +def test_consumeEvents_Exceptions(mock_consumer, prepareResponse, topic): + consumergroup = "test" + consumerid = "test1" + limit = abc + timeout = 1 + mock_consumer.__name__ = "subscribe" + mock_consumer.__name__ = "poll" + mock_consumer.poll.return_value = None + resMsg = '"Failed to return the events"' + EventConsumer.consumeEvents( + EventConsumer, prepareResponse, topic, consumergroup, consumerid, limit, timeout + ) assert 500 == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() -def test_getTopics(mocker,prepareResponse,topic_list,resCode): - mocker.patch('confluent_kafka.admin.AdminClient.list_topics', - return_value=topic_list) +def test_getTopics(mocker, prepareResponse, topic_list, resCode): + mocker.patch( + "confluent_kafka.admin.AdminClient.list_topics", return_value=topic_list + ) TopicConsumer.getTopics(TopicConsumer, prepareResponse) - resMsg='{"topics": ["test1", "test2"]}' + resMsg = '{"topics": ["test1", "test2"]}' assert resCode == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() -#test getTopics Exception -def test_getTopics_Exceptions(mocker,prepareResponse): - mocker.patch('confluent_kafka.admin.AdminClient.list_topics', - return_value='') +# test getTopics Exception +def test_getTopics_Exceptions(mocker, prepareResponse): + mocker.patch("confluent_kafka.admin.AdminClient.list_topics", return_value="") TopicConsumer.getTopics(TopicConsumer, prepareResponse) - resMsg='"Failed to return the topics"' + resMsg = '"Failed to return the topics"' assert 500 == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() -#test ListALLTopics() function -def test_listAllTopics(mocker,prepareResponse,topic_list,resCode): - mocker.patch('confluent_kafka.admin.AdminClient.list_topics', - return_value=topic_list) +# test ListALLTopics() function +def test_listAllTopics(mocker, prepareResponse, topic_list, resCode): + mocker.patch( + "confluent_kafka.admin.AdminClient.list_topics", return_value=topic_list + ) TopicConsumer.listAllTopics(TopicConsumer, prepareResponse) - resMsg='{"topics": [{"topicName": "test1", "owner": "", "txenabled": false}, {"topicName": "test2", "owner": "", "txenabled": false}]}' + resMsg = '{"topics": [{"topicName": "test1", "owner": "", "txenabled": false}, {"topicName": "test2", "owner": "", "txenabled": false}]}' assert resCode == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() -#test listAllTopics Exceptions -def test_listAllTopics_Exceptions(mocker,prepareResponse): - mocker.patch('confluent_kafka.admin.AdminClient.list_topics', - return_value='') +# test listAllTopics Exceptions +def test_listAllTopics_Exceptions(mocker, prepareResponse): + mocker.patch("confluent_kafka.admin.AdminClient.list_topics", return_value="") TopicConsumer.listAllTopics(TopicConsumer, prepareResponse) - resMsg='"Failed to return the topics"' + resMsg = '"Failed to return the topics"' assert 500 == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() -#test getTopicDetails() function -def test_getTopicDetails(mocker,prepareResponse,topic,topic_list,resCode): - mocker.patch('confluent_kafka.admin.AdminClient.list_topics', - return_value=topic_list) - TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse,topic) - resMsg='{"name": "test1", "owner": "", "description": "", "readerAcl": {"enabled": true, "users": []}, "writerAcl": {"enabled": true, "users": []}}' +# test getTopicDetails() function +def test_getTopicDetails(mocker, prepareResponse, topic, topic_list, resCode): + mocker.patch( + "confluent_kafka.admin.AdminClient.list_topics", return_value=topic_list + ) + TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse, topic) + resMsg = '{"name": "test1", "owner": "", "description": "", "readerAcl": {"enabled": true, "users": []}, "writerAcl": {"enabled": true, "users": []}}' assert resCode == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() -#test getTopicDetails Exceptions -def test_getTopicDetails_Exceptions(mocker,prepareResponse,topic): - mocker.patch('confluent_kafka.admin.AdminClient.list_topics', - return_value='') - TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse,topic) - resMsg='"Failed to return the topics"' +# test getTopicDetails Exceptions +def test_getTopicDetails_Exceptions(mocker, prepareResponse, topic): + mocker.patch("confluent_kafka.admin.AdminClient.list_topics", return_value="") + TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse, topic) + resMsg = '"Failed to return the topics"' assert 500 == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() -#test getTopicDetails Topic exists -def test_getTopicDetails_Topic_exists(mocker,prepareResponse,topic,empty_topic_list,resCode): - mocker.patch('confluent_kafka.admin.AdminClient.list_topics', - return_value=empty_topic_list) - TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse,topic) - resMsg='"Topic [test1] not found"' +# test getTopicDetails Topic exists +def test_getTopicDetails_Topic_exists( + mocker, prepareResponse, topic, empty_topic_list, resCode +): + mocker.patch( + "confluent_kafka.admin.AdminClient.list_topics", return_value=empty_topic_list + ) + TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse, topic) + resMsg = '"Topic [test1] not found"' assert 404 == prepareResponse.getResponseCode() assert resMsg == prepareResponse.getResponseMsg() class ListTopics: - topics={"test1":"value1", "test2":"value2"} + topics = {"test1": "value1", "test2": "value2"} class EmptyListTopics: - topics={} \ No newline at end of file + topics = {} diff --git a/tests/influxdb_connector/events.txt b/tests/influxdb_connector/events.txt index 1e7413c..4d57cf7 100644 --- a/tests/influxdb_connector/events.txt +++ b/tests/influxdb_connector/events.txt @@ -2,4 +2,5 @@ heartbeat={"event": {"commonEventHeader": {"domain": "heartbeat","eventId": "ORA pnfRegistration={"event": {"commonEventHeader": {"domain": "pnfRegistration","eventId": "ORAN-DEV_ONAP Controller for Radio","eventName": "pnfRegistration_EventType5G", "eventType": "EventType5G", "sequence": 0,"priority": "Low","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "", "sourceName": "ORAN-DEV","startEpochMicrosec": 1639985329569087,"lastEpochMicrosec": 1639985329569087,"nfNamingCode": "SDNR", "nfVendorName": "ONAP", "timeZoneOffset": "+00:00","version": "4.1","vesEventListenerVersion": "7.2.1"},"pnfRegistrationFields": {"pnfRegistrationFieldsVersion": "2.1","lastServiceDate": "2021-03-26","macAddress": "02:42:f7:d4:62:ce","manufactureDate": "2021-01-16","modelNumber": "ONAP Controller for Radio","oamV4IpAddress": "127.0.0.1","oamV6IpAddress": "0:0:0:0:0:ffff:a0a:0.1", "serialNumber": "ONAP-SDNR-127.0.0.1-ONAP Controller for Radio","softwareVersion": "2.3.5", "unitFamily": "ONAP-SDNR","unitType": "SDNR", "vendorName": "ONAP","additionalFields": {"oamPort": "830","protocol": "SSH","username": "netconf","reconnectOnChangedSchema": "false","sleep-factor": "1.5","tcpOnly": "false","connectionTimeout": "20000","maxConnectionAttempts": "100","betweenAttemptsTimeout": "2000","keepaliveDelay": "120"}}}} measurement={"event": {"commonEventHeader": {"domain": "measurement", "eventId": "O-RAN-FH-IPv6-01_1639984500_PM15min", "eventName": "measurement_O_RAN_COMPONENT_PM15min","eventType": "O_RAN_COMPONENT_PM15min","sequence": 0, "priority": "Low","reportingEntityId": "", "reportingEntityName": "ORAN-DEV", "sourceId": "", "sourceName": "O-RAN-FH-IPv6-01", "startEpochMicrosec": 1639983600000, "lastEpochMicrosec": 1639984500000, "internalHeaderFields": {"intervalStartTime": "Mon, 20 Dec 2021 07:00:00 +0000","intervalEndTime": "Mon, 20 Dec 2021 07:15:00 +0000"}, "version": "4.1", "vesEventListenerVersion": "7.2.1" }, "measurementFields": {"additionalFields": {}, "additionalMeasurements": [{ "name": "LP-MWPS-RADIO-1","hashMap": {"es": "0","ses": "1", "cses": "0", "unavailability": "0" }},{"name": "LP-MWPS-RADIO-2","hashMap": {"es": "0","ses": "1","cses": "0","unavailability": "0"} }],"additionalObjects": [],"codecUsageArray": [],"concurrentSessions": 2,"configuredEntities": 2, "cpuUsageArray": [], "diskUsageArray": [], "featureUsageArray": { "https://www.itu.int/rec/T-REC-G.841": "true" }, "filesystemUsageArray": [], "hugePagesArray": [], "ipmi": {}, "latencyDistribution": [], "loadArray": [], "machineCheckExceptionArray": [], "meanRequestLatency": 1000, "measurementInterval": 234, "measurementFieldsVersion": "4.0", "memoryUsageArray": [], "numberOfMediaPortsInUse": 234, "requestRate": 23,"nfcScalingMetric": 3,"nicPerformanceArray": [],"processStatsArray": []}}} fault={ "event": { "commonEventHeader": {"domain": "fault","eventId": "LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA","eventName": "fault_O_RAN_COMPONENT_Alarms_TCA","eventType": "O_RAN_COMPONENT_Alarms", "sequence": 0,"priority": "High","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "","sourceName": "LKCYFL79Q01M01FYNG01","startEpochMicrosec": 1639985333218840,"lastEpochMicrosec": 1639985333218840,"nfNamingCode": "FYNG","nfVendorName": "VENDORA","timeZoneOffset": "+00:00", "version": "4.1","vesEventListenerVersion": "7.2.1"},"faultFields": {"faultFieldsVersion": "1.0","alarmCondition": "TCA", "alarmInterfaceA": "LP-MWPS-RADIO","eventSourceType": "O_RAN_COMPONENT","specificProblem": "TCA","eventSeverity": "NORMAL","vfStatus": "Active","alarmAdditionalInformation": {"eventTime": "2021-12-20T07:28:53.218840Z","equipType": "FYNG","vendor": "VENDORA","model": "FancyNextGeneration"}}}} -thresholdCrossingAlert={"event": {"commonEventHeader": {"domain": "thresholdCrossingAlert","eventId": "__TCA","eventName": "thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA","eventType": "O_RAN_COMPONENT_TCA","sequence": 0,"priority": "High","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "","sourceName": "","startEpochMicrosec": 1639985336443218,"lastEpochMicrosec": 1639985336443218,"nfNamingCode": "1OSF","nfVendorName": "","timeZoneOffset": "+00:00","version": "4.1","vesEventListenerVersion": "7.2.1"}, "thresholdCrossingAlertFields": {"thresholdCrossingFieldsVersion": "4.0","additionalParameters": [{"criticality": "MAJ","hashMap": { "additionalProperties": "up-and-down" },"thresholdCrossed": "packetLoss" }], "alertAction": "SET","alertDescription": "TCA","alertType": "INTERFACE-ANOMALY","alertValue": "1OSF","associatedAlertIdList": ["loss-of-signal"],"collectionTimestamp": "Mon, 20 Dec 2021 07:28:56 +0000","dataCollector": "data-lake","elementType": "1OSF", "eventSeverity": "WARNING", "eventStartTimestamp": "Mon, 20 Dec 2021 07:15:00 +0000","interfaceName": "", "networkService": "from-a-to-b","possibleRootCause": "always-the-others", "additionalFields": {"eventTime": "2021-12-20T07:28:56.443218Z", "equipType": "1OSF", "vendor": "", "model": ""}}}} \ No newline at end of file +thresholdCrossingAlert={"event": {"commonEventHeader": {"domain": "thresholdCrossingAlert","eventId": "__TCA","eventName": "thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA","eventType": "O_RAN_COMPONENT_TCA","sequence": 0,"priority": "High","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "","sourceName": "","startEpochMicrosec": 1639985336443218,"lastEpochMicrosec": 1639985336443218,"nfNamingCode": "1OSF","nfVendorName": "","timeZoneOffset": "+00:00","version": "4.1","vesEventListenerVersion": "7.2.1"}, "thresholdCrossingAlertFields": {"thresholdCrossingFieldsVersion": "4.0","additionalParameters": [{"criticality": "MAJ","hashMap": { "additionalProperties": "up-and-down" },"thresholdCrossed": "packetLoss" }], "alertAction": "SET","alertDescription": "TCA","alertType": "INTERFACE-ANOMALY","alertValue": "1OSF","associatedAlertIdList": ["loss-of-signal"],"collectionTimestamp": "Mon, 20 Dec 2021 07:28:56 +0000","dataCollector": "data-lake","elementType": "1OSF", "eventSeverity": "WARNING", "eventStartTimestamp": "Mon, 20 Dec 2021 07:15:00 +0000","interfaceName": "", "networkService": "from-a-to-b","possibleRootCause": "always-the-others", "additionalFields": {"eventTime": "2021-12-20T07:28:56.443218Z", "equipType": "1OSF", "vendor": "", "model": ""}}}} +stndDefinedFields={"event": {"commonEventHeader": {"domain": "stndDefined","eventId": "O-RAN-FH-IPv6-01_1639984500_PM15min","eventName": "stndDefined_O_RAN_COMPONENT_Alarms_lossOfSignal","eventType": "O_RAN_COMPONENT_Alarms","sequence": 0,"priority": "Low","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "","sourceName": "NSHMRIACQ01M01123401","startEpochMicrosec":"1639983600000","lastEpochMicrosec": "1639984500000","nfNamingCode": "1234","nfVendorName": "VENDORA","timeZoneOffset": "+00:00","version": "4.1","stndDefinedNamespace": "3GPP-FaultSupervision","vesEventListenerVersion": "7.2.1"},"stndDefinedFields": {"schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm","data": {"href": "href1","uri": "1","notificationId": 0,"notificationType": "notifyNewAlarm","eventTime": "2022-06-22T12:43:50.579315Z","trendIndication": "MORE_SEVERE","thresholdInfo": {"observedMeasurement": "new","observedValue": 123},"monitoredAttributes": {"interface": "LP-MWPS-RADIO"},"proposedRepairActions": "12345","additionalInformation": {"eventTime": "2022-06-22T12:43:50.579315Z","equipType": "1234","vendor": "VENDORA","model": "1234 BestInClass"}},"stndDefinedFieldsVersion": "1.0"}}} \ No newline at end of file diff --git a/tests/influxdb_connector/test_influxdb_events.py b/tests/influxdb_connector/test_influxdb_events.py index 3919b03..65f0c95 100644 --- a/tests/influxdb_connector/test_influxdb_events.py +++ b/tests/influxdb_connector/test_influxdb_events.py @@ -13,22 +13,17 @@ # limitations under the License. # -import configparser import json -import logging -import os import pytest -import sys import influxdb_connector from unittest import mock from pathlib import Path from unittest.mock import patch -from mock import MagicMock def getEvent(arg): path = Path(__file__).parent - fname = path /'events.txt' + fname = path / "events.txt" event_dictionary = {} with fname.open() as file: @@ -36,60 +31,84 @@ def getEvent(arg): key, value = line.split("=") event_dictionary[key] = value if key == arg: - return value - return 'NA' + return value + return "NA" @pytest.fixture def event_Timestamp(): - eventTimestamp = '1639985333218840' + eventTimestamp = "1639985333218840" return eventTimestamp + # ------------------------------------------------------------------------------ # Address of heart_beat event unit test_case # ------------------------------------------------------------------------------ + @pytest.fixture def hb_json(): - hb_jsonObj = {'additionalFields': {'eventTime': '2021-12-20T07:29:34.292938Z'}, 'heartbeatFieldsVersion': '3.0', - 'heartbeatInterval': 20} - return hb_jsonObj + hb_jsonObj = { + "additionalFields": {"eventTime": "2021-12-20T07:29:34.292938Z"}, + "heartbeatFieldsVersion": "3.0", + "heartbeatInterval": 20, + } + return hb_jsonObj @pytest.fixture def hb_data(): - data = 'heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1' - return data + data = "heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1" + return data @pytest.fixture def hb_nonstringpdata(): - nonstringpdata = ' lastEpochMicrosec=1639965574292938,sequence=357,startEpochMicrosec=1639965574292938,' - return nonstringpdata + nonstringpdata = " lastEpochMicrosec=1639965574292938,sequence=357,startEpochMicrosec=1639965574292938," + return nonstringpdata @pytest.fixture def hb_expected_pdata(): - heartbeat_expected_pdata = 'heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,eventTime=2021-12-20T07:29:34.292938Z,heartbeatFieldsVersion=3.0 lastEpochMicrosec=1639965574292938,sequence=357,startEpochMicrosec=1639965574292938,heartbeatInterval=20 1639985333218840000' - return heartbeat_expected_pdata - - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_heartbeat_events_called(mocker_process_time, mocker_send_to_influxdb, hb_json, hb_data, hb_nonstringpdata, hb_expected_pdata, event_Timestamp): + heartbeat_expected_pdata = "heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,eventTime=2021-12-20T07:29:34.292938Z,heartbeatFieldsVersion=3.0 lastEpochMicrosec=1639965574292938,sequence=357,startEpochMicrosec=1639965574292938,heartbeatInterval=20 1639985333218840000" + return heartbeat_expected_pdata + + +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_heartbeat_events_called( + mocker_process_time, + mocker_send_to_influxdb, + hb_json, + hb_data, + hb_nonstringpdata, + hb_expected_pdata, + event_Timestamp, +): domain = "heartbeat" - influxdb_connector.process_heartbeat_events(domain, hb_json, hb_data, hb_nonstringpdata) + influxdb_connector.process_heartbeat_events( + domain, hb_json, hb_data, hb_nonstringpdata + ) mocker_send_to_influxdb.assert_called_with(domain, hb_expected_pdata) - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_heartbeat_events(mocker_process_time, mocker_send_to_influxdb, hb_json, hb_data, hb_nonstringpdata, hb_expected_pdata, event_Timestamp): +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_heartbeat_events( + mocker_process_time, + mocker_send_to_influxdb, + hb_json, + hb_data, + hb_nonstringpdata, + hb_expected_pdata, + event_Timestamp, +): domain = "heartbeat" - jobj={'additionalFields':{'eventTime':6}} - hb_ex='heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None lastEpochMicrosec=1639965574292938,sequence=357,startEpochMicrosec=1639965574292938,eventTime=6 1639985333218840000' - influxdb_connector.process_heartbeat_events(domain, jobj, hb_data, hb_nonstringpdata) + jobj = {"additionalFields": {"eventTime": 6}} + hb_ex = "heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None lastEpochMicrosec=1639965574292938,sequence=357,startEpochMicrosec=1639965574292938,eventTime=6 1639985333218840000" + influxdb_connector.process_heartbeat_events( + domain, jobj, hb_data, hb_nonstringpdata + ) mocker_send_to_influxdb.assert_called_with(domain, hb_ex) @@ -97,127 +116,242 @@ def test_process_heartbeat_events(mocker_process_time, mocker_send_to_influxdb, # Address of pnfRegistration event. # ------------------------------------------------------------------------------ + @pytest.fixture def pnf_json(): - jobj = {'pnfRegistrationFieldsVersion': '2.1', 'lastServiceDate': '2021-03-26', 'macAddress': '02:42:f7:d4:62:ce', 'manufactureDate': '2021-01-16', 'modelNumber': 'ONAP Controller for Radio', 'oamV4IpAddress': '127.0.0.1', 'oamV6IpAddress': '0:0:0:0:0:ffff:a0a:0.1', 'serialNumber': 'ONAP-SDNR-127.0.0.1-ONAP Controller for Radio', 'softwareVersion': '2.3.5', 'unitFamily': 'ONAP-SDNR', 'unitType': 'SDNR', 'vendorName': 'ONAP', 'additionalFields': {'oamPort': '830', 'protocol': 'SSH', 'username': 'netconf', 'reconnectOnChangedSchema': 'false', 'sleep-factor': '1.5', 'tcpOnly': 'false', 'connectionTimeout': '20000', 'maxConnectionAttempts': '100', 'betweenAttemptsTimeout': '2000', 'keepaliveDelay': '120'}} - return jobj + jobj = { + "pnfRegistrationFieldsVersion": "2.1", + "lastServiceDate": "2021-03-26", + "macAddress": "02:42:f7:d4:62:ce", + "manufactureDate": "2021-01-16", + "modelNumber": "ONAP Controller for Radio", + "oamV4IpAddress": "127.0.0.1", + "oamV6IpAddress": "0:0:0:0:0:ffff:a0a:0.1", + "serialNumber": "ONAP-SDNR-127.0.0.1-ONAP Controller for Radio", + "softwareVersion": "2.3.5", + "unitFamily": "ONAP-SDNR", + "unitType": "SDNR", + "vendorName": "ONAP", + "additionalFields": { + "oamPort": "830", + "protocol": "SSH", + "username": "netconf", + "reconnectOnChangedSchema": "false", + "sleep-factor": "1.5", + "tcpOnly": "false", + "connectionTimeout": "20000", + "maxConnectionAttempts": "100", + "betweenAttemptsTimeout": "2000", + "keepaliveDelay": "120", + }, + } + return jobj @pytest.fixture def pnf_data(): - data = 'pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1' - return data + data = "pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1" + return data @pytest.fixture def pnf_nonstringpdata(): - nonstringpdata = ' sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087,' - return nonstringpdata + nonstringpdata = " sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087," + return nonstringpdata @pytest.fixture def pnf_expected_pdata(): - pnf_expected_pdata = 'pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,pnfRegistrationFieldsVersion=2.1,lastServiceDate=2021-03-26,macAddress=02:42:f7:d4:62:ce,manufactureDate=2021-01-16,modelNumber=ONAP\\ Controller\\ for\\ Radio,oamV4IpAddress=127.0.0.1,oamV6IpAddress=0:0:0:0:0:ffff:a0a:0.1,serialNumber=ONAP-SDNR-127.0.0.1-ONAP\\ Controller\\ for\\ Radio,softwareVersion=2.3.5,unitFamily=ONAP-SDNR,unitType=SDNR,vendorName=ONAP,oamPort=830,protocol=SSH,username=netconf,reconnectOnChangedSchema=false,sleep-factor=1.5,tcpOnly=false,connectionTimeout=20000,maxConnectionAttempts=100,betweenAttemptsTimeout=2000,keepaliveDelay=120 sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087 1639985333218840000' - return pnf_expected_pdata - - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_pnfRegistration_event_called(mock_process_time ,mocker_send_to_influxdb, pnf_json, pnf_data, pnf_nonstringpdata, pnf_expected_pdata, event_Timestamp): + pnf_expected_pdata = "pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,pnfRegistrationFieldsVersion=2.1,lastServiceDate=2021-03-26,macAddress=02:42:f7:d4:62:ce,manufactureDate=2021-01-16,modelNumber=ONAP\\ Controller\\ for\\ Radio,oamV4IpAddress=127.0.0.1,oamV6IpAddress=0:0:0:0:0:ffff:a0a:0.1,serialNumber=ONAP-SDNR-127.0.0.1-ONAP\\ Controller\\ for\\ Radio,softwareVersion=2.3.5,unitFamily=ONAP-SDNR,unitType=SDNR,vendorName=ONAP,oamPort=830,protocol=SSH,username=netconf,reconnectOnChangedSchema=false,sleep-factor=1.5,tcpOnly=false,connectionTimeout=20000,maxConnectionAttempts=100,betweenAttemptsTimeout=2000,keepaliveDelay=120 sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087 1639985333218840000" + return pnf_expected_pdata + + +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_pnfRegistration_event_called( + mock_process_time, + mocker_send_to_influxdb, + pnf_json, + pnf_data, + pnf_nonstringpdata, + pnf_expected_pdata, + event_Timestamp, +): domain = "pnfRegistration" - influxdb_connector.process_pnfRegistration_event(domain, pnf_json, pnf_data, pnf_nonstringpdata) + influxdb_connector.process_pnfRegistration_event( + domain, pnf_json, pnf_data, pnf_nonstringpdata + ) mocker_send_to_influxdb.assert_called_with(domain, pnf_expected_pdata) -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_pnfRegistration_event(mock_process_time ,mocker_send_to_influxdb, pnf_json, pnf_data, pnf_nonstringpdata, pnf_expected_pdata, event_Timestamp): +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_pnfRegistration_event( + mock_process_time, + mocker_send_to_influxdb, + pnf_json, + pnf_data, + pnf_nonstringpdata, + pnf_expected_pdata, + event_Timestamp, +): domain = "pnfRegistration" - jobj={1:2,2:4} - non_pnf='pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087,1=2,2=4 1639985333218840000' - influxdb_connector.process_pnfRegistration_event(domain, jobj, pnf_data, pnf_nonstringpdata) + jobj = {1: 2, 2: 4} + non_pnf = "pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087,1=2,2=4 1639985333218840000" + influxdb_connector.process_pnfRegistration_event( + domain, jobj, pnf_data, pnf_nonstringpdata + ) mocker_send_to_influxdb.assert_called_with(domain, non_pnf) -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_pnfRegistration_event_elif(mock_process_time ,mocker_send_to_influxdb, pnf_json, pnf_data, pnf_nonstringpdata, pnf_expected_pdata, event_Timestamp): +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_pnfRegistration_event_elif( + mock_process_time, + mocker_send_to_influxdb, + pnf_json, + pnf_data, + pnf_nonstringpdata, + pnf_expected_pdata, + event_Timestamp, +): domain = "pnfRegistration" - jobj={'additionalFields': {'oamPort': 830}} - non_pnf='pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087,oamPort=830 1639985333218840000' - influxdb_connector.process_pnfRegistration_event(domain, jobj, pnf_data, pnf_nonstringpdata) + jobj = {"additionalFields": {"oamPort": 830}} + non_pnf = "pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087,oamPort=830 1639985333218840000" + influxdb_connector.process_pnfRegistration_event( + domain, jobj, pnf_data, pnf_nonstringpdata + ) mocker_send_to_influxdb.assert_called_with(domain, non_pnf) - # ------------------------------------------------------------------------------ # Address of fault event unit test case # ------------------------------------------------------------------------------ + @pytest.fixture def flt_json(): - jobj = {'faultFieldsVersion': '1.0', 'alarmCondition': 'TCA', 'alarmInterfaceA': 'LP-MWPS-RADIO', - 'eventSourceType': 'O_RAN_COMPONENT', 'specificProblem': 'TCA', 'eventSeverity': 'NORMAL', - 'vfStatus': 'Active', 'alarmAdditionalInformation': {'eventTime': '2021-12-20T07:28:53.218840Z', 'equipType': 'FYNG', 'vendor': 'VENDORA', 'model': 'FancyNextGeneration'}} - return jobj + jobj = { + "faultFieldsVersion": "1.0", + "alarmCondition": "TCA", + "alarmInterfaceA": "LP-MWPS-RADIO", + "eventSourceType": "O_RAN_COMPONENT", + "specificProblem": "TCA", + "eventSeverity": "NORMAL", + "vfStatus": "Active", + "alarmAdditionalInformation": { + "eventTime": "2021-12-20T07:28:53.218840Z", + "equipType": "FYNG", + "vendor": "VENDORA", + "model": "FancyNextGeneration", + }, + } + return jobj @pytest.fixture def flt_data(): - data = 'fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1' - return data + data = "fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1" + return data @pytest.fixture def flt_nonstringpdata(): - nonstringpdata = ' sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840,' - return nonstringpdata + nonstringpdata = " sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840," + return nonstringpdata @pytest.fixture def flt_expected_pdata(): - expected_pdata = 'fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,faultFieldsVersion=1.0,alarmCondition=TCA,alarmInterfaceA=LP-MWPS-RADIO,eventSourceType=O_RAN_COMPONENT,specificProblem=TCA,eventSeverity=NORMAL,vfStatus=Active,eventTime=2021-12-20T07:28:53.218840Z,equipType=FYNG,vendor=VENDORA,model=FancyNextGeneration sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840 1639985333218840000' - return expected_pdata - - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_fault_event_called(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp): + expected_pdata = "fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,faultFieldsVersion=1.0,alarmCondition=TCA,alarmInterfaceA=LP-MWPS-RADIO,eventSourceType=O_RAN_COMPONENT,specificProblem=TCA,eventSeverity=NORMAL,vfStatus=Active,eventTime=2021-12-20T07:28:53.218840Z,equipType=FYNG,vendor=VENDORA,model=FancyNextGeneration sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840 1639985333218840000" + return expected_pdata + + +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_fault_event_called( + mock_time, + mocker_send_to_influxdb, + flt_json, + flt_data, + flt_nonstringpdata, + flt_expected_pdata, + event_Timestamp, +): domain = "fault" - influxdb_connector.process_fault_event(domain, flt_json, flt_data, flt_nonstringpdata) + influxdb_connector.process_fault_event( + domain, flt_json, flt_data, flt_nonstringpdata + ) mocker_send_to_influxdb.assert_called_with(domain, flt_expected_pdata) -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_fault_event(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp): +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_fault_event( + mock_time, + mocker_send_to_influxdb, + flt_json, + flt_data, + flt_nonstringpdata, + flt_expected_pdata, + event_Timestamp, +): domain = "fault" - payload=flt_json + payload = flt_json for key, val in payload.items(): - if key != 'alarmAdditionalInformation' and val != "": + if key != "alarmAdditionalInformation" and val != "": if isinstance(val, list): - influxdb_connector.process_fault_event(payload.get('alarmAdditionalInformation'),domain, flt_json, flt_data, flt_nonstringpdata) + influxdb_connector.process_fault_event( + payload.get("alarmAdditionalInformation"), + domain, + flt_json, + flt_data, + flt_nonstringpdata, + ) mocker_send_to_influxdb.assert_called_with(domain, flt_expected_pdata) -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_fault_event_nonstr(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp): +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_fault_event_nonstr( + mock_time, + mocker_send_to_influxdb, + flt_json, + flt_data, + flt_nonstringpdata, + flt_expected_pdata, + event_Timestamp, +): domain = "fault" - jobj={2:2} - flt_ex='fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840,2=2 1639985333218840000' + jobj = {2: 2} + flt_ex = "fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840,2=2 1639985333218840000" influxdb_connector.process_fault_event(domain, jobj, flt_data, flt_nonstringpdata) mocker_send_to_influxdb.assert_called_with(domain, flt_ex) -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_fault_event_nonstr_elif(mock_time,mocker_send_to_influxdb, flt_json, flt_data, flt_nonstringpdata, flt_expected_pdata, event_Timestamp): +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_fault_event_nonstr_elif( + mock_time, + mocker_send_to_influxdb, + flt_json, + flt_data, + flt_nonstringpdata, + flt_expected_pdata, + event_Timestamp, +): domain = "fault" - jobj={'alarmAdditionalInformation':{'eventTime': 234, 'equipType': 345, 'vendor': 'VENDORA', 'model': 'FancyNextGeneration'}} - flt_ex='fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,vendor=VENDORA,model=FancyNextGeneration sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840,eventTime=234,equipType=345 1639985333218840000' + jobj = { + "alarmAdditionalInformation": { + "eventTime": 234, + "equipType": 345, + "vendor": "VENDORA", + "model": "FancyNextGeneration", + } + } + flt_ex = "fault,domain=fault,eventId=LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA,eventName=fault_O_RAN_COMPONENT_Alarms_TCA,eventType=O_RAN_COMPONENT_Alarms,priority=High,reportingEntityName=ORAN-DEV,sourceName=LKCYFL79Q01M01FYNG01,nfNamingCode=FYNG,nfVendorName=VENDORA,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,vendor=VENDORA,model=FancyNextGeneration sequence=0,startEpochMicrosec=1639985333218840,lastEpochMicrosec=1639985333218840,eventTime=234,equipType=345 1639985333218840000" influxdb_connector.process_fault_event(domain, jobj, flt_data, flt_nonstringpdata) mocker_send_to_influxdb.assert_called_with(domain, flt_ex) @@ -226,6 +360,7 @@ def test_process_fault_event_nonstr_elif(mock_time,mocker_send_to_influxdb, flt_ # Address of measurement event unit test_cases # ------------------------------------------------------------------------------ + @pytest.fixture def event_Id(): eventId = "O-RAN-FH-IPv6-01_1639984500_PM15min" @@ -243,356 +378,925 @@ def last_Epoch_Microsec(): lastEpochMicrosec = "1639984500000" return lastEpochMicrosec + @pytest.fixture def meas_json(): - jobj = {'additionalFields': {}, 'additionalMeasurements': [{'name': 'LP-MWPS-RADIO-1', 'hashMap': {'es': - '0', 'ses': '1', 'cses': '0', 'unavailability': '0'}}, {'name': 'LP-MWPS-RADIO-2', 'hashMap': {'es': '0', 'ses': '1', - 'cses': '0', 'unavailability': '0'}}], 'additionalObjects': [], 'codecUsageArray': [], 'concurrentSessions': 2, - 'configuredEntities': 2, 'cpuUsageArray': [], 'diskUsageArray': [], 'featureUsageArray': {'https://www.itu.int/rec/T-REC-G.841': 'true'}, 'filesystemUsageArray': [], 'hugePagesArray': [], 'ipmi': {}, - 'latencyDistribution': [], 'loadArray': [], 'machineCheckExceptionArray': [], 'meanRequestLatency': 1000, - 'measurementInterval': 234, 'measurementFieldsVersion': '4.0', 'memoryUsageArray': [], - 'numberOfMediaPortsInUse': 234, 'requestRate': 23, 'nfcScalingMetric': 3, 'nicPerformanceArray': [], - 'processStatsArray': []} - return jobj + jobj = { + "additionalFields": {}, + "additionalMeasurements": [ + { + "name": "LP-MWPS-RADIO-1", + "hashMap": {"es": "0", "ses": "1", "cses": "0", "unavailability": "0"}, + }, + { + "name": "LP-MWPS-RADIO-2", + "hashMap": {"es": "0", "ses": "1", "cses": "0", "unavailability": "0"}, + }, + ], + "additionalObjects": [], + "codecUsageArray": [], + "concurrentSessions": 2, + "configuredEntities": 2, + "cpuUsageArray": [], + "diskUsageArray": [], + "featureUsageArray": {"https://www.itu.int/rec/T-REC-G.841": "true"}, + "filesystemUsageArray": [], + "hugePagesArray": [], + "ipmi": {}, + "latencyDistribution": [], + "loadArray": [], + "machineCheckExceptionArray": [], + "meanRequestLatency": 1000, + "measurementInterval": 234, + "measurementFieldsVersion": "4.0", + "memoryUsageArray": [], + "numberOfMediaPortsInUse": 234, + "requestRate": 23, + "nfcScalingMetric": 3, + "nicPerformanceArray": [], + "processStatsArray": [], + } + return jobj + @pytest.fixture def meas_data(): - data = 'measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\,\ 20\ Dec\ 2021\ 07:00:00\ +0000,intervalEndTime=Mon\,\ 20\ Dec\ 2021\ 07:15:00\ +0000,version=4.1,vesEventListenerVersion=7.2.1' - return data + data = "measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\,\ 20\ Dec\ 2021\ 07:00:00\ +0000,intervalEndTime=Mon\,\ 20\ Dec\ 2021\ 07:15:00\ +0000,version=4.1,vesEventListenerVersion=7.2.1" + return data + @pytest.fixture def meas_nonstringpdata(): - nonstringpdata = ' sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,' - return nonstringpdata + nonstringpdata = ( + " sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000," + ) + return nonstringpdata @pytest.fixture def add_meas_data(): - data_set = {'additionalMeasurements': [{'name': 'LP-MWPS-RADIO-1', 'hashMap': {'es': - '0', 'ses': '1', 'cses': '0', 'unavailability': '0'}}, {'name': 'LP-MWPS-RADIO-2', 'hashMap': {'es': '0', 'ses': '1', - 'cses': '0', 'unavailability': '0'}}]} + data_set = { + "additionalMeasurements": [ + { + "name": "LP-MWPS-RADIO-1", + "hashMap": {"es": "0", "ses": "1", "cses": "0", "unavailability": "0"}, + }, + { + "name": "LP-MWPS-RADIO-2", + "hashMap": {"es": "0", "ses": "1", "cses": "0", "unavailability": "0"}, + }, + ] + } return data_set + @pytest.fixture def non_add_meas_data(): - data_set = {'measurementcpuusage': [{'name': 'LP-MWPS-RADIO-1', 'hashMap': {'es': - '0', 'ses': '1', 'cses': '0', 'unavailability': '0'}}, {'name': 'LP-MWPS-RADIO-2', 'hashMap': {'es': '0', 'ses': '1', - 'cses': '0', 'unavailability': '0'}}]} + data_set = { + "measurementcpuusage": [ + { + "name": "LP-MWPS-RADIO-1", + "hashMap": {"es": "0", "ses": "1", "cses": "0", "unavailability": "0"}, + }, + { + "name": "LP-MWPS-RADIO-2", + "hashMap": {"es": "0", "ses": "1", "cses": "0", "unavailability": "0"}, + }, + ] + } return data_set + @pytest.fixture def meas_expected_data(): - measurement_expected_pdata = 'measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None,https://www.itu.int/rec/T-REC-G.841=true,measurementFieldsVersion=4.0 sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,concurrentSessions=2,configuredEntities=2,meanRequestLatency=1000,measurementInterval=234,numberOfMediaPortsInUse=234,requestRate=23,nfcScalingMetric=3 1639985333218840000' - return measurement_expected_pdata + measurement_expected_pdata = "measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None,https://www.itu.int/rec/T-REC-G.841=true,measurementFieldsVersion=4.0 sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,concurrentSessions=2,configuredEntities=2,meanRequestLatency=1000,measurementInterval=234,numberOfMediaPortsInUse=234,requestRate=23,nfcScalingMetric=3 1639985333218840000" + return measurement_expected_pdata # ## process_measurement_events unit test_cases. -@patch('influxdb_connector.process_nonadditional_measurements') -@patch('influxdb_connector.process_additional_measurements') -@patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_measurement_events_called(mock_time,mocker_send_to_influxdb, mocker_additional, mocker_nonadditional, meas_json, - meas_data, meas_nonstringpdata, event_Id, start_Epoch_Microsec, last_Epoch_Microsec, - meas_expected_data, non_add_meas_data, add_meas_data, event_Timestamp): +@patch("influxdb_connector.process_nonadditional_measurements") +@patch("influxdb_connector.process_additional_measurements") +@patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_measurement_events_called( + mock_time, + mocker_send_to_influxdb, + mocker_additional, + mocker_nonadditional, + meas_json, + meas_data, + meas_nonstringpdata, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + meas_expected_data, + non_add_meas_data, + add_meas_data, + event_Timestamp, +): domain = "measurement" - influxdb_connector.process_measurement_events('measurement', meas_json, meas_data, meas_nonstringpdata, event_Id, - start_Epoch_Microsec, last_Epoch_Microsec) - mocker_additional.process_additional_measurements(add_meas_data.get('additionalMeasurements'), 'measurementadditionalmeasurements', - event_Id, start_Epoch_Microsec, last_Epoch_Microsec) - mocker_additional.assert_called_with(add_meas_data.get('additionalMeasurements'), 'measurementadditionalmeasurements', event_Id, - start_Epoch_Microsec, last_Epoch_Microsec) - - mocker_nonadditional.process_nonadditional_measurements([], 'measurementnicperformance', event_Id, start_Epoch_Microsec, last_Epoch_Microsec) - mocker_nonadditional.assert_called_with([], 'measurementnicperformance', event_Id, start_Epoch_Microsec, last_Epoch_Microsec) + influxdb_connector.process_measurement_events( + "measurement", + meas_json, + meas_data, + meas_nonstringpdata, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) + mocker_additional.process_additional_measurements( + add_meas_data.get("additionalMeasurements"), + "measurementadditionalmeasurements", + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) + mocker_additional.assert_called_with( + add_meas_data.get("additionalMeasurements"), + "measurementadditionalmeasurements", + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) + + mocker_nonadditional.process_nonadditional_measurements( + [], + "measurementnicperformance", + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) + mocker_nonadditional.assert_called_with( + [], + "measurementnicperformance", + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) mocker_send_to_influxdb.assert_called_with(domain, meas_expected_data) - -@patch('influxdb_connector.process_nonadditional_measurements') -@patch('influxdb_connector.process_additional_measurements') -@patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_measurement_events(mock_time,mocker_send_to_influxdb, mocker_additional, mocker_nonadditional, meas_json, - meas_data, meas_nonstringpdata, event_Id, start_Epoch_Microsec, last_Epoch_Microsec, - meas_expected_data, non_add_meas_data, add_meas_data, event_Timestamp): +@patch("influxdb_connector.process_nonadditional_measurements") +@patch("influxdb_connector.process_additional_measurements") +@patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_measurement_events( + mock_time, + mocker_send_to_influxdb, + mocker_additional, + mocker_nonadditional, + meas_json, + meas_data, + meas_nonstringpdata, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + meas_expected_data, + non_add_meas_data, + add_meas_data, + event_Timestamp, +): domain = "measurement" - jobj={"test":[1,2,3],'networkSliceArray':[1,2,3]} - means_ex='measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000' - influxdb_connector.process_measurement_events('measurement',jobj, meas_data, meas_nonstringpdata, event_Id, - start_Epoch_Microsec, last_Epoch_Microsec) - influxdb_connector.process_additional_measurements(domain,event_Id, start_Epoch_Microsec, last_Epoch_Microsec) - mocker_nonadditional.process_nonadditional_measurements([], 'measurementnicperformance', event_Id, start_Epoch_Microsec, last_Epoch_Microsec) + jobj = {"test": [1, 2, 3], "networkSliceArray": [1, 2, 3]} + means_ex = "measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000" + influxdb_connector.process_measurement_events( + "measurement", + jobj, + meas_data, + meas_nonstringpdata, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) + influxdb_connector.process_additional_measurements( + domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_nonadditional.process_nonadditional_measurements( + [], + "measurementnicperformance", + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) mocker_send_to_influxdb.assert_called_with(domain, means_ex) - -@patch('influxdb_connector.process_nonadditional_measurements') -@patch('influxdb_connector.process_additional_measurements') -@patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_measurement_events_elif(mock_time,mocker_send_to_influxdb, mocker_additional, mocker_nonadditional, meas_json, - meas_data, meas_nonstringpdata, event_Id, start_Epoch_Microsec, last_Epoch_Microsec, - meas_expected_data, non_add_meas_data, add_meas_data, event_Timestamp): +@patch("influxdb_connector.process_nonadditional_measurements") +@patch("influxdb_connector.process_additional_measurements") +@patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_measurement_events_elif( + mock_time, + mocker_send_to_influxdb, + mocker_additional, + mocker_nonadditional, + meas_json, + meas_data, + meas_nonstringpdata, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + meas_expected_data, + non_add_meas_data, + add_meas_data, + event_Timestamp, +): domain = "measurement" - jobj={"test":{1:26,2:56},'networkSliceArray':{1:4,2:7}} - means_ex='measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1=26,2=56,1=4,2=7 1639985333218840000' - influxdb_connector.process_measurement_events('measurement',jobj, meas_data, meas_nonstringpdata, event_Id, - start_Epoch_Microsec, last_Epoch_Microsec) - influxdb_connector.process_additional_measurements(domain,event_Id, start_Epoch_Microsec, last_Epoch_Microsec) - mocker_additional.process_additional_measurements(add_meas_data.get('additionalMeasurements'), 'measurementadditionalmeasurements', - event_Id, start_Epoch_Microsec, last_Epoch_Microsec) - - mocker_nonadditional.process_nonadditional_measurements([], 'measurementnicperformance', event_Id, start_Epoch_Microsec, last_Epoch_Microsec) + jobj = {"test": {1: 26, 2: 56}, "networkSliceArray": {1: 4, 2: 7}} + means_ex = "measurement,domain=measurement,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,eventName=measurement_O_RAN_COMPONENT_PM15min,eventType=O_RAN_COMPONENT_PM15min,priority=Low,reportingEntityName=ORAN-DEV,sourceName=O-RAN-FH-IPv6-01,intervalStartTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:00:00\\ +0000,intervalEndTime=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1=26,2=56,1=4,2=7 1639985333218840000" + influxdb_connector.process_measurement_events( + "measurement", + jobj, + meas_data, + meas_nonstringpdata, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) + influxdb_connector.process_additional_measurements( + domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_additional.process_additional_measurements( + add_meas_data.get("additionalMeasurements"), + "measurementadditionalmeasurements", + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) + + mocker_nonadditional.process_nonadditional_measurements( + [], + "measurementnicperformance", + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) mocker_send_to_influxdb.assert_called_with(domain, means_ex) - @pytest.fixture def add_meas_expected_pdata(): - additional_expected_pdata = 'measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,name=LP-MWPS-RADIO-2,es=0,ses=1,cses=0,unavailability=0 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000' - return additional_expected_pdata - + additional_expected_pdata = "measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,name=LP-MWPS-RADIO-2,es=0,ses=1,cses=0,unavailability=0 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000" + return additional_expected_pdata # ## process_additional_measurements unit test_case -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_additional_measurements_called(mock_time, mocker_send_to_influxdb, event_Id, start_Epoch_Microsec, last_Epoch_Microsec, - add_meas_data, add_meas_expected_pdata, event_Timestamp): +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_additional_measurements_called( + mock_time, + mocker_send_to_influxdb, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + add_meas_data, + add_meas_expected_pdata, + event_Timestamp, +): payload = add_meas_data - domain = 'measurementadditionalmeasurements' + domain = "measurementadditionalmeasurements" for key, val in payload.items(): - if isinstance(val, list): - if key == 'additionalMeasurements': - influxdb_connector.process_additional_measurements(payload.get('additionalMeasurements'), domain, - event_Id, start_Epoch_Microsec, last_Epoch_Microsec) - mocker_send_to_influxdb.assert_called_with(domain, add_meas_expected_pdata) - - - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_additional_measurements(mock_time, mocker_send_to_influxdb, event_Id, start_Epoch_Microsec, last_Epoch_Microsec, - add_meas_data, add_meas_expected_pdata, event_Timestamp): - payload = [{1:23}] - domain = 'measurementadditionalmeasurements' - expected_pdata='measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1=23 1639985333218840000' - influxdb_connector.process_additional_measurements(payload, domain, - event_Id, start_Epoch_Microsec, last_Epoch_Microsec) + if isinstance(val, list): + if key == "additionalMeasurements": + influxdb_connector.process_additional_measurements( + payload.get("additionalMeasurements"), + domain, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) + mocker_send_to_influxdb.assert_called_with( + domain, add_meas_expected_pdata + ) + + +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_additional_measurements( + mock_time, + mocker_send_to_influxdb, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + add_meas_data, + add_meas_expected_pdata, + event_Timestamp, +): + payload = [{1: 23}] + domain = "measurementadditionalmeasurements" + expected_pdata = "measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1=23 1639985333218840000" + influxdb_connector.process_additional_measurements( + payload, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) mocker_send_to_influxdb.assert_called_with(domain, expected_pdata) - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_additional_measurements_else(mock_time, mocker_send_to_influxdb, event_Id, start_Epoch_Microsec, last_Epoch_Microsec, - add_meas_data, add_meas_expected_pdata, event_Timestamp): - payload = [{1:{1:{67}}}] - domain = 'measurementadditionalmeasurements' - expected_pdata='measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1={67} 1639985333218840000' - influxdb_connector.process_additional_measurements(payload, domain, - event_Id, start_Epoch_Microsec, last_Epoch_Microsec) +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_additional_measurements_else( + mock_time, + mocker_send_to_influxdb, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + add_meas_data, + add_meas_expected_pdata, + event_Timestamp, +): + payload = [{1: {1: {67}}}] + domain = "measurementadditionalmeasurements" + expected_pdata = "measurementadditionalmeasurements,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,1={67} 1639985333218840000" + influxdb_connector.process_additional_measurements( + payload, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) mocker_send_to_influxdb.assert_called_with(domain, expected_pdata) - - - @pytest.fixture def non_add_expected_data(): - non_additional_expected_pdata = "measurementcpuusage,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,name=LP-MWPS-RADIO-2 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,hashMap={'es': '0', 'ses': '1', 'cses': '0', 'unavailability': '0'} 1639985333218840000" - return non_additional_expected_pdata + non_additional_expected_pdata = "measurementcpuusage,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,name=LP-MWPS-RADIO-2 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,hashMap={'es': '0', 'ses': '1', 'cses': '0', 'unavailability': '0'} 1639985333218840000" + return non_additional_expected_pdata # ## process_nonadditional_measurements unit test_cases. -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_nonadditional_measurements_called(mock_time, mocker_send_to_influxdb, event_Id, start_Epoch_Microsec, - last_Epoch_Microsec, non_add_meas_data, non_add_expected_data, event_Timestamp): - domain = 'measurementcpuusage' - source = 'unkown' - - influxdb_connector.process_nonadditional_measurements(non_add_meas_data.get('measurementcpuusage'), domain, event_Id, - start_Epoch_Microsec, last_Epoch_Microsec) +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_nonadditional_measurements_called( + mock_time, + mocker_send_to_influxdb, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + non_add_meas_data, + non_add_expected_data, + event_Timestamp, +): + domain = "measurementcpuusage" + source = "unkown" + + influxdb_connector.process_nonadditional_measurements( + non_add_meas_data.get("measurementcpuusage"), + domain, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) mocker_send_to_influxdb.assert_called_with(domain, non_add_expected_data) +# ------------------------------------------------------------------------- +# ## process_stndDefinedFields_events unit test_case +# ------------------------------------------------------------------------ + + +@pytest.fixture +def stndDefined_json(): + std_json = { + "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm", + "data": { + "href": "href1", + "uri": "1", + "notificationId": 0, + "notificationType": "notifyNewAlarm", + "eventTime": "2022-06-22T12:43:50.579315Z", + "trendIndication": "MORE_SEVERE", + "thresholdInfo": {"observedMeasurement": "new", "observedValue": 123}, + "monitoredAttributes": {"interface": "LP-MWPS-RADIO"}, + "proposedRepairActions": "12345", + "additionalInformation": { + "eventTime": "2022-06-22T12:43:50.579315Z", + "equipType": "1234", + "vendor": "VENDORA", + "model": "1234 BestInClass", + }, + }, + "stndDefinedFieldsVersion": "1.0", + } + return std_json + + +@pytest.fixture +def std_nonstringpdata(): + nonstrdata = " sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218," + return str(nonstrdata) + + +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +@mock.patch("influxdb_connector.send_to_influxdb") +def test_process_stndDefinedFields_events( + mocker_send_to_influxdb, + mock_time, + std_nonstringpdata, + stndDefined_json, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + event_Timestamp, +): + domain = "stndDefined" + nonstrdata = "stndDefined,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,schemaReference=https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm,href=href1,uri=1,notificationType=notifyNewAlarm,eventTime=2022-06-22T12:43:50.579315Z,trendIndication=MORE_SEVERE,observedMeasurement=new,interface=LP-MWPS-RADIO,proposedRepairActions=12345,additionalInformation_eventTime=2022-06-22T12:43:50.579315Z,additionalInformation_equipType=1234,additionalInformation_vendor=VENDORA,additionalInformation_model=1234\\ BestInClass,stndDefinedFieldsVersion=1.0 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,notificationId=0,observedValue=123 1639985333218840000" + influxdb_connector.process_stndDefinedFields_events( + stndDefined_json, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstrdata) + + +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +@mock.patch("influxdb_connector.send_to_influxdb") +def test_process_stndDefinedFields_events_ins( + mocker_send_to_influxdb, + mock_time, + std_nonstringpdata, + stndDefined_json, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + event_Timestamp, +): + domain = "stndDefined" + stndDefined_json = {"events": "test1"} + nonstrdata = "stndDefined,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,events=test1 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000" + influxdb_connector.process_stndDefinedFields_events( + stndDefined_json, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstrdata) + + +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +@mock.patch("influxdb_connector.send_to_influxdb") +def test_process_stndDefinedFields_events_dic( + mocker_send_to_influxdb, + mock_time, + std_nonstringpdata, + stndDefined_json, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + event_Timestamp, +): + domain = "stndDefined" + stndDefined_json = {"events": {"commonEventHeader": "test"}} + nonstrdata = "stndDefined,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,commonEventHeader=test startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000" + influxdb_connector.process_stndDefinedFields_events( + stndDefined_json, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstrdata) + + +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +@mock.patch("influxdb_connector.send_to_influxdb") +def test_process_stndDefinedFields_events_addinfo( + mocker_send_to_influxdb, + mock_time, + std_nonstringpdata, + stndDefined_json, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + event_Timestamp, +): + domain = "stndDefined" + stndDefined_json = {"events": {"additionalInformation": {"test1": "test2"}}} + nonstrdata = "stndDefined,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,additionalInformation_test1=test2 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000" + influxdb_connector.process_stndDefinedFields_events( + stndDefined_json, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstrdata) + + +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +@mock.patch("influxdb_connector.send_to_influxdb") +def test_process_stndDefinedFields_events_addinfo_else( + mocker_send_to_influxdb, + std_nonstringpdata, + stndDefined_json, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + event_Timestamp, +): + domain = "stndDefined" + stndDefined_json = {"events": {"additionalInformation": {"test1": 2}}} + nonstrdata = "stndDefined,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,test1=2 1639985333218840000" + influxdb_connector.process_stndDefinedFields_events( + stndDefined_json, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstrdata) + + +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +@mock.patch("influxdb_connector.send_to_influxdb") +def test_process_stndDefinedFields_events_corel( + mocker_send_to_influxdb, + std_nonstringpdata, + stndDefined_json, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + event_Timestamp, +): + domain = "stndDefined" + stndDefined_json = {"events": {"correlatedNotifications": [{"test1": "test2"}]}} + nonstrdata = "stndDefined,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None,test1=test2 startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000 1639985333218840000" + influxdb_connector.process_stndDefinedFields_events( + stndDefined_json, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstrdata) + + +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +@mock.patch("influxdb_connector.send_to_influxdb") +def test_process_stndDefinedFields_events_corel_else( + mocker_send_to_influxdb, + std_nonstringpdata, + stndDefined_json, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + event_Timestamp, +): + domain = "stndDefined" + stndDefined_json = {"events": {"correlatedNotifications": [{2: 2}]}} + nonstrdata = "stndDefined,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,2=2 1639985333218840000" + influxdb_connector.process_stndDefinedFields_events( + stndDefined_json, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstrdata) + + +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +@mock.patch("influxdb_connector.send_to_influxdb") +def test_process_stndDefinedFields_events_else( + mocker_send_to_influxdb, + std_nonstringpdata, + stndDefined_json, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + event_Timestamp, +): + domain = "stndDefined" + stndDefined_json = {"events": {"commonEventHeader": 2}} + nonstrdata = "stndDefined,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,commonEventHeader=2 1639985333218840000" + influxdb_connector.process_stndDefinedFields_events( + stndDefined_json, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstrdata) + + +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +@mock.patch("influxdb_connector.send_to_influxdb") +def test_process_stndDefinedFields( + mocker_send_to_influxdb, + std_nonstringpdata, + stndDefined_json, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + event_Timestamp, +): + domain = "stndDefined" + stndDefined_json = {"events": 2} + nonstrdata = "stndDefined,eventId=O-RAN-FH-IPv6-01_1639984500_PM15min,system=None startEpochMicrosec=1639983600000,lastEpochMicrosec=1639984500000,events=2 1639985333218840000" + influxdb_connector.process_stndDefinedFields_events( + stndDefined_json, domain, event_Id, start_Epoch_Microsec, last_Epoch_Microsec + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstrdata) + + # ------------------------------------------------------------------------------ # Address of threshold event unit test_case # ------------------------------------------------------------------------------ + @pytest.fixture def thre_json(): - jobj = {'thresholdCrossingFieldsVersion': '4.0', 'additionalParameters': [{'criticality': 'MAJ', 'hashMap': - {'additionalProperties': 'up-and-down'}, 'thresholdCrossed': 'packetLoss'}], 'alertAction': 'SET', - 'alertDescription': 'TCA', 'alertType': 'INTERFACE-ANOMALY', 'alertValue': '1OSF', - 'associatedAlertIdList': ['loss-of-signal'], 'collectionTimestamp': 'Mon, 20 Dec 2021 07:28:56 +0000', - 'dataCollector': 'data-lake', 'elementType': '1OSF', 'eventSeverity': 'WARNING', 'eventStartTimestamp': - 'Mon, 20 Dec 2021 07:15:00 +0000', 'interfaceName': '', 'networkService': 'from-a-to-b', - 'possibleRootCause': 'always-the-others', 'additionalFields': {'eventTime': '2021-12-20T07:28:56.443218Z', - 'equipType': '1OSF', 'vendor': '', 'model': ''}} - return jobj + jobj = { + "thresholdCrossingFieldsVersion": "4.0", + "additionalParameters": [ + { + "criticality": "MAJ", + "hashMap": {"additionalProperties": "up-and-down"}, + "thresholdCrossed": "packetLoss", + } + ], + "alertAction": "SET", + "alertDescription": "TCA", + "alertType": "INTERFACE-ANOMALY", + "alertValue": "1OSF", + "associatedAlertIdList": ["loss-of-signal"], + "collectionTimestamp": "Mon, 20 Dec 2021 07:28:56 +0000", + "dataCollector": "data-lake", + "elementType": "1OSF", + "eventSeverity": "WARNING", + "eventStartTimestamp": "Mon, 20 Dec 2021 07:15:00 +0000", + "interfaceName": "", + "networkService": "from-a-to-b", + "possibleRootCause": "always-the-others", + "additionalFields": { + "eventTime": "2021-12-20T07:28:56.443218Z", + "equipType": "1OSF", + "vendor": "", + "model": "", + }, + } + return jobj @pytest.fixture def threshold_data(): - data = 'thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1' - return str(data) + data = "thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1" + return str(data) @pytest.fixture def thres_nonstringpdata(): - nonstringpdata = ' sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,' - return str(nonstringpdata) - + nonstringpdata = " sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218," + return str(nonstringpdata) -def test_process_thresholdCrossingAlert_event_called(thre_json, threshold_data, thres_nonstringpdata, event_Timestamp): +def test_process_thresholdCrossingAlert_event_called( + thre_json, threshold_data, thres_nonstringpdata, event_Timestamp +): domain = "thresholdCrossingAlert" - with patch('influxdb_connector.process_thresholdCrossingAlert_event') as func: - influxdb_connector.process_thresholdCrossingAlert_event(domain, thre_json, threshold_data, thres_nonstringpdata) - func.assert_called_with(domain, thre_json, threshold_data, thres_nonstringpdata) - - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_thresholdCrossingAlert_event(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp): - jobj= {"test":"test"} - pdata= 'thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None' + with patch("influxdb_connector.process_thresholdCrossingAlert_event") as func: + influxdb_connector.process_thresholdCrossingAlert_event( + domain, thre_json, threshold_data, thres_nonstringpdata + ) + func.assert_called_with(domain, thre_json, threshold_data, thres_nonstringpdata) + + +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_thresholdCrossingAlert_event( + mock_pro, + mocker_send_to_influxdb, + thre_json, + threshold_data, + thres_nonstringpdata, + event_Timestamp, +): + jobj = {"test": "test"} + pdata = "thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None" domain = "thresholdCrossingAlert" - thres_data='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,system=None,thresholdCrossingFieldsVersion=4.0,criticality=MAJ,additionalProperties=up-and-down,thresholdCrossed=packetLoss,alertAction=SET,alertDescription=TCA,alertType=INTERFACE-ANOMALY,alertValue=1OSF,associatedAlertIdList=loss-of-signal,collectionTimestamp=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:28:56\\ +0000,dataCollector=data-lake,elementType=1OSF,eventSeverity=WARNING,eventStartTimestamp=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,networkService=from-a-to-b,possibleRootCause=always-the-others,eventTime=2021-12-20T07:28:56.443218Z,equipType=1OSF sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218 1639985333218840000' - influxdb_connector.process_thresholdCrossingAlert_event(domain,thre_json, pdata, thres_nonstringpdata) + thres_data = "thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,system=None,thresholdCrossingFieldsVersion=4.0,criticality=MAJ,additionalProperties=up-and-down,thresholdCrossed=packetLoss,alertAction=SET,alertDescription=TCA,alertType=INTERFACE-ANOMALY,alertValue=1OSF,associatedAlertIdList=loss-of-signal,collectionTimestamp=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:28:56\\ +0000,dataCollector=data-lake,elementType=1OSF,eventSeverity=WARNING,eventStartTimestamp=Mon\\,\\ 20\\ Dec\\ 2021\\ 07:15:00\\ +0000,networkService=from-a-to-b,possibleRootCause=always-the-others,eventTime=2021-12-20T07:28:56.443218Z,equipType=1OSF sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218 1639985333218840000" + influxdb_connector.process_thresholdCrossingAlert_event( + domain, thre_json, pdata, thres_nonstringpdata + ) mocker_send_to_influxdb.assert_called_with(domain, thres_data) -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_thresholdCrossingAlert_elif(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp): - jobj={'additionalParameters': [{'addParameter': 'MAJ', 'abc': - {'additionalProperties': 'up-and-down'}, 'thresholdCrossed': 'packetLoss'}],} +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_thresholdCrossingAlert_elif( + mock_pro, + mocker_send_to_influxdb, + thre_json, + threshold_data, + thres_nonstringpdata, + event_Timestamp, +): + jobj = { + "additionalParameters": [ + { + "addParameter": "MAJ", + "abc": {"additionalProperties": "up-and-down"}, + "thresholdCrossed": "packetLoss", + } + ], + } domain = "thresholdCrossingAlert" - nonstr="thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,addParameter=MAJ,thresholdCrossed=packetLoss sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,abc={'additionalProperties': 'up-and-down'} 1639985333218840000" - influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata) - mocker_send_to_influxdb.assert_called_with(domain, nonstr) - - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_thresholdCrossingAlert_elif_elif(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp): - jobj={'additionalParameters': [{'addParameter': 'MAJ', 'hashMap': - {'additionalProperties':67}, 'thresholdCrossed': 'packetLoss'}],} + nonstr = "thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,addParameter=MAJ,thresholdCrossed=packetLoss sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,abc={'additionalProperties': 'up-and-down'} 1639985333218840000" + influxdb_connector.process_thresholdCrossingAlert_event( + domain, jobj, threshold_data, thres_nonstringpdata + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstr) + + +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_thresholdCrossingAlert_elif_elif( + mock_pro, + mocker_send_to_influxdb, + thre_json, + threshold_data, + thres_nonstringpdata, + event_Timestamp, +): + jobj = { + "additionalParameters": [ + { + "addParameter": "MAJ", + "hashMap": {"additionalProperties": 67}, + "thresholdCrossed": "packetLoss", + } + ], + } domain = "thresholdCrossingAlert" - nonstr='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,addParameter=MAJ,thresholdCrossed=packetLoss sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,additionalProperties=67 1639985333218840000' - influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata) - mocker_send_to_influxdb.assert_called_with(domain, nonstr) - - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_thresholdCrossingAlert_event_elif(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp): - jobj= {1:2} + nonstr = "thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,addParameter=MAJ,thresholdCrossed=packetLoss sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,additionalProperties=67 1639985333218840000" + influxdb_connector.process_thresholdCrossingAlert_event( + domain, jobj, threshold_data, thres_nonstringpdata + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstr) + + +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_thresholdCrossingAlert_event_elif( + mock_pro, + mocker_send_to_influxdb, + thre_json, + threshold_data, + thres_nonstringpdata, + event_Timestamp, +): + jobj = {1: 2} domain = "thresholdCrossingAlert" - nonstr='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,1=2 1639985333218840000' - influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata) - mocker_send_to_influxdb.assert_called_with(domain,nonstr) - - -@mock.patch('influxdb_connector.send_to_influxdb') -@mock.patch('influxdb_connector.process_time', return_value='1639985333218840000') -def test_process_thresholdCrossingAlert_event_nonstr(mock_pro,mocker_send_to_influxdb,thre_json, threshold_data, thres_nonstringpdata, event_Timestamp): - jobj= {'additionalFields': {'eventTime': 2}} + nonstr = "thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,1=2 1639985333218840000" + influxdb_connector.process_thresholdCrossingAlert_event( + domain, jobj, threshold_data, thres_nonstringpdata + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstr) + + +@mock.patch("influxdb_connector.send_to_influxdb") +@mock.patch("influxdb_connector.process_time", return_value="1639985333218840000") +def test_process_thresholdCrossingAlert_event_nonstr( + mock_pro, + mocker_send_to_influxdb, + thre_json, + threshold_data, + thres_nonstringpdata, + event_Timestamp, +): + jobj = {"additionalFields": {"eventTime": 2}} domain = "thresholdCrossingAlert" - nonstr='thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,eventTime=2 1639985333218840000' - influxdb_connector.process_thresholdCrossingAlert_event(domain,jobj, threshold_data, thres_nonstringpdata) - mocker_send_to_influxdb.assert_called_with(domain,nonstr) + nonstr = "thresholdCrossingAlert,domain=thresholdCrossingAlert,eventId=__TCA,eventName=thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA,eventType=O_RAN_COMPONENT_TCA,priority=High,reportingEntityName=ORAN-DEV,nfNamingCode=1OSF,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None sequence=0,startEpochMicrosec=1639985336443218,lastEpochMicrosec=1639985336443218,eventTime=2 1639985333218840000" + influxdb_connector.process_thresholdCrossingAlert_event( + domain, jobj, threshold_data, thres_nonstringpdata + ) + mocker_send_to_influxdb.assert_called_with(domain, nonstr) -#................................................................................. +# ................................................................................. # ## save_event_in_db unit test_cases. -#.................................................................................... - -@patch('influxdb_connector.logger') -@pytest.mark.parametrize("key", [("heartbeat"), ("pnfRegistration"), ("measurement"), ("fault"), ("thresholdCrossingAlert")]) -def test_save_event_in_db(mock_logger, key, hb_json, hb_data, hb_nonstringpdata, pnf_json, pnf_data, pnf_nonstringpdata, - meas_json, meas_data, meas_nonstringpdata, event_Id, start_Epoch_Microsec, last_Epoch_Microsec, - flt_json, flt_data, flt_nonstringpdata, - thre_json, threshold_data, thres_nonstringpdata): - - if(key == 'heartbeat'): +# .................................................................................... + + +@patch("influxdb_connector.logger") +@pytest.mark.parametrize( + "key", + [ + ("heartbeat"), + ("pnfRegistration"), + ("measurement"), + ("fault"), + ("thresholdCrossingAlert"), + ("stndDefinedFields"), + ], +) +def test_save_event_in_db( + mock_logger, + key, + hb_json, + hb_data, + hb_nonstringpdata, + pnf_json, + pnf_data, + pnf_nonstringpdata, + meas_json, + meas_data, + meas_nonstringpdata, + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + flt_json, + flt_data, + flt_nonstringpdata, + thre_json, + threshold_data, + thres_nonstringpdata, + stndDefined_json, +): + + if key == "heartbeat": data_set = getEvent("heartbeat") - with patch('influxdb_connector.process_heartbeat_events') as func: - influxdb_connector.save_event_in_db(data_set) - func.assert_called_with('heartbeat', hb_json, hb_data, hb_nonstringpdata) - - elif(key == 'pnfRegistration'): - data_set = getEvent("pnfRegistration") - with patch('influxdb_connector.process_pnfRegistration_event') as func: - influxdb_connector.save_event_in_db(data_set) - func.assert_called_with('pnfRegistration', pnf_json, pnf_data, pnf_nonstringpdata) - - elif(key == 'measurement'): - data_set = getEvent("measurement") - with patch('influxdb_connector.process_measurement_events') as func: - influxdb_connector.save_event_in_db(data_set) - func.assert_called_with('measurement', meas_json, meas_data, meas_nonstringpdata, event_Id, int(start_Epoch_Microsec), - int(last_Epoch_Microsec)) - - elif(key == 'fault'): - data_set = getEvent("fault") - with patch('influxdb_connector.process_fault_event') as func: - influxdb_connector.save_event_in_db(data_set) - func.assert_called_with('fault', flt_json, flt_data, flt_nonstringpdata) - - elif(key == 'thresholdCrossingAlert'): - data_set = getEvent("thresholdCrossingAlert") - with patch('influxdb_connector.process_thresholdCrossingAlert_event') as func: - influxdb_connector.save_event_in_db(data_set) - func.assert_called_with('thresholdCrossingAlert', thre_json, threshold_data, thres_nonstringpdata) - - - -@patch('influxdb_connector.logger') + with patch("influxdb_connector.process_heartbeat_events") as func: + influxdb_connector.save_event_in_db(data_set) + func.assert_called_with("heartbeat", hb_json, hb_data, hb_nonstringpdata) + + elif key == "pnfRegistration": + data_set = getEvent("pnfRegistration") + with patch("influxdb_connector.process_pnfRegistration_event") as func: + influxdb_connector.save_event_in_db(data_set) + func.assert_called_with( + "pnfRegistration", pnf_json, pnf_data, pnf_nonstringpdata + ) + + elif key == "measurement": + data_set = getEvent("measurement") + with patch("influxdb_connector.process_measurement_events") as func: + influxdb_connector.save_event_in_db(data_set) + func.assert_called_with( + "measurement", + meas_json, + meas_data, + meas_nonstringpdata, + event_Id, + int(start_Epoch_Microsec), + int(last_Epoch_Microsec), + ) + + elif key == "fault": + data_set = getEvent("fault") + with patch("influxdb_connector.process_fault_event") as func: + influxdb_connector.save_event_in_db(data_set) + func.assert_called_with("fault", flt_json, flt_data, flt_nonstringpdata) + + elif key == "thresholdCrossingAlert": + data_set = getEvent("thresholdCrossingAlert") + with patch("influxdb_connector.process_thresholdCrossingAlert_event") as func: + influxdb_connector.save_event_in_db(data_set) + func.assert_called_with( + "thresholdCrossingAlert", + thre_json, + threshold_data, + thres_nonstringpdata, + ) + + elif key == "stndDefinedFields": + data_set = getEvent("stndDefinedFields") + with patch("influxdb_connector.process_stndDefinedFields_events") as func: + influxdb_connector.save_event_in_db(data_set) + func.assert_called_with( + stndDefined_json, + "stndDefined", + event_Id, + start_Epoch_Microsec, + last_Epoch_Microsec, + ) + + +@patch("influxdb_connector.logger") def test_save_event_in_db_localhost(mock_logger): - data_set = {'event':{'commonEventHeader':{'reportingEntityName':'LOCALHOST','domain':'heartbeat','startEpochMicrosec':'1639965574292938','sourceId':'1223'}}} + data_set = { + "event": { + "commonEventHeader": { + "reportingEntityName": "LOCALHOST", + "domain": "heartbeat", + "startEpochMicrosec": "1639965574292938", + "sourceId": "1223", + } + } + } try: - res=influxdb_connector.save_event_in_db(json.dumps(data_set)) + res = influxdb_connector.save_event_in_db(json.dumps(data_set)) except Exception: - pytest.fail('Exception occured while saving data') - assert res==None + pytest.fail("Exception occured while saving data") + assert res == None -@patch('influxdb_connector.logger') +@patch("influxdb_connector.logger") def test_save_event_in_db_comman(mock_logger): - data_set = {'event':{'commonEventHeader':{'reportingEntityName':'LOCALHOST','domain':'heartbeat','startEpochMicrosec':'1639965574292938','sourceId':'1223','internalHeaderFields':{1:78}}}} + data_set = { + "event": { + "commonEventHeader": { + "reportingEntityName": "LOCALHOST", + "domain": "heartbeat", + "startEpochMicrosec": "1639965574292938", + "sourceId": "1223", + "internalHeaderFields": {1: 78}, + } + } + } try: - res=influxdb_connector.save_event_in_db(json.dumps(data_set)) + res = influxdb_connector.save_event_in_db(json.dumps(data_set)) except Exception: - pytest.fail('Exception occured while saving data') - assert res==None + pytest.fail("Exception occured while saving data") + assert res == None - @pytest.fixture def event(): - event="domain" + event = "domain" return event @pytest.fixture def p_data(): - p_data='heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1' + p_data = "heartbeat,domain=heartbeat,eventId=ORAN-DEV_2021-12-20T07:29:34.292938Z,eventName=heartbeat_O_RAN_COMPONENT,eventType=O_RAN_COMPONENT,nfNamingCode=SDN-Controller,nfVendorName=O-RAN-SC-OAM,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1" return p_data -#send_to_influxdb unittest -@patch('influxdb_connector.requests.post') -@patch('influxdb_connector.logger') -def test_send_to_influxdb(mock_logger,mock_post,event,p_data): - mock_post.return_value.status_code=201 +# send_to_influxdb unittest +@patch("influxdb_connector.requests.post") +@patch("influxdb_connector.logger") +def test_send_to_influxdb(mock_logger, mock_post, event, p_data): + mock_post.return_value.status_code = 201 try: - res=influxdb_connector.send_to_influxdb(event,p_data) + res = influxdb_connector.send_to_influxdb(event, p_data) except Exception: - pytest.fail('Exception occured while saving data') - assert res==None + pytest.fail("Exception occured while saving data") + assert res == None -- 2.16.6