From: santanude Date: Thu, 21 Oct 2021 12:12:34 +0000 (+0530) Subject: Add support for measurement event related to network slicing X-Git-Tag: 6.0.2~42 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=c82167dcdcb98260161ecd85510baa568f2f8cdb;p=smo%2Fves.git Add support for measurement event related to network slicing This task will add the JSON schema changes for that measurement event, and the corresponding support for it in the VES collector to validate the event based on the schema. In addition, it will also add support for persisting data into data sinks such as InfluxdB, Kafka, Elasticsearch. Issue-Id: SMO-21 Signed-off-by: santanude Change-Id: I5a866e1b3735c51bb0d0e34c15053b37493d714b Signed-off-by: santanude --- diff --git a/agent/Dockerfile b/agent/Dockerfile index bb2a10e..238d877 100755 --- a/agent/Dockerfile +++ b/agent/Dockerfile @@ -29,7 +29,7 @@ RUN apt-get install -y tzdata # libpthread-stubs0-dev libssl-dev libsasl2-dev liblz4-dev # (or libz-dev?) # Required for building collectd: pkg-config - +RUN apt-get install -y netcat RUN apt-get install -y default-jre zookeeperd \ python3 python3-pip pkg-config git build-essential libpthread-stubs0-dev \ libssl-dev libsasl2-dev liblz4-dev libz-dev diff --git a/agent/start.sh b/agent/start.sh index 3ba501a..522b6a7 100755 --- a/agent/start.sh +++ b/agent/start.sh @@ -16,6 +16,28 @@ # #. What this is: Startup script for the OPNFV VES Agent running under docker. +echo "Ves-agent is trying to connect Kafka Broker.." +timeout 1m bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$ves_kafka_host/$ves_kafka_port; do sleep 2; done' +success=$? +if [ $success -eq 0 ] + then + echo "Kafka is up.." + else + echo "No Kafka found .. exiting container.." + exit; +fi + +echo "Ves-agent is trying to connect ves-collector.." +timeout 1m bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$ves_host/$ves_port; do sleep 2; done' +success=$? +if [ $success -eq 0 ] + then + echo "ves-collector is up.." + else + echo "No ves-collector found .. exiting container.." + exit; +fi + echo "$ves_kafka_host $ves_kafka_hostname" >>/etc/hosts echo "ves_kafka_hostname=$ves_kafka_hostname" echo "*** /etc/hosts ***" diff --git a/collector/Dockerfile b/collector/Dockerfile old mode 100644 new mode 100755 index 5ebd8c3..f69561b --- a/collector/Dockerfile +++ b/collector/Dockerfile @@ -1,11 +1,12 @@ -# Copyright 2017-2018 AT&T Intellectual Property, Inc -# +# Original work Copyright 2017-2018 AT&T Intellectual Property, Inc +# Modified work Copyright 2021 Xoriant Corporation + # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,10 +20,10 @@ FROM ubuntu:focal - RUN apt-get update && apt-get -y upgrade RUN apt-get install -y git curl python3 python3-pip -RUN pip3 install requests jsonschema +RUN pip3 install requests jsonschema elasticsearch kafka-python + RUN mkdir /opt/ves diff --git a/collector/evel-test-collector/code/collector/monitor.py b/collector/evel-test-collector/code/collector/monitor.py index 3d40b63..ae9de29 100755 --- a/collector/evel-test-collector/code/collector/monitor.py +++ b/collector/evel-test-collector/code/collector/monitor.py @@ -34,6 +34,11 @@ import json import jsonschema from functools import partial import requests +from datetime import timezone +from elasticsearch import Elasticsearch +from kafka import KafkaProducer +from json import dumps +import datetime import time monitor_mode = "f" @@ -48,6 +53,7 @@ rows = 0 class JSONObject: + def __init__(self, d): self.__dict__ = d @@ -98,6 +104,7 @@ pending_command_list = None # Logger for this module. # ------------------------------------------------------------------------------ logger = None +producer = None def listener(environ, start_response, schema): @@ -130,7 +137,7 @@ def listener(environ, start_response, schema): logger.debug('Content Body: {0}'.format(body)) mode, b64_credentials = str.split(environ.get('HTTP_AUTHORIZATION', - 'None None')) + 'None None')) logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode)) if (b64_credentials != 'None'): credentials = b64decode(b64_credentials) @@ -157,8 +164,8 @@ def listener(environ, start_response, schema): # -------------------------------------------------------------------------- # See whether the user authenticated themselves correctly. # -------------------------------------------------------------------------- - if (credentials == bytes((vel_username - + ':' + vel_password), 'utf-8')): + if (credentials == bytes((vel_username + ':' + vel_password), + 'utf-8')): logger.info('Authenticated OK') # ---------------------------------------------------------------------- @@ -172,20 +179,19 @@ def listener(environ, start_response, schema): response = pending_command_list pending_command_list = None - logger.debug('\n' + '='*80) + logger.debug('\n' + '=' * 80) logger.debug('Sending pending commandList in the response:\n' '{0}'.format(json.dumps(response, sort_keys=True, indent=4, separators=(',', ': ')))) - logger.debug('='*80 + '\n') + logger.debug('=' * 80 + '\n') yield json.dumps(response).encode() else: start_response('202 Accepted', []) yield ''.encode() else: - logger.warn('Failed to authenticate OK; creds: ' - + credentials) + logger.warn('Failed to authenticate OK; creds: ' + credentials) logger.warn('Failed to authenticate agent credentials: ', credentials, 'against expected ', @@ -197,18 +203,26 @@ def listener(environ, start_response, schema): # Respond to the caller. # ---------------------------------------------------------------------- start_response('401 Unauthorized', [('Content-type', - 'application/json')]) + 'application/json')]) req_error = {'requestError': { 'policyException': { - 'messageId': 'POL0001', - 'text': 'Failed to authenticate' - } - } - } + 'messageId': 'POL0001', + 'text': 'Failed to authenticate' + } + } + } yield json.dumps(req_error) + # saving data in Kafka by deafult + save_event_in_kafka(body) + logger.info("data_storage ={}".format(data_storage)) - if(data_storage == 'influxdb'): + data_storageArr = data_storage.split("|") + # if word 'elasticsearch' exists in config file then save data in elasticsearch + if('elasticsearch' in data_storageArr): + save_event_in_elasticsearch(body) + # if word 'influxdb' exists in config file then save data in influxdb + if('influxdb' in data_storageArr): save_event_in_db(body) except jsonschema.SchemaError as e: @@ -230,9 +244,9 @@ def listener(environ, start_response, schema): decoded_body = json.loads(body) logger.warn('Valid JSON body (no schema checking) decoded:\n' '{0}'.format(json.dumps(decoded_body, - sort_keys=True, - indent=4, - separators=(',', ': ')))) + sort_keys=True, + indent=4, + separators=(',', ': ')))) logger.warn('Event is valid JSON but not checked against schema!') except Exception as e: @@ -242,14 +256,149 @@ def listener(environ, start_response, schema): # -------------------------------------------------------------------------- # Send event to influxdb # -------------------------------------------------------------------------- + def send_to_influxdb(event, pdata): url = 'http://{}/write?db=veseventsdb'.format(influxdb) - logger.info('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata)) + logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata)) r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'}) logger.info('influxdb return code {}'.format(r.status_code)) if r.status_code != 204: logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code)) +# -------------------------------------------------------------------------- +# Save event data in Kafka +# -------------------------------------------------------------------------- +def save_event_in_kafka(body): + jobj = json.loads(body) + if 'commonEventHeader' in jobj['event']: + # store each domain information in individual topic + topic = jobj['event']['commonEventHeader']['domain'].lower() + logger.info('Got an event request for {} domain'.format(topic)) + if (len(topic) == 0): + topic = kafka_topic + + logger.debug('Kafka broker ={} and kafka topic={}'.format(kafka_port, topic)) + produce_events_in_kafka(jobj, topic) + + +def produce_events_in_kafka(jobj, topic): + try: + global producer + if producer is None: + logger.debug('Producer is None') + producer = KafkaProducer(bootstrap_servers=[kafka_port], + value_serializer=lambda x: + dumps(x).encode('utf-8')) + producer.send(topic, value=jobj) + logger.debug('Event has been successfully posted into kafka bus') + except Exception as e: + logger.error('Getting error while posting event into kafka bus {0}'.format(e)) + +# -------------------------------------------------------------------------- +# Save event data in Elasticsearch +# -------------------------------------------------------------------------- + +def format_timestamp(EpochMicrosec): + if isinstance(EpochMicrosec, int): + return datetime.datetime.fromtimestamp(int(str(EpochMicrosec)[:10]), tz=timezone.utc) + else: + return datetime.datetime.fromtimestamp(EpochMicrosec, tz=timezone.utc) + + +def save_event_in_elasticsearch(body): + jobj = json.loads(body) + eventId = jobj['event']['commonEventHeader']['eventId'] + sourceId = jobj['event']['commonEventHeader']['sourceId'] + startEpochMicrosec = jobj['event']['commonEventHeader']['startEpochMicrosec'] + lastEpochMicrosec = jobj['event']['commonEventHeader']['lastEpochMicrosec'] + jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] = format_timestamp(startEpochMicrosec) + jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] = format_timestamp(lastEpochMicrosec) + domain = jobj['event']['commonEventHeader']['domain'].lower() + es = Elasticsearch([{'host': elasticsearch_domain, 'port': elasticsearch_port}]) + + if 'measurementFields' in jobj['event']: + if 'commonEventHeader' in jobj['event']: + es.index(index='measurement', body=jobj['event']['commonEventHeader']) + + if 'additionalMeasurements' in jobj['event']['measurementFields']: + for addmeasure in jobj['event']['measurementFields']['additionalMeasurements']: + addmeasure['eventId'] = eventId + addmeasure['sourceId'] = sourceId + addmeasure['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] + addmeasure['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] + addmeasure['startEpochMicrosec'] = startEpochMicrosec + addmeasure['lastEpochMicrosec'] = lastEpochMicrosec + es.index(index='measurementaddlmeasurements', body=addmeasure) + + if 'cpuUsageArray' in jobj['event']['measurementFields']: + for cpu in jobj['event']['measurementFields']['cpuUsageArray']: + cpu['eventId'] = eventId + cpu['sourceId'] = sourceId + cpu['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] + cpu['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] + cpu['startEpochMicrosec'] = startEpochMicrosec + cpu['lastEpochMicrosec'] = lastEpochMicrosec + es.index(index='measurementcpuusage', body=cpu) + + if 'diskUsageArray' in jobj['event']['measurementFields']: + for disk in jobj['event']['measurementFields']['diskUsageArray']: + disk['eventId'] = eventId + disk['sourceId'] = sourceId + disk['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] + disk['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] + disk['startEpochMicrosec'] = startEpochMicrosec + disk['lastEpochMicrosec'] = lastEpochMicrosec + es.index(index='measurementdiskusage', body=disk) + + if 'nicPerformanceArray' in jobj['event']['measurementFields']: + for vnic in jobj['event']['measurementFields']['nicPerformanceArray']: + vnic['eventId'] = eventId + vnic['sourceId'] = sourceId + vnic['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] + vnic['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] + vnic['startEpochMicrosec'] = startEpochMicrosec + vnic['lastEpochMicrosec'] = lastEpochMicrosec + es.index(index='measurementnicperformance', body=vnic) + + if 'memoryUsageArray' in jobj['event']['measurementFields']: + for memory in jobj['event']['measurementFields']['memoryUsageArray']: + memory['eventId'] = eventId + memory['sourceId'] = sourceId + memory['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] + memory['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] + memory['startEpochMicrosec'] = startEpochMicrosec + memory['lastEpochMicrosec'] = lastEpochMicrosec + es.index(index='measurementmemoryusage', body=memory) + + if 'loadArray' in jobj['event']['measurementFields']: + for load in jobj['event']['measurementFields']['loadArray']: + load['eventId'] = eventId + load['sourceId'] = sourceId + load['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] + load['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] + load['startEpochMicrosec'] = startEpochMicrosec + load['lastEpochMicrosec'] = lastEpochMicrosec + es.index(index='measurementload', body=load) + + if 'networkSliceArray' in jobj['event']['measurementFields']: + for networkslice in jobj['event']['measurementFields']['networkSliceArray']: + networkslice['eventId'] = eventId + networkslice['sourceId'] = sourceId + networkslice['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] + networkslice['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] + networkslice['startEpochMicrosec'] = startEpochMicrosec + networkslice['lastEpochMicrosec'] = lastEpochMicrosec + es.index(index='measurementnetworkslice', body=networkslice) + + if 'pnfRegistrationFields' in jobj['event']: + es.index(index=domain, body=jobj['event']) + if 'thresholdCrossingAlertFields' in jobj['event']: + es.index(index=domain, body=jobj['event']) + if 'faultFields' in jobj['event']: + es.index(index=domain, body=jobj['event']) + if 'heartbeatFields' in jobj['event']: + es.index(index=domain, body=jobj['event']) + def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec): for additionalMeasurements in val: @@ -333,7 +482,8 @@ def process_thresholdCrossingAlert_event(domain, jobj, pdata, nonstringpdata): for associatedAlertId in val: associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|" if(associatedAlertIdList != ""): - pdata = pdata + ',{}={}'.format("associatedAlertIdList", process_special_char(associatedAlertIdList)[:-1]) + pdata = pdata + ',{}={}'.format("associatedAlertIdList", + process_special_char(associatedAlertIdList)[:-1]) send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp)) @@ -382,41 +532,19 @@ def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, sta pdata = pdata + ',{}={}'.format(key, process_special_char(val)) elif isinstance(val, list): if key == 'additionalMeasurements': - process_additional_measurements(val, - domain + "additionalmeasurements", - eventId, - startEpochMicrosec, - lastEpochMicrosec) + process_additional_measurements(val, domain + "additionalmeasurements", eventId, startEpochMicrosec, lastEpochMicrosec) elif key == 'cpuUsageArray': - process_nonadditional_measurements(val, - domain + "cpuusage", - eventId, - startEpochMicrosec, - lastEpochMicrosec) + process_nonadditional_measurements(val, domain + "cpuusage", eventId, startEpochMicrosec, lastEpochMicrosec) elif key == 'diskUsageArray': - process_nonadditional_measurements(val, - domain + "diskusage", - eventId, - startEpochMicrosec, - lastEpochMicrosec) + process_nonadditional_measurements(val, domain + "diskusage", eventId, startEpochMicrosec, lastEpochMicrosec) elif key == 'memoryUsageArray': - process_nonadditional_measurements(val, - domain + "memoryusage", - eventId, - startEpochMicrosec, - lastEpochMicrosec) + process_nonadditional_measurements(val, domain + "memoryusage", eventId, startEpochMicrosec, lastEpochMicrosec) elif key == 'nicPerformanceArray': - process_nonadditional_measurements(val, - domain + "nicperformance", - eventId, - startEpochMicrosec, - lastEpochMicrosec) + process_nonadditional_measurements(val, domain + "nicperformance", eventId, startEpochMicrosec, lastEpochMicrosec) elif key == 'loadArray': - process_nonadditional_measurements(val, - domain + "load", - eventId, - startEpochMicrosec, - lastEpochMicrosec) + process_nonadditional_measurements(val, domain + "load", eventId, startEpochMicrosec, lastEpochMicrosec) + elif key == 'networkSliceArray': + process_nonadditional_measurements(val, domain + "networkslice", eventId, startEpochMicrosec, lastEpochMicrosec) elif isinstance(val, dict): for key2, val2 in val.items(): if isinstance(val2, str): @@ -441,17 +569,16 @@ def process_time(eventTimestamp): eventTimestamp = eventTimestamp + "0" return format(int(eventTimestamp)) + # -------------------------------------------------------------------------- # Save event data # -------------------------------------------------------------------------- - - def save_event_in_db(body): - jobj = json.loads(body) global source global eventTimestamp - source = "unknown" + jobj = json.loads(body) + source = "unknown" domain = jobj['event']['commonEventHeader']['domain'] eventTimestamp = jobj['event']['commonEventHeader']['startEpochMicrosec'] agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper() @@ -474,7 +601,7 @@ def save_event_in_db(body): for key2, val2 in val.items(): if val2 != "": if isinstance(val2, str): - pdata = pdata +',{}={}'.format(key2, process_special_char(val2)) + pdata = pdata + ',{}={}'.format(key2, process_special_char(val2)) else: nonstringpdata = nonstringpdata + '{}={}'.format(key2, val2) + ',' @@ -497,10 +624,7 @@ def save_event_in_db(body): # processing fault events if 'faultFields' in jobj['event']: logger.debug('Found faultFields') - process_fault_event(domain, - jobj['event']['faultFields'], - pdata, - nonstringpdata) + process_fault_event(domain, jobj['event']['faultFields'], pdata, nonstringpdata) # process heartbeat events if 'heartbeatFields' in jobj['event']: @@ -515,7 +639,8 @@ def save_event_in_db(body): logger.debug('Found measurementFields') process_measurement_events(domain, jobj['event']['measurementFields'], - pdata, nonstringpdata, + pdata, + nonstringpdata, jobj['event']['commonEventHeader']['eventId'], jobj['event']['commonEventHeader']['startEpochMicrosec'], jobj['event']['commonEventHeader']['lastEpochMicrosec']) @@ -714,15 +839,30 @@ USAGE global vel_password global vel_topic_name global data_storage + global elasticsearch_domain + global elasticsearch_port + global kafka_port + global kafka_topic influxdb = config.get(config_section, 'influxdb', vars=overrides) log_file = config.get(config_section, 'log_file', vars=overrides) vel_port = config.get(config_section, 'vel_port', vars=overrides) vel_path = config.get(config_section, 'vel_path', vars=overrides) + kafka_port = config.get(config_section, + 'kafka_second_port', + vars=overrides) + kafka_topic = config.get(config_section, + 'kafka_topic', + vars=overrides) data_storage = config.get(config_section, 'data_storage', vars=overrides) - + elasticsearch_domain = config.get(config_section, + 'elasticsearch_domain', + vars=overrides) + elasticsearch_port = config.get(config_section, + 'elasticsearch_port', + vars=overrides) vel_topic_name = config.get(config_section, 'vel_topic_name', vars=overrides) @@ -742,7 +882,7 @@ USAGE 'throttle_schema_file', vars=overrides) test_control_schema_file = config.get(config_section, - 'test_control_schema_file', + 'test_control_schema_file', vars=overrides) # ---------------------------------------------------------------------- @@ -769,9 +909,9 @@ USAGE logger.addHandler(handler) logger.info('Started') - # ---------------------------------------------------------------------- + # --------------------------------------------------------------------- # Log the details of the configuration. - # ---------------------------------------------------------------------- + # --------------------------------------------------------------------- logger.debug('Log file = {0}'.format(log_file)) logger.debug('Influxdb server = {0}'.format(influxdb)) logger.debug('Event Listener Port = {0}'.format(vel_port)) @@ -790,6 +930,7 @@ USAGE # ---------------------------------------------------------------------- # Perform some basic error checking on the config. # ---------------------------------------------------------------------- + if (int(vel_port) < 1024 or int(vel_port) > 65535): logger.error('Invalid Vendor Event Listener port ({0}) ' 'specified'.format(vel_port)) diff --git a/collector/evel-test-collector/config/collector.conf b/collector/evel-test-collector/config/collector.conf old mode 100644 new mode 100755 index 5f05f50..83cfd77 --- a/collector/evel-test-collector/config/collector.conf +++ b/collector/evel-test-collector/config/collector.conf @@ -15,7 +15,7 @@ #------------------------------------------------------------------------------ [default] log_file = collector.log -schema_file = evel-test-collector/docs/att_interface_definition/CommonEventFormat_30.2.1_ONAP.json +schema_file = evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json base_schema_file = evel-test-collector/docs/att_interface_definition/base_schema.json throttle_schema_file = evel-test-collector/docs/att_interface_definition/throttle_schema.json test_control_schema_file = evel-test-collector/docs/att_interface_definition/test_control_schema.json @@ -40,12 +40,15 @@ test_control_schema_file = evel-test-collector/docs/att_interface_definition/tes #------------------------------------------------------------------------------ vel_domain = 127.0.0.1 vel_port = 9999 -vel_path = -vel_username = -vel_password = +vel_path = +vel_username = +vel_password = vel_topic_name = events -data_storage = influxdb - +data_storage = +elasticsearch_domain = +elasticsearch_port= 9200 +kafka_second_port = +kafka_topic = #------------------------------------------------------------------------------ # Settings to be used when running in a windows test environment rather than @@ -79,7 +82,7 @@ test_control_schema_file = ../../docs/att_interface_definition/test_control_sche vel_domain = 127.0.0.1 vel_port = 9999 vel_path = -vel_username = user user user user -vel_password = password password password password +vel_username = +vel_password = vel_topic_name = diff --git a/collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat_30.2.1_ONAP.json b/collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json similarity index 99% rename from collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat_30.2.1_ONAP.json rename to collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json index 6ee76cb..24701ac 100644 --- a/collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat_30.2.1_ONAP.json +++ b/collector/evel-test-collector/docs/att_interface_definition/CommonEventFormat-v7-2-2.json @@ -245,6 +245,28 @@ "thresholdCrossed" ] }, + "networkSlice": { + "description": "PM counters for network slicing", + "type": "object", + "properties": { + "networkSliceIdentifier": { + "description": "network slicing identifier", + "type": "string" + }, + "DRB.UEThpDl.SNSSAI": { + "description": "UE throughput in the downlink direction for each slice", + "type": "number", + "minimum": 0, + "maximum": 4294967295 + }, + "DRB.UEThpUl.SNSSAI": { + "description": "UE throughput in the uplink direction for each slice", + "type": "number", + "minimum": 0, + "maximum": 4294967295 + } + } + }, "cpuUsage": { "description": "usage of an identified CPU", "type": "object", @@ -1851,6 +1873,13 @@ "description": "over the measurementInterval, peak total number of: users, subscribers, devices, adjacencies, etc., for the VM, or subscribers, devices, etc., for the xNF", "type": "integer" }, + "networkSliceArray": { + "description": "usage of an array of network slicing", + "type": "array", + "items": { + "$ref": "#/definitions/networkSlice" + } + }, "cpuUsageArray": { "description": "usage of an array of CPUs", "type": "array", diff --git a/collector/evel-test-collector/docs/att_interface_definition/README.md b/collector/evel-test-collector/docs/att_interface_definition/README.md old mode 100644 new mode 100755 index 552990e..7d8ad61 --- a/collector/evel-test-collector/docs/att_interface_definition/README.md +++ b/collector/evel-test-collector/docs/att_interface_definition/README.md @@ -1,3 +1,2 @@ -NOTE: This folder contains updates for the VES 7.0 release. -* CommonEventFormat_30.2.1_ONAP.json - +NOTE: This folder contains updates for the VES 7.0 release. +* CommonEventFormat-v7-2-2.json diff --git a/collector/start.sh b/collector/start.sh index 534c220..1394cb6 100755 --- a/collector/start.sh +++ b/collector/start.sh @@ -1,6 +1,5 @@ #!/bin/bash -# Original work Copyright 2017-2018 AT&T Intellectual Property, Inc -# Modified work Copyright 2021 Xoriant Corporation +# Copyright 2017-2018 AT&T Intellectual Property, Inc # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,32 +13,54 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# What this is: Startup script for the VES Collector running under docker. +#. What this is: Startup script for the OPNFV VES Collector running under docker. # the variables used below are now passed in as environmental variables # from the docker run command. cd /opt/ves touch monitor.log +config_file="evel-test-collector/config/collector.conf" +schema_file=`grep -w schema_file $config_file | head -1 | cut -d "=" -f 2 | xargs` + +echo "schema_file = " $schema_file > monitor.log +if [ "$schema_file" != "" ]; then + if ! [ -e $schema_file ]; then + echo "Schema file does not exists!" >> monitor.log + exit + fi +else + echo "Schema file path is missing in config file!" >> monitor.log + exit +fi + sed -i -- \ "s~log_file = /var/log/att/collector.log~log_file = /opt/ves/collector.log~" \ - evel-test-collector/config/collector.conf + $config_file sed -i -- "s/vel_domain = 127.0.0.1/vel_domain = $ves_host/g" \ - evel-test-collector/config/collector.conf + $config_file sed -i -- "s/vel_port = 30000/vel_port = $ves_port/g" \ - evel-test-collector/config/collector.conf + $config_file sed -i -- "s/vel_username =/vel_username = $ves_user/g" \ - evel-test-collector/config/collector.conf + $config_file sed -i -- "s/vel_password =/vel_password = $ves_pass/g" \ - evel-test-collector/config/collector.conf + $config_file sed -i -- "s~vel_path = vendor_event_listener/~vel_path = $ves_path~g" \ - evel-test-collector/config/collector.conf + $config_file sed -i -- "s~vel_topic_name = example_vnf~vel_topic_name = $ves_topic~g" \ - evel-test-collector/config/collector.conf + $config_file sed -i -- "/vel_topic_name = /a influxdb = $ves_influxdb_host:$ves_influxdb_port" \ - evel-test-collector/config/collector.conf + $config_file +sed -i -- "s/elasticsearch_domain =/elasticsearch_domain = $elasticsearch_domain/g" \ + $config_file +sed -i -- "s/data_storage =/data_storage = $data_storage/g" \ + $config_file +sed -i -- "s/kafka_second_port =/kafka_second_port = $kafka_host_2:$kafka_port_2/g" \ + $config_file +sed -i -- "s/kafka_topic =/kafka_topic = $kafka_topic/g" \ + $config_file -echo; echo "evel-test-collector/config/collector.conf" -cat evel-test-collector/config/collector.conf +echo; echo $config_file +cat $config_file echo; echo "wait for InfluxDB API at $ves_influxdb_host:$ves_influxdb_port" while ! curl http://$ves_influxdb_host:$ves_influxdb_port/ping ; do diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..5ec3fee --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,173 @@ +# Copyright 2021 Xoriant Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '3' +services: + ves-influxdb: + container_name: ves-influxdb + image: influxdb:1.8.5 + ports: + - 8086:8086 + networks: + - ves-net + ves-grafana: + container_name: ves-grafana + image: grafana/grafana:7.5.11 + ports: + - 8880:3000 + networks: + - ves-net + ves-zookeeper: + container_name: ves-zookeeper + image: confluentinc/cp-zookeeper:5.5.6 + networks: + - ves-net + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ves-kafka: + container_name: ves-kafka + image: confluentinc/cp-kafka:5.5.6 + networks: + - ves-net + depends_on: + - ves-zookeeper + ports: + - 9092:9092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: ves-zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ves-kafka:9092,PLAINTEXT_HOST://localhost:19092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + smo-zookeeper: + container_name: smo-zookeeper + image: confluentinc/cp-zookeeper:5.5.6 + networks: + - ves-net + environment: + ZOOKEEPER_CLIENT_PORT: 22181 + ZOOKEEPER_TICK_TIME: 2000 + smo-kafka: + container_name: smo-kafka + image: confluentinc/cp-kafka:5.5.6 + networks: + - ves-net + depends_on: + - smo-zookeeper + ports: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: smo-zookeeper:22181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://smo-kafka:29092,PLAINTEXT_HOST://localhost:39092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + ves-kafdrop: + container_name: ves-kafdrop + image: obsidiandynamics/kafdrop:3.27.0 + networks: + - ves-net + depends_on: + - ves-kafka + ports: + - 9000:9000 + environment: + KAFKA_BROKERCONNECT: ves-kafka:9092 + smo-kafdrop: + container_name: smo-kafdrop + image: obsidiandynamics/kafdrop:3.27.0 + networks: + - ves-net + depends_on: + - smo-kafka + ports: + - 29000:9000 + environment: + KAFKA_BROKERCONNECT: smo-kafka:29092 + ves-collector: + container_name: ves-collector + build: ./collector + image: ves-collector + networks: + - ves-net + ports: + - 9999:9999 + environment: + ves_influxdb_host: "ves-influxdb" + ves_influxdb_port: "8086" + ves_grafana_host: "ves-grafana" + ves_grafana_port: "3000" + data_storage: "influxdb|elasticsearch" + elasticsearch_domain: "ves-elasticsearch" + kafka_host_2: "smo-kafka" + kafka_port_2: "29092" + kafka_topic: "smo-events" + ves_host: "ves-collector" + ves_port: "9999" + ves_grafana_auth: "admin:admin" + ves_user: "user" + ves_pass: "password" + ves_path: "" + ves_topic: "events" + ves_loglevel: "ERROR" + depends_on: + - ves-kafka + - smo-kafka + - ves-influxdb + - ves-grafana + ves-agent: + container_name: ves-agent + build: ./agent + image: ves-agent + networks: + - ves-net + restart: always + environment: + ves_kafka_host: "ves-kafka" + ves_kafka_hostname: "ves-kafka" + ves_host: "ves-collector" + ves_port: "9999" + ves_path: "" + ves_topic: "events" + ves_https: "False" + ves_user: "user" + ves_pass: "password" + ves_interval: "10" + ves_kafka_port: "9092" + ves_mode: "./yaml/host" + ves_version: "5" + ves_loglevel: "ERROR" + depends_on: + - ves-kafka + - ves-collector + ves-elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:7.11.1 + container_name: ves-elasticsearch + restart: always + environment: + discovery.type: "single-node" + ES_JAVA_OPTS: "-Xms1024m -Xmx1024m" + ports: + - 9200:9200 + - 9300:9300 + networks: + - ves-net + +networks: + ves-net: + driver: bridge diff --git a/ves-start.sh b/ves-start.sh deleted file mode 100755 index b5ef2c6..0000000 --- a/ves-start.sh +++ /dev/null @@ -1,201 +0,0 @@ -#!/bin/bash -# Copyright 2021 Xoriant Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Script to build ves project and its dependent containers -# Maintainer shrinivas.joshi@xoriant.com - -# List of containers for this project - -# ves-kafka -- kafka broker to store events recieved from collectd or other similar services -# ves-agent -- read events forom kafka and send those events to VEL port on ves-collector container -# ves-collector -- Read the event received from ves-agent and write it to influxdb -# grafana -- Read the events written by ves-collector in influxdb and show the graphs on UI -# influxdb -- Store the events in DB sent by ves-agent -# kafdrop -- UI for Kafka - -# Stop all containers if those are running accedently. - -./ves-stop.sh - -# Port allotment on host system for the micro services running in docker. - -influx_port=3330 -grafana_port=8880 -kafka_port=9092 -kafdrop_port=9000 -zookeeper_port=2181 -vel_ves_port=9999 - -OS=`uname -s` -# Check Docker, collectd, ip and git is installed on the VM - -if ! which docker > /dev/null; then - echo -e "Docker not found, please install docker from https://docs.docker.com/engine/install/ubuntu\n" - exit; -fi - -if ! which collectd > /dev/null; then - if [ $OS = 'Darwin' ] - then - echo -e "Collectd not found, please install collectd using brew install collectd\n" - elif [ $OS = 'Linux' ] - then - echo -e "Collectd not found, please install collectd using sudo apt-get install -y collectd\n" - else - echo -e "Could not determine kind of system. Collectd not found, please install collectd using whatever method works.\n" - fi - exit; -fi - -if ! which ip > /dev/null; then - if [ $OS = 'Darwin' ] - then - echo -e "ip not found, please install ip using brew install ip.\n" - elif [ $OS = 'Linux' ] - then - echo -e "/sbin/ip not found, please install ip using sudo apt-get install ip.\n" - else - echo -e "Could not determine kind of system. ip not found, please install ip using whatever method works.\n" - exit 1 - fi - exit; -fi - -clear - -#get local ip address of VM from first interface -if [ $OS = 'Darwin' ] -then - local_ip=`ip -4 addr list | grep en11 | grep inet | awk '{print $2}' | cut -d/ -f1` -elif [ $OS = 'Linux' ] -then - local_ip=`/sbin/ip -o -4 addr list | grep enp | head -n 1 | awk '{print $4}' | cut -d/ -f1` -else - echo -e "Could not determine which OS this.\n" - exit 1 -fi -echo -e "Binding VES Services to local ip address $local_ip \n " -echo "" -echo -e "--------------------------------------------------------------------\n" -#Spin influx DB -echo -e "Starting influxdb container on Local Port Number $influx_port. Please wait..\n" -docker run -d -p $influx_port:8086 -v $PWD/influxdb influxdb:1.8.5 -if [ $? != 0 ] -then - exit 1 -fi - -sleep 5 #Give some time to spin the container and bring service up -echo "Done." -echo"" -echo -e "--------------------------------------------------------------------\n" -#Spin Grafana Cotainer -echo -e "Starting Grafana cotainer on Local port number $grafana_port. Please wait..\n" -docker run -d -p $grafana_port:3000 grafana/grafana -if [ $? != 0 ] -then - exit 1 -fi -sleep 5 #Give some time to spin the container and bring service up -echo "Done." -echo "" -echo -e "--------------------------------------------------------------------\n" -#Spin zookeeper container -echo -e "Starting zookeeper container on Local port number $zookeeper_port. Please wait..\n" -docker run -d --add-host mykafka:$local_ip --add-host myzoo:$local_ip \ - -p $zookeeper_port:2181 -p 2888:2888 -p 3888:3888 \ - -p 8800:8080 zookeeper -if [ $? != 0 ] -then - exit 1 -fi -sleep 5 -echo "Done." -echo "" -echo -e "--------------------------------------------------------------------\n" -#Spin Kafka container. -echo -e "Starting Kafka container on Local port number $kafka_port. Please wait..\n" -docker run -d --add-host mykafka:$local_ip -e zookeeper_host=$local_ip \ - -e zookeeper_hostname='myzoo' -e zookeeper_port=$zookeeper_port \ - -e kafka_hostname='mykafka' -e kafka_port=$kafka_port \ - -p $kafka_port:$kafka_port ves-kafka -if [ $? != 0 ] -then - exit 1 -fi -sleep 7 -echo "Done." -echo "" -echo -e "--------------------------------------------------------------------\n" -#Spin Kafdrop UI container (this is optional componant) -echo -e "Starting kafdrop UI container on Local port numner $kafdrop_port. please wait..\n" -docker run -d --add-host mykafka:$local_ip -p $kafdrop_port:9000 \ - -e KAFKA_BROKERCONNECT=$local_ip:$kafka_port \ - -e JVM_OPTS="-Xms64M -Xmx128M" obsidiandynamics/kafdrop:latest -if [ $? != 0 ] -then - exit 1 -fi -sleep 5 -echo "Done." -echo "" -echo -e "--------------------------------------------------------------------\n" -# Spin ves-collector container. -echo -e "Starting ves collector container on Local port number $vel_ves_port. Please wait\n" -docker run -d -e ves_influxdb_host=$local_ip \ - -e ves_influxdb_port=$influx_port -e ves_grafana_host=$local_ip \ - -e ves_grafana_port=$grafana_port -e ves_host='localhost' \ - -e ves_port=$vel_ves_port -e ves_grafana_auth='admin:admin' \ - -e ves_user='user' -e ves_pass='password' -e ves_path=''\ - -e ves_topic='events' -e ves_loglevel='DEBUG' \ - -p $vel_ves_port:$vel_ves_port ves-collector -if [ $? != 0 ] -then - exit 1 -fi -sleep 6 -echo "Done." -echo "" -echo -e "--------------------------------------------------------------------\n" -#Spin ves agent container. -echo -e "Starting ves agent container. Please wait\n" -docker run -d -e ves_kafka_host=$local_ip \ - -e ves_kafka_hostname='mykafka' -e ves_host=$local_ip \ - -e ves_port=$vel_ves_port -e ves_path='' \ - -e ves_topic='events' -e ves_https='False' -e ves_user='user' \ - -e ves_pass='password' -e ves_interval='10' \ - -e ves_kafka_port=$kafka_port -e ves_mode='./yaml/host' \ - -e ves_version='7' -e ves_loglevel='DEBUG' ves-agent -if [ $? != 0 ] -then - exit 1 -fi -sleep 5 -echo "Done." -echo "" -echo -e "--------------------------------------------------------------------\n" -echo"" -echo -e "ves stack summary\n" -echo -e "===================================================================================================================\n" -echo "" -echo -e "Kafka port: $kafka_port \n" -echo -e "Kafdrop port: $kafdrop_port \n" -echo -e "ves collector listner port: $vel_ves_port \n" -echo -e "Grafana port: $grafana_port \n" -echo -e "To access kafdrop UI use http://$local_ip:$kafdrop_port from your web browser. \n" -echo -e "To access grafana dashboard paste url http://$local_ip:$grafana_port in web browser. " -echo -e "Grafana username/password is admin/admin *** DO NOT CHANGE THE ADMIN PASSWORD, CLICK SKIP OPTION ***\n" -echo "" -echo -e "===================================================================================================================\n" diff --git a/ves-stop.sh b/ves-stop.sh deleted file mode 100755 index 210072a..0000000 --- a/ves-stop.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash -# Copyright 2021 Xoriant Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -echo "Stopping all containers" -docker stop $(docker ps -aq)