From 5ea5016b7eec9415bd0133207a304435c05db8aa Mon Sep 17 00:00:00 2001 From: santanude Date: Tue, 7 Dec 2021 15:31:41 +0530 Subject: [PATCH] Leverage DMaaP adapter to send error code and error message Send error code and error message to the caller if error occurs SMO-28 Signed-off-by: santanude Change-Id: I6c7cb234319581afbd431e2bcbf2bc9a9b9934fd Signed-off-by: santanude --- dmaapadapter/adapter/code/app_config.py | 53 +++++++++------- dmaapadapter/adapter/code/consumer.py | 91 ++++++++++++++++----------- dmaapadapter/adapter/code/dmaap_adapter.py | 34 +++++----- dmaapadapter/adapter/code/prepare_response.py | 34 ++++++++++ 4 files changed, 139 insertions(+), 73 deletions(-) create mode 100644 dmaapadapter/adapter/code/prepare_response.py diff --git a/dmaapadapter/adapter/code/app_config.py b/dmaapadapter/adapter/code/app_config.py index fb5be95..7eb2029 100644 --- a/dmaapadapter/adapter/code/app_config.py +++ b/dmaapadapter/adapter/code/app_config.py @@ -43,34 +43,43 @@ class AppConfig: config = configparser.ConfigParser() config.read(config_file) - self.kafka_broker = config.get(config_section, - 'kafka_broker', - vars=overrides) + self.kafka_broker = config.get(config_section, 'kafka_broker', vars=overrides) log_file = config.get(config_section, 'log_file', vars=overrides) log_level = config.get(config_section, 'log_level', vars=overrides) - handler = logging.handlers.RotatingFileHandler(log_file, - maxBytes=1000000, - backupCount=10) - formatter = logging.Formatter('%(asctime)s %(name)s - ' - '%(levelname)s - %(message)s', - '%Y-%m-%d %H:%M:%S.%f %z') - handler.setFormatter(formatter) - self.logger.addHandler(handler) - - # we are going to set the log level - if (log_level == 'DEBUG'): - self.logger.setLevel(logging.DEBUG) - elif (log_level == 'ERROR'): - self.logger.setLevel(logging.ERROR) - else: - self.logger.setLevel(logging.INFO) - - self.logger.info('Log level {} and log file {} : ' - .format(log_level, log_file)) + self.setLogger(log_file, log_level) def getKafkaBroker(self): return self.kafka_broker def getLogger(self): return self.logger + + def setLogger(self, log_file, log_level): + rfh = logging.handlers.RotatingFileHandler( + filename=log_file, + mode='w', + maxBytes=1000000, + backupCount=10, + encoding=None, + delay=0 + ) + + logging.basicConfig( + format="%(asctime)s %(name)-8s %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S.%f %z", + handlers=[rfh] + ) + + logger = logging.getLogger("DMaaP") + + # we are going to set the log level + if (log_level == 'DEBUG'): + logger.setLevel(logging.DEBUG) + elif (log_level == 'ERROR'): + logger.setLevel(logging.ERROR) + else: + logger.setLevel(logging.INFO) + + logger.info('Log level {} and log file {} : '.format(log_level, log_file)) + self.logger = logger diff --git a/dmaapadapter/adapter/code/consumer.py b/dmaapadapter/adapter/code/consumer.py index 62d744c..06f0ba6 100644 --- a/dmaapadapter/adapter/code/consumer.py +++ b/dmaapadapter/adapter/code/consumer.py @@ -14,10 +14,10 @@ # from confluent_kafka.admin import AdminClient -from confluent_kafka import Consumer +from confluent_kafka import Consumer, KafkaError from app_config import AppConfig -import logging import sys +import logging.handlers class EventConsumer: @@ -29,9 +29,9 @@ class EventConsumer: self.logger = appConfig.getLogger() self.broker = appConfig.getKafkaBroker() - def consumeEvents(self, topic, consumergroup, consumerid, limit, timeout): + def consumeEvents(self, prepareResponse, topic, consumergroup, consumerid, limit, timeout): self.logger.debug("topic={}, consumergroup={}, consumerid={}, limit={}, timeout={} " - .format(topic, consumergroup, consumerid, limit, timeout)) + .format(topic, consumergroup, consumerid, limit, timeout)) consumer_config = { 'bootstrap.servers': self.broker, 'group.id': consumergroup, @@ -47,6 +47,7 @@ class EventConsumer: try: ctr = 0 content_size = 0 + response_code = 200 while True: if (ctr == int(limit)): break @@ -56,85 +57,103 @@ class EventConsumer: # read single message at a time msg = consumer.poll(timeout=int(timeout)) if msg is None: - self.logger.debug("No records ") + self.logger.debug("No new records exists in topic {} of broker {}".format(topic, self.broker)) break if msg.error(): + if (msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART): + response_code = 409 self.logger.debug("Error reading message : {}".format(msg.error())) break - event = msg.value().decode('utf8').replace("'", '"') - content_size = content_size + sys.getsizeof(event) - event_list.append(event) + + content_size = content_size + sys.getsizeof(msg.value().decode('utf8').replace("'", '"')) + event_list.append(msg.value().decode('utf8').replace("'", '"')) consumer.commit() + prepareResponse.setResponseCode(response_code) + if (response_code == 409): + prepareResponse.setResponseMsg("Unable to read the messages from the topic") + else: + prepareResponse.setResponseMsg(event_list) + except Exception as ex: - self.logger.debug('Failed to get event information due to unexpected reason! {0}'.format(ex)) + self.logger.error("Failed to get event information due to unexpected reason! {0}".format(ex)) + prepareResponse.setResponseCode(500) + prepareResponse.setResponseMsg("Failed to return the events") finally: self.logger.debug("closing consumer") consumer.close() - return event_list class TopicConsumer: broker = "" - logger = logging.getLogger() timeout = 10 + logger = logging.getLogger() def __init__(self): appConfig = AppConfig() self.logger = appConfig.getLogger() self.broker = appConfig.getKafkaBroker() - def getTopics(self): + def getTopics(self, prepareResponse): try: + topic_list = [] adminClient = AdminClient({"bootstrap.servers": self.broker}) ListTopicsResult = adminClient.list_topics(timeout=self.timeout) - topic_list = [] for key, value in ListTopicsResult.topics.items(): topic_list.append(key) - dict = {'topics': topic_list} - return dict + dict = {"topics": topic_list} + prepareResponse.setResponseCode(200) + prepareResponse.setResponseMsg(dict) except Exception as ex: - self.logger.debug('Failed to get topic information due to unexpected reason! {0}'.format(ex)) + self.logger.error('Failed to get topic information due to unexpected reason! {0}'.format(ex)) + prepareResponse.setResponseCode(500) + prepareResponse.setResponseMsg("Failed to return the topics") - def listAllTopics(self): + def listAllTopics(self, prepareResponse): try: topic_list = [] adminClient = AdminClient({"bootstrap.servers": self.broker}) ListTopicsResult = adminClient.list_topics(timeout=self.timeout) for key, value in ListTopicsResult.topics.items(): - dict = {'topicName': key, - 'owner': '', - 'txenabled': False - } + dict = {"topicName": key, "owner": "", "txenabled": False} topic_list.append(dict) - dict2 = {'topics': topic_list} - return dict2 + dict2 = {"topics": topic_list} + prepareResponse.setResponseCode(200) + prepareResponse.setResponseMsg(dict2) except Exception as ex: - self.logger.debug('Failed to get list of topic information due to unexpected reason! {0}'.format(ex)) + self.logger.error('Failed to get list of topic information due to unexpected reason! {0}'.format(ex)) + prepareResponse.setResponseCode(500) + prepareResponse.setResponseMsg("Failed to return the topics") - def getTopicDetails(self, topic): + def getTopicDetails(self, prepareResponse, topic): try: adminClient = AdminClient({"bootstrap.servers": self.broker}) ListTopicsResult = adminClient.list_topics(timeout=self.timeout) + topic_exists = False for key, value in ListTopicsResult.topics.items(): if (key == topic): - dict = {'name': key, - 'owner': '', - 'description': '', - 'readerAcl': {"enabled": True, "users": []}, - 'writerAcl': {"enabled": True, "users": []} - } - return dict - - self.logger.debug("Topic {} does not exists! ".format(topic)) - return "Topic [" + topic + "] does not exists" + topic_exists = True + dict = {"name": key, + "owner": "", + "description": "", + "readerAcl": {"enabled": True, "users": []}, + "writerAcl": {"enabled": True, "users": []}} + prepareResponse.setResponseCode(200) + prepareResponse.setResponseMsg(dict) + + if (topic_exists is False): + self.logger.debug("Topic '{}' does not exists! ".format(topic)) + prepareResponse.setResponseCode(404) + prepareResponse.setResponseMsg("Topic [" + topic + "] not found") except Exception as ex: - self.logger.debug('Failed to get topic detail due to unexpected reason! {0}'.format(ex)) + self.logger.error('Failed to get topic detail due to unexpected reason! {0}'.format(ex)) + prepareResponse.setResponseCode(500) + prepareResponse.setResponseMsg("Failed to return the topics") diff --git a/dmaapadapter/adapter/code/dmaap_adapter.py b/dmaapadapter/adapter/code/dmaap_adapter.py index 4db2935..e3042e2 100644 --- a/dmaapadapter/adapter/code/dmaap_adapter.py +++ b/dmaapadapter/adapter/code/dmaap_adapter.py @@ -16,7 +16,7 @@ import flask from flask import request from consumer import EventConsumer, TopicConsumer -import json +from prepare_response import PrepareResponse app = flask.Flask(__name__) app.config["DEBUG"] = True @@ -30,18 +30,22 @@ def index(): @app.route(api_base_url + '/topics', methods=['GET']) def get_all_topics(): + prepareResponse = PrepareResponse() topicConsumer = TopicConsumer() - response = app.response_class(response=json.dumps(topicConsumer.getTopics()), - status=200, + topicConsumer.getTopics(prepareResponse) + response = app.response_class(response=prepareResponse.getResponseMsg(), + status=prepareResponse.getResponseCode(), mimetype='application/json') return response @app.route(api_base_url + '/topics/listAll', methods=['GET']) def listall_topics(): + prepareResponse = PrepareResponse() topicConsumer = TopicConsumer() - response = app.response_class(response=json.dumps(topicConsumer.listAllTopics()), - status=200, + topicConsumer.listAllTopics(prepareResponse) + response = app.response_class(response=prepareResponse.getResponseMsg(), + status=prepareResponse.getResponseCode(), mimetype='application/json') return response @@ -49,9 +53,11 @@ def listall_topics(): @app.route(api_base_url + '/topics/', methods=['GET']) def topic_details(topic): assert topic == request.view_args['topic'] + prepareResponse = PrepareResponse() topicConsumer = TopicConsumer() - response = app.response_class(response=json.dumps(topicConsumer.getTopicDetails(topic)), - status=200, + topicConsumer.getTopicDetails(prepareResponse, topic) + response = app.response_class(response=prepareResponse.getResponseMsg(), + status=prepareResponse.getResponseCode(), mimetype='application/json') return response @@ -69,15 +75,11 @@ def get_events(topic, consumergroup, consumerid): if 'timeout' in request.args: timeout = request.args['timeout'] + prepareResponse = PrepareResponse() eventConsumer = EventConsumer() - response = app.response_class(response=json.dumps( - eventConsumer.consumeEvents( - topic, - consumergroup, - consumerid, - getLimit(limit), - getTimeout(timeout))), - status=200, + eventConsumer.consumeEvents(prepareResponse, topic, consumergroup, consumerid, getLimit(limit), getTimeout(timeout)) + response = app.response_class(response=prepareResponse.getResponseMsg(), + status=prepareResponse.getResponseCode(), mimetype='application/json') return response @@ -94,6 +96,8 @@ def getLimit(limit): def getTimeout(timeout): try: timeout = int(timeout) + if (timeout < 0): + timeout = 15 except Exception: timeout = 15 finally: diff --git a/dmaapadapter/adapter/code/prepare_response.py b/dmaapadapter/adapter/code/prepare_response.py new file mode 100644 index 0000000..6cce394 --- /dev/null +++ b/dmaapadapter/adapter/code/prepare_response.py @@ -0,0 +1,34 @@ +# Copyright 2021 Xoriant Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json + + +class PrepareResponse: + + responseCode = None + responseMsg = "" + + def setResponseCode(self, responseCode): + self.responseCode = responseCode + + def getResponseCode(self): + return self.responseCode + + def setResponseMsg(self, responseMsg): + self.responseMsg = json.dumps(responseMsg) + + def getResponseMsg(self): + return self.responseMsg -- 2.16.6