# ============LICENSE_START===============================================
-# Copyright (C) 2020 Nordix Foundation. All rights reserved.
+# Copyright (C) 2021 Nordix Foundation. All rights reserved.
# ========================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
import time
from connexion import NoContent
-from flask import Flask, escape, request, Response, make_response
-from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint
+from flask import Flask, request, Response
+from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
from utils import calcFingerprint
+from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check
+
+#Constsants
+APPL_JSON='application/json'
+APPL_PROB_JSON='application/problem+json'
# API Function: Get all policy ids
def get_all_policy_identities():
+ extract_host_name(hosts_set, request)
+
if ((r := check_modified_response()) is not None):
return r
- return (list(policy_instances.keys()), 200)
-
+ res = list(policy_instances.keys())
+ return Response(json.dumps(res), 200, mimetype=APPL_JSON)
+
# API Function: Create or update a policy
def put_policy(policyId):
+ extract_host_name(hosts_set, request)
+
if ((r := check_modified_response()) is not None):
return r
try:
data = request.data
data = json.loads(data)
- except:
+ except Exception:
pjson=create_problem_json(None, "The policy is corrupt or missing.", 400, None, policyId)
- return Response(json.dumps(pjson), 400, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
- fpPrevious=None
+ fp_previous=None
retcode=201
if policyId in policy_instances.keys():
retcode=200
- fpPrevious=calcFingerprint(policy_instances[policyId])
+ if (is_duplicate_check()):
+ fp_previous=calcFingerprint(policy_instances[policyId])
+ else:
+ fp_previous=policyId
+
+ if (is_duplicate_check()):
+ fp=calcFingerprint(data)
+ else:
+ fp=policyId
- fp=calcFingerprint(data)
if (fp in policy_fingerprint.keys()):
- id=policy_fingerprint[fp]
- if (id != policyId):
+ p_id=policy_fingerprint[fp]
+ if (p_id != policyId):
pjson=create_problem_json(None, "The policy json already exists.", 400, None, policyId)
- return Response(json.dumps(pjson), 400, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
- if (fpPrevious is not None):
- del policy_fingerprint[fpPrevious]
+ if (fp_previous is not None):
+ del policy_fingerprint[fp_previous]
policy_fingerprint[fp]=policyId
policy_status[policyId]=ps
if (retcode == 200):
- return Response(json.dumps(data), 200, mimetype='application/json')
+ return Response(json.dumps(data), 200, mimetype=APPL_JSON)
else:
headers={}
headers['Location']='/A1-P/v1/policies/' + policyId
- return Response(json.dumps(data), 201, headers=headers, mimetype='application/json')
+ return Response(json.dumps(data), 201, headers=headers, mimetype=APPL_JSON)
# API Function: Get a policy
def get_policy(policyId):
+ extract_host_name(hosts_set, request)
+
if ((r := check_modified_response()) is not None):
return r
if policyId in policy_instances.keys():
- return Response(json.dumps(policy_instances[policyId]), 200, mimetype='application/json')
+ return Response(json.dumps(policy_instances[policyId]), 200, mimetype=APPL_JSON)
pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policyId)
- return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
# API Function: Delete a policy
def delete_policy(policyId):
+ extract_host_name(hosts_set, request)
+
if ((r := check_modified_response()) is not None):
return r
if policyId in policy_instances.keys():
- fpPrevious=calcFingerprint(policy_instances[policyId])
- policy_fingerprint.pop(fpPrevious)
+ if (is_duplicate_check()):
+ fp_previous=calcFingerprint(policy_instances[policyId])
+ else:
+ fp_previous=policyId
+
+ policy_fingerprint.pop(fp_previous)
policy_instances.pop(policyId)
policy_status.pop(policyId)
callbacks.pop(policyId)
- return Response('', 204, mimetype='application/json')
+ return Response('', 204, mimetype=APPL_JSON)
pjson=create_problem_json(None, "The policy identity does not exist.", 404, "No policy instance has been deleted.", policyId)
- return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
# API Function: Get status for a policy
def get_policy_status(policyId):
+ extract_host_name(hosts_set, request)
+
if ((r := check_modified_response()) is not None):
return r
if policyId in policy_instances.keys():
- return Response(json.dumps(policy_status[policyId]), status=200, mimetype='application/json')
+ return Response(json.dumps(policy_status[policyId]), status=200, mimetype=APPL_JSON)
pjson=create_problem_json(None, "The policy identity does not exist.", 404, "There is no existing policy instance with the identity: " + policyId, policyId)
- return Response(json.dumps(pjson), 404, mimetype='application/problem+json')
+ return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
# Helper: Create a response object if forced http response code is set
def get_forced_response():
if (forced_settings['code'] is not None):
pjson=create_error_response(forced_settings['code'])
forced_settings['code']=None
- return Response(json.dumps(pjson), pjson['status'], mimetype='application/problem+json')
+ return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
return None
# Helper: Delay if delayed response code is set
try:
val=int(forced_settings['delay'])
time.sleep(val)
- except:
+ except Exception:
return
- return
# Helper: Check if response shall be delayed or a forced response shall be sent
def check_modified_response():