RUN apt-get update && apt-get -y upgrade
RUN apt-get install -y git curl python3 python3-pip
-RUN pip3 install requests jsonschema kafka-python gevent
+COPY pip.conf /etc/pip.conf
+
+RUN pip3 install requests jsonschema kafka-python gevent PyYAML
RUN mkdir -p /opt/smo/certs
import datetime
import time
from gevent import pywsgi
+import yaml
+import requests
monitor_mode = "f"
vdu_id = ['', '', '', '', '', '']
# ------------------------------------------------------------------------------
vel_schema = None
+#------------------------------------------------------------------------------
+# The yaml schema which we will use to validate events.
+# ------------------------------------------------------------------------------
+vel_yaml_schema = None
+
# ------------------------------------------------------------------------------
# The JSON schema which we will use to validate client throttle state.
# ------------------------------------------------------------------------------
producer = None
-def listener(environ, start_response, schema):
+def listener(environ, start_response, schema, yaml_schema=None):
'''
Handler for the Vendor Event Listener REST API.
decoded_body = json.loads(body)
validate = jsonschema.validate(decoded_body, schema)
assert (validate is None), "Invalid event!"
+ if 'stndDefinedFields' in decoded_body['event'].keys():
+ logger.debug('in yaml validation')
+ schema_ref = decoded_body['event']['stndDefinedFields']["schemaReference"]
+ if "https://forge.3gpp.org" in schema_ref or "https://gerrit.o-ran-sc.org" in schema_ref:
+ stnd_define_event_validation(yaml_schema, decoded_body)
+ else :
+ logger.error("schema reference {0} not supported.".format(schema_ref))
+ raise Exception("schema reference {0} not supported.".format(schema_ref))
logger.info('Event is valid!')
logger.info('Valid body decoded & checked against schema OK:\n'
except Exception as e:
logger.error('Event invalid for unexpected reason! {0}'.format(e))
+# --------------------------------------------------------------------------
+# check yaml schema file exists or not
+# --------------------------------------------------------------------------
+def check_schema_file_exist(vel_schema_path, schema_ref):
+ logger.debug('in check yaml file')
+ assert (vel_schema_path != ""), "Value of property 'schema_file' is missing in config file"
+ # Fetching file and folder name from url
+ schema_ref = schema_ref.split('#')[0]
+ name_list = schema_ref.split('/')
+ folder_name = '_'.join(name_list[2:-1])
+ file_name = name_list[-1]
+ updated_vel_schema_path = vel_schema_path +'/{0}/{1}'.format(folder_name,file_name)
+ if "https://forge.3gpp.org" in schema_ref:
+ schema_ref = schema_ref.replace("blob","raw")
+ schema_ref = schema_ref + '?inline=false'
+ if not os.path.exists(updated_vel_schema_path):
+ logger.warning('Event Listener Schema File ({0}) not found. ''No validation will be undertaken.'.format(vel_schema_path))
+ logger.info('Start downloading yaml file :{}'.format(schema_ref))
+ result = os.system('curl -JOL "{0}"'.format(schema_ref))
+ logger.debug("result {0}".format(result))
+ assert(result == 0), "Invalid URL {0}".format(schema_ref)
+ logger.info("Download Completed")
+ with open(file_name, "r") as file:
+ first_line = file.readline()
+ # checking downloaded file content is yaml or html
+ if first_line.strip() == "<!DOCTYPE html>":
+ logger.info("Downloaded file is not valid yaml")
+ os.system("del {0} ".format(file_name))
+ logger.info("Downloaded file deleted")
+ assert(first_line.strip() != "<!DOCTYPE html>"), "Invalid Schema File"
+ else:
+ # Create a folder from source url
+ os.system('mkdir {0}/{1}'.format(vel_schema_path, folder_name))
+ # move downloaded file in above created folder
+ os.system("mv {0} {1}/{2}".format(file_name, vel_schema_path, folder_name))
+ return updated_vel_schema_path
# --------------------------------------------------------------------------
+# Second level of validation for stnd define message
+# --------------------------------------------------------------------------
+def stnd_define_event_validation(schema_path , body):
+ logger.debug('in second level validation ')
+ schema_ref = body['event']['stndDefinedFields']["schemaReference"]
+ schema_path = check_schema_file_exist(schema_path , schema_ref)
+ logger.debug('end check yaml path ')
+ schema = yaml.full_load(open(schema_path, 'r'))
+ schema_name= schema_ref.split('/')[-1]
+ updated_schema = dict(schema["components"]["schemas"][schema_name], **schema)
+ decoded_body = body['event']['stndDefinedFields']['data']
+ validate_yaml = jsonschema.validate(decoded_body,updated_schema)
+ assert(validate_yaml is None), "Invalid event!"
+ logger.info('standard defined event validated sucessfully ')
+ return validate_yaml
+# --------------------------------------------------------------------------
# Save event data in Kafka
# --------------------------------------------------------------------------
def save_event_in_kafka(body):
test_control_schema_file = config.get(config_section,
'test_control_schema_file',
vars=overrides)
+ vel_yaml_schema = config.get(config_section,
+ 'yaml_schema_path',
+ vars=overrides)
# ----------------------------------------------------------------------
# Finally we have enough info to start a proper flow trace.
global get_info
get_info = root_url
dispatcher = PathDispatcher()
- vendor_event_listener = partial(listener, schema=vel_schema)
+ vendor_event_listener = partial(listener, schema=vel_schema, yaml_schema=vel_yaml_schema)
dispatcher.register('GET', root_url, vendor_event_listener)
dispatcher.register('POST', root_url, vendor_event_listener)
vendor_throttle_listener = partial(listener, schema=throttle_schema)
base_schema_file = evel-test-collector/docs/att_interface_definition/base_schema.json
throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json
test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = evel-test-collector/docs/schema
#------------------------------------------------------------------------------
# Details of the Vendor Event Listener REST service.
base_schema_file =
throttle_schema_file = ../../docs/att_interface_definition/throttle_schema.json
test_control_schema_file = ../../docs/att_interface_definition/test_control_schema.json
+yaml_schema_path = ../../docs/schema
#------------------------------------------------------------------------------
# Details of the Vendor Event Listener REST service.
--- /dev/null
+NOTE: This folder contains yaml schema folder
\ No newline at end of file
--- /dev/null
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
FROM ubuntu:focal
+
RUN apt-get update && apt-get -y upgrade
-RUN apt-get install -y git curl python3 python3-pip
-RUN pip3 install requests jsonschema kafka-python flask confluent-kafka
+RUN apt-get install -y git curl python3 python3-pip
+COPY pip.conf /etc/pip.conf
+
+RUN pip3 install requests jsonschema kafka-python flask confluent-kafka
RUN mkdir /opt/smo
@app.route(api_base_url + '/topics/<topic>', methods=['GET'])
def topic_details(topic):
- topic == request.view_args['topic']
+ assert topic == request.view_args['topic']
prepareResponse = PrepareResponse()
topicConsumer = TopicConsumer()
topicConsumer.getTopicDetails(prepareResponse, topic)
@app.route(api_base_url + '/events/<topic>/<consumergroup>/<consumerid>', methods=['GET'])
def get_events(topic, consumergroup, consumerid):
- topic == request.view_args['topic']
- consumergroup == request.view_args['consumergroup']
- consumerid == request.view_args['consumerid']
+ assert topic == request.view_args['topic']
+ assert consumergroup == request.view_args['consumergroup']
+ assert consumerid == request.view_args['consumerid']
limit = ""
timeout = ""
limit = int(limit)
except Exception:
limit = -1
- finally:
- return limit
+ return limit
def getTimeout(timeout):
timeout = 15
except Exception:
timeout = 15
- finally:
- return timeout
+ return timeout
if __name__ == '__main__':
--- /dev/null
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
"keepaliveDelay": "120",
"maxConnectionAttempts": "100",
"oamPort": "830",
- "password": "netconf",
"protocol": "SSH",
"reconnectOnChangedSchema": "false",
"sleep-factor": "1.5",
response = requests.post(collector_url + "/eventListener/v7/events",
json=payload,
auth=('user', 'password'),
- verify=False,
headers={"Content-Type": "application/json"})
assert "400" in str(response)
print("Success")
"keepaliveDelay": "120",
"maxConnectionAttempts": "100",
"oamPort": "830",
- "password": "netconf",
"protocol": "SSH",
"reconnectOnChangedSchema": "false",
"sleep-factor": "1.5",
response = requests.post(collector_url + "/eventListener/v7/events",
json=payload,
auth=('user', 'password'),
- verify=False,
headers={"Content-Type": "application/json"})
assert "202" in str(response)
print("Success")
response = requests.post(collector_url + "/eventListener/v7/events",
json=payload,
auth=('user', 'password'),
- verify=False,
headers={"Content-Type": "application/json"})
assert "202" in str(response)
print("Success")
response = requests.post(collector_url + "/eventListener/v7/events",
json=payload,
auth=('user', 'password'),
- verify=False,
headers={"Content-Type": "application/json"})
assert "202" in str(response)
print("Success")
response = requests.post(collector_url + "/eventListener/v7/events",
json=payload,
auth=('user', 'password'),
- verify=False,
headers={"Content-Type": "application/json"})
assert "202" in str(response)
print("Success")
response = requests.post(collector_url + "/eventListener/v7/events",
json=payload,
auth=('user', 'password'),
- verify=False,
headers={"Content-Type": "application/json"})
assert "202" in str(response)
print("Success")
RUN apt-get update && apt-get -y upgrade
RUN apt-get install -y git curl python3 python3-pip
+
+COPY pip.conf /etc/pip.conf
+
RUN pip3 install requests confluent-kafka
# Clone influxdb-connector
except Exception as e:
logger.error('Exception occured while saving data : '.format(e))
+def process_event(jobj, pdata, nonstringpdata):
+ for key, val in jobj.items():
+ if key != 'additionalFields' and val != "":
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+ elif key == 'additionalFields':
+ for key2, val2 in val.items():
+ if val2 != "" and isinstance(val2, str):
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ elif val2 != "":
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+ return pdata, nonstringpdata
def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
for additionalMeasurements in val:
def process_pnfRegistration_event(domain, jobj, pdata, nonstringpdata):
pdata = pdata + ",system={}".format(source)
- for key, val in jobj.items():
- if key != 'additionalFields' and val != "":
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
- elif key == 'additionalFields':
- for key2, val2 in val.items():
- if val2 != "" and isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- elif val2 != "":
- nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
+ pdata, nonstringpdata = process_event(jobj, pdata, nonstringpdata)
send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
def process_heartbeat_events(domain, jobj, pdata, nonstringpdata):
pdata = pdata + ",system={}".format(source)
- for key, val in jobj.items():
- if key != 'additionalFields' and val != "":
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
- elif key == 'additionalFields':
- for key2, val2 in val.items():
- if val2 != "" and isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- elif val2 != "":
- nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
+ pdata, nonstringpdata = process_event(jobj, pdata, nonstringpdata)
send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+def process_stndDefinedFields_events(values, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+ """
+ Format stndDefined event to store in influx db
+ Values(dict) :- data to store in influxdb,
+ domain(str) :- name of topic ,
+ eventId (str) :- event id,
+ startEpochMicrosec :- Timestamp ,
+ lastEpochMicrosec:- Timestamp
+ """
+ pdata = domain + ",eventId={},system={}".format(eventId, source)
+ nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+ for key, val in values.items():
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ elif isinstance(val, dict):
+ for key2, val2 in val.items():
+ if isinstance(val2, str) and val2 != '':
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ elif isinstance(val2, dict) and key2 != 'additionalInformation' :
+ for key3, val3 in val2.items():
+ if isinstance(val3, str) and val3 != '':
+ pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
+ elif val3 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+ elif key2 == 'additionalInformation':
+ for key3, val3 in val2.items():
+ if isinstance(val3, str) and val3 != '' :
+ pdata = pdata + ',{}={}'.format('additionalInformation_'+key3, process_special_char(val3))
+ elif val3 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+ elif key2 == 'correlatedNotifications':
+ for item in val2:
+ for key4, val4 in item.items():
+ if isinstance(val4, str) and val4 !='':
+ pdata = pdata + ',{}={}'.format(key4, process_special_char(val4))
+ elif val4 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key4, val4)
+
+ elif val2 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+ elif val !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
def process_special_char(str):
for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
jobj['event']['commonEventHeader']['startEpochMicrosec'],
jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+ if "stndDefinedFields" in jobj['event']:
+ logger.debug('Found stndDefinedFields')
+ process_stndDefinedFields_events(jobj['event']['stndDefinedFields'],
+ domain,
+ jobj['event']['commonEventHeader']['eventId'],
+ jobj['event']['commonEventHeader']['startEpochMicrosec'],
+ jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+
def main():
c = Consumer(settings)
c.subscribe(['measurement', 'pnfregistration',
- 'fault', 'thresholdcrossingalert', 'heartbeat'])
+ 'fault', 'thresholdcrossingalert', 'heartbeat', 'stnddefined'])
try:
while True:
try:
save_event_in_db(msg.value())
except Exception as e:
- logger.error('Exception occured while saving data : '.format(e))
+ logger.error('Exception occured while saving data : {} '.format(e))
elif msg.error().code() == KafkaError._PARTITION_EOF:
logger.error('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
--- /dev/null
+[global]
+timeout = 60
+index-url = https://nexus3.o-ran-sc.org/repository/PyPi/simple
--- /dev/null
+attrs==22.1.0
+certifi==2022.6.15
+charset-normalizer==2.1.1
+click==8.1.3
+confluent-kafka==1.9.2
+Flask==2.2.2
+gevent==21.12.0
+greenlet==1.1.3
+idna==3.3
+importlib-metadata==4.12.0
+importlib-resources==5.9.0
+itsdangerous==2.1.2
+Jinja2==3.1.2
+jsonschema==4.15.0
+kafka-python==2.0.2
+MarkupSafe==2.1.1
+pkgutil-resolve-name==1.3.10
+pyrsistent==0.18.1
+requests==2.28.1
+urllib3==1.26.12
+Werkzeug==2.2.2
+zipp==3.8.1
+zope.event==4.5.0
+zope.interface==5.4.0
@mock.patch('argparse.ArgumentParser.parse_args',
return_value=argparse.Namespace(verbose=None, api_version='7',config=get_config_path(),section='default'))
@mock.patch('gevent.pywsgi.WSGIServer.serve_forever')
+@mock.patch('monitor.logger', logging.getLogger('monitor'))
def test_main(server,parser,body):
argv=None
logger = logging.getLogger('monitor')
#@pytest.mark.skip
@mock.patch('monitor.kafka_server')
+@mock.patch('monitor.logger', logging.getLogger('monitor'))
def test_save_event_in_kafka(mocker,data_set,topic_name):
data_set_string=json.dumps(data_set)
logger = logging.getLogger('monitor')
@mock.patch('monitor.KafkaProducer')
@mock.patch('monitor.producer')
+@mock.patch('monitor.logger', logging.getLogger('monitor'))
def test_produce_events_in_kafka(mock_pro,mock_producer,data_set,topic_name):
logger = logging.getLogger('monitor')
logger.setLevel(logging.DEBUG)
heartbeat={"event": {"commonEventHeader": {"domain": "heartbeat","eventId": "ORAN-DEV_2021-12-20T07:29:34.292938Z", "eventName": "heartbeat_O_RAN_COMPONENT","eventType": "O_RAN_COMPONENT","lastEpochMicrosec": 1639965574292938, "nfNamingCode": "SDN-Controller","nfVendorName": "O-RAN-SC-OAM","priority": "Low", "reportingEntityId": "","reportingEntityName": "ORAN-DEV","sequence": 357,"sourceId": "", "sourceName": "ORAN-DEV","startEpochMicrosec": 1639965574292938, "timeZoneOffset": "+00:00", "version": "4.1", "vesEventListenerVersion": "7.2.1"},"heartbeatFields": {"additionalFields": {"eventTime": "2021-12-20T07:29:34.292938Z" },"heartbeatFieldsVersion": "3.0", "heartbeatInterval": 20} } }
-pnfRegistration={"event": {"commonEventHeader": {"domain": "pnfRegistration","eventId": "ORAN-DEV_ONAP Controller for Radio","eventName": "pnfRegistration_EventType5G", "eventType": "EventType5G", "sequence": 0,"priority": "Low","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "", "sourceName": "ORAN-DEV","startEpochMicrosec": 1639985329569087,"lastEpochMicrosec": 1639985329569087,"nfNamingCode": "SDNR", "nfVendorName": "ONAP", "timeZoneOffset": "+00:00","version": "4.1","vesEventListenerVersion": "7.2.1"},"pnfRegistrationFields": {"pnfRegistrationFieldsVersion": "2.1","lastServiceDate": "2021-03-26","macAddress": "02:42:f7:d4:62:ce","manufactureDate": "2021-01-16","modelNumber": "ONAP Controller for Radio", "oamV4IpAddress": "127.0.0.1","oamV6IpAddress": "0:0:0:0:0:ffff:a0a:0.1", "serialNumber": "ONAP-SDNR-127.0.0.1-ONAP Controller for Radio","softwareVersion": "2.3.5", "unitFamily": "ONAP-SDNR","unitType": "SDNR", "vendorName": "ONAP","additionalFields": {"oamPort": "830","protocol": "SSH","username": "netconf","password": "netconf","reconnectOnChangedSchema": "false","sleep-factor": "1.5","tcpOnly": "false","connectionTimeout": "20000","maxConnectionAttempts": "100","betweenAttemptsTimeout": "2000","keepaliveDelay": "120"}}}}
+pnfRegistration={"event": {"commonEventHeader": {"domain": "pnfRegistration","eventId": "ORAN-DEV_ONAP Controller for Radio","eventName": "pnfRegistration_EventType5G", "eventType": "EventType5G", "sequence": 0,"priority": "Low","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "", "sourceName": "ORAN-DEV","startEpochMicrosec": 1639985329569087,"lastEpochMicrosec": 1639985329569087,"nfNamingCode": "SDNR", "nfVendorName": "ONAP", "timeZoneOffset": "+00:00","version": "4.1","vesEventListenerVersion": "7.2.1"},"pnfRegistrationFields": {"pnfRegistrationFieldsVersion": "2.1","lastServiceDate": "2021-03-26","macAddress": "02:42:f7:d4:62:ce","manufactureDate": "2021-01-16","modelNumber": "ONAP Controller for Radio","oamV4IpAddress": "127.0.0.1","oamV6IpAddress": "0:0:0:0:0:ffff:a0a:0.1", "serialNumber": "ONAP-SDNR-127.0.0.1-ONAP Controller for Radio","softwareVersion": "2.3.5", "unitFamily": "ONAP-SDNR","unitType": "SDNR", "vendorName": "ONAP","additionalFields": {"oamPort": "830","protocol": "SSH","username": "netconf","reconnectOnChangedSchema": "false","sleep-factor": "1.5","tcpOnly": "false","connectionTimeout": "20000","maxConnectionAttempts": "100","betweenAttemptsTimeout": "2000","keepaliveDelay": "120"}}}}
measurement={"event": {"commonEventHeader": {"domain": "measurement", "eventId": "O-RAN-FH-IPv6-01_1639984500_PM15min", "eventName": "measurement_O_RAN_COMPONENT_PM15min","eventType": "O_RAN_COMPONENT_PM15min","sequence": 0, "priority": "Low","reportingEntityId": "", "reportingEntityName": "ORAN-DEV", "sourceId": "", "sourceName": "O-RAN-FH-IPv6-01", "startEpochMicrosec": 1639983600000, "lastEpochMicrosec": 1639984500000, "internalHeaderFields": {"intervalStartTime": "Mon, 20 Dec 2021 07:00:00 +0000","intervalEndTime": "Mon, 20 Dec 2021 07:15:00 +0000"}, "version": "4.1", "vesEventListenerVersion": "7.2.1" }, "measurementFields": {"additionalFields": {}, "additionalMeasurements": [{ "name": "LP-MWPS-RADIO-1","hashMap": {"es": "0","ses": "1", "cses": "0", "unavailability": "0" }},{"name": "LP-MWPS-RADIO-2","hashMap": {"es": "0","ses": "1","cses": "0","unavailability": "0"} }],"additionalObjects": [],"codecUsageArray": [],"concurrentSessions": 2,"configuredEntities": 2, "cpuUsageArray": [], "diskUsageArray": [], "featureUsageArray": { "https://www.itu.int/rec/T-REC-G.841": "true" }, "filesystemUsageArray": [], "hugePagesArray": [], "ipmi": {}, "latencyDistribution": [], "loadArray": [], "machineCheckExceptionArray": [], "meanRequestLatency": 1000, "measurementInterval": 234, "measurementFieldsVersion": "4.0", "memoryUsageArray": [], "numberOfMediaPortsInUse": 234, "requestRate": 23,"nfcScalingMetric": 3,"nicPerformanceArray": [],"processStatsArray": []}}}
fault={ "event": { "commonEventHeader": {"domain": "fault","eventId": "LKCYFL79Q01M01FYNG01_LP-MWPS-RADIO_TCA","eventName": "fault_O_RAN_COMPONENT_Alarms_TCA","eventType": "O_RAN_COMPONENT_Alarms", "sequence": 0,"priority": "High","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "","sourceName": "LKCYFL79Q01M01FYNG01","startEpochMicrosec": 1639985333218840,"lastEpochMicrosec": 1639985333218840,"nfNamingCode": "FYNG","nfVendorName": "VENDORA","timeZoneOffset": "+00:00", "version": "4.1","vesEventListenerVersion": "7.2.1"},"faultFields": {"faultFieldsVersion": "1.0","alarmCondition": "TCA", "alarmInterfaceA": "LP-MWPS-RADIO","eventSourceType": "O_RAN_COMPONENT","specificProblem": "TCA","eventSeverity": "NORMAL","vfStatus": "Active","alarmAdditionalInformation": {"eventTime": "2021-12-20T07:28:53.218840Z","equipType": "FYNG","vendor": "VENDORA","model": "FancyNextGeneration"}}}}
thresholdCrossingAlert={"event": {"commonEventHeader": {"domain": "thresholdCrossingAlert","eventId": "__TCA","eventName": "thresholdCrossingAlert_O_RAN_COMPONENT_TCA_TCA","eventType": "O_RAN_COMPONENT_TCA","sequence": 0,"priority": "High","reportingEntityId": "","reportingEntityName": "ORAN-DEV","sourceId": "","sourceName": "","startEpochMicrosec": 1639985336443218,"lastEpochMicrosec": 1639985336443218,"nfNamingCode": "1OSF","nfVendorName": "","timeZoneOffset": "+00:00","version": "4.1","vesEventListenerVersion": "7.2.1"}, "thresholdCrossingAlertFields": {"thresholdCrossingFieldsVersion": "4.0","additionalParameters": [{"criticality": "MAJ","hashMap": { "additionalProperties": "up-and-down" },"thresholdCrossed": "packetLoss" }], "alertAction": "SET","alertDescription": "TCA","alertType": "INTERFACE-ANOMALY","alertValue": "1OSF","associatedAlertIdList": ["loss-of-signal"],"collectionTimestamp": "Mon, 20 Dec 2021 07:28:56 +0000","dataCollector": "data-lake","elementType": "1OSF", "eventSeverity": "WARNING", "eventStartTimestamp": "Mon, 20 Dec 2021 07:15:00 +0000","interfaceName": "", "networkService": "from-a-to-b","possibleRootCause": "always-the-others", "additionalFields": {"eventTime": "2021-12-20T07:28:56.443218Z", "equipType": "1OSF", "vendor": "", "model": ""}}}}
\ No newline at end of file
return data_set
# <Response [204]>
+def test_send_event_to_influxdb_successfully(data_set):
"""
Simply test event should store in influxdb successfully.
"""
-def test_send_event_to_influxdb_successfully(data_set):
with requests_mock.Mocker() as rm:
rm.post('http://localhost:8086/write?db=eventsdb', json=data_set, status_code=204)
response = requests.post(
assert response.status_code == 204
# <Response [400]>
+def test_send_event_to_influxdb_failed(data_set):
"""
Bad Request.
"""
-def test_send_event_to_influxdb_failed(data_set):
with requests_mock.Mocker() as rm:
rm.post('http://localhost:8086/write?db=eventsdb', json=data_set, status_code=400)
response = requests.post(
@pytest.fixture
def pnf_json():
- jobj = {'pnfRegistrationFieldsVersion': '2.1', 'lastServiceDate': '2021-03-26', 'macAddress': '02:42:f7:d4:62:ce', 'manufactureDate': '2021-01-16', 'modelNumber': 'ONAP Controller for Radio', 'oamV4IpAddress': '127.0.0.1', 'oamV6IpAddress': '0:0:0:0:0:ffff:a0a:0.1', 'serialNumber': 'ONAP-SDNR-127.0.0.1-ONAP Controller for Radio', 'softwareVersion': '2.3.5', 'unitFamily': 'ONAP-SDNR', 'unitType': 'SDNR', 'vendorName': 'ONAP', 'additionalFields': {'oamPort': '830', 'protocol': 'SSH', 'username': 'netconf', 'password': 'netconf', 'reconnectOnChangedSchema': 'false', 'sleep-factor': '1.5', 'tcpOnly': 'false', 'connectionTimeout': '20000', 'maxConnectionAttempts': '100', 'betweenAttemptsTimeout': '2000', 'keepaliveDelay': '120'}}
+ jobj = {'pnfRegistrationFieldsVersion': '2.1', 'lastServiceDate': '2021-03-26', 'macAddress': '02:42:f7:d4:62:ce', 'manufactureDate': '2021-01-16', 'modelNumber': 'ONAP Controller for Radio', 'oamV4IpAddress': '127.0.0.1', 'oamV6IpAddress': '0:0:0:0:0:ffff:a0a:0.1', 'serialNumber': 'ONAP-SDNR-127.0.0.1-ONAP Controller for Radio', 'softwareVersion': '2.3.5', 'unitFamily': 'ONAP-SDNR', 'unitType': 'SDNR', 'vendorName': 'ONAP', 'additionalFields': {'oamPort': '830', 'protocol': 'SSH', 'username': 'netconf', 'reconnectOnChangedSchema': 'false', 'sleep-factor': '1.5', 'tcpOnly': 'false', 'connectionTimeout': '20000', 'maxConnectionAttempts': '100', 'betweenAttemptsTimeout': '2000', 'keepaliveDelay': '120'}}
return jobj
@pytest.fixture
def pnf_expected_pdata():
- pnf_expected_pdata = 'pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,pnfRegistrationFieldsVersion=2.1,lastServiceDate=2021-03-26,macAddress=02:42:f7:d4:62:ce,manufactureDate=2021-01-16,modelNumber=ONAP\\ Controller\\ for\\ Radio,oamV4IpAddress=127.0.0.1,oamV6IpAddress=0:0:0:0:0:ffff:a0a:0.1,serialNumber=ONAP-SDNR-127.0.0.1-ONAP\\ Controller\\ for\\ Radio,softwareVersion=2.3.5,unitFamily=ONAP-SDNR,unitType=SDNR,vendorName=ONAP,oamPort=830,protocol=SSH,username=netconf,password=netconf,reconnectOnChangedSchema=false,sleep-factor=1.5,tcpOnly=false,connectionTimeout=20000,maxConnectionAttempts=100,betweenAttemptsTimeout=2000,keepaliveDelay=120 sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087 1639985333218840000'
+ pnf_expected_pdata = 'pnfRegistration,domain=pnfRegistration,eventId=ORAN-DEV_ONAP\\ Controller\\ for\\ Radio,eventName=pnfRegistration_EventType5G,eventType=EventType5G,priority=Low,reportingEntityName=ORAN-DEV,sourceName=ORAN-DEV,nfNamingCode=SDNR,nfVendorName=ONAP,timeZoneOffset=+00:00,version=4.1,vesEventListenerVersion=7.2.1,system=None,pnfRegistrationFieldsVersion=2.1,lastServiceDate=2021-03-26,macAddress=02:42:f7:d4:62:ce,manufactureDate=2021-01-16,modelNumber=ONAP\\ Controller\\ for\\ Radio,oamV4IpAddress=127.0.0.1,oamV6IpAddress=0:0:0:0:0:ffff:a0a:0.1,serialNumber=ONAP-SDNR-127.0.0.1-ONAP\\ Controller\\ for\\ Radio,softwareVersion=2.3.5,unitFamily=ONAP-SDNR,unitType=SDNR,vendorName=ONAP,oamPort=830,protocol=SSH,username=netconf,reconnectOnChangedSchema=false,sleep-factor=1.5,tcpOnly=false,connectionTimeout=20000,maxConnectionAttempts=100,betweenAttemptsTimeout=2000,keepaliveDelay=120 sequence=0,startEpochMicrosec=1639985329569087,lastEpochMicrosec=1639985329569087 1639985333218840000'
return pnf_expected_pdata