import datetime
import time
from gevent import pywsgi
+import yaml
+import requests
monitor_mode = "f"
vdu_id = ['', '', '', '', '', '']
# ------------------------------------------------------------------------------
vel_schema = None
+#------------------------------------------------------------------------------
+# The yaml schema which we will use to validate events.
+# ------------------------------------------------------------------------------
+vel_yaml_schema = None
+
# ------------------------------------------------------------------------------
# The JSON schema which we will use to validate client throttle state.
# ------------------------------------------------------------------------------
producer = None
-def listener(environ, start_response, schema):
+def listener(environ, start_response, schema, yaml_schema=None):
'''
Handler for the Vendor Event Listener REST API.
decoded_body = json.loads(body)
validate = jsonschema.validate(decoded_body, schema)
assert (validate is None), "Invalid event!"
+ if 'stndDefinedFields' in decoded_body['event'].keys():
+ logger.debug('in yaml validation')
+ schema_ref = decoded_body['event']['stndDefinedFields']["schemaReference"]
+ if "https://forge.3gpp.org" in schema_ref or "https://gerrit.o-ran-sc.org" in schema_ref:
+ stnd_define_event_validation(yaml_schema, decoded_body)
+ else :
+ logger.error("schema reference {0} not supported.".format(schema_ref))
+ raise Exception("schema reference {0} not supported.".format(schema_ref))
logger.info('Event is valid!')
logger.info('Valid body decoded & checked against schema OK:\n'
except Exception as e:
logger.error('Event invalid for unexpected reason! {0}'.format(e))
+# --------------------------------------------------------------------------
+# check yaml schema file exists or not
+# --------------------------------------------------------------------------
+def check_schema_file_exist(vel_schema_path, schema_ref):
+ logger.debug('in check yaml file')
+ assert (vel_schema_path != ""), "Value of property 'schema_file' is missing in config file"
+ # Fetching file and folder name from url
+ schema_ref = schema_ref.split('#')[0]
+ name_list = schema_ref.split('/')
+ folder_name = '_'.join(name_list[2:-1])
+ file_name = name_list[-1]
+ updated_vel_schema_path = vel_schema_path +'/{0}/{1}'.format(folder_name,file_name)
+ if "https://forge.3gpp.org" in schema_ref:
+ schema_ref = schema_ref.replace("blob","raw")
+ schema_ref = schema_ref + '?inline=false'
+ if not os.path.exists(updated_vel_schema_path):
+ logger.warning('Event Listener Schema File ({0}) not found. ''No validation will be undertaken.'.format(vel_schema_path))
+ logger.info('Start downloading yaml file :{}'.format(schema_ref))
+ result = os.system('curl -JOL "{0}"'.format(schema_ref))
+ logger.debug("result {0}".format(result))
+ assert(result == 0), "Invalid URL {0}".format(schema_ref)
+ logger.info("Download Completed")
+ with open(file_name, "r") as file:
+ first_line = file.readline()
+ # checking downloaded file content is yaml or html
+ if first_line.strip() == "<!DOCTYPE html>":
+ logger.info("Downloaded file is not valid yaml")
+ os.system("del {0} ".format(file_name))
+ logger.info("Downloaded file deleted")
+ assert(first_line.strip() != "<!DOCTYPE html>"), "Invalid Schema File"
+ else:
+ # Create a folder from source url
+ os.system('mkdir {0}/{1}'.format(vel_schema_path, folder_name))
+ # move downloaded file in above created folder
+ os.system("mv {0} {1}/{2}".format(file_name, vel_schema_path, folder_name))
+ return updated_vel_schema_path
# --------------------------------------------------------------------------
+# Second level of validation for stnd define message
+# --------------------------------------------------------------------------
+def stnd_define_event_validation(schema_path , body):
+ logger.debug('in second level validation ')
+ schema_ref = body['event']['stndDefinedFields']["schemaReference"]
+ schema_path = check_schema_file_exist(schema_path , schema_ref)
+ logger.debug('end check yaml path ')
+ schema = yaml.full_load(open(schema_path, 'r'))
+ schema_name= schema_ref.split('/')[-1]
+ updated_schema = dict(schema["components"]["schemas"][schema_name], **schema)
+ decoded_body = body['event']['stndDefinedFields']['data']
+ validate_yaml = jsonschema.validate(decoded_body,updated_schema)
+ assert(validate_yaml is None), "Invalid event!"
+ logger.info('standard defined event validated sucessfully ')
+ return validate_yaml
+# --------------------------------------------------------------------------
# Save event data in Kafka
# --------------------------------------------------------------------------
def save_event_in_kafka(body):
test_control_schema_file = config.get(config_section,
'test_control_schema_file',
vars=overrides)
+ vel_yaml_schema = config.get(config_section,
+ 'yaml_schema_path',
+ vars=overrides)
# ----------------------------------------------------------------------
# Finally we have enough info to start a proper flow trace.
global get_info
get_info = root_url
dispatcher = PathDispatcher()
- vendor_event_listener = partial(listener, schema=vel_schema)
+ vendor_event_listener = partial(listener, schema=vel_schema, yaml_schema=vel_yaml_schema)
dispatcher.register('GET', root_url, vendor_event_listener)
dispatcher.register('POST', root_url, vendor_event_listener)
vendor_throttle_listener = partial(listener, schema=throttle_schema)
send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+def process_stndDefinedFields_events(values, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+ """
+ Format stndDefined event to store in influx db
+ Values(dict) :- data to store in influxdb,
+ domain(str) :- name of topic ,
+ eventId (str) :- event id,
+ startEpochMicrosec :- Timestamp ,
+ lastEpochMicrosec:- Timestamp
+ """
+ pdata = domain + ",eventId={},system={}".format(eventId, source)
+ nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+ for key, val in values.items():
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ elif isinstance(val, dict):
+ for key2, val2 in val.items():
+ if isinstance(val2, str) and val2 != '':
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ elif isinstance(val2, dict) and key2 != 'additionalInformation' :
+ for key3, val3 in val2.items():
+ if isinstance(val3, str) and val3 != '':
+ pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
+ elif val3 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+ elif key2 == 'additionalInformation':
+ for key3, val3 in val2.items():
+ if isinstance(val3, str) and val3 != '' :
+ pdata = pdata + ',{}={}'.format('additionalInformation_'+key3, process_special_char(val3))
+ elif val3 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+ elif key2 == 'correlatedNotifications':
+ for item in val2:
+ for key4, val4 in item.items():
+ if isinstance(val4, str) and val4 !='':
+ pdata = pdata + ',{}={}'.format(key4, process_special_char(val4))
+ elif val4 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key4, val4)
+
+ elif val2 !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+ elif val !='':
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
def process_special_char(str):
for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
jobj['event']['commonEventHeader']['startEpochMicrosec'],
jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+ if "stndDefinedFields" in jobj['event']:
+ logger.debug('Found stndDefinedFields')
+ process_stndDefinedFields_events(jobj['event']['stndDefinedFields'],
+ domain,
+ jobj['event']['commonEventHeader']['eventId'],
+ jobj['event']['commonEventHeader']['startEpochMicrosec'],
+ jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+
def main():
c = Consumer(settings)
c.subscribe(['measurement', 'pnfregistration',
- 'fault', 'thresholdcrossingalert', 'heartbeat'])
+ 'fault', 'thresholdcrossingalert', 'heartbeat', 'stnddefined'])
try:
while True:
try:
save_event_in_db(msg.value())
except Exception as e:
- logger.error('Exception occured while saving data : '.format(e))
+ logger.error('Exception occured while saving data : {} '.format(e))
elif msg.error().code() == KafkaError._PARTITION_EOF:
logger.error('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))