@mock.patch('argparse.ArgumentParser.parse_args',
return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+ @mock.patch('monitor.logger', logging.getLogger('monitor'))
def test_main(server,parser,body):
argv=None
- logger = logging.getLogger('monitor')
- logger.setLevel(logging.ERROR)
- with mock.patch.object(logger,'error') as mock_error:
- monitor.main(argv=None)
- #server.assert_called_once_with()
- mock_error.assert_called_once_with('Main loop exited unexpectedly!')
+ result=monitor.main(argv=None)
+ assert 0==result
+
-#@pytest.mark.skip
+
+#test main() function argv is None
+@patch('monitor.logger')
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=2, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_argv(server,parser,logger,body):
+ argv=''
+ logger.return_value=logging.getLogger('monitor')
+ try:
+ result=monitor.main(argv)
+ except TypeError:
+ assert result == None
+ except Exception:
+ pytest.fail('unexcepted error')
+
+
+
+#test platform.system in main
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_platform(server,parser,body):
+ argv=None
+ sys = mock.MagicMock()
+ try:
+ with patch('platform.system', MagicMock(return_value='Windows')):
+ res=monitor.main(argv)
+ except RuntimeError:
+ assert res == None
+ except Exception:
+ pytest.fail('Exiting because of exception')
+
+
+#test vel_port in main
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_port_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_vel_port(server,parser,body):
+ argv=''
+ res=monitor.main(argv)
+ assert res == 2
+
+
+
+# test vel_path in main
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_path(server,parser,body):
+ argv=None
+ try:
+ result = monitor.main(argv)
+ except RuntimeError:
+ assert result == None
+ except Exception:
+ pytest.fail('fail beacuase of exception')
+
+
+
+@pytest.fixture
+def vel_schema_path():
+ config = configparser.ConfigParser()
+ config_file=get_config_path()
+ config.read(config_file)
+ ref = config.get('default', 'schema_file')
+ return ref
+
+# check listener() vel_schema, if it exists
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_vel_schema_path(server,parser,vel_schema_path):
+ argv=None
+ with mock.patch('os.path.exists') as m:
+ m.return_value=vel_schema_path
+ result=monitor.main(argv)
+ assert 0==result
+
+
+
+#test unhandle exception
+@patch('monitor.DEBUG',True)
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_wrong_config_port_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main_unhandle_exception(server,parser,body):
+ argv=None
+ result=None
+ try:
+ result = monitor.main(argv)
+ except RuntimeError:
+ assert result == None
+ except Exception:
+ pytest.fail('Exiting because of exception')
+
+
+
+#check test_listener() function
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ,mock_start_response,schema))
+ assert ['']==result
+
+
+
+#check test_listener() GET method
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_get_method(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "GET","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ response= ['{"event": {"commonEventHeader": {"domain": "measurement", "eventId": "11", "eventName": "", "eventType": "platform", "lastEpochMicrosec": 0, "priority": "Normal", "reportingEntityId": "localhost", "reportingEntityName": "localhost", "sequence": 0, "sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a", "sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a", "startEpochMicrosec": 1642961518.919, "version": "4.0", "vesEventListenerVersion": "7.2.1"}}}']
+ result=list(monitor.test_listener(environ,mock_start_response,schema))
+ assert response==result
+
+
+#test test_listener() jsonschema error
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_schema_error(mocker,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ project_path=os.getcwd()
+ schema_path =os.path.join(project_path,"tests/collector/schema.json")
+ schema=json.load(open(schema_path, 'r'))
+ result=list(monitor.test_listener(environ, mock_start_response,schema))
+ assert ['']==result
+
+
+#test test_listener() jsonschema validation error
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_schema_validation_error(mocker,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ body={"event": {"commonEventHeader": {"domain": 6,"eventId": "11","eventName": "","eventType": "platform","lastEpochMicrosec": 0,"priority": "Normal","reportingEntityId": "localhost","reportingEntityName": "localhost","sequence": 0,"sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a","sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a","startEpochMicrosec": 1642961518.919,"version": "4.0","vesEventListenerVersion": "7.2.1"}}}
+ body=json.dumps(body)
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ, mock_start_response,schema))
+ assert ['']==result
+
+
+
+@pytest.fixture
+def schema_wrong():
+ schema_path ="/home/ves-dev/ves/tests/collector/schema.json"
+ schema=json.load(open(schema_path, 'r'))
+ return schema
+
+
+#test test_listener() exception TestControl input not valid
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_exception(mocker,mock_input,body,start_response,schema_wrong):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ body={}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ, mock_start_response,schema_wrong))
+ assert ['']==result
+
+
+
+#check test_listener() Missing schema
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_listener_Missing_schema(mocker,mock_input,body,start_response):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ, mock_start_response,None))
+ assert ['']==result
+
+
+#check test_listener() Invalid Input
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_TestControl_Listener_Input_invalid(mocker,mock_input,body,start_response):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ body={}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result=list(monitor.test_listener(environ, mock_start_response,None))
+ assert ['']==result
+
+
+#test listener() get method
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener_get_method(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "GET","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ result = list(monitor.listener(environ, mock_start_response, schema))
+ assert [b'POST /eventListener/v7'] == result
+
+
+
+#check save_event_in_kafka() function
@mock.patch('monitor.kafka_server')
+ @mock.patch('monitor.logger', logging.getLogger('monitor'))
def test_save_event_in_kafka(mocker,data_set,topic_name):
data_set_string=json.dumps(data_set)
logger = logging.getLogger('monitor')
with mock.patch.object(logger,'info') as mock_info:
monitor.save_event_in_kafka(data_set_string)
mock_info.assert_called_once_with('Got an event request for topic domain')
- #monitor.produce_events_in_kafka.assert_called_once_with(data_set,topic_name)
+# check save_event_in_kafka() topic length
+@patch('monitor.logger',logging.getLogger('monitor'))
+@mock.patch('monitor.produce_events_in_kafka')
+@mock.patch('monitor.kafka_server')
+def test_save_event_in_kafka_topic_len(server,mock_producer,topic_name):
+ body={'event':{'commonEventHeader':{'domain':''}}}
+ body=json.dumps(body)
+ monitor.save_event_in_kafka(body)
+ data_set={'event': {'commonEventHeader': {'domain': ''}}}
+ mock_producer.assert_called_once_with(data_set,'')
+
+
+
+#check produce_event_in_kafka() function
@mock.patch('monitor.KafkaProducer')
@mock.patch('monitor.producer')
+ @mock.patch('monitor.logger', logging.getLogger('monitor'))
def test_produce_events_in_kafka(mock_pro,mock_producer,data_set,topic_name):
logger = logging.getLogger('monitor')
logger.setLevel(logging.DEBUG)