--- /dev/null
+# Copyright (c) 2022 Nokia
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Symptomdata collection is triggered from the trblmgr ricplt pod. This subsystem provides for xapp interface to subscribe the
+# symptomdata collection via lwsd pod. When the symptomdata collection is triggered then the xapp gets the callback to collect
+# the symptomdata.
+#
+# If the dynamic registration is needed, then the xapp needs to use the Symptomdata.subscribe(...) method to indicate symptomdata
+# collection. In case the xapp is set to trblmgr config file then the registration is not needed.
+#
+# If the xapp has the internal data for symptomdata collection REST call response, it can use the helper methods getFileList and collect
+# to get the needed files or readymade zipped package for reponse.
+#
+import os
+import re
+import time
+import requests
+import json
+from requests.exceptions import HTTPError
+from zipfile import ZipFile
+from threading import Timer
+from datetime import datetime
+from mdclogpy import Logger
+
+logging = Logger(name=__name__)
+
+
+class RepeatTimer(Timer):
+ # timer class for housekeeping and file rotating
+ def run(self):
+ while not self.finished.wait(self.interval):
+ self.function(*self.args, **self.kwargs)
+
+
+class Symptomdata(object):
+ # service is the local POD service id, path the temporal storage, host should be the trblmgr service name
+ def __init__(self, service="", servicehost="", path="/tmp/", lwsduri=None, timeout=30):
+ """
+ init
+
+ Parameters
+ ----------
+ service: string
+ xapp service name
+ servicehost: string
+ xapp service host name
+ path:
+ temporal path where the symptomdata collection is stored
+ lwsduri:
+ lwsd uri for symptomdata dynamic registration
+ timeout:
+ timeout for subscription status polling
+ """
+ if not os.path.exists(path):
+ os.mkdir(path)
+ self.service = service
+ self.servicehost = servicehost
+ self.path = path
+ self.lwsduri = lwsduri
+ self.timeout = timeout
+ # runtime attrs
+ self.zipfilename = None
+ logging.info("Symptomdata init service:%s path:%s lwsduri:%s timeout:%d" % (self.service, self.path, self.lwsduri, self.timeout))
+ if self.lwsduri is not None:
+ # do the subscription, set to True so that first the query is triggered
+ self.lwsdok = True
+ self.subscribe(args=("",))
+ self.subscribetimer = RepeatTimer(self.timeout, self.subscribe, args=("",))
+ self.subscribetimer.start()
+
+ # make the symptomdata subscription query to lwsd - dynamic registration (needed if the static config in trblmgr does not have xapp service data)
+ def subscribe(self, args):
+ """
+ subscribe
+ internally used subscription function if the dynamic registration has been set
+ """
+ if self.lwsduri is not None:
+ try:
+ proxies = {"http": "", "https": ""} # disable proxy usage
+ headers = {'Content-type': 'application/json', 'Accept': 'application/json'}
+ if self.lwsdok is False:
+ jsondata = json.dumps({'url': 'http://' + self.servicehost +
+ ':8080/ric/v1/symptomdata', 'service': self.service, 'instance': '1'})
+ response = requests.post(self.lwsduri,
+ data=jsondata,
+ headers=headers,
+ proxies=proxies)
+ logging.info("Symptomdata subscription success")
+ self.lwsdok = True
+ elif self.lwsdok is True:
+ self.lwsdok = False
+ response = requests.get(self.lwsduri, headers=headers, proxies=proxies)
+ for item in response.json():
+ if item.get('service') == self.service:
+ logging.info("Symptomdata subscription request success")
+ self.lwsdok = True
+ if self.lwsdok is False:
+ logging.error("Symptomdata subscription missing")
+ response.raise_for_status()
+ except HTTPError as http_err:
+ logging.error("Symptomdata subscription failed - http error : %s" % (http_err))
+ self.lwsdok = False
+ except Exception as err:
+ logging.error("Symptomdata subscription failed - error : %s" % (err))
+ self.lwsdok = False
+
+ def stop(self):
+ """
+ stop
+ stops the dynamic service registration/polling
+ """
+ if self.subscribetimer is not None:
+ self.subscribetimer.cancel()
+
+ def __del__(self):
+ if self.subscribetimer is not None:
+ self.subscribetimer.cancel()
+
+ def getFileList(self, regex, fromtime, totime):
+ """
+ getFileList
+ internal use only, get the matching files for collect method
+ """
+ fileList = []
+ path, wc = regex.rsplit('/', 1)
+ logging.info("Filtering path: %s using wildcard %s fromtime %d totime %d" % (path + '/', wc, fromtime, totime))
+ try:
+ for root, dirs, files in os.walk((path + '/')):
+ for filename in files:
+ if re.match(wc, filename):
+ file_path = os.path.join(root, filename)
+ filest = os.stat(file_path)
+ if fromtime > 0:
+ logging.info("Filtering file time %d fromtime %d totime %d" % (filest.st_ctime, fromtime, totime))
+ if fromtime <= filest.st_ctime:
+ logging.info("Adding file time %d fromtime %d" % (filest.st_ctime, fromtime))
+ if totime > 0:
+ if totime >= filest.st_ctime:
+ fileList.append(file_path)
+ else:
+ fileList.append(file_path)
+ elif totime > 0:
+ if totime >= filest.st_ctime:
+ logging.info("Filtering file time %d fromtime %d totime %d" % (filest.st_ctime, fromtime, totime))
+ fileList.append(file_path)
+ else:
+ fileList.append(file_path)
+
+ except OSError as e:
+ logging.error("System error %d" % (e.errno))
+ return fileList
+
+ def collect(self, zipfiletmpl, fileregexlist, fromtime, totime):
+ """
+ collect
+ collects the symptomdata based on the file regular expression match and stored the symptomdata. Optionaly
+ caller can use fromtime and totime to choose only files matching the access time
+
+ Parameters
+ ----------
+ zipfiletmpl: string
+ template for zip file name using the strftime format - ex: ``"symptomdata"+'-%Y-%m-%d-%H-%M-%S.zip'``
+ fileregexlist: string array
+ array for file matching - ex: ``('examples/*.csv',)``
+ fromtime: integer
+ time value seconds
+ totime: integer
+ time value seconds
+ Returns
+ -------
+ string
+ zipfile name
+ """
+ zipfilename = self.path + datetime.fromtimestamp(int(time.time())).strftime(zipfiletmpl)
+ logging.info("Compressing files to symptomdata archive: %s" % (zipfilename))
+ zipdata = ZipFile(zipfilename, "w")
+ self.remove()
+ self.zipfilename = None
+ fileCnt = 0
+ for fileregex in fileregexlist:
+ logging.info("Compressing files using %s" % (fileregex))
+ fileList = self.getFileList(fileregex, fromtime, totime)
+ try:
+ if len(fileList) > 0:
+ for file_path in fileList:
+ logging.info("Adding file %s to archive" % (file_path))
+ zipdata.write(file_path, file_path)
+ fileCnt += 1
+ except OSError as e:
+ logging.error("System error %d" % (e.errno))
+ zipdata.close()
+ if fileCnt > 0:
+ self.zipfilename = zipfilename
+ return self.zipfilename
+
+ def read(self):
+ """
+ read
+ reads the stored symptomdata file content
+
+ Returns
+ -------
+ string
+ zipfile name
+ integer
+ data lenght
+ bytes
+ bytes of the file data
+ """
+ data = None
+ with open(self.zipfilename, 'rb') as file:
+ data = file.read()
+ return (self.zipfilename, len(data), data)
+
+ def remove(self):
+ if self.zipfilename is not None:
+ os.remove(self.zipfilename)