Implement a version of the O1/VES interface for SMO
[smo/ves.git] / collector / evel-test-collector / code / collector / monitor.py
1 #!/usr/bin/env python
2 #
3 #Original work Copyright 2016-2017 AT&T Intellectual Property, Inc
4 #Modified work Copyright 2021 Xoriant Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 from rest_dispatcher import PathDispatcher, set_404_content
20 from wsgiref.simple_server import make_server
21 import sys
22 import os
23 import platform
24 import traceback
25 import time
26 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
27 import ConfigParser
28 import logging.handlers
29 from base64 import b64decode
30 import string
31 import json
32 import jsonschema
33 from functools import partial
34 import requests
35 from datetime import datetime, date, time
36 import calendar
37 import datetime
38 import time
39 import tzlocal
40 import pytz
41
42 monitor_mode = "f"
43 vdu_id = ['','','','','','']
44 summary_e = ['***** Summary of key stats *****','','','']
45 summary_c = ['Collectd agents:']
46 status = ['','Started','Started','Started']
47 base_url = ''
48 template_404 = b'''POST {0}'''
49 columns = 0
50 rows = 0
51
52 class JSONObject:
53   def __init__(self, d):
54     self.__dict__ = d
55
56 __all__ = []
57 __version__ = 0.1
58 __date__ = '2015-12-04'
59 __updated__ = '2015-12-04'
60
61 TESTRUN = False
62 DEBUG = False
63 PROFILE = False
64
65 #------------------------------------------------------------------------------
66 # Address of influxdb server.
67 #------------------------------------------------------------------------------
68
69 influxdb = '127.0.0.1'
70
71 #------------------------------------------------------------------------------
72 # Credentials we expect clients to authenticate themselves with.
73 #------------------------------------------------------------------------------
74 vel_username = ''
75 vel_password = ''
76
77 #------------------------------------------------------------------------------
78 # The JSON schema which we will use to validate events.
79 #------------------------------------------------------------------------------
80 vel_schema = None
81
82 #------------------------------------------------------------------------------
83 # The JSON schema which we will use to validate client throttle state.
84 #------------------------------------------------------------------------------
85 throttle_schema = None
86
87 #------------------------------------------------------------------------------
88 # The JSON schema which we will use to provoke throttling commands for testing.
89 #------------------------------------------------------------------------------
90 test_control_schema = None
91
92 #------------------------------------------------------------------------------
93 # Pending command list from the testControl API
94 # This is sent as a response commandList to the next received event.
95 #------------------------------------------------------------------------------
96 pending_command_list = None
97
98 #------------------------------------------------------------------------------
99 # Logger for this module.
100 #------------------------------------------------------------------------------
101 logger = None
102
103 def listener(environ, start_response, schema):
104     '''
105     Handler for the Vendor Event Listener REST API.
106
107     Extract headers and the body and check that:
108
109       1)  The client authenticated themselves correctly.
110       2)  The body validates against the provided schema for the API.
111
112     '''
113     logger.info('Got a Vendor Event request')
114     logger.info('==== ' + time.asctime() + ' ' + '=' * 49)
115
116     #--------------------------------------------------------------------------
117     # Extract the content from the request.
118     #--------------------------------------------------------------------------
119     length = int(environ.get('CONTENT_LENGTH', '0'))
120     logger.debug('Content Length: {0}'.format(length))
121     body = environ['wsgi.input'].read(length)
122     logger.debug('Content Body: {0}'.format(body))
123
124     mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION',
125                                                      'None None'))
126     # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode,
127     #                                                     b64_credentials))
128     logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode))
129     if (b64_credentials != 'None'):
130         credentials = b64decode(b64_credentials)
131     else:
132         credentials = None
133
134     # logger.debug('Credentials: {0}'.format(credentials))
135     logger.debug('Credentials: ****')
136
137     #--------------------------------------------------------------------------
138     # If we have a schema file then check that the event matches that expected.
139     #--------------------------------------------------------------------------
140     if (schema is not None):
141         logger.debug('Attempting to validate data: {0}\n'
142                      'Against schema: {1}'.format(body, schema))
143         try:
144             decoded_body = json.loads(body)
145             jsonschema.validate(decoded_body, schema)
146             logger.info('Event is valid!')
147             logger.debug('Valid body decoded & checked against schema OK:\n'
148                   '{0}'.format(json.dumps(decoded_body,
149                                           sort_keys=True,
150                                           indent=4,
151                                           separators=(',', ': '))))
152     #--------------------------------------------------------------------------
153     # See whether the user authenticated themselves correctly.
154     #--------------------------------------------------------------------------
155             if (credentials == (vel_username + ':' + vel_password)):
156                 logger.debug('Authenticated OK')
157
158         #----------------------------------------------------------------------
159         # Respond to the caller. If we have a pending commandList from the
160         # testControl API, send it in response.
161         #----------------------------------------------------------------------
162                 global pending_command_list
163                 if pending_command_list is not None:
164                     start_response('202 Accepted',
165                            [('Content-type', 'application/json')])
166                     response = pending_command_list
167                     pending_command_list = None
168
169                     logger.debug('\n'+ '='*80)
170                     logger.debug('Sending pending commandList in the response:\n'
171                           '{0}'.format(json.dumps(response,
172                                           sort_keys=True,
173                                           indent=4,
174                                           separators=(',', ': '))))
175                     logger.debug('='*80 + '\n')
176                     yield json.dumps(response)
177                 else:
178                     start_response('202 Accepted', [])
179                     yield ''
180             else:
181                 logger.warn('Failed to authenticate OK; creds: ' +  credentials)
182                 logger.warn('Failed to authenticate agent credentials: ', credentials, 
183                              'against expected ', vel_username, ':', vel_password)
184
185         #----------------------------------------------------------------------
186         # Respond to the caller.
187         #----------------------------------------------------------------------
188                 start_response('401 Unauthorized', [ ('Content-type',
189                                               'application/json')])
190                 req_error = { 'requestError': {
191                                  'policyException': {
192                                      'messageId': 'POL0001',
193                                       'text': 'Failed to authenticate'
194                             }
195                         }
196                     }
197                 yield json.dumps(req_error)
198
199             logger.info("data_storage ={}".format(data_storage))
200             if(data_storage == 'influxdb'):
201                 save_event_in_db(body)
202
203         except jsonschema.SchemaError as e:
204             logger.error('Schema is not valid! {0}'.format(e))
205
206         except jsonschema.ValidationError as e:
207             logger.warn('Event is not valid against schema! {0}'.format(e))
208             logger.warn('Bad JSON body decoded:\n'
209                   '{0}'.format(json.dumps(decoded_body,
210                                          sort_keys=True,
211                                          indent=4,
212                                          separators=(',', ': '))))
213
214         except Exception as e:
215             logger.error('Event invalid for unexpected reason! {0}'.format(e))
216     else:
217         logger.debug('No schema so just decode JSON: {0}'.format(body))
218         try:
219             decoded_body = json.loads(body)
220             logger.warn('Valid JSON body (no schema checking) decoded:\n'
221                   '{0}'.format(json.dumps(decoded_body,
222                                          sort_keys=True,
223                                          indent=4,
224                                          separators=(',', ': '))))
225             logger.warn('Event is valid JSON but not checked against schema!')
226
227         except Exception as e:
228             logger.error('Event invalid for unexpected reason! {0}'.format(e))
229
230 #--------------------------------------------------------------------------
231 # Send event to influxdb
232 #--------------------------------------------------------------------------
233 def send_to_influxdb(event,pdata):
234   url = 'http://{}/write?db=veseventsdb'.format(influxdb)
235   logger.debug('Send {} to influxdb at {}: {}'.format(event,influxdb,pdata))
236   r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
237   logger.debug('influxdb return code {}'.format(r.status_code))
238   if r.status_code != 204:
239     logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
240
241 #--------------------------------------------------------------------------
242 # Convert timestamp to integer
243 #--------------------------------------------------------------------------
244 def convertTimestampToInt(timestamp, timeFormat="%Y-%m-%dT%H:%M:%S.%fz"):
245   date_time_obj = datetime.datetime.strptime(timestamp, timeFormat)
246   local_timezone = tzlocal.get_localzone();
247   local_timestamp = date_time_obj.replace(tzinfo=pytz.utc).astimezone(local_timezone).strftime(timeFormat)
248   date_time_obj_new = datetime.datetime.strptime(local_timestamp, timeFormat)
249   unixtime = time.mktime(date_time_obj_new.timetuple())
250   return int(float(unixtime) * float(1000000000))
251
252 #--------------------------------------------------------------------------
253 # Save event data
254 #--------------------------------------------------------------------------
255 def save_event_in_db(body):
256   jobj = json.loads(body)
257   e = json.loads(body, object_hook=JSONObject)
258
259   domain = jobj['event']['commonEventHeader']['domain']
260   timestamp = jobj['event']['commonEventHeader']['lastEpochMicrosec']
261   agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper(      )
262   if "LOCALHOST" in agent:
263     agent = "computehost"
264   source = jobj['event']['commonEventHeader']['sourceId'].upper(        )
265
266 ###################################################
267   ## processing common header part
268   pdata = domain
269   nonstringpdata = " "
270   commonHeaderObj = jobj['event']['commonEventHeader'].items()
271   for key,val in commonHeaderObj:
272      if val != "" :
273       if isinstance(val, unicode):
274         pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
275       else:
276         nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
277
278
279   ## processing pnfRegistration events
280   if 'pnfRegistrationFields' in jobj['event']:
281     logger.debug('Found pnfRegistrationFields')
282
283     d = jobj['event']['pnfRegistrationFields'].items()
284     for key,val in d:
285       if key != 'additionalFields' and val != "" :
286         if isinstance(val, unicode):
287           pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
288         else:
289           nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
290       elif key == 'additionalFields':
291           for key2,val2 in val.items():
292             if val2 != "" and isinstance(val2, unicode):
293               pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
294             elif val2 != "" :
295               nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
296
297     send_to_influxdb(domain, pdata + nonstringpdata[:-1])
298
299
300   ## processing thresholdCrossingAlert events
301   if 'thresholdCrossingAlertFields' in jobj['event']:
302     logger.debug('Found thresholdCrossingAlertFields')
303
304     d = jobj['event']['thresholdCrossingAlertFields'].items()
305     for key,val in d:
306       if (key != 'additionalFields' and key != 'additionalParameters' and key != 'associatedAlertIdList') and val != "" :
307         if isinstance(val, unicode):
308           if key == "collectionTimestamp" or key == "eventStartTimestamp" :
309             nonstringpdata = nonstringpdata + '{}={}'.format(key,convertTimestampToInt(val[:-6], "%a, %d %b %Y %H:%M:%S"))+ ','
310           else:
311             pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
312         else:
313           nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
314       elif key == 'additionalFields':
315         for key2,val2 in val.items():
316           if key2 == 'eventTime' :
317             eventTime = convertTimestampToInt(val2)
318           else:
319             if val2 != "" and isinstance(val2, unicode):
320               pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
321             elif val2 != "" :
322               nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
323       elif key == 'additionalParameters':
324         for addParameter in val:
325           for key2,val2 in addParameter.items():
326             if key2 != "hashMap" :
327               if val2 != "" and isinstance(val2, unicode):
328                 pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
329               elif val2 != "" :
330                 nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
331             elif key2 == "hashMap" :
332               for key3,val3 in val2.items():
333                 if val3 != "" and isinstance(val3, unicode):
334                   pdata = pdata + ',{}={}'.format(key3,val3.replace(' ','-'))
335                 elif val3 != "" :
336                   nonstringpdata = nonstringpdata + '{}={}'.format(key3,val3) + ','
337       elif key == 'associatedAlertIdList':
338         associatedAlertIdList = ""
339         for associatedAlertId in val:
340             associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|"
341         if(associatedAlertIdList != ""):
342           pdata = pdata + ',{}={}'.format("associatedAlertIdList",associatedAlertIdList.replace(' ','-')[:-1])
343
344     send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + format(eventTime))
345
346
347   ## processing fault events
348   if 'faultFields' in jobj['event']:
349     logger.debug('Found faultFields')
350
351     d = jobj['event']['faultFields'].items()
352     for key,val in d:
353       if key != 'alarmAdditionalInformation' and val != "" :
354         if isinstance(val, unicode):
355           pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
356         else:
357           nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
358       elif key == 'alarmAdditionalInformation':
359           for key2,val2 in val.items():
360             if key2 == 'eventTime' :
361               eventTime = convertTimestampToInt(val2)
362             else:
363               if val2 != "" and isinstance(val2, unicode):
364                 pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
365               elif val2 != "" :
366                 nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
367
368     send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + format(eventTime))
369
370
371   ###process heartbeat events
372   if 'heartbeatFields' in jobj['event']:
373     logger.debug('Found Heartbeat')
374
375     d = jobj['event']['heartbeatFields'].items()
376     for key,val in d:
377       if key != 'additionalFields' and val != "" :
378         if isinstance(val, unicode):
379           pdata = pdata + ',{}={}'.format(key,val.replace(' ','-'))
380         else:
381           nonstringpdata = nonstringpdata + '{}={}'.format(key,val) + ','
382       elif key == 'additionalFields':
383           for key2,val2 in val.items():
384             if key2 == 'eventTime' :
385               eventTime = convertTimestampToInt(val2)
386             else:
387               if val2 != "" and isinstance(val2, unicode):
388                 pdata = pdata + ',{}={}'.format(key2,val2.replace(' ','-'))
389               elif val2 != "" :
390                 nonstringpdata = nonstringpdata + '{}={}'.format(key2,val2) + ','
391
392     send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + format(eventTime))
393
394
395   ## processing measurement events
396   if 'measurementFields' in jobj['event']:
397     logger.debug('Found measurementFields')
398     d = jobj['event']['measurementFields'].items()
399     nonstringKey = ["concurrentSessions","configuredEntities","meanRequestLatency","measurementFieldsVersion","measurementInterval",
400     "nfcScalingMetric","numberOfMediaPortsInUse","requestRate"]
401
402     pdata = pdata + ' '
403     for key,val in d:
404       for nonstrKey in nonstringKey:
405         if key == nonstrKey:
406           pdata = pdata + '{}={}'.format(key,val) + ','
407
408     send_to_influxdb("fault", pdata[:-1])
409
410
411   if 'measurementsForVfScalingFields' in jobj['event']:
412     logger.debug('Found measurementsForVfScalingFields')
413
414 #        "measurementsForVfScalingFields": {
415 #            "additionalMeasurements": [
416 #                {
417 #                    "arrayOfFields": [
418 #                        {
419 #                            "name": "load-longterm",
420 #                            "value": "0.34"
421 #                        },
422 #                        {
423 #                            "name": "load-shortterm",
424 #                            "value": "0.32"
425 #                        },
426 #                        {
427 #                            "name": "load-midterm",
428 #                            "value": "0.34"
429 #                        }
430 #                    ],
431 #                    "name": "load"
432 #                }
433 #            ],
434
435     if 'additionalMeasurements' in jobj['event']['measurementsForVfScalingFields']:
436       for meas in jobj['event']['measurementsForVfScalingFields']['additionalMeasurements']:
437         name = meas['name']
438         eventTime = int(float(meas['eventTime']) * float(1000000000))
439
440         if name =="kernel4-filterAccounting":
441             data = '{},system={}'.format(name,source)
442             for field in meas['arrayOfFields']:
443                if field['name'] =="ipt-packets-value":
444                  val=field['value']
445                else:
446                  data = data + ",{}={}".format(field['name'],field['value'])
447
448             data = data + ' ' + "ipt-packets-value=" + val + ' ' + format(eventTime)
449             send_to_influxdb("iptables", data)
450         else:
451             pdata = '{},system={}'.format(name,source)
452
453             for field in meas['arrayOfFields']:
454               pdata = pdata + ",{}={}".format(field['name'],field['value'])
455             #pdata = pdata + ",{}={}".format("eventTime",meas['eventTime'])
456             i=pdata.find(',', pdata.find('system'))
457             pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
458             send_to_influxdb("systemLoad", pdata)
459
460 #            "cpuUsageArray": [
461 #                {
462 #                    "cpuIdentifier": "15",
463 #                    "cpuIdle": 99.8998998999,
464 #                    "cpuUsageInterrupt": 0,
465 #                    "cpuUsageNice": 0,
466 #                    "cpuUsageSoftIrq": 0,
467 #                    "cpuUsageSteal": 0,
468 #                    "cpuUsageSystem": 0,
469 #                    "cpuUsageUser": 0.1001001001,
470 #                    "cpuWait": 0,
471 #                    "percentUsage": 0.0
472 #                },
473
474
475
476     if 'cpuUsageArray' in jobj['event']['measurementsForVfScalingFields']:
477       logger.debug('Found cpuUsageArray')
478       for disk in jobj['event']['measurementsForVfScalingFields']['cpuUsageArray']:
479         id=disk['cpuIdentifier']
480         pdata = 'cpuUsage,system={},cpu={}'.format(source,id)
481         d = disk.items()
482         for key,val in d:
483           if key == 'eventTime':
484             eventTime = int(float(val) * float(1000000000))
485           elif key != 'cpuIdentifier':
486             pdata = pdata + ',{}={}'.format(key,val)
487
488         i=pdata.find(',', pdata.find('cpu='))
489         pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
490         send_to_influxdb("cpuUsage", pdata)
491
492 #            "diskUsageArray": [
493 #                {
494 #                    "diskIdentifier": "sda",
495 #                    "diskIoTimeLast": 0.3996139893,
496 #                    "diskMergedReadLast": 0,
497 #                    "diskMergedWriteLast": 26.1747155344,
498 #                    "diskOctetsReadLast": 0,
499 #                    "diskOctetsWriteLast": 309767.93302,
500 #                    "diskOpsReadLast": 0,
501 #                    "diskOpsWriteLast": 10.9893839563,
502 #                    "diskTimeReadLast": 0,
503 #                    "diskTimeWriteLast": 0.699324445683
504 #                },
505
506     if 'diskUsageArray' in jobj['event']['measurementsForVfScalingFields']:
507       logger.debug('Found diskUsageArray')
508       for disk in jobj['event']['measurementsForVfScalingFields']['diskUsageArray']:
509         id=disk['diskIdentifier']
510         pdata = 'diskUsage,system={},disk={}'.format(source,id)
511         d = disk.items()
512         for key,val in d:
513           if key == 'eventTime':
514             eventTime = int(float(val) * float(1000000000))
515           elif key != 'diskIdentifier':
516             pdata = pdata + ',{}={}'.format(key,val)
517
518         i=pdata.find(',', pdata.find('disk='))
519         pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
520         send_to_influxdb("diskUsage", pdata)
521
522 #            "memoryUsageArray": [
523 #                {
524 #                    "memoryBuffered": 269056.0,
525 #                    "memoryCached": 17636956.0,
526 #                    "memoryFree": 244731658240,
527 #                    "memorySlabRecl": 753160.0,
528 #                    "memorySlabUnrecl": 210800.0,
529 #                    "memoryUsed": 6240064.0,
530 #                    "vmIdentifier": "opnfv01"
531 #                }
532 #            ],
533
534     if 'memoryUsageArray' in jobj['event']['measurementsForVfScalingFields']:
535       logger.debug('Found memoryUsageArray')
536       pdata = 'memoryUsage,system={}'.format(source)
537       vmid=e.event.measurementsForVfScalingFields.memoryUsageArray[0].vmIdentifier
538       d = jobj['event']['measurementsForVfScalingFields']['memoryUsageArray'][0].items()
539       for key,val in d:
540         if key == 'eventTime':
541           eventTime = int(float(val) * float(1000000000))
542         elif key != 'vmIdentifier':
543           pdata = pdata + ',{}={}'.format(key,val)
544
545       i=pdata.find(',', pdata.find('system'))
546       pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
547       send_to_influxdb("memoryUsage", pdata)
548
549 #            "vNicPerformanceArray": [
550 #                {
551 #                    "receivedDiscardedPacketsAccumulated": 0,
552 #                    "receivedErrorPacketsAccumulated": 0,
553 #                    "receivedOctetsAccumulated": 476.801524578,
554 #                    "receivedTotalPacketsAccumulated": 2.90000899705,
555 #                    "transmittedDiscardedPacketsAccumulated": 0,
556 #                    "transmittedErrorPacketsAccumulated": 0,
557 #                    "transmittedOctetsAccumulated": 230.100735749,
558 #                    "transmittedTotalPacketsAccumulated": 1.20000372292,
559 #                    "vNicIdentifier": "eno4",
560 #                    "valuesAreSuspect": "true"
561 #                },
562
563     if 'vNicPerformanceArray' in jobj['event']['measurementsForVfScalingFields']:
564       logger.debug('Found vNicPerformanceArray')
565       for vnic in jobj['event']['measurementsForVfScalingFields']['vNicPerformanceArray']:
566         vnid=vnic['vNicIdentifier']
567         pdata = 'vNicPerformance,system={},vnic={}'.format(vmid,vnid)
568         d = vnic.items()
569         for key,val in d:
570           if key == 'eventTime':
571             eventTime = int(float(val) * float(1000000000))
572           elif key != 'vNicIdentifier':
573             pdata = pdata + ',{}={}'.format(key,val)
574
575         i=pdata.find(',', pdata.find('vnic'))
576         pdata = pdata[:i] + ' ' + pdata[i+1:] + ' ' + format(eventTime)
577         send_to_influxdb("vNicPerformance", pdata)
578
579 def test_listener(environ, start_response, schema):
580     '''
581     Handler for the Test Collector Test Control API.
582
583     There is no authentication on this interface.
584
585     This simply stores a commandList which will be sent in response to the next
586     incoming event on the EVEL interface.
587     '''
588     global pending_command_list
589     logger.info('Got a Test Control input')
590     logger.info('============================')
591     logger.info('==== TEST CONTROL INPUT ====')
592
593     #--------------------------------------------------------------------------
594     # GET allows us to get the current pending request.
595     #--------------------------------------------------------------------------
596     if environ.get('REQUEST_METHOD') == 'GET':
597         start_response('200 OK', [('Content-type', 'application/json')])
598         yield json.dumps(pending_command_list)
599         return
600
601     #--------------------------------------------------------------------------
602     # Extract the content from the request.
603     #--------------------------------------------------------------------------
604     length = int(environ.get('CONTENT_LENGTH', '0'))
605     logger.debug('TestControl Content Length: {0}'.format(length))
606     body = environ['wsgi.input'].read(length)
607     logger.debug('TestControl Content Body: {0}'.format(body))
608
609     #--------------------------------------------------------------------------
610     # If we have a schema file then check that the event matches that expected.
611     #--------------------------------------------------------------------------
612     if (schema is not None):
613         logger.debug('Attempting to validate data: {0}\n'
614                      'Against schema: {1}'.format(body, schema))
615         try:
616             decoded_body = json.loads(body)
617             jsonschema.validate(decoded_body, schema)
618             logger.info('TestControl is valid!')
619             logger.info('TestControl:\n'
620                   '{0}'.format(json.dumps(decoded_body,
621                                           sort_keys=True,
622                                           indent=4,
623                                           separators=(',', ': '))))
624
625         except jsonschema.SchemaError as e:
626             logger.error('TestControl Schema is not valid: {0}'.format(e))
627
628         except jsonschema.ValidationError as e:
629             logger.error('TestControl input not valid: {0}'.format(e))
630             logger.error('Bad JSON body decoded:\n'
631                   '{0}'.format(json.dumps(decoded_body,
632                                           sort_keys=True,
633                                           indent=4,
634                                           separators=(',', ': '))))
635
636         except Exception as e:
637             logger.error('TestControl input not valid: {0}'.format(e))
638     else:
639         logger.debug('Missing schema just decode JSON: {0}'.format(body))
640         try:
641             decoded_body = json.loads(body)
642             logger.info('Valid JSON body (no schema checking) decoded:\n'
643                   '{0}'.format(json.dumps(decoded_body,
644                                           sort_keys=True,
645                                           indent=4,
646                                           separators=(',', ': '))))
647             logger.info('TestControl input not checked against schema!')
648
649         except Exception as e:
650             logger.error('TestControl input not valid: {0}'.format(e))
651
652     #--------------------------------------------------------------------------
653     # Respond to the caller. If we received otherField 'ThrottleRequest',
654     # generate the appropriate canned response.
655     #--------------------------------------------------------------------------
656     pending_command_list = decoded_body
657     logger.debug('===== TEST CONTROL END =====')
658     logger.debug('============================')
659     start_response('202 Accepted', [])
660     yield ''
661
662 def main(argv=None):
663     '''
664     Main function for the collector start-up.
665
666     Called with command-line arguments:
667         *    --config *<file>*
668         *    --section *<section>*
669         *    --verbose
670
671     Where:
672
673         *<file>* specifies the path to the configuration file.
674
675         *<section>* specifies the section within that config file.
676
677         *verbose* generates more information in the log files.
678
679     The process listens for REST API invocations and checks them. Errors are
680     displayed to stdout and logged.
681     '''
682
683     if argv is None:
684         argv = sys.argv
685     else:
686         sys.argv.extend(argv)
687
688     program_name = os.path.basename(sys.argv[0])
689     program_version = 'v{0}'.format(__version__)
690     program_build_date = str(__updated__)
691     program_version_message = '%%(prog)s {0} ({1})'.format(program_version,
692                                                          program_build_date)
693     if (__import__('__main__').__doc__ is not None):
694         program_shortdesc = __import__('__main__').__doc__.split('\n')[1]
695     else:
696         program_shortdesc = 'Running in test harness'
697     program_license = '''{0}
698
699   Created  on {1}.
700   Copyright 2015 Metaswitch Networks Ltd. All rights reserved.
701
702   Distributed on an "AS IS" basis without warranties
703   or conditions of any kind, either express or implied.
704
705 USAGE
706 '''.format(program_shortdesc, str(__date__))
707
708     try:
709         #----------------------------------------------------------------------
710         # Setup argument parser so we can parse the command-line.
711         #----------------------------------------------------------------------
712         parser = ArgumentParser(description=program_license,
713                                 formatter_class=ArgumentDefaultsHelpFormatter)
714         parser.add_argument('-i', '--influxdb',
715                             dest='influxdb',
716                             default='localhost',
717                             help='InfluxDB server addresss')
718         parser.add_argument('-v', '--verbose',
719                             dest='verbose',
720                             action='count',
721                             help='set verbosity level')
722         parser.add_argument('-V', '--version',
723                             action='version',
724                             version=program_version_message,
725                             help='Display version information')
726         parser.add_argument('-a', '--api-version',
727                             dest='api_version',
728                             default='7',
729                             help='set API version')
730         parser.add_argument('-c', '--config',
731                             dest='config',
732                             default='/etc/opt/att/collector.conf',
733                             help='Use this config file.',
734                             metavar='<file>')
735         parser.add_argument('-s', '--section',
736                             dest='section',
737                             default='default',
738                             metavar='<section>',
739                             help='section to use in the config file')
740
741         #----------------------------------------------------------------------
742         # Process arguments received.
743         #----------------------------------------------------------------------
744         args = parser.parse_args()
745         verbose = args.verbose
746         api_version = args.api_version
747         config_file = args.config
748         config_section = args.section
749
750         #----------------------------------------------------------------------
751         # Now read the config file, using command-line supplied values as
752         # overrides.
753         #----------------------------------------------------------------------
754         defaults = {'log_file': 'collector.log',
755                     'vel_port': '12233',
756                     'vel_path': '',
757                     'vel_topic_name': ''
758                    }
759         overrides = {}
760         config = ConfigParser.SafeConfigParser(defaults)
761         config.read(config_file)
762
763         #----------------------------------------------------------------------
764         # extract the values we want.
765         #----------------------------------------------------------------------
766         global influxdb
767         global vel_username
768         global vel_password
769         global vel_topic_name
770         global data_storage
771         
772         influxdb = config.get(config_section, 'influxdb', vars=overrides)
773         log_file = config.get(config_section, 'log_file', vars=overrides)
774         vel_port = config.get(config_section, 'vel_port', vars=overrides)
775         vel_path = config.get(config_section, 'vel_path', vars=overrides)
776         data_storage = config.get(config_section, 'data_storage', vars=overrides)
777
778         vel_topic_name = config.get(config_section,
779                                     'vel_topic_name',
780                                     vars=overrides)
781         vel_username = config.get(config_section,
782                                   'vel_username',
783                                   vars=overrides)
784         vel_password = config.get(config_section,
785                                   'vel_password',
786                                   vars=overrides)
787         vel_schema_file = config.get(config_section,
788                                      'schema_file',
789                                      vars=overrides)
790         base_schema_file = config.get(config_section,
791                                       'base_schema_file',
792                                       vars=overrides)
793         throttle_schema_file = config.get(config_section,
794                                           'throttle_schema_file',
795                                           vars=overrides)
796         test_control_schema_file = config.get(config_section,
797                                            'test_control_schema_file',
798                                            vars=overrides)
799
800         #----------------------------------------------------------------------
801         # Finally we have enough info to start a proper flow trace.
802         #----------------------------------------------------------------------
803         global logger
804         logger = logging.getLogger('monitor')
805         if verbose > 0:
806             logger.info('Verbose mode on')
807             logger.setLevel(logging.DEBUG)
808         else:
809             logger.setLevel(logging.INFO)
810         handler = logging.handlers.RotatingFileHandler(log_file,
811                                                        maxBytes=1000000,
812                                                        backupCount=10)
813         if (platform.system() == 'Windows'):
814             date_format = '%Y-%m-%d %H:%M:%S'
815         else:
816             date_format = '%Y-%m-%d %H:%M:%S.%f %z'
817         formatter = logging.Formatter('%(asctime)s %(name)s - '
818                                       '%(levelname)s - %(message)s',
819                                       date_format)
820         handler.setFormatter(formatter)
821         logger.addHandler(handler)
822         logger.info('Started') 
823
824         #----------------------------------------------------------------------
825         # Log the details of the configuration.
826         #----------------------------------------------------------------------
827         logger.debug('Log file = {0}'.format(log_file))
828         logger.debug('Influxdb server = {0}'.format(influxdb))
829         logger.debug('Event Listener Port = {0}'.format(vel_port))
830         logger.debug('Event Listener Path = {0}'.format(vel_path))
831         logger.debug('Event Listener Topic = {0}'.format(vel_topic_name))
832         logger.debug('Event Listener Username = {0}'.format(vel_username))
833         # logger.debug('Event Listener Password = {0}'.format(vel_password))
834         logger.debug('Event Listener JSON Schema File = {0}'.format(
835                                                               vel_schema_file))
836         logger.debug('Base JSON Schema File = {0}'.format(base_schema_file))
837         logger.debug('Throttle JSON Schema File = {0}'.format(
838                                                          throttle_schema_file))
839         logger.debug('Test Control JSON Schema File = {0}'.format(
840                                                      test_control_schema_file))
841
842         #----------------------------------------------------------------------
843         # Perform some basic error checking on the config.
844         #----------------------------------------------------------------------
845         if (int(vel_port) < 1024 or int(vel_port) > 65535):
846             logger.error('Invalid Vendor Event Listener port ({0}) '
847                          'specified'.format(vel_port))
848             raise RuntimeError('Invalid Vendor Event Listener port ({0}) '
849                                'specified'.format(vel_port))
850
851         if (len(vel_path) > 0 and vel_path[-1] != '/'):
852             logger.warning('Event Listener Path ({0}) should have terminating '
853                            '"/"!  Adding one on to configured string.'.format(
854                                                                      vel_path))
855             vel_path += '/'
856
857         #----------------------------------------------------------------------
858         # Load up the vel_schema, if it exists.
859         #----------------------------------------------------------------------
860         if not os.path.exists(vel_schema_file):     
861             logger.warning('Event Listener Schema File ({0}) not found. '
862                            'No validation will be undertaken.'.format(
863                                                               vel_schema_file))
864         else:
865             global vel_schema
866             global throttle_schema
867             global test_control_schema
868             vel_schema = json.load(open(vel_schema_file, 'r'))
869             logger.debug('Loaded the JSON schema file')
870
871             #------------------------------------------------------------------
872             # Load up the throttle_schema, if it exists.
873             #------------------------------------------------------------------
874             if (os.path.exists(throttle_schema_file)):
875                 logger.debug('Loading throttle schema')
876                 throttle_fragment = json.load(open(throttle_schema_file, 'r'))
877                 throttle_schema = {}
878                 throttle_schema.update(vel_schema)
879                 throttle_schema.update(throttle_fragment)
880                 logger.debug('Loaded the throttle schema')
881
882             #------------------------------------------------------------------
883             # Load up the test control _schema, if it exists.
884             #------------------------------------------------------------------
885             if (os.path.exists(test_control_schema_file)):
886                 logger.debug('Loading test control schema')
887                 test_control_fragment = json.load(
888                     open(test_control_schema_file, 'r'))
889                 test_control_schema = {}
890                 test_control_schema.update(vel_schema)
891                 test_control_schema.update(test_control_fragment)
892                 logger.debug('Loaded the test control schema')
893
894             #------------------------------------------------------------------
895             # Load up the base_schema, if it exists.
896             #------------------------------------------------------------------
897             if (os.path.exists(base_schema_file)):
898                 logger.debug('Updating the schema with base definition')
899                 base_schema = json.load(open(base_schema_file, 'r'))
900                 vel_schema.update(base_schema)
901                 logger.debug('Updated the JSON schema file')
902
903         #----------------------------------------------------------------------
904         # We are now ready to get started with processing. Start-up the various
905         # components of the system in order:
906         #
907         #  1) Create the dispatcher.
908         #  2) Register the functions for the URLs of interest.
909         #  3) Run the webserver.
910         #----------------------------------------------------------------------
911         root_url = '/{0}eventListener/v{1}{2}'.\
912                    format(vel_path,
913                           api_version,
914                           '/' + vel_topic_name
915                           if len(vel_topic_name) > 0
916                           else '')
917         throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\
918                        format(vel_path, api_version)
919         set_404_content(root_url)
920         dispatcher = PathDispatcher()
921         vendor_event_listener = partial(listener, schema = vel_schema)
922         dispatcher.register('GET', root_url, vendor_event_listener)
923         dispatcher.register('POST', root_url, vendor_event_listener)
924         vendor_throttle_listener = partial(listener, schema = throttle_schema)
925         dispatcher.register('GET', throttle_url, vendor_throttle_listener)
926         dispatcher.register('POST', throttle_url, vendor_throttle_listener)
927
928         #----------------------------------------------------------------------
929         # We also add a POST-only mechanism for test control, so that we can
930         # send commands to a single attached client.
931         #----------------------------------------------------------------------
932         test_control_url = '/testControl/v{0}/commandList'.format(api_version)
933         test_control_listener = partial(test_listener,
934                                         schema = test_control_schema)
935         dispatcher.register('POST', test_control_url, test_control_listener)
936         dispatcher.register('GET', test_control_url, test_control_listener)
937
938         httpd = make_server('', int(vel_port), dispatcher)
939         logger.info('Serving on port {0}...'.format(vel_port))
940         httpd.serve_forever()
941
942         logger.error('Main loop exited unexpectedly!')
943         return 0
944
945     except KeyboardInterrupt:
946         #----------------------------------------------------------------------
947         # handle keyboard interrupt
948         #----------------------------------------------------------------------
949         logger.info('Exiting on keyboard interrupt!')
950         return 0
951
952     except Exception as e:
953         #----------------------------------------------------------------------
954         # Handle unexpected exceptions.
955         #----------------------------------------------------------------------
956         if DEBUG or TESTRUN:
957             raise(e)
958         indent = len(program_name) * ' '
959         sys.stderr.write(program_name + ': ' + repr(e) + '\n')
960         sys.stderr.write(indent + '  for help use --help\n')
961         sys.stderr.write(traceback.format_exc())
962         logger.critical('Exiting because of exception: {0}'.format(e))
963         logger.critical(traceback.format_exc())
964         return 2
965
966 #------------------------------------------------------------------------------
967 # MAIN SCRIPT ENTRY POINT.
968 #------------------------------------------------------------------------------
969 if __name__ == '__main__':
970     if TESTRUN:
971         #----------------------------------------------------------------------
972         # Running tests - note that doctest comments haven't been included so
973         # this is a hook for future improvements.
974         #----------------------------------------------------------------------
975         import doctest
976         doctest.testmod()
977
978     if PROFILE:
979         #----------------------------------------------------------------------
980         # Profiling performance.  Performance isn't expected to be a major
981         # issue, but this should all work as expected.
982         #----------------------------------------------------------------------
983         import cProfile
984         import pstats
985         profile_filename = 'collector_profile.txt'
986         cProfile.run('main()', profile_filename)
987         statsfile = open('collector_profile_stats.txt', 'wb')
988         p = pstats.Stats(profile_filename, stream=statsfile)
989         stats = p.strip_dirs().sort_stats('cumulative')
990         stats.print_stats()
991         statsfile.close()
992         sys.exit(0)
993
994     #--------------------------------------------------------------------------
995     # Normal operation - call through to the main function.
996     #--------------------------------------------------------------------------
997     sys.exit(main())