Sonar corrections. Unit tests and code coverage added
[sim/a1-interface.git] / near-rt-ric-simulator / src / STD_1.1.3 / a1.py
index d234327..d7169b1 100644 (file)
@@ -26,11 +26,16 @@ from connexion import NoContent
 from flask import Flask, escape, request, Response, make_response
 from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
 from utils import calcFingerprint
-from maincommon import *
+from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name
+
+#Constsants
+APPL_JSON='application/json'
+APPL_PROB_JSON='application/problem+json'
 
 # API Function: Get all policy ids
 def get_all_policy_identities():
 
+  extract_host_name(hosts_set, request)
 
   if ((r := check_modified_response()) is not None):
     return r
@@ -48,25 +53,25 @@ def put_policy(policyId):
   try:
     data = request.data
     data = json.loads(data)
-  except:
+  except Exception:
     pjson=create_problem_json(None, "The policy is corrupt or missing.", 400, None, policyId)
-    return Response(json.dumps(pjson), 400, mimetype='application/problem+json')
+    return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
 
-  fpPrevious=None
+  fp_previous=None
   retcode=201
   if policyId in policy_instances.keys():
     retcode=200
-    fpPrevious=calcFingerprint(policy_instances[policyId])
+    fp_previous=calcFingerprint(policy_instances[policyId])
 
   fp=calcFingerprint(data)
   if (fp in policy_fingerprint.keys()):
-    id=policy_fingerprint[fp]
-    if (id != policyId):
+    p_id=policy_fingerprint[fp]
+    if (p_id != policyId):
       pjson=create_problem_json(None, "The policy json already exists.", 400, None, policyId)
-      return Response(json.dumps(pjson), 400, mimetype='application/problem+json')
+      return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
 
-  if (fpPrevious is not None):
-    del policy_fingerprint[fpPrevious]
+  if (fp_previous is not None):
+    del policy_fingerprint[fp_previous]
 
   policy_fingerprint[fp]=policyId
 
@@ -79,11 +84,11 @@ def put_policy(policyId):
   policy_status[policyId]=ps
 
   if (retcode == 200):
-    return Response(json.dumps(data), 200, mimetype='application/json')
+    return Response(json.dumps(data), 200, mimetype=APPL_JSON)
   else:
     headers={}
     headers['Location']='/A1-P/v1/policies/' + policyId
-    return Response(json.dumps(data), 201, headers=headers, mimetype='application/json')
+    return Response(json.dumps(data), 201, headers=headers, mimetype=APPL_JSON)
 
 # API Function: Get a policy
 def get_policy(policyId):
@@ -94,10 +99,10 @@ def get_policy(policyId):
     return r
 
   if policyId in policy_instances.keys():
-    return Response(json.dumps(policy_instances[policyId]), 200, mimetype='application/json')
+    return Response(json.dumps(policy_instances[policyId]), 200, mimetype=APPL_JSON)
 
   pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policyId)
-  return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+  return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
 
 # API Function: Delete a policy
 def delete_policy(policyId):
@@ -108,15 +113,15 @@ def delete_policy(policyId):
     return r
 
   if policyId in policy_instances.keys():
-    fpPrevious=calcFingerprint(policy_instances[policyId])
-    policy_fingerprint.pop(fpPrevious)
+    fp_previous=calcFingerprint(policy_instances[policyId])
+    policy_fingerprint.pop(fp_previous)
     policy_instances.pop(policyId)
     policy_status.pop(policyId)
     callbacks.pop(policyId)
-    return Response('', 204, mimetype='application/json')
+    return Response('', 204, mimetype=APPL_JSON)
 
   pjson=create_problem_json(None, "The policy identity does not exist.", 404, "No policy instance has been deleted.", policyId)
-  return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+  return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
 
 # API Function: Get status for a policy
 def get_policy_status(policyId):
@@ -127,17 +132,17 @@ def get_policy_status(policyId):
     return r
 
   if policyId in policy_instances.keys():
-    return Response(json.dumps(policy_status[policyId]), status=200, mimetype='application/json')
+    return Response(json.dumps(policy_status[policyId]), status=200, mimetype=APPL_JSON)
 
   pjson=create_problem_json(None, "The policy identity does not exist.", 404, "There is no existing policy instance with the identity: " + policyId, policyId)
-  return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+  return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
 
 # Helper: Create a response object if forced http response code is set
 def get_forced_response():
   if (forced_settings['code'] is not None):
     pjson=create_error_response(forced_settings['code'])
     forced_settings['code']=None
-    return Response(json.dumps(pjson), pjson['status'], mimetype='application/problem+json')
+    return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
   return None
 
 # Helper: Delay if delayed response code is set
@@ -146,9 +151,8 @@ def do_delay():
     try:
       val=int(forced_settings['delay'])
       time.sleep(val)
-    except:
+    except Exception:
       return
-  return
 
 # Helper: Check if response shall be delayed or a forced response shall be sent
 def check_modified_response():