# ============LICENSE_START===============================================
-# Copyright (C) 2020 Nordix Foundation. All rights reserved.
+# Copyright (C) 2021 Nordix Foundation. All rights reserved.
# ========================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
from jsonschema import validate
from var_declaration import policy_instances, policy_types, policy_status, policy_fingerprint, forced_settings, hosts_set
from utils import calcFingerprint
-from maincommon import extract_host_name
+from maincommon import extract_host_name, is_duplicate_check
+from payload_logging import is_payload_logging
+#Constsants
+APPL_JSON='application/json'
+
+
+#Helper funtion to log http reponse
+def log_resp_text(msg):
+ global payload_log
+ if (is_payload_logging()):
+ print("-----Error description-----")
+ print(str(msg))
# API Function: Health check
def get_healthcheck():
if ((r := check_modified_response()) is not None):
return r
- return (list(policy_instances.keys()), 200)
+ res = list(policy_instances.keys())
+ res = list(map(int, res))
+ return (res, 200)
# API Function: Get a policy type
def get_policy_type(policy_type_id):
policy_type_id=str(policy_type_id)
if (policy_type_id not in policy_types.keys()):
+ log_resp_text("Policy type id not found")
return (None, 404)
- return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype='application/json')
+ return Response(json.dumps(policy_types[policy_type_id]), 200, mimetype=APPL_JSON)
# API Function: Delete a policy type
def delete_policy_type(policy_type_id):
policy_type_id=str(policy_type_id)
if (policy_type_id not in policy_instances.keys()):
+ log_resp_text("Policy type not found")
return (None, 404)
if (len(policy_instances[policy_type_id]) > 0):
+ log_resp_text("Policy type cannot be removed, instances exists")
return (None, 400)
del policy_instances[policy_type_id]
if ((r := check_modified_response()) is not None):
return r
+ try:
+ int(policy_type_id)
+ except Exception:
+ return Response("The policy type id is not an int", 400, mimetype='text/plain')
+
policy_type_id=str(policy_type_id)
if (policy_type_id in policy_instances.keys()):
if (len(policy_instances[policy_type_id]) > 0):
+ log_resp_text("Policy type id already exists")
return (None, 400)
try:
data = request.data
data = json.loads(data)
- except:
+ except Exception:
+ log_resp_text("Policy type validation failure")
return (None, 400)
if (('name' not in data.keys()) or ('description' not in data.keys()) or ('policy_type_id' not in data.keys()) or ('create_schema' not in data.keys())):
+ log_resp_text("Parameters missing in policy type")
return (None, 400)
if (policy_type_id not in policy_instances.keys()):
policy_type_id=str(policy_type_id)
if (policy_type_id not in policy_instances.keys()):
+ log_resp_text("Policy type id not found")
return (None, 404)
return (list(policy_instances[policy_type_id].keys()), 200)
policy_type_id=str(policy_type_id)
if (policy_type_id not in policy_instances.keys()):
+ log_resp_text("Policy type id not found")
return (None, 404)
if (policy_instance_id not in policy_instances[policy_type_id].keys()):
+ log_resp_text("Policy instance id not found")
return (None, 404)
- return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype='application/json')
+ return Response(json.dumps(policy_instances[policy_type_id][policy_instance_id]), 200, mimetype=APPL_JSON)
# API function: Delete a policy
def delete_policy_instance(policy_type_id, policy_instance_id):
policy_type_id=str(policy_type_id)
if (policy_type_id not in policy_instances.keys()):
+ log_resp_text("Policy type id not found")
return (None, 404)
if (policy_instance_id not in policy_instances[policy_type_id].keys()):
+ log_resp_text("Policy instance id not found")
return (None, 404)
- fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
- del policy_fingerprint[fpPrevious]
+ if (is_duplicate_check()):
+ fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id], policy_type_id)
+ else:
+ fp_previous=policy_instance_id
+
+ del policy_fingerprint[fp_previous]
del policy_instances[policy_type_id][policy_instance_id]
del policy_status[policy_instance_id]
policy_type_id=str(policy_type_id)
if (policy_type_id not in policy_instances.keys()):
+ log_resp_text("Policy type id not found")
return (None, 404)
try:
data = request.data
data = json.loads(data)
- except:
+ except Exception:
+ log_resp_text("Policy json error")
return (None, 400)
try:
validate(instance=data, schema=policy_types[policy_type_id]['create_schema'])
- except:
+ except Exception:
+ log_resp_text("Policy validation error")
return (None, 400)
- fpPrevious=None
- retcode=201
+ fp_previous=None
if policy_instance_id in policy_instances[policy_type_id].keys():
- retcode=200
- fpPrevious=calcFingerprint(policy_instances[policy_type_id][policy_instance_id])
+ if (is_duplicate_check()):
+ fp_previous=calcFingerprint(policy_instances[policy_type_id][policy_instance_id], policy_type_id)
+ else:
+ fp_previous=policy_instance_id
+
else:
if (policy_instance_id in policy_fingerprint.values()):
+ log_resp_text("Policy id already exist for other type")
return (None, 400)
- fp=calcFingerprint(data)
- if (fp in policy_fingerprint.keys()):
- id=policy_fingerprint[fp]
- if (id != policy_instance_id):
+ if (is_duplicate_check()):
+ fp=calcFingerprint(data, policy_type_id)
+ else:
+ fp=policy_instance_id
+
+ if ((fp in policy_fingerprint.keys()) and is_duplicate_check()):
+ p_id=policy_fingerprint[fp]
+ if (p_id != policy_instance_id):
+ log_resp_text("Policy json duplicate of other instance")
return (None, 400)
- if (fpPrevious is not None):
- del policy_fingerprint[fpPrevious]
+ if (fp_previous is not None):
+ del policy_fingerprint[fp_previous]
policy_fingerprint[fp]=policy_instance_id
policy_type_id=str(policy_type_id)
if (policy_type_id not in policy_instances.keys()):
+ log_resp_text("Policy type id not found")
return (None, 404)
if (policy_instance_id not in policy_instances[policy_type_id].keys()):
+ log_resp_text("Policy instance id not found")
return (None, 404)
- return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype='application/json')
+ return Response(json.dumps(policy_status[policy_instance_id]), 200, mimetype=APPL_JSON)
# Helper: Create a response object if forced http response code is set
def get_forced_response():
try:
val=int(forced_settings['delay'])
time.sleep(val)
- except:
+ except Exception:
return
- return
# Helper: Check if response shall be delayed or a forced response shall be sent
def check_modified_response():