--- /dev/null
+import os
+import sys
+import configparser
+
+PROJECT_PATH = os.getcwd()
+configfile_name = PROJECT_PATH+'/test_collector.conf'
+PROJECT_PATH = PROJECT_PATH[:PROJECT_PATH.rfind('/')]
+schema_file_path = os.path.join(
+ PROJECT_PATH,"docs/att_interface_definition/CommonEventFormat-v7-2-2.json")
+if os.path.isfile(configfile_name):
+ # Create the configuration file as it doesn't exist yet
+ cfgfile = open(configfile_name, "w")
+ # Add content to the file
+ Config = configparser.ConfigParser()
+ Config.add_section("default")
+ Config.set('default','schema_file', schema_file_path)
+ Config.set('default','base_schema_file', '/evel-test-collector/docs/att_interface_definition/base_schema.json')
+ Config.set('default','throttle_schema_file', 'evel-test-collector/docs/att_interface_definition/throttle_schema.json')
+ Config.set('default','test_control_schema_file', 'evel-test-collector/docs/att_interface_definition/test_control_schema.json')
+ Config.set('default','log_file', 'collector.log')
+ Config.set('default','vel_domain', '127.0.0.1')
+ Config.set('default','vel_port', '9999')
+ Config.set('default','vel_path', '')
+ Config.set('default','vel_username', '')
+ Config.set('default','vel_password', '')
+ Config.set('default','vel_topic_name', '')
+ Config.set('default','kafka_server', 'kafka-server')
+ Config.set('default','kafka_topic', '')
+ Config.write(cfgfile)
+ cfgfile.close()
+SOURCE_PATH = os.path.join(
+ PROJECT_PATH,"code/collector"
+)
+sys.path.append(SOURCE_PATH)
--- /dev/null
+[pytest]\r
+addopts= -rA\r
--- /dev/null
+[default]
+schema_file = /home/ves-dev/ves/ves/collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json
+base_schema_file = /evel-test-collector/docs/att_interface_definition/base_schema.json
+throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
+test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+log_file = collector.log
+vel_domain = 127.0.0.1
+vel_port = 9999
+vel_path =
+vel_username =
+vel_password =
+vel_topic_name =
+kafka_server = kafka-server
+kafka_topic =
+
--- /dev/null
+import os
+import pytest
+import unittest
+import monitor
+import argparse
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+from unittest import mock
+from unittest.mock import patch
+import logging
+from pytest_mock import MockerFixture
+from gevent import socket
+from json import dumps
+from gevent import pywsgi
+import gevent
+import json
+import jsonschema
+from kafka import KafkaProducer
+
+def get_path():
+ project_path = os.getcwd()
+ project_path = project_path[:project_path.rfind('/')]
+ return project_path
+
+def get_config_path():
+ project_path=get_path()
+ config_path = os.path.join(
+ project_path,"test/test_collector.conf")
+ return config_path
+
+def get_schema_path():
+ project_path=get_path()
+ schema_path = os.path.join(
+ project_path,"docs/att_interface_definition/CommonEventFormat-v7-2-2.json")
+ return schema_path
+
+@pytest.fixture
+def body():
+ body={"event": {"commonEventHeader": {"domain": "measurement","eventId": "11","eventName": "","eventType": "platform","lastEpochMicrosec": 0,"priority": "Normal","reportingEntityId": "localhost","reportingEntityName": "localhost","sequence": 0,"sourceId": "776f3123-30a5-f947-bdf5-099ec3a7577a","sourceName": "776f3123-30a5-f947-bdf5-099ec3a7577a","startEpochMicrosec": 1642961518.919,"version": "4.0","vesEventListenerVersion": "7.2.1"}}}
+ body=json.dumps(body)
+ return body
+
+@pytest.fixture
+def start_response():
+ sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ start_response=pywsgi.WSGIHandler(sock,"","")
+ return start_response
+
+@pytest.fixture
+def schema():
+ schema_path = get_schema_path()
+ schema=json.load(open(schema_path, 'r'))
+ return schema
+
+@pytest.fixture
+def data_set():
+ data_set={"event": {"commonEventHeader": {"domain": "topic" }}}
+ return data_set
+
+@pytest.fixture
+def topic_name():
+ topic_name="topic"
+ return topic_name
+
+#@pytest.mark.skip
+@mock.patch('gevent.pywsgi.Input',autospec=True)
+@mock.patch('monitor.save_event_in_kafka')
+def test_listener(mock_monitor,mock_input,body,start_response,schema):
+ mock_input.__name__ = 'read'
+ environ={"REQUEST_METHOD": "POST","wsgi.input": mock_input,"CONTENT_TYPE": "application/json","HTTP_AUTHORIZATION": "Basic dXNlcjpwYXNzd29yZA==", "CONTENT_LENGTH": "2"}
+ mock_input.read.return_value=body
+ mock_start_response= mock.Mock(start_response)
+ logger = logging.getLogger('monitor')
+ logger.setLevel(logging.DEBUG)
+ with mock.patch.object(logger,'debug') as mock_debug:
+ monitor.listener(environ,mock_start_response,schema)
+
+@mock.patch('argparse.ArgumentParser.parse_args',
+ return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
+@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+def test_main(server,parser,body):
+ argv=None
+ logger = logging.getLogger('monitor')
+ logger.setLevel(logging.ERROR)
+ with mock.patch.object(logger,'error') as mock_error:
+ monitor.main(argv=None)
+ server.assert_called_once_with()
+ mock_error.assert_called_once_with('Main loop exited unexpectedly!')
+
+#@pytest.mark.skip
+def test_save_event_in_kafka(mocker,data_set,topic_name):
+ data_set_string=json.dumps(data_set)
+ logger = logging.getLogger('monitor')
+ logger.setLevel(logging.INFO)
+ mocker.patch('monitor.produce_events_in_kafka')
+ with mock.patch.object(logger,'info') as mock_info:
+ monitor.save_event_in_kafka(data_set_string)
+ mock_info.assert_called_once_with('Got an event request for topic domain')
+ monitor.produce_events_in_kafka.assert_called_once_with(data_set,topic_name)
+
+
+@mock.patch('monitor.KafkaProducer')
+@mock.patch('monitor.producer')
+def test_produce_events_in_kafka(mock_pro,mock_producer,data_set,topic_name):
+ logger = logging.getLogger('monitor')
+ logger.setLevel(logging.DEBUG)
+ with mock.patch.object(logger,'debug') as mock_debug:
+ monitor.produce_events_in_kafka(data_set,topic_name)
+ mock_pro.send.assert_called_with(topic_name,value=data_set)
+ mock_debug.assert_called_once_with('Event has been successfully posted into kafka bus')
+
[default]
log_file = dmaap.log
-log_level =
-kafka_broker =
+log_level = error
+kafka_broker = smo-kafka:29092
enable_assert =
--- /dev/null
+import os
+import sys
+
+PROJECT_PATH = os.getcwd()
+PROJECT_PATH = PROJECT_PATH[:PROJECT_PATH.rfind('/')]
+SOURCE_PATH = os.path.join(
+ PROJECT_PATH,"code"
+)
+sys.path.append(SOURCE_PATH)
\ No newline at end of file
--- /dev/null
+import argparse
+import os
+import sys
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import configparser
+from unittest import mock
+from unittest.mock import patch
+from pytest_mock import MockerFixture
+import logging
+from app_config import AppConfig
+import pytest
+
+def get_path():
+ project_path = os.getcwd()
+ project_path = project_path[:project_path.rfind('/')]
+ return project_path
+
+def get_config_path():
+ project_path=get_path()
+ config_path = os.path.join(
+ project_path,"config/adapter.conf")
+ return config_path
+
+@pytest.fixture
+def kafkaBroker():
+ kafkaBroker='broker'
+ return kafkaBroker
+
+@pytest.fixture
+def logger():
+ logger = logging.getLogger('DMaaP')
+ logger.setLevel(logging.INFO)
+ return logger
+
+@mock.patch('app_config.AppConfig.setLogger')
+@mock.patch('argparse.ArgumentParser.parse_args',
+return_value=argparse.Namespace(config=get_config_path(),section='default'))
+def test___init__(parser,mock_setLogger):
+ AppConfig.__init__(AppConfig)
+ mock_setLogger.assert_called_with('dmaap.log','error')
+
+def test_getKafkaBroker(kafkaBroker):
+ AppConfig.kafka_broker=kafkaBroker
+ res=AppConfig.getKafkaBroker(AppConfig)
+ assert res == kafkaBroker
+
+def test_getLogger(logger):
+ AppConfig.logger=logger
+ res=AppConfig.getLogger(AppConfig)
+ assert res.getEffectiveLevel()==20
+
+def test_setLogger(logger):
+ log_file= 'dmaap.log'
+ log_level='INFO'
+ with mock.patch.object(logger,'info') as mock_info:
+ AppConfig.setLogger(AppConfig,log_file,log_level)
+ mock_info.assert_called_with('Log level INFO and log file dmaap.log : ')
--- /dev/null
+import pytest
+from unittest import mock
+from unittest.mock import patch
+from pytest_mock import MockerFixture
+from prepare_response import PrepareResponse
+from confluent_kafka import Consumer, KafkaError
+from confluent_kafka.admin import AdminClient
+from consumer import EventConsumer, TopicConsumer
+import logging
+
+@pytest.fixture
+def prepareResponse():
+ return PrepareResponse()
+
+@pytest.fixture
+def topic():
+ topic_name = "test1"
+ return topic_name
+
+@pytest.fixture
+def topic_list():
+ topic_list=ListTopics()
+ return topic_list
+
+@pytest.fixture
+def resCode():
+ responseCode=200
+ return responseCode
+
+@mock.patch('confluent_kafka.Consumer')
+def test_consumeEvents(mock_consumer,prepareResponse,topic,resCode):
+ consumergroup="test"
+ consumerid="test1"
+ limit=10
+ timeout=1
+ mock_consumer.__name__ = 'subscribe'
+ mock_consumer.__name__ = 'poll'
+ mock_consumer.poll.return_value=None
+ EventConsumer.consumeEvents(EventConsumer, prepareResponse, topic, consumergroup, consumerid,limit, timeout)
+ resMsg='[]'
+ assert resCode == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+def test_getTopics(mocker,prepareResponse,topic_list,resCode):
+ mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+ return_value=topic_list)
+ TopicConsumer.getTopics(TopicConsumer, prepareResponse)
+ resMsg='{"topics": ["test1", "test2"]}'
+ assert resCode == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+def test_listAllTopics(mocker,prepareResponse,topic_list,resCode):
+ mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+ return_value=topic_list)
+ TopicConsumer.listAllTopics(TopicConsumer, prepareResponse)
+ resMsg='{"topics": [{"topicName": "test1", "owner": "", "txenabled": false}, {"topicName": "test2", "owner": "", "txenabled": false}]}'
+ assert resCode == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+def test_getTopicDetails(mocker,prepareResponse,topic,topic_list,resCode):
+ mocker.patch('confluent_kafka.admin.AdminClient.list_topics',
+ return_value=topic_list)
+ TopicConsumer.getTopicDetails(TopicConsumer, prepareResponse,topic)
+ resMsg='{"name": "test1", "owner": "", "description": "", "readerAcl": {"enabled": true, "users": []}, "writerAcl": {"enabled": true, "users": []}}'
+ assert resCode == prepareResponse.getResponseCode()
+ assert resMsg == prepareResponse.getResponseMsg()
+
+class ListTopics:
+ topics={"test1":"value1", "test2":"value2"}
--- /dev/null
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from _pytest.mark import param
+from flask import request
+import flask
+from future.backports.urllib import response
+from mock import MagicMock
+from pathlib import Path
+import pytest
+import requests
+from unittest import mock
+from unittest.mock import patch
+from consumer import EventConsumer, TopicConsumer
+import dmaap_adapter
+from prepare_response import PrepareResponse
+import requests_mock
+
+@pytest.fixture
+def response_object():
+ return PrepareResponse()
+
+@pytest.fixture
+def prepareResponse(response_object, data_set):
+ response_object.setResponseCode("200")
+ response_object.setResponseMsg(data_set)
+ return response_object
+
+@mock.patch('flask.Flask.response_class')
+@mock.patch('dmaap_adapter.PrepareResponse')
+@mock.patch('dmaap_adapter.TopicConsumer')
+def test_get_all_topics(mock_consumer, mock_response, mock_app, prepareResponse, data_set):
+ mock_app.return_value = prepareResponse
+ res = dmaap_adapter.get_all_topics()
+ mock_consumer.getTopics(mock_response)
+ mock_consumer.getTopics.assert_called_with(mock_response)
+ assert res.responseCode == prepareResponse.getResponseCode()
+
+@mock.patch('flask.Flask.response_class')
+@mock.patch('dmaap_adapter.PrepareResponse')
+@mock.patch('dmaap_adapter.TopicConsumer')
+def test_listall_topics(mock_consumer, mock_response, mock_app, prepareResponse, data_set):
+ mock_app.return_value = prepareResponse
+ res = dmaap_adapter.listall_topics()
+ mock_consumer.listAllTopics(mock_response)
+ mock_consumer.listAllTopics.assert_called_with(mock_response)
+ assert res.responseCode == prepareResponse.getResponseCode()
+
+@pytest.mark.skip
+@mock.patch('flask.request')
+@mock.patch('flask.Flask.response_class')
+@mock.patch('dmaap_adapter.PrepareResponse')
+@mock.patch('dmaap_adapter.TopicConsumer')
+def test_topic_details(mock_consumer, mock_response, mock_app, mock_req, prepareResponse, data_set, topic):
+ mock_app.return_value = prepareResponse
+ res = dmaap_adapter.topic_details(topic)
+ mock_consumer.getTopicDetails(mock_response, topic)
+ mock_consumer.getTopicDetails.assert_called_with(mock_response, topic)
+ assert res.responseCode == prepareResponse.getResponseCode()
+
+@pytest.mark.skip
+@mock.patch('flask.Flask.response_class')
+@mock.patch('dmaap_adapter.PrepareResponse')
+@mock.patch('dmaap_adapter.EventConsumer')
+def test_get_events(mock_consumer, mock_response, mock_app, prepareResponse, data_set, topic):
+ mock_app.return_value = prepareResponse
+ res = dmaap_adapter.get_events(topic, "consumergroup", "consumerid")
+ mock_consumer.consumeEvents(mock_response, topic, "consumergroup", "consumerid", 10, 15)
+ mock_consumer.consumeEvents.assert_called_with(mock_response, topic, "consumergroup", "consumerid", 10, 15)
+ assert res.responseCode == prepareResponse.getResponseCode()
+
+@pytest.fixture
+def topic():
+ topic_name = "measurement"
+ return topic_name
+
+@pytest.fixture
+def data_set():
+ data_set = {
+ "event": {
+ "commonEventHeader": {
+ "domain": "measurement",
+ "eventId": "5",
+ "eventName": "",
+ "eventType": "platform",
+ "lastEpochMicrosec": 0,
+ "priority": "Normal",
+ "reportingEntityId": "localhost",
+ "reportingEntityName": "localhost",
+ "sequence": 0,
+ "sourceName": "bf9006ed-a735-064a-871c-4b4debe57935",
+ "startEpochMicrosec": 1643798824.813,
+ "version": "4.0",
+ "vesEventListenerVersion": "7.2.1",
+ "sourceId": "bf9006ed-a735-064a-871c-4b4debe57935"
+ },
+ "measurementFields": {
+ "measurementFieldsVersion": "4.0",
+ "measurementInterval": 10,
+ "loadArray": [
+ {
+ "midTerm": 4.12,
+ "shortTerm": 5.14,
+ "longTerm": 2.22
+ }
+ ],
+ "memoryUsageArray": [
+ {
+ "vmIdentifier": "bf9006ed-a735-064a-871c-4b4debe57935",
+ "memoryFree": 489902080,
+ "memoryUsed": 5010788,
+ "memoryBuffered": 249216,
+ "memoryCached": 2080804,
+ "memorySlabRecl": 175884,
+ "memorySlabUnrecl": 153208
+ }
+ ],
+ "cpuUsageArray": [
+ {
+ "cpuIdentifier": "3",
+ "cpuIdle": 90.8722109533469,
+ "percentUsage": 0,
+ "cpuUsageUser": 6.08519269776876,
+ "cpuWait": 2.23123732251521,
+ "cpuUsageInterrupt": 0,
+ "cpuUsageNice": 0,
+ "cpuUsageSoftIrq": 0.202839756592292,
+ "cpuUsageSteal": 0,
+ "cpuUsageSystem": 0.608519269776876
+ }
+ ],
+ "nicPerformanceArray": [
+ {
+ "valuesAreSuspect": "true",
+ "nicIdentifier": "vethad656aa",
+ "receivedTotalPacketsAccumulated": 6.60006078986562,
+ "transmittedTotalPacketsAccumulated": 15.1001390798441,
+ "receivedOctetsAccumulated": 2453.82247547105,
+ "transmittedOctetsAccumulated": 7411.46788438591,
+ "receivedErrorPacketsAccumulated": 0,
+ "transmittedErrorPacketsAccumulated": 0,
+ "receivedDiscardedPacketsAccumulated": 0,
+ "transmittedDiscardedPacketsAccumulated": 0
+ }
+ ],
+ "diskUsageArray": [
+ {
+ "diskIdentifier": "loop12",
+ "diskOctetsReadLast": 0,
+ "diskOctetsWriteLast": 0,
+ "diskOpsReadLast": 0,
+ "diskOpsWriteLast": 0,
+ "diskIoTimeLast": 0,
+ "diskMergedReadLast": 0,
+ "diskMergedWriteLast": 0,
+ "diskTimeReadLast": 0,
+ "diskTimeWriteLast": 0
+ },
+ {
+ "diskIdentifier": "loop13",
+ "diskOctetsReadLast": 0,
+ "diskOctetsWriteLast": 0,
+ "diskOpsReadLast": 0,
+ "diskOpsWriteLast": 0,
+ "diskIoTimeLast": 0,
+ "diskMergedReadLast": 0,
+ "diskMergedWriteLast": 0,
+ "diskTimeReadLast": 0,
+ "diskTimeWriteLast": 0
+ }
+ ]
+ }
+ }
+ }
+ return data_set
\ No newline at end of file
--- /dev/null
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import pytest
+from prepare_response import PrepareResponse
+from pytest_mock import MockerFixture
+
+
+@pytest.fixture
+def response_object():
+ return PrepareResponse()
+
+
+def test_setResponseCode(response_object):
+ response_object.setResponseCode("200")
+ assert response_object.getResponseCode() == '200'
+
+def test_setResponseMsg(response_object, data_set):
+ response_object.setResponseMsg(data_set)
+ assert json.loads(response_object.getResponseMsg()) == data_set
+
+
+def test_getResponseCode(response_object):
+ response_object.setResponseCode("200")
+ assert response_object. getResponseCode() == '200'
+
+
+def test_getResponseMsg(response_object, data_set):
+ response_object.setResponseMsg(data_set)
+ assert json.loads(response_object.getResponseMsg()) == data_set
+
+
+@pytest.fixture
+def data_set():
+ data_set = {
+ "event": {
+ "commonEventHeader": {
+ "domain": "measurement",
+ "eventId": "5",
+ "eventName": "",
+ "eventType": "platform",
+ "lastEpochMicrosec": 0,
+ "priority": "Normal",
+ "reportingEntityId": "localhost",
+ "reportingEntityName": "localhost",
+ "sequence": 0,
+ "sourceName": "bf9006ed-a735-064a-871c-4b4debe57935",
+ "startEpochMicrosec": 1643798824.813,
+ "version": "4.0",
+ "vesEventListenerVersion": "7.2.1",
+ "sourceId": "bf9006ed-a735-064a-871c-4b4debe57935"
+ },
+ "measurementFields": {
+ "measurementFieldsVersion": "4.0",
+ "measurementInterval": 10,
+ "loadArray": [
+ {
+ "midTerm": 4.12,
+ "shortTerm": 5.14,
+ "longTerm": 2.22
+ }
+ ],
+ "memoryUsageArray": [
+ {
+ "vmIdentifier": "bf9006ed-a735-064a-871c-4b4debe57935",
+ "memoryFree": 489902080,
+ "memoryUsed": 5010788,
+ "memoryBuffered": 249216,
+ "memoryCached": 2080804,
+ "memorySlabRecl": 175884,
+ "memorySlabUnrecl": 153208
+ }
+ ],
+ "cpuUsageArray": [
+ {
+ "cpuIdentifier": "0",
+ "cpuIdle": 92.6977687626775,
+ "percentUsage": 0,
+ "cpuUsageUser": 6.08519269776876,
+ "cpuWait": 0.304259634888438,
+ "cpuUsageInterrupt": 0,
+ "cpuUsageNice": 0,
+ "cpuUsageSoftIrq": 0.202839756592292,
+ "cpuUsageSteal": 0,
+ "cpuUsageSystem": 0.709939148073022
+ },
+ {
+ "cpuIdentifier": "3",
+ "cpuIdle": 90.8722109533469,
+ "percentUsage": 0,
+ "cpuUsageUser": 6.08519269776876,
+ "cpuWait": 2.23123732251521,
+ "cpuUsageInterrupt": 0,
+ "cpuUsageNice": 0,
+ "cpuUsageSoftIrq": 0.202839756592292,
+ "cpuUsageSteal": 0,
+ "cpuUsageSystem": 0.608519269776876
+ }
+ ]
+ }
+ }
+ }
+ return json.dumps(data_set)
+
config_file="adapter/config/adapter.conf"
-sed -i -- "s/kafka_broker =/kafka_broker = $kafka_host:$kafka_port/g" \
+sed -i -- "s/kafka_broker = smo-kafka:29092/kafka_broker = $kafka_host:$kafka_port/g" \
$config_file
-sed -i -- "s/log_level =/log_level = $log_level/g" \
+sed -i -- "s/log_level = error/log_level = $log_level/g" \
$config_file
sed -i -- "s/enable_assert =/enable_assert = $enable_assert/g" \
$config_file