import jsonschema
from functools import partial
import requests
+from datetime import timezone
+from elasticsearch import Elasticsearch
+from kafka import KafkaProducer
+from json import dumps
+import datetime
import time
monitor_mode = "f"
class JSONObject:
+
def __init__(self, d):
self.__dict__ = d
# Logger for this module.
# ------------------------------------------------------------------------------
logger = None
+producer = None
def listener(environ, start_response, schema):
logger.debug('Content Body: {0}'.format(body))
mode, b64_credentials = str.split(environ.get('HTTP_AUTHORIZATION',
- 'None None'))
+ 'None None'))
logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
if (b64_credentials != 'None'):
credentials = b64decode(b64_credentials)
# --------------------------------------------------------------------------
# See whether the user authenticated themselves correctly.
# --------------------------------------------------------------------------
- if (credentials == bytes((vel_username
- + ':' + vel_password), 'utf-8')):
+ if (credentials == bytes((vel_username + ':' + vel_password),
+ 'utf-8')):
logger.info('Authenticated OK')
# ----------------------------------------------------------------------
response = pending_command_list
pending_command_list = None
- logger.debug('\n' + '='*80)
+ logger.debug('\n' + '=' * 80)
logger.debug('Sending pending commandList in the response:\n'
'{0}'.format(json.dumps(response,
sort_keys=True,
indent=4,
separators=(',', ': '))))
- logger.debug('='*80 + '\n')
+ logger.debug('=' * 80 + '\n')
yield json.dumps(response).encode()
else:
start_response('202 Accepted', [])
yield ''.encode()
else:
- logger.warn('Failed to authenticate OK; creds: '
- + credentials)
+ logger.warn('Failed to authenticate OK; creds: ' + credentials)
logger.warn('Failed to authenticate agent credentials: ',
credentials,
'against expected ',
# Respond to the caller.
# ----------------------------------------------------------------------
start_response('401 Unauthorized', [('Content-type',
- 'application/json')])
+ 'application/json')])
req_error = {'requestError': {
'policyException': {
- 'messageId': 'POL0001',
- 'text': 'Failed to authenticate'
- }
- }
- }
+ 'messageId': 'POL0001',
+ 'text': 'Failed to authenticate'
+ }
+ }
+ }
yield json.dumps(req_error)
+ # saving data in Kafka by deafult
+ save_event_in_kafka(body)
+
logger.info("data_storage ={}".format(data_storage))
- if(data_storage == 'influxdb'):
+ data_storageArr = data_storage.split("|")
+ # if word 'elasticsearch' exists in config file then save data in elasticsearch
+ if('elasticsearch' in data_storageArr):
+ save_event_in_elasticsearch(body)
+ # if word 'influxdb' exists in config file then save data in influxdb
+ if('influxdb' in data_storageArr):
save_event_in_db(body)
except jsonschema.SchemaError as e:
decoded_body = json.loads(body)
logger.warn('Valid JSON body (no schema checking) decoded:\n'
'{0}'.format(json.dumps(decoded_body,
- sort_keys=True,
- indent=4,
- separators=(',', ': '))))
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': '))))
logger.warn('Event is valid JSON but not checked against schema!')
except Exception as e:
# --------------------------------------------------------------------------
# Send event to influxdb
# --------------------------------------------------------------------------
+
def send_to_influxdb(event, pdata):
url = 'http://{}/write?db=veseventsdb'.format(influxdb)
- logger.info('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata))
+ logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata))
r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
logger.info('influxdb return code {}'.format(r.status_code))
if r.status_code != 204:
logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
+# --------------------------------------------------------------------------
+# Save event data in Kafka
+# --------------------------------------------------------------------------
+def save_event_in_kafka(body):
+ jobj = json.loads(body)
+ if 'commonEventHeader' in jobj['event']:
+ # store each domain information in individual topic
+ topic = jobj['event']['commonEventHeader']['domain'].lower()
+ logger.info('Got an event request for {} domain'.format(topic))
+ if (len(topic) == 0):
+ topic = kafka_topic
+
+ logger.debug('Kafka broker ={} and kafka topic={}'.format(kafka_port, topic))
+ produce_events_in_kafka(jobj, topic)
+
+
+def produce_events_in_kafka(jobj, topic):
+ try:
+ global producer
+ if producer is None:
+ logger.debug('Producer is None')
+ producer = KafkaProducer(bootstrap_servers=[kafka_port],
+ value_serializer=lambda x:
+ dumps(x).encode('utf-8'))
+ producer.send(topic, value=jobj)
+ logger.debug('Event has been successfully posted into kafka bus')
+ except Exception as e:
+ logger.error('Getting error while posting event into kafka bus {0}'.format(e))
+
+# --------------------------------------------------------------------------
+# Save event data in Elasticsearch
+# --------------------------------------------------------------------------
+
+def format_timestamp(EpochMicrosec):
+ if isinstance(EpochMicrosec, int):
+ return datetime.datetime.fromtimestamp(int(str(EpochMicrosec)[:10]), tz=timezone.utc)
+ else:
+ return datetime.datetime.fromtimestamp(EpochMicrosec, tz=timezone.utc)
+
+
+def save_event_in_elasticsearch(body):
+ jobj = json.loads(body)
+ eventId = jobj['event']['commonEventHeader']['eventId']
+ sourceId = jobj['event']['commonEventHeader']['sourceId']
+ startEpochMicrosec = jobj['event']['commonEventHeader']['startEpochMicrosec']
+ lastEpochMicrosec = jobj['event']['commonEventHeader']['lastEpochMicrosec']
+ jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp'] = format_timestamp(startEpochMicrosec)
+ jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp'] = format_timestamp(lastEpochMicrosec)
+ domain = jobj['event']['commonEventHeader']['domain'].lower()
+ es = Elasticsearch([{'host': elasticsearch_domain, 'port': elasticsearch_port}])
+
+ if 'measurementFields' in jobj['event']:
+ if 'commonEventHeader' in jobj['event']:
+ es.index(index='measurement', body=jobj['event']['commonEventHeader'])
+
+ if 'additionalMeasurements' in jobj['event']['measurementFields']:
+ for addmeasure in jobj['event']['measurementFields']['additionalMeasurements']:
+ addmeasure['eventId'] = eventId
+ addmeasure['sourceId'] = sourceId
+ addmeasure['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
+ addmeasure['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
+ addmeasure['startEpochMicrosec'] = startEpochMicrosec
+ addmeasure['lastEpochMicrosec'] = lastEpochMicrosec
+ es.index(index='measurementaddlmeasurements', body=addmeasure)
+
+ if 'cpuUsageArray' in jobj['event']['measurementFields']:
+ for cpu in jobj['event']['measurementFields']['cpuUsageArray']:
+ cpu['eventId'] = eventId
+ cpu['sourceId'] = sourceId
+ cpu['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
+ cpu['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
+ cpu['startEpochMicrosec'] = startEpochMicrosec
+ cpu['lastEpochMicrosec'] = lastEpochMicrosec
+ es.index(index='measurementcpuusage', body=cpu)
+
+ if 'diskUsageArray' in jobj['event']['measurementFields']:
+ for disk in jobj['event']['measurementFields']['diskUsageArray']:
+ disk['eventId'] = eventId
+ disk['sourceId'] = sourceId
+ disk['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
+ disk['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
+ disk['startEpochMicrosec'] = startEpochMicrosec
+ disk['lastEpochMicrosec'] = lastEpochMicrosec
+ es.index(index='measurementdiskusage', body=disk)
+
+ if 'nicPerformanceArray' in jobj['event']['measurementFields']:
+ for vnic in jobj['event']['measurementFields']['nicPerformanceArray']:
+ vnic['eventId'] = eventId
+ vnic['sourceId'] = sourceId
+ vnic['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
+ vnic['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
+ vnic['startEpochMicrosec'] = startEpochMicrosec
+ vnic['lastEpochMicrosec'] = lastEpochMicrosec
+ es.index(index='measurementnicperformance', body=vnic)
+
+ if 'memoryUsageArray' in jobj['event']['measurementFields']:
+ for memory in jobj['event']['measurementFields']['memoryUsageArray']:
+ memory['eventId'] = eventId
+ memory['sourceId'] = sourceId
+ memory['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
+ memory['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
+ memory['startEpochMicrosec'] = startEpochMicrosec
+ memory['lastEpochMicrosec'] = lastEpochMicrosec
+ es.index(index='measurementmemoryusage', body=memory)
+
+ if 'loadArray' in jobj['event']['measurementFields']:
+ for load in jobj['event']['measurementFields']['loadArray']:
+ load['eventId'] = eventId
+ load['sourceId'] = sourceId
+ load['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
+ load['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
+ load['startEpochMicrosec'] = startEpochMicrosec
+ load['lastEpochMicrosec'] = lastEpochMicrosec
+ es.index(index='measurementload', body=load)
+
+ if 'networkSliceArray' in jobj['event']['measurementFields']:
+ for networkslice in jobj['event']['measurementFields']['networkSliceArray']:
+ networkslice['eventId'] = eventId
+ networkslice['sourceId'] = sourceId
+ networkslice['startEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['startEpochMicrosecTimestamp']
+ networkslice['lastEpochMicrosecTimestamp'] = jobj['event']['commonEventHeader']['lastEpochMicrosecTimestamp']
+ networkslice['startEpochMicrosec'] = startEpochMicrosec
+ networkslice['lastEpochMicrosec'] = lastEpochMicrosec
+ es.index(index='measurementnetworkslice', body=networkslice)
+
+ if 'pnfRegistrationFields' in jobj['event']:
+ es.index(index=domain, body=jobj['event'])
+ if 'thresholdCrossingAlertFields' in jobj['event']:
+ es.index(index=domain, body=jobj['event'])
+ if 'faultFields' in jobj['event']:
+ es.index(index=domain, body=jobj['event'])
+ if 'heartbeatFields' in jobj['event']:
+ es.index(index=domain, body=jobj['event'])
+
def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
for additionalMeasurements in val:
for associatedAlertId in val:
associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|"
if(associatedAlertIdList != ""):
- pdata = pdata + ',{}={}'.format("associatedAlertIdList", process_special_char(associatedAlertIdList)[:-1])
+ pdata = pdata + ',{}={}'.format("associatedAlertIdList",
+ process_special_char(associatedAlertIdList)[:-1])
send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
pdata = pdata + ',{}={}'.format(key, process_special_char(val))
elif isinstance(val, list):
if key == 'additionalMeasurements':
- process_additional_measurements(val,
- domain + "additionalmeasurements",
- eventId,
- startEpochMicrosec,
- lastEpochMicrosec)
+ process_additional_measurements(val, domain + "additionalmeasurements", eventId, startEpochMicrosec, lastEpochMicrosec)
elif key == 'cpuUsageArray':
- process_nonadditional_measurements(val,
- domain + "cpuusage",
- eventId,
- startEpochMicrosec,
- lastEpochMicrosec)
+ process_nonadditional_measurements(val, domain + "cpuusage", eventId, startEpochMicrosec, lastEpochMicrosec)
elif key == 'diskUsageArray':
- process_nonadditional_measurements(val,
- domain + "diskusage",
- eventId,
- startEpochMicrosec,
- lastEpochMicrosec)
+ process_nonadditional_measurements(val, domain + "diskusage", eventId, startEpochMicrosec, lastEpochMicrosec)
elif key == 'memoryUsageArray':
- process_nonadditional_measurements(val,
- domain + "memoryusage",
- eventId,
- startEpochMicrosec,
- lastEpochMicrosec)
+ process_nonadditional_measurements(val, domain + "memoryusage", eventId, startEpochMicrosec, lastEpochMicrosec)
elif key == 'nicPerformanceArray':
- process_nonadditional_measurements(val,
- domain + "nicperformance",
- eventId,
- startEpochMicrosec,
- lastEpochMicrosec)
+ process_nonadditional_measurements(val, domain + "nicperformance", eventId, startEpochMicrosec, lastEpochMicrosec)
elif key == 'loadArray':
- process_nonadditional_measurements(val,
- domain + "load",
- eventId,
- startEpochMicrosec,
- lastEpochMicrosec)
+ process_nonadditional_measurements(val, domain + "load", eventId, startEpochMicrosec, lastEpochMicrosec)
+ elif key == 'networkSliceArray':
+ process_nonadditional_measurements(val, domain + "networkslice", eventId, startEpochMicrosec, lastEpochMicrosec)
elif isinstance(val, dict):
for key2, val2 in val.items():
if isinstance(val2, str):
eventTimestamp = eventTimestamp + "0"
return format(int(eventTimestamp))
+
# --------------------------------------------------------------------------
# Save event data
# --------------------------------------------------------------------------
-
-
def save_event_in_db(body):
- jobj = json.loads(body)
global source
global eventTimestamp
- source = "unknown"
+ jobj = json.loads(body)
+ source = "unknown"
domain = jobj['event']['commonEventHeader']['domain']
eventTimestamp = jobj['event']['commonEventHeader']['startEpochMicrosec']
agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper()
for key2, val2 in val.items():
if val2 != "":
if isinstance(val2, str):
- pdata = pdata +',{}={}'.format(key2, process_special_char(val2))
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
else:
nonstringpdata = nonstringpdata + '{}={}'.format(key2, val2) + ','
# processing fault events
if 'faultFields' in jobj['event']:
logger.debug('Found faultFields')
- process_fault_event(domain,
- jobj['event']['faultFields'],
- pdata,
- nonstringpdata)
+ process_fault_event(domain, jobj['event']['faultFields'], pdata, nonstringpdata)
# process heartbeat events
if 'heartbeatFields' in jobj['event']:
logger.debug('Found measurementFields')
process_measurement_events(domain,
jobj['event']['measurementFields'],
- pdata, nonstringpdata,
+ pdata,
+ nonstringpdata,
jobj['event']['commonEventHeader']['eventId'],
jobj['event']['commonEventHeader']['startEpochMicrosec'],
jobj['event']['commonEventHeader']['lastEpochMicrosec'])
global vel_password
global vel_topic_name
global data_storage
+ global elasticsearch_domain
+ global elasticsearch_port
+ global kafka_port
+ global kafka_topic
influxdb = config.get(config_section, 'influxdb', vars=overrides)
log_file = config.get(config_section, 'log_file', vars=overrides)
vel_port = config.get(config_section, 'vel_port', vars=overrides)
vel_path = config.get(config_section, 'vel_path', vars=overrides)
+ kafka_port = config.get(config_section,
+ 'kafka_second_port',
+ vars=overrides)
+ kafka_topic = config.get(config_section,
+ 'kafka_topic',
+ vars=overrides)
data_storage = config.get(config_section,
'data_storage',
vars=overrides)
-
+ elasticsearch_domain = config.get(config_section,
+ 'elasticsearch_domain',
+ vars=overrides)
+ elasticsearch_port = config.get(config_section,
+ 'elasticsearch_port',
+ vars=overrides)
vel_topic_name = config.get(config_section,
'vel_topic_name',
vars=overrides)
'throttle_schema_file',
vars=overrides)
test_control_schema_file = config.get(config_section,
- 'test_control_schema_file',
+ 'test_control_schema_file',
vars=overrides)
# ----------------------------------------------------------------------
logger.addHandler(handler)
logger.info('Started')
- # ----------------------------------------------------------------------
+ # ---------------------------------------------------------------------
# Log the details of the configuration.
- # ----------------------------------------------------------------------
+ # ---------------------------------------------------------------------
logger.debug('Log file = {0}'.format(log_file))
logger.debug('Influxdb server = {0}'.format(influxdb))
logger.debug('Event Listener Port = {0}'.format(vel_port))
# ----------------------------------------------------------------------
# Perform some basic error checking on the config.
# ----------------------------------------------------------------------
+
if (int(vel_port) < 1024 or int(vel_port) > 65535):
logger.error('Invalid Vendor Event Listener port ({0}) '
'specified'.format(vel_port))
+++ /dev/null
-#!/bin/bash
-# Copyright 2021 Xoriant Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Script to build ves project and its dependent containers
-# Maintainer shrinivas.joshi@xoriant.com
-
-# List of containers for this project
-
-# ves-kafka -- kafka broker to store events recieved from collectd or other similar services
-# ves-agent -- read events forom kafka and send those events to VEL port on ves-collector container
-# ves-collector -- Read the event received from ves-agent and write it to influxdb
-# grafana -- Read the events written by ves-collector in influxdb and show the graphs on UI
-# influxdb -- Store the events in DB sent by ves-agent
-# kafdrop -- UI for Kafka
-
-# Stop all containers if those are running accedently.
-
-./ves-stop.sh
-
-# Port allotment on host system for the micro services running in docker.
-
-influx_port=3330
-grafana_port=8880
-kafka_port=9092
-kafdrop_port=9000
-zookeeper_port=2181
-vel_ves_port=9999
-
-OS=`uname -s`
-# Check Docker, collectd, ip and git is installed on the VM
-
-if ! which docker > /dev/null; then
- echo -e "Docker not found, please install docker from https://docs.docker.com/engine/install/ubuntu\n"
- exit;
-fi
-
-if ! which collectd > /dev/null; then
- if [ $OS = 'Darwin' ]
- then
- echo -e "Collectd not found, please install collectd using brew install collectd\n"
- elif [ $OS = 'Linux' ]
- then
- echo -e "Collectd not found, please install collectd using sudo apt-get install -y collectd\n"
- else
- echo -e "Could not determine kind of system. Collectd not found, please install collectd using whatever method works.\n"
- fi
- exit;
-fi
-
-if ! which ip > /dev/null; then
- if [ $OS = 'Darwin' ]
- then
- echo -e "ip not found, please install ip using brew install ip.\n"
- elif [ $OS = 'Linux' ]
- then
- echo -e "/sbin/ip not found, please install ip using sudo apt-get install ip.\n"
- else
- echo -e "Could not determine kind of system. ip not found, please install ip using whatever method works.\n"
- exit 1
- fi
- exit;
-fi
-
-clear
-
-#get local ip address of VM from first interface
-if [ $OS = 'Darwin' ]
-then
- local_ip=`ip -4 addr list | grep en11 | grep inet | awk '{print $2}' | cut -d/ -f1`
-elif [ $OS = 'Linux' ]
-then
- local_ip=`/sbin/ip -o -4 addr list | grep enp | head -n 1 | awk '{print $4}' | cut -d/ -f1`
-else
- echo -e "Could not determine which OS this.\n"
- exit 1
-fi
-echo -e "Binding VES Services to local ip address $local_ip \n "
-echo ""
-echo -e "--------------------------------------------------------------------\n"
-#Spin influx DB
-echo -e "Starting influxdb container on Local Port Number $influx_port. Please wait..\n"
-docker run -d -p $influx_port:8086 -v $PWD/influxdb influxdb:1.8.5
-if [ $? != 0 ]
-then
- exit 1
-fi
-
-sleep 5 #Give some time to spin the container and bring service up
-echo "Done."
-echo""
-echo -e "--------------------------------------------------------------------\n"
-#Spin Grafana Cotainer
-echo -e "Starting Grafana cotainer on Local port number $grafana_port. Please wait..\n"
-docker run -d -p $grafana_port:3000 grafana/grafana
-if [ $? != 0 ]
-then
- exit 1
-fi
-sleep 5 #Give some time to spin the container and bring service up
-echo "Done."
-echo ""
-echo -e "--------------------------------------------------------------------\n"
-#Spin zookeeper container
-echo -e "Starting zookeeper container on Local port number $zookeeper_port. Please wait..\n"
-docker run -d --add-host mykafka:$local_ip --add-host myzoo:$local_ip \
- -p $zookeeper_port:2181 -p 2888:2888 -p 3888:3888 \
- -p 8800:8080 zookeeper
-if [ $? != 0 ]
-then
- exit 1
-fi
-sleep 5
-echo "Done."
-echo ""
-echo -e "--------------------------------------------------------------------\n"
-#Spin Kafka container.
-echo -e "Starting Kafka container on Local port number $kafka_port. Please wait..\n"
-docker run -d --add-host mykafka:$local_ip -e zookeeper_host=$local_ip \
- -e zookeeper_hostname='myzoo' -e zookeeper_port=$zookeeper_port \
- -e kafka_hostname='mykafka' -e kafka_port=$kafka_port \
- -p $kafka_port:$kafka_port ves-kafka
-if [ $? != 0 ]
-then
- exit 1
-fi
-sleep 7
-echo "Done."
-echo ""
-echo -e "--------------------------------------------------------------------\n"
-#Spin Kafdrop UI container (this is optional componant)
-echo -e "Starting kafdrop UI container on Local port numner $kafdrop_port. please wait..\n"
-docker run -d --add-host mykafka:$local_ip -p $kafdrop_port:9000 \
- -e KAFKA_BROKERCONNECT=$local_ip:$kafka_port \
- -e JVM_OPTS="-Xms64M -Xmx128M" obsidiandynamics/kafdrop:latest
-if [ $? != 0 ]
-then
- exit 1
-fi
-sleep 5
-echo "Done."
-echo ""
-echo -e "--------------------------------------------------------------------\n"
-# Spin ves-collector container.
-echo -e "Starting ves collector container on Local port number $vel_ves_port. Please wait\n"
-docker run -d -e ves_influxdb_host=$local_ip \
- -e ves_influxdb_port=$influx_port -e ves_grafana_host=$local_ip \
- -e ves_grafana_port=$grafana_port -e ves_host='localhost' \
- -e ves_port=$vel_ves_port -e ves_grafana_auth='admin:admin' \
- -e ves_user='user' -e ves_pass='password' -e ves_path=''\
- -e ves_topic='events' -e ves_loglevel='DEBUG' \
- -p $vel_ves_port:$vel_ves_port ves-collector
-if [ $? != 0 ]
-then
- exit 1
-fi
-sleep 6
-echo "Done."
-echo ""
-echo -e "--------------------------------------------------------------------\n"
-#Spin ves agent container.
-echo -e "Starting ves agent container. Please wait\n"
-docker run -d -e ves_kafka_host=$local_ip \
- -e ves_kafka_hostname='mykafka' -e ves_host=$local_ip \
- -e ves_port=$vel_ves_port -e ves_path='' \
- -e ves_topic='events' -e ves_https='False' -e ves_user='user' \
- -e ves_pass='password' -e ves_interval='10' \
- -e ves_kafka_port=$kafka_port -e ves_mode='./yaml/host' \
- -e ves_version='7' -e ves_loglevel='DEBUG' ves-agent
-if [ $? != 0 ]
-then
- exit 1
-fi
-sleep 5
-echo "Done."
-echo ""
-echo -e "--------------------------------------------------------------------\n"
-echo""
-echo -e "ves stack summary\n"
-echo -e "===================================================================================================================\n"
-echo ""
-echo -e "Kafka port: $kafka_port \n"
-echo -e "Kafdrop port: $kafdrop_port \n"
-echo -e "ves collector listner port: $vel_ves_port \n"
-echo -e "Grafana port: $grafana_port \n"
-echo -e "To access kafdrop UI use http://$local_ip:$kafdrop_port from your web browser. \n"
-echo -e "To access grafana dashboard paste url http://$local_ip:$grafana_port in web browser. "
-echo -e "Grafana username/password is admin/admin *** DO NOT CHANGE THE ADMIN PASSWORD, CLICK SKIP OPTION ***\n"
-echo ""
-echo -e "===================================================================================================================\n"