'Against schema: {1}'.format(body, schema))
try:
decoded_body = json.loads(body)
- jsonschema.validate(decoded_body, schema)
+ validate = jsonschema.validate(decoded_body, schema)
+ assert (validate is None), "Invalid event!"
+
logger.info('Event is valid!')
logger.info('Valid body decoded & checked against schema OK:\n'
'{0}'.format(json.dumps(decoded_body,
api_version = args.api_version
config_file = args.config
config_section = args.section
+ assert (config_file != ""), "Config file is missing"
# ----------------------------------------------------------------------
# Now read the config file, using command-line supplied values as
logger.addHandler(handler)
logger.info('Started')
+
+ assert (log_file != ""), "Value of property 'log_file' is missing in config file"
+ assert (vel_schema_file != ""), "Value of property 'schema_file' is missing in config file"
+ assert (kafka_server != ""), "Value of property 'kafka_server' is missing in config file"
+
# ---------------------------------------------------------------------
# Log the details of the configuration.
# ---------------------------------------------------------------------
args = parser.parse_args()
config_file = args.config
config_section = args.section
+ assert (config_file != ""), "Config file is missing"
overrides = {}
config = configparser.ConfigParser()
log_file = config.get(config_section, 'log_file', vars=overrides)
log_level = config.get(config_section, 'log_level', vars=overrides)
+ assert (self.kafka_broker != ""), "Value of property 'kafka_broker' is missing in config file"
+ assert (log_file != ""), "Value of property 'log_file' is missing in config file"
+ assert (log_level != ""), "Value of property 'log_level' is missing in config file"
+
self.setLogger(log_file, log_level)
def getKafkaBroker(self):
logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata))
r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
logger.info('influxdb return code {}'.format(r.status_code))
- if r.status_code != 204:
- logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
+ assert (r.status_code == 204), logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
+
def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
for additionalMeasurements in val:
domain = jobj['event']['commonEventHeader']['domain']
eventTimestamp = jobj['event']['commonEventHeader']['startEpochMicrosec']
agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper()
+
+ assert (domain != ""), "'domain' in payload is empty"
+ assert(eventTimestamp != ""), "'eventTimestamp' in payload is empty"
+
if "LOCALHOST" in agent:
agent = "computehost"
source = jobj['event']['commonEventHeader']['sourceId'].upper()
verbose = args.verbose
config_section = args.section
+ assert (config_file != ""), "Config file is missing"
# ----------------------------------------------------------------------
# Now read the config file, using command-line supplied values as
# overrides.
influxdb = config.get(config_section, 'influxdb', vars=overrides)
log_file = config.get(config_section, 'log_file', vars=overrides)
- kafka_server=config.get(config_section,'kafka_server',
- vars=overrides)
+ kafka_server = config.get(config_section,'kafka_server', vars=overrides)
+
+ assert (influxdb != ""), "Value of property 'influxdb' is missing in config file"
+ assert (log_file != ""), "Value of property 'log_file' is missing in config file"
+ assert (kafka_server != ""), "Value of property 'kafka_server' is missing in config file"
# ----------------------------------------------------------------------
# Finally we have enough info to start a proper flow trace.
if msg is None:
continue
elif not msg.error():
+ assert (msg.value() is not None), "Invalid message"
logger.debug('Recived message from topic name {} and offset number {}'.format(msg.topic(), msg.offset()))
# saving data in influxdb
save_event_in_db(msg.value())