from flask import Flask, escape, request, Response, make_response
from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
from utils import calcFingerprint
-from maincommon import *
+from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name
+
+#Constsants
+APPL_JSON='application/json'
+APPL_PROB_JSON='application/problem+json'
# API Function: Get all policy ids
def get_all_policy_identities():
+ extract_host_name(hosts_set, request)
if ((r := check_modified_response()) is not None):
return r
try:
data = request.data
data = json.loads(data)
- except:
+ except Exception:
pjson=create_problem_json(None, "The policy is corrupt or missing.", 400, None, policyId)
- return Response(json.dumps(pjson), 400, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
- fpPrevious=None
+ fp_previous=None
retcode=201
if policyId in policy_instances.keys():
retcode=200
- fpPrevious=calcFingerprint(policy_instances[policyId])
+ fp_previous=calcFingerprint(policy_instances[policyId])
fp=calcFingerprint(data)
if (fp in policy_fingerprint.keys()):
- id=policy_fingerprint[fp]
- if (id != policyId):
+ p_id=policy_fingerprint[fp]
+ if (p_id != policyId):
pjson=create_problem_json(None, "The policy json already exists.", 400, None, policyId)
- return Response(json.dumps(pjson), 400, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
- if (fpPrevious is not None):
- del policy_fingerprint[fpPrevious]
+ if (fp_previous is not None):
+ del policy_fingerprint[fp_previous]
policy_fingerprint[fp]=policyId
policy_status[policyId]=ps
if (retcode == 200):
- return Response(json.dumps(data), 200, mimetype='application/json')
+ return Response(json.dumps(data), 200, mimetype=APPL_JSON)
else:
headers={}
headers['Location']='/A1-P/v1/policies/' + policyId
- return Response(json.dumps(data), 201, headers=headers, mimetype='application/json')
+ return Response(json.dumps(data), 201, headers=headers, mimetype=APPL_JSON)
# API Function: Get a policy
def get_policy(policyId):
return r
if policyId in policy_instances.keys():
- return Response(json.dumps(policy_instances[policyId]), 200, mimetype='application/json')
+ return Response(json.dumps(policy_instances[policyId]), 200, mimetype=APPL_JSON)
pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policyId)
- return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
# API Function: Delete a policy
def delete_policy(policyId):
return r
if policyId in policy_instances.keys():
- fpPrevious=calcFingerprint(policy_instances[policyId])
- policy_fingerprint.pop(fpPrevious)
+ fp_previous=calcFingerprint(policy_instances[policyId])
+ policy_fingerprint.pop(fp_previous)
policy_instances.pop(policyId)
policy_status.pop(policyId)
callbacks.pop(policyId)
- return Response('', 204, mimetype='application/json')
+ return Response('', 204, mimetype=APPL_JSON)
pjson=create_problem_json(None, "The policy identity does not exist.", 404, "No policy instance has been deleted.", policyId)
- return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
# API Function: Get status for a policy
def get_policy_status(policyId):
return r
if policyId in policy_instances.keys():
- return Response(json.dumps(policy_status[policyId]), status=200, mimetype='application/json')
+ return Response(json.dumps(policy_status[policyId]), status=200, mimetype=APPL_JSON)
pjson=create_problem_json(None, "The policy identity does not exist.", 404, "There is no existing policy instance with the identity: " + policyId, policyId)
- return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
# Helper: Create a response object if forced http response code is set
def get_forced_response():
if (forced_settings['code'] is not None):
pjson=create_error_response(forced_settings['code'])
forced_settings['code']=None
- return Response(json.dumps(pjson), pjson['status'], mimetype='application/problem+json')
+ return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
return None
# Helper: Delay if delayed response code is set
try:
val=int(forced_settings['delay'])
time.sleep(val)
- except:
+ except Exception:
return
- return
# Helper: Check if response shall be delayed or a forced response shall be sent
def check_modified_response():