NONRTRIC-955: Add uvicorn, fix for appl/json
[sim/a1-interface.git] / near-rt-ric-simulator / src / STD_1.1.3 / a1.py
index dfc81e6..ece4f39 100644 (file)
@@ -1,5 +1,5 @@
 #  ============LICENSE_START===============================================
-#  Copyright (C) 2020 Nordix Foundation. All rights reserved.
+#  Copyright (C) 2021 Nordix Foundation. All rights reserved.
 #  ========================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -23,46 +23,63 @@ import collections
 import time
 
 from connexion import NoContent
-from flask import Flask, escape, request, Response, make_response
-from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint
+from flask import Flask, request, Response
+from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
 from utils import calcFingerprint
+from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check
+
+#Constsants
+APPL_JSON='application/json'
+APPL_PROB_JSON='application/problem+json'
 
 # API Function: Get all policy ids
 def get_all_policy_identities():
 
+  extract_host_name(hosts_set, request)
+
   if ((r := check_modified_response()) is not None):
     return r
 
-  return (list(policy_instances.keys()), 200)
-
+  res = list(policy_instances.keys())
+  return Response(json.dumps(res), 200, mimetype=APPL_JSON)
+  
 # API Function: Create or update a policy
 def put_policy(policyId):
 
+  extract_host_name(hosts_set, request)
+
   if ((r := check_modified_response()) is not None):
     return r
 
   try:
     data = request.data
     data = json.loads(data)
-  except:
+  except Exception:
     pjson=create_problem_json(None, "The policy is corrupt or missing.", 400, None, policyId)
-    return Response(json.dumps(pjson), 400, mimetype='application/problem+json')
+    return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
 
-  fpPrevious=None
+  fp_previous=None
   retcode=201
   if policyId in policy_instances.keys():
     retcode=200
-    fpPrevious=calcFingerprint(policy_instances[policyId])
+    if (is_duplicate_check()):
+      fp_previous=calcFingerprint(policy_instances[policyId])
+    else:
+      fp_previous=policyId
+
+  if (is_duplicate_check()):
+    fp=calcFingerprint(data)
+  else:
+    fp=policyId
 
-  fp=calcFingerprint(data)
   if (fp in policy_fingerprint.keys()):
-    id=policy_fingerprint[fp]
-    if (id != policyId):
+    p_id=policy_fingerprint[fp]
+    if (p_id != policyId):
       pjson=create_problem_json(None, "The policy json already exists.", 400, None, policyId)
-      return Response(json.dumps(pjson), 400, mimetype='application/problem+json')
+      return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
 
-  if (fpPrevious is not None):
-    del policy_fingerprint[fpPrevious]
+  if (fp_previous is not None):
+    del policy_fingerprint[fp_previous]
 
   policy_fingerprint[fp]=policyId
 
@@ -75,59 +92,69 @@ def put_policy(policyId):
   policy_status[policyId]=ps
 
   if (retcode == 200):
-    return Response(json.dumps(data), 200, mimetype='application/json')
+    return Response(json.dumps(data), 200, mimetype=APPL_JSON)
   else:
     headers={}
     headers['Location']='/A1-P/v1/policies/' + policyId
-    return Response(json.dumps(data), 201, headers=headers, mimetype='application/json')
+    return Response(json.dumps(data), 201, headers=headers, mimetype=APPL_JSON)
 
 # API Function: Get a policy
 def get_policy(policyId):
 
+  extract_host_name(hosts_set, request)
+
   if ((r := check_modified_response()) is not None):
     return r
 
   if policyId in policy_instances.keys():
-    return Response(json.dumps(policy_instances[policyId]), 200, mimetype='application/json')
+    return Response(json.dumps(policy_instances[policyId]), 200, mimetype=APPL_JSON)
 
   pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policyId)
-  return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+  return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
 
 # API Function: Delete a policy
 def delete_policy(policyId):
 
+  extract_host_name(hosts_set, request)
+
   if ((r := check_modified_response()) is not None):
     return r
 
   if policyId in policy_instances.keys():
-    fpPrevious=calcFingerprint(policy_instances[policyId])
-    policy_fingerprint.pop(fpPrevious)
+    if (is_duplicate_check()):
+      fp_previous=calcFingerprint(policy_instances[policyId])
+    else:
+      fp_previous=policyId
+
+    policy_fingerprint.pop(fp_previous)
     policy_instances.pop(policyId)
     policy_status.pop(policyId)
     callbacks.pop(policyId)
-    return Response('', 204, mimetype='application/json')
+    return Response('', 204, mimetype=APPL_JSON)
 
   pjson=create_problem_json(None, "The policy identity does not exist.", 404, "No policy instance has been deleted.", policyId)
-  return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+  return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
 
 # API Function: Get status for a policy
 def get_policy_status(policyId):
 
+  extract_host_name(hosts_set, request)
+
   if ((r := check_modified_response()) is not None):
     return r
 
   if policyId in policy_instances.keys():
-    return Response(json.dumps(policy_status[policyId]), status=200, mimetype='application/json')
+    return Response(json.dumps(policy_status[policyId]), status=200, mimetype=APPL_JSON)
 
   pjson=create_problem_json(None, "The policy identity does not exist.", 404, "There is no existing policy instance with the identity: " + policyId, policyId)
-  return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+  return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
 
 # Helper: Create a response object if forced http response code is set
 def get_forced_response():
   if (forced_settings['code'] is not None):
     pjson=create_error_response(forced_settings['code'])
     forced_settings['code']=None
-    return Response(json.dumps(pjson), pjson['status'], mimetype='application/problem+json')
+    return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
   return None
 
 # Helper: Delay if delayed response code is set
@@ -136,9 +163,8 @@ def do_delay():
     try:
       val=int(forced_settings['delay'])
       time.sleep(val)
-    except:
+    except Exception:
       return
-  return
 
 # Helper: Check if response shall be delayed or a forced response shall be sent
 def check_modified_response():