# See the License for the specific language governing permissions and
# limitations under the License.
#
-import sys
-import os
import platform
import json
import logging
import configparser
import logging.handlers
import requests
-import urllib.request as url
from confluent_kafka import Consumer, KafkaError
# ------------------------------------------------------------------------------
logger = None
+
def send_to_influxdb(event, pdata):
url = 'http://{}/write?db=eventsdb'.format(influxdb)
logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata))
- r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
- logger.info('influxdb return code {}'.format(r.status_code))
- assert (r.status_code == 204), logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
-
+ try:
+ r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
+ logger.info('influxdb return code {}'.format(r.status_code))
+ assert (r.status_code == 204), logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
+ except Exception as e:
+ logger.error('Exception occured while saving data : '.format(e))
+
def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
for additionalMeasurements in val:
}
config.read(config_file)
-
# ----------------------------------------------------------------------
# extract the values we want.
# ----------------------------------------------------------------------
influxdb = config.get(config_section, 'influxdb', vars=overrides)
log_file = config.get(config_section, 'log_file', vars=overrides)
- kafka_server = config.get(config_section,'kafka_server', vars=overrides)
+ kafka_server = config.get(config_section, 'kafka_server', vars=overrides)
assert (influxdb != ""), "Value of property 'influxdb' is missing in config file"
assert (log_file != ""), "Value of property 'log_file' is missing in config file"
# kafka Consumer code .
# ----------------------------------------------------------------------
-
settings = {
'bootstrap.servers': kafka_server,
'group.id': 'mygroup',
c = Consumer(settings)
- c.subscribe(['measurement','pnfregistration',
- 'fault','thresholdcrossingalert','heartbeat'])
+ c.subscribe(['measurement', 'pnfregistration',
+ 'fault', 'thresholdcrossingalert', 'heartbeat'])
try:
while True:
assert (msg.value() is not None), "Invalid message"
logger.debug('Recived message from topic name {} and offset number {}'.format(msg.topic(), msg.offset()))
# saving data in influxdb
- save_event_in_db(msg.value())
+ try:
+ save_event_in_db(msg.value())
+ except Exception as e:
+ logger.error('Exception occured while saving data : '.format(e))
elif msg.error().code() == KafkaError._PARTITION_EOF:
logger.error('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
finally:
c.close()
+
if __name__ == '__main__':
main()