3 #Original work Copyright 2016-2017 AT&T Intellectual Property, Inc
4 #Modified work Copyright 2021 Xoriant Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
19 from rest_dispatcher import PathDispatcher, set_404_content
20 from wsgiref.simple_server import make_server
26 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
28 import logging.handlers
29 from base64 import b64decode
33 from functools import partial
35 from datetime import datetime, date, time
43 vdu_id = ['','','','','','']
44 summary_e = ['***** Summary of key stats *****','','','']
45 summary_c = ['Collectd agents:']
46 status = ['','Started','Started','Started']
48 template_404 = b'''POST {0}'''
53 def __init__(self, d):
58 __date__ = '2015-12-04'
59 __updated__ = '2015-12-04'
65 #------------------------------------------------------------------------------
66 # Address of influxdb server.
67 #------------------------------------------------------------------------------
69 influxdb = '127.0.0.1'
71 #------------------------------------------------------------------------------
72 # Credentials we expect clients to authenticate themselves with.
73 #------------------------------------------------------------------------------
77 #------------------------------------------------------------------------------
78 # The JSON schema which we will use to validate events.
79 #------------------------------------------------------------------------------
82 #------------------------------------------------------------------------------
83 # The JSON schema which we will use to validate client throttle state.
84 #------------------------------------------------------------------------------
85 throttle_schema = None
87 #------------------------------------------------------------------------------
88 # The JSON schema which we will use to provoke throttling commands for testing.
89 #------------------------------------------------------------------------------
90 test_control_schema = None
92 #------------------------------------------------------------------------------
93 # Pending command list from the testControl API
94 # This is sent as a response commandList to the next received event.
95 #------------------------------------------------------------------------------
96 pending_command_list = None
98 #------------------------------------------------------------------------------
99 # Logger for this module.
100 #------------------------------------------------------------------------------
103 def listener(environ, start_response, schema):
105 Handler for the Vendor Event Listener REST API.
107 Extract headers and the body and check that:
109 1) The client authenticated themselves correctly.
110 2) The body validates against the provided schema for the API.
113 logger.info('Got a Vendor Event request')
114 logger.info('==== ' + time.asctime() + ' ' + '=' * 49)
116 #--------------------------------------------------------------------------
117 # Extract the content from the request.
118 #--------------------------------------------------------------------------
119 length = int(environ.get('CONTENT_LENGTH', '0'))
120 logger.debug('Content Length: {0}'.format(length))
121 body = environ['wsgi.input'].read(length)
122 logger.debug('Content Body: {0}'.format(body))
124 mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
126 # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
128 logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
129 if (b64_credentials != 'None'):
130 credentials = b64decode(b64_credentials)
134 # logger.debug('Credentials: {0}'.format(credentials))
135 logger.debug('Credentials: ****')
137 #--------------------------------------------------------------------------
138 # If we have a schema file then check that the event matches that expected.
139 #--------------------------------------------------------------------------
140 if (schema is not None):
141 logger.debug('Attempting to validate data: {0}\n'
142 'Against schema: {1}'.format(body, schema))
144 decoded_body = json.loads(body)
145 jsonschema.validate(decoded_body, schema)
146 logger.info('Event is valid!')
147 logger.debug('Valid body decoded & checked against schema OK:\n'
148 '{0}'.format(json.dumps(decoded_body,
151 separators=(',', ': '))))
152 #--------------------------------------------------------------------------
153 # See whether the user authenticated themselves correctly.
154 #--------------------------------------------------------------------------
155 if (credentials == (vel_username + ':' + vel_password)):
156 logger.debug('Authenticated OK')
158 #----------------------------------------------------------------------
159 # Respond to the caller. If we have a pending commandList from the
160 # testControl API, send it in response.
161 #----------------------------------------------------------------------
162 global pending_command_list
163 if pending_command_list is not None:
164 start_response('202 Accepted',
165 [('Content-type', 'application/json')])
166 response = pending_command_list
167 pending_command_list = None
169 logger.debug('\n'+ '='*80)
170 logger.debug('Sending pending commandList in the response:\n'
171 '{0}'.format(json.dumps(response,
174 separators=(',', ': '))))
175 logger.debug('='*80 + '\n')
176 yield json.dumps(response)
178 start_response('202 Accepted', [])
181 logger.warn('Failed to authenticate OK; creds: ' + credentials)
182 logger.warn('Failed to authenticate agent credentials: ', credentials,
183 'against expected ', vel_username, ':', vel_password)
185 #----------------------------------------------------------------------
186 # Respond to the caller.
187 #----------------------------------------------------------------------
188 start_response('401 Unauthorized', [ ('Content-type',
189 'application/json')])
190 req_error = { 'requestError': {
192 'messageId': 'POL0001',
193 'text': 'Failed to authenticate'
197 yield json.dumps(req_error)
199 logger.info("data_storage ={}".format(data_storage))
200 if(data_storage == 'influxdb'):
201 save_event_in_db(body)
203 except jsonschema.SchemaError as e:
204 logger.error('Schema is not valid! {0}'.format(e))
206 except jsonschema.ValidationError as e:
207 logger.warn('Event is not valid against schema! {0}'.format(e))
208 logger.warn('Bad JSON body decoded:\n'
209 '{0}'.format(json.dumps(decoded_body,
212 separators=(',', ': '))))
214 except Exception as e:
215 logger.error('Event invalid for unexpected reason! {0}'.format(e))
217 logger.debug('No schema so just decode JSON: {0}'.format(body))
219 decoded_body = json.loads(body)
220 logger.warn('Valid JSON body (no schema checking) decoded:\n'
221 '{0}'.format(json.dumps(decoded_body,
224 separators=(',', ': '))))
225 logger.warn('Event is valid JSON but not checked against schema!')
227 except Exception as e:
228 logger.error('Event invalid for unexpected reason! {0}'.format(e))
230 #--------------------------------------------------------------------------
231 # Send event to influxdb
232 #--------------------------------------------------------------------------
233 def send_to_influxdb(event,pdata):
234 url = 'http://{}/write?db=veseventsdb'.format(influxdb)
235 logger.debug('Send {} to influxdb at {}: {}'.format(event,influxdb,pdata))
236 r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
237 logger.debug('influxdb return code {}'.format(r.status_code))
238 if r.status_code != 204:
239 logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
241 #--------------------------------------------------------------------------
242 # Convert timestamp to integer
243 #--------------------------------------------------------------------------
244 def convertTimestampToInt(timestamp, timeFormat="%Y-%m-%dT%H:%M:%S.%fz"):
245 date_time_obj = datetime.datetime.strptime(timestamp, timeFormat)
246 local_timezone = tzlocal.get_localzone();
247 local_timestamp = date_time_obj.replace(tzinfo=pytz.utc).astimezone(local_timezone).strftime(timeFormat)
248 date_time_obj_new = datetime.datetime.strptime(local_timestamp, timeFormat)
249 unixtime = time.mktime(date_time_obj_new.timetuple())
250 return int(float(unixtime) * float(1000000000))
252 #--------------------------------------------------------------------------
254 #--------------------------------------------------------------------------
255 def save_event_in_db(body):
256 jobj = json.loads(body)
257 e = json.loads(body, object_hook=JSONObject)
259 domain = jobj['event']['commonEventHeader']['domain']
260 timestamp = jobj['event']['commonEventHeader']['lastEpochMicrosec']
261 agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper( )
262 if "LOCALHOST" in agent:
263 agent = "computehost"
264 source = jobj['event']['commonEventHeader']['sourceId'].upper( )
266 ###################################################
267 ## processing common header part
270 commonHeaderObj = jobj['event']['commonEventHeader'].items()
271 for key,val in commonHeaderObj:
273 if isinstance(val, unicode):
274 pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
276 nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
279 ## processing pnfRegistration events
280 if 'pnfRegistrationFields' in jobj['event']:
281 logger.debug('Found pnfRegistrationFields')
283 d = jobj['event']['pnfRegistrationFields'].items()
285 if key != 'additionalFields' and val != "" :
286 if isinstance(val, unicode):
287 pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
289 nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
290 elif key == 'additionalFields':
291 for key2,val2 in val.items():
292 if val2 != "" and isinstance(val2, unicode):
293 pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
295 nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
297 send_to_influxdb(domain, pdata + nonstringpdata[:-1])
300 ## processing thresholdCrossingAlert events
301 if 'thresholdCrossingAlertFields' in jobj['event']:
302 logger.debug('Found thresholdCrossingAlertFields')
304 d = jobj['event']['thresholdCrossingAlertFields'].items()
306 if (key != 'additionalFields' and key != 'additionalParameters' and key != 'associatedAlertIdList') and val != "" :
307 if isinstance(val, unicode):
308 if key == "collectionTimestamp" or key == "eventStartTimestamp" :
309 nonstringpdata = nonstringpdata + '{}={}'.format(key,convertTimestampToInt(val[:-6], "%a, %d %b %Y %H:%M:%S"))+ ','
311 pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
313 nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
314 elif key == 'additionalFields':
315 for key2,val2 in val.items():
316 if key2 == 'eventTime' :
317 eventTime = convertTimestampToInt(val2)
319 if val2 != "" and isinstance(val2, unicode):
320 pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
322 nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
323 elif key == 'additionalParameters':
324 for addParameter in val:
325 for key2,val2 in addParameter.items():
326 if key2 != "hashMap" :
327 if val2 != "" and isinstance(val2, unicode):
328 pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
330 nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
331 elif key2 == "hashMap" :
332 for key3,val3 in val2.items():
333 if val3 != "" and isinstance(val3, unicode):
334 pdata = pdata + ',{}={}'.format(key3,val3.replace(' ','-'))
336 nonstringpdata = nonstringpdata + '{}={}'.format(key3,val3) + ','
337 elif key == 'associatedAlertIdList':
338 associatedAlertIdList = ""
339 for associatedAlertId in val:
340 associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|"
341 if(associatedAlertIdList != ""):
342 pdata = pdata + ',{}={}'.format("associatedAlertIdList",associatedAlertIdList.replace(' ','-')[:-1])
344 send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + format(eventTime))
347 ## processing fault events
348 if 'faultFields' in jobj['event']:
349 logger.debug('Found faultFields')
351 d = jobj['event']['faultFields'].items()
353 if key != 'alarmAdditionalInformation' and val != "" :
354 if isinstance(val, unicode):
355 pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
357 nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
358 elif key == 'alarmAdditionalInformation':
359 for key2,val2 in val.items():
360 if key2 == 'eventTime' :
361 eventTime = convertTimestampToInt(val2)
363 if val2 != "" and isinstance(val2, unicode):
364 pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
366 nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
368 send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + format(eventTime))
371 ###process heartbeat events
372 if 'heartbeatFields' in jobj['event']:
373 logger.debug('Found Heartbeat')
375 d = jobj['event']['heartbeatFields'].items()
377 if key != 'additionalFields' and val != "" :
378 if isinstance(val, unicode):
379 pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
381 nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
382 elif key == 'additionalFields':
383 for key2,val2 in val.items():
384 if key2 == 'eventTime' :
385 eventTime = convertTimestampToInt(val2)
387 if val2 != "" and isinstance(val2, unicode):
388 pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
390 nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
392 send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + format(eventTime))
395 ## processing measurement events
396 if 'measurementFields' in jobj['event']:
397 logger.debug('Found measurementFields')
398 d = jobj['event']['measurementFields'].items()
399 nonstringKey = ["concurrentSessions","configuredEntities","meanRequestLatency","measurementFieldsVersion","measurementInterval",
400 "nfcScalingMetric","numberOfMediaPortsInUse","requestRate"]
404 for nonstrKey in nonstringKey:
406 pdata = pdata + '{}={}'.format(key,val) + ','
408 send_to_influxdb("fault", pdata[:-1])
411 if 'measurementsForVfScalingFields' in jobj['event']:
412 logger.debug('Found measurementsForVfScalingFields')
414 # "measurementsForVfScalingFields": {
415 # "additionalMeasurements": [
419 # "name": "load-longterm",
423 # "name": "load-shortterm",
427 # "name": "load-midterm",
435 if 'additionalMeasurements' in jobj['event']['measurementsForVfScalingFields']:
436 for meas in jobj['event']['measurementsForVfScalingFields']['additionalMeasurements']:
438 eventTime = int(float(meas['eventTime']) * float(1000000000))
440 if name =="kernel4-filterAccounting":
441 data = '{},system={}'.format(name,source)
442 for field in meas['arrayOfFields']:
443 if field['name'] =="ipt-packets-value":
446 data = data + ",{}={}".format(field['name'],field['value'])
448 data = data + ' ' + "ipt-packets-value=" + val + ' ' + format(eventTime)
449 send_to_influxdb("iptables", data)
451 pdata = '{},system={}'.format(name,source)
453 for field in meas['arrayOfFields']:
454 pdata = pdata + ",{}={}".format(field['name'],field['value'])
455 #pdata = pdata + ",{}={}".format("eventTime",meas['eventTime'])
456 i=pdata.find(',', pdata.find('system'))
457 pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
458 send_to_influxdb("systemLoad", pdata)
462 # "cpuIdentifier": "15",
463 # "cpuIdle": 99.8998998999,
464 # "cpuUsageInterrupt": 0,
466 # "cpuUsageSoftIrq": 0,
467 # "cpuUsageSteal": 0,
468 # "cpuUsageSystem": 0,
469 # "cpuUsageUser": 0.1001001001,
471 # "percentUsage": 0.0
476 if 'cpuUsageArray' in jobj['event']['measurementsForVfScalingFields']:
477 logger.debug('Found cpuUsageArray')
478 for disk in jobj['event']['measurementsForVfScalingFields']['cpuUsageArray']:
479 id=disk['cpuIdentifier']
480 pdata = 'cpuUsage,system={},cpu={}'.format(source,id)
483 if key == 'eventTime':
484 eventTime = int(float(val) * float(1000000000))
485 elif key != 'cpuIdentifier':
486 pdata = pdata + ',{}={}'.format(key,val)
488 i=pdata.find(',', pdata.find('cpu='))
489 pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
490 send_to_influxdb("cpuUsage", pdata)
492 # "diskUsageArray": [
494 # "diskIdentifier": "sda",
495 # "diskIoTimeLast": 0.3996139893,
496 # "diskMergedReadLast": 0,
497 # "diskMergedWriteLast": 26.1747155344,
498 # "diskOctetsReadLast": 0,
499 # "diskOctetsWriteLast": 309767.93302,
500 # "diskOpsReadLast": 0,
501 # "diskOpsWriteLast": 10.9893839563,
502 # "diskTimeReadLast": 0,
503 # "diskTimeWriteLast": 0.699324445683
506 if 'diskUsageArray' in jobj['event']['measurementsForVfScalingFields']:
507 logger.debug('Found diskUsageArray')
508 for disk in jobj['event']['measurementsForVfScalingFields']['diskUsageArray']:
509 id=disk['diskIdentifier']
510 pdata = 'diskUsage,system={},disk={}'.format(source,id)
513 if key == 'eventTime':
514 eventTime = int(float(val) * float(1000000000))
515 elif key != 'diskIdentifier':
516 pdata = pdata + ',{}={}'.format(key,val)
518 i=pdata.find(',', pdata.find('disk='))
519 pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
520 send_to_influxdb("diskUsage", pdata)
522 # "memoryUsageArray": [
524 # "memoryBuffered": 269056.0,
525 # "memoryCached": 17636956.0,
526 # "memoryFree": 244731658240,
527 # "memorySlabRecl": 753160.0,
528 # "memorySlabUnrecl": 210800.0,
529 # "memoryUsed": 6240064.0,
530 # "vmIdentifier": "opnfv01"
534 if 'memoryUsageArray' in jobj['event']['measurementsForVfScalingFields']:
535 logger.debug('Found memoryUsageArray')
536 pdata = 'memoryUsage,system={}'.format(source)
537 vmid=e.event.measurementsForVfScalingFields.memoryUsageArray[0].vmIdentifier
538 d = jobj['event']['measurementsForVfScalingFields']['memoryUsageArray'][0].items()
540 if key == 'eventTime':
541 eventTime = int(float(val) * float(1000000000))
542 elif key != 'vmIdentifier':
543 pdata = pdata + ',{}={}'.format(key,val)
545 i=pdata.find(',', pdata.find('system'))
546 pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
547 send_to_influxdb("memoryUsage", pdata)
549 # "vNicPerformanceArray": [
551 # "receivedDiscardedPacketsAccumulated": 0,
552 # "receivedErrorPacketsAccumulated": 0,
553 # "receivedOctetsAccumulated": 476.801524578,
554 # "receivedTotalPacketsAccumulated": 2.90000899705,
555 # "transmittedDiscardedPacketsAccumulated": 0,
556 # "transmittedErrorPacketsAccumulated": 0,
557 # "transmittedOctetsAccumulated": 230.100735749,
558 # "transmittedTotalPacketsAccumulated": 1.20000372292,
559 # "vNicIdentifier": "eno4",
560 # "valuesAreSuspect": "true"
563 if 'vNicPerformanceArray' in jobj['event']['measurementsForVfScalingFields']:
564 logger.debug('Found vNicPerformanceArray')
565 for vnic in jobj['event']['measurementsForVfScalingFields']['vNicPerformanceArray']:
566 vnid=vnic['vNicIdentifier']
567 pdata = 'vNicPerformance,system={},vnic={}'.format(vmid,vnid)
570 if key == 'eventTime':
571 eventTime = int(float(val) * float(1000000000))
572 elif key != 'vNicIdentifier':
573 pdata = pdata + ',{}={}'.format(key,val)
575 i=pdata.find(',', pdata.find('vnic'))
576 pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
577 send_to_influxdb("vNicPerformance", pdata)
579 def test_listener(environ, start_response, schema):
581 Handler for the Test Collector Test Control API.
583 There is no authentication on this interface.
585 This simply stores a commandList which will be sent in response to the next
586 incoming event on the EVEL interface.
588 global pending_command_list
589 logger.info('Got a Test Control input')
590 logger.info('============================')
591 logger.info('==== TEST CONTROL INPUT ====')
593 #--------------------------------------------------------------------------
594 # GET allows us to get the current pending request.
595 #--------------------------------------------------------------------------
596 if environ.get('REQUEST_METHOD') == 'GET':
597 start_response('200 OK', [('Content-type', 'application/json')])
598 yield json.dumps(pending_command_list)
601 #--------------------------------------------------------------------------
602 # Extract the content from the request.
603 #--------------------------------------------------------------------------
604 length = int(environ.get('CONTENT_LENGTH', '0'))
605 logger.debug('TestControl Content Length: {0}'.format(length))
606 body = environ['wsgi.input'].read(length)
607 logger.debug('TestControl Content Body: {0}'.format(body))
609 #--------------------------------------------------------------------------
610 # If we have a schema file then check that the event matches that expected.
611 #--------------------------------------------------------------------------
612 if (schema is not None):
613 logger.debug('Attempting to validate data: {0}\n'
614 'Against schema: {1}'.format(body, schema))
616 decoded_body = json.loads(body)
617 jsonschema.validate(decoded_body, schema)
618 logger.info('TestControl is valid!')
619 logger.info('TestControl:\n'
620 '{0}'.format(json.dumps(decoded_body,
623 separators=(',', ': '))))
625 except jsonschema.SchemaError as e:
626 logger.error('TestControl Schema is not valid: {0}'.format(e))
628 except jsonschema.ValidationError as e:
629 logger.error('TestControl input not valid: {0}'.format(e))
630 logger.error('Bad JSON body decoded:\n'
631 '{0}'.format(json.dumps(decoded_body,
634 separators=(',', ': '))))
636 except Exception as e:
637 logger.error('TestControl input not valid: {0}'.format(e))
639 logger.debug('Missing schema just decode JSON: {0}'.format(body))
641 decoded_body = json.loads(body)
642 logger.info('Valid JSON body (no schema checking) decoded:\n'
643 '{0}'.format(json.dumps(decoded_body,
646 separators=(',', ': '))))
647 logger.info('TestControl input not checked against schema!')
649 except Exception as e:
650 logger.error('TestControl input not valid: {0}'.format(e))
652 #--------------------------------------------------------------------------
653 # Respond to the caller. If we received otherField 'ThrottleRequest',
654 # generate the appropriate canned response.
655 #--------------------------------------------------------------------------
656 pending_command_list = decoded_body
657 logger.debug('===== TEST CONTROL END =====')
658 logger.debug('============================')
659 start_response('202 Accepted', [])
664 Main function for the collector start-up.
666 Called with command-line arguments:
668 * --section *<section>*
673 *<file>* specifies the path to the configuration file.
675 *<section>* specifies the section within that config file.
677 *verbose* generates more information in the log files.
679 The process listens for REST API invocations and checks them. Errors are
680 displayed to stdout and logged.
686 sys.argv.extend(argv)
688 program_name = os.path.basename(sys.argv[0])
689 program_version = 'v{0}'.format(__version__)
690 program_build_date = str(__updated__)
691 program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
693 if (__import__('__main__').__doc__ is not None):
694 program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
696 program_shortdesc = 'Running in test harness'
697 program_license = '''{0}
700 Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
702 Distributed on an "AS IS" basis without warranties
703 or conditions of any kind, either express or implied.
706 '''.format(program_shortdesc, str(__date__))
709 #----------------------------------------------------------------------
710 # Setup argument parser so we can parse the command-line.
711 #----------------------------------------------------------------------
712 parser = ArgumentParser(description=program_license,
713 formatter_class=ArgumentDefaultsHelpFormatter)
714 parser.add_argument('-i', '--influxdb',
717 help='InfluxDB server addresss')
718 parser.add_argument('-v', '--verbose',
721 help='set verbosity level')
722 parser.add_argument('-V', '--version',
724 version=program_version_message,
725 help='Display version information')
726 parser.add_argument('-a', '--api-version',
729 help='set API version')
730 parser.add_argument('-c', '--config',
732 default='/etc/opt/att/collector.conf',
733 help='Use this config file.',
735 parser.add_argument('-s', '--section',
739 help='section to use in the config file')
741 #----------------------------------------------------------------------
742 # Process arguments received.
743 #----------------------------------------------------------------------
744 args = parser.parse_args()
745 verbose = args.verbose
746 api_version = args.api_version
747 config_file = args.config
748 config_section = args.section
750 #----------------------------------------------------------------------
751 # Now read the config file, using command-line supplied values as
753 #----------------------------------------------------------------------
754 defaults = {'log_file': 'collector.log',
760 config = ConfigParser.SafeConfigParser(defaults)
761 config.read(config_file)
763 #----------------------------------------------------------------------
764 # extract the values we want.
765 #----------------------------------------------------------------------
769 global vel_topic_name
772 influxdb = config.get(config_section, 'influxdb', vars=overrides)
773 log_file = config.get(config_section, 'log_file', vars=overrides)
774 vel_port = config.get(config_section, 'vel_port', vars=overrides)
775 vel_path = config.get(config_section, 'vel_path', vars=overrides)
776 data_storage = config.get(config_section, 'data_storage', vars=overrides)
778 vel_topic_name = config.get(config_section,
781 vel_username = config.get(config_section,
784 vel_password = config.get(config_section,
787 vel_schema_file = config.get(config_section,
790 base_schema_file = config.get(config_section,
793 throttle_schema_file = config.get(config_section,
794 'throttle_schema_file',
796 test_control_schema_file = config.get(config_section,
797 'test_control_schema_file',
800 #----------------------------------------------------------------------
801 # Finally we have enough info to start a proper flow trace.
802 #----------------------------------------------------------------------
804 logger = logging.getLogger('monitor')
806 logger.info('Verbose mode on')
807 logger.setLevel(logging.DEBUG)
809 logger.setLevel(logging.INFO)
810 handler = logging.handlers.RotatingFileHandler(log_file,
813 if (platform.system() == 'Windows'):
814 date_format = '%Y-%m-%d %H:%M:%S'
816 date_format = '%Y-%m-%d %H:%M:%S.%f %z'
817 formatter = logging.Formatter('%(asctime)s %(name)s - '
818 '%(levelname)s - %(message)s',
820 handler.setFormatter(formatter)
821 logger.addHandler(handler)
822 logger.info('Started')
824 #----------------------------------------------------------------------
825 # Log the details of the configuration.
826 #----------------------------------------------------------------------
827 logger.debug('Log file = {0}'.format(log_file))
828 logger.debug('Influxdb server = {0}'.format(influxdb))
829 logger.debug('Event Listener Port = {0}'.format(vel_port))
830 logger.debug('Event Listener Path = {0}'.format(vel_path))
831 logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
832 logger.debug('Event Listener Username = {0}'.format(vel_username))
833 # logger.debug('Event Listener Password = {0}'.format(vel_password))
834 logger.debug('Event Listener JSON Schema File = {0}'.format(
836 logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
837 logger.debug('Throttle JSON Schema File = {0}'.format(
838 throttle_schema_file))
839 logger.debug('Test Control JSON Schema File = {0}'.format(
840 test_control_schema_file))
842 #----------------------------------------------------------------------
843 # Perform some basic error checking on the config.
844 #----------------------------------------------------------------------
845 if (int(vel_port) < 1024 or int(vel_port) > 65535):
846 logger.error('Invalid Vendor Event Listener port ({0}) '
847 'specified'.format(vel_port))
848 raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
849 'specified'.format(vel_port))
851 if (len(vel_path) > 0 and vel_path[-1] != '/'):
852 logger.warning('Event Listener Path ({0}) should have terminating '
853 '"/"! Adding one on to configured string.'.format(
857 #----------------------------------------------------------------------
858 # Load up the vel_schema, if it exists.
859 #----------------------------------------------------------------------
860 if not os.path.exists(vel_schema_file):
861 logger.warning('Event Listener Schema File ({0}) not found. '
862 'No validation will be undertaken.'.format(
866 global throttle_schema
867 global test_control_schema
868 vel_schema = json.load(open(vel_schema_file, 'r'))
869 logger.debug('Loaded the JSON schema file')
871 #------------------------------------------------------------------
872 # Load up the throttle_schema, if it exists.
873 #------------------------------------------------------------------
874 if (os.path.exists(throttle_schema_file)):
875 logger.debug('Loading throttle schema')
876 throttle_fragment = json.load(open(throttle_schema_file, 'r'))
878 throttle_schema.update(vel_schema)
879 throttle_schema.update(throttle_fragment)
880 logger.debug('Loaded the throttle schema')
882 #------------------------------------------------------------------
883 # Load up the test control _schema, if it exists.
884 #------------------------------------------------------------------
885 if (os.path.exists(test_control_schema_file)):
886 logger.debug('Loading test control schema')
887 test_control_fragment = json.load(
888 open(test_control_schema_file, 'r'))
889 test_control_schema = {}
890 test_control_schema.update(vel_schema)
891 test_control_schema.update(test_control_fragment)
892 logger.debug('Loaded the test control schema')
894 #------------------------------------------------------------------
895 # Load up the base_schema, if it exists.
896 #------------------------------------------------------------------
897 if (os.path.exists(base_schema_file)):
898 logger.debug('Updating the schema with base definition')
899 base_schema = json.load(open(base_schema_file, 'r'))
900 vel_schema.update(base_schema)
901 logger.debug('Updated the JSON schema file')
903 #----------------------------------------------------------------------
904 # We are now ready to get started with processing. Start-up the various
905 # components of the system in order:
907 # 1) Create the dispatcher.
908 # 2) Register the functions for the URLs of interest.
909 # 3) Run the webserver.
910 #----------------------------------------------------------------------
911 root_url = '/{0}eventListener/v{1}{2}'.\
915 if len(vel_topic_name) > 0
917 throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
918 format(vel_path, api_version)
919 set_404_content(root_url)
920 dispatcher = PathDispatcher()
921 vendor_event_listener = partial(listener, schema = vel_schema)
922 dispatcher.register('GET', root_url, vendor_event_listener)
923 dispatcher.register('POST', root_url, vendor_event_listener)
924 vendor_throttle_listener = partial(listener, schema = throttle_schema)
925 dispatcher.register('GET', throttle_url, vendor_throttle_listener)
926 dispatcher.register('POST', throttle_url, vendor_throttle_listener)
928 #----------------------------------------------------------------------
929 # We also add a POST-only mechanism for test control, so that we can
930 # send commands to a single attached client.
931 #----------------------------------------------------------------------
932 test_control_url = '/testControl/v{0}/commandList'.format(api_version)
933 test_control_listener = partial(test_listener,
934 schema = test_control_schema)
935 dispatcher.register('POST', test_control_url, test_control_listener)
936 dispatcher.register('GET', test_control_url, test_control_listener)
938 httpd = make_server('', int(vel_port), dispatcher)
939 logger.info('Serving on port {0}...'.format(vel_port))
940 httpd.serve_forever()
942 logger.error('Main loop exited unexpectedly!')
945 except KeyboardInterrupt:
946 #----------------------------------------------------------------------
947 # handle keyboard interrupt
948 #----------------------------------------------------------------------
949 logger.info('Exiting on keyboard interrupt!')
952 except Exception as e:
953 #----------------------------------------------------------------------
954 # Handle unexpected exceptions.
955 #----------------------------------------------------------------------
958 indent = len(program_name) * ' '
959 sys.stderr.write(program_name + ': ' + repr(e) + '\n')
960 sys.stderr.write(indent + ' for help use --help\n')
961 sys.stderr.write(traceback.format_exc())
962 logger.critical('Exiting because of exception: {0}'.format(e))
963 logger.critical(traceback.format_exc())
966 #------------------------------------------------------------------------------
967 # MAIN SCRIPT ENTRY POINT.
968 #------------------------------------------------------------------------------
969 if __name__ == '__main__':
971 #----------------------------------------------------------------------
972 # Running tests - note that doctest comments haven't been included so
973 # this is a hook for future improvements.
974 #----------------------------------------------------------------------
979 #----------------------------------------------------------------------
980 # Profiling performance. Performance isn't expected to be a major
981 # issue, but this should all work as expected.
982 #----------------------------------------------------------------------
985 profile_filename = 'collector_profile.txt'
986 cProfile.run('main()', profile_filename)
987 statsfile = open('collector_profile_stats.txt', 'wb')
988 p = pstats.Stats(profile_filename, stream=statsfile)
989 stats = p.strip_dirs().sort_stats('cumulative')
994 #--------------------------------------------------------------------------
995 # Normal operation - call through to the main function.
996 #--------------------------------------------------------------------------