+# ============LICENSE_START===============================================
+# Copyright (C) 2023 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
# coding: utf-8
from __future__ import absolute_import
from var_declaration import policy_instances, policy_types, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
from utils import calcFingerprint
from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check
+from models.enforceStatus import EnforceStatus
-#Constsants
+# Constants
APPL_JSON='application/json'
APPL_PROB_JSON='application/problem+json'
pjson=create_error_response(resp)
return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON)
- #Callout hooks for external server
- #When it fails, break and return 419 HTTP status code
+ # Callout hooks for external server
+ # When it fails, break and return HTTP status code 500
if (EXT_SRV_URL is not None):
resp = callout_external_server(policy_id, data, 'PUT')
if (resp != retcode):
policy_instances[policy_type_id][policy_id]=data
if (policy_types[policy_type_id]['statusSchema'] is not None):
- ps = {}
- ps["enforceStatus"] = ""
- ps["enforceReason"] = ""
- policy_status[policy_id] = ps
+ enforceStatus = EnforceStatus("NOT_ENFORCED", "OTHER_REASON")
+ policy_status[policy_id] = enforceStatus.to_dict()
if (retcode == 200):
return Response(json.dumps(data), 200, mimetype=APPL_JSON)
pjson=create_error_response(resp)
return Response(json.dumps(pjson), 500, mimetype=APPL_PROB_JSON)
- #Callout hooks for external server
- #When it fails, break and return 419 HTTP status code
+ # Callout hooks for external server
+ # When it fails, break and return HTTP status code 500
if (EXT_SRV_URL is not None):
resp = callout_external_server(policy_id, None, 'DELETE')
if (resp != 204):
--- /dev/null
+# ============LICENSE_START===============================================
+# Copyright (C) 2023 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# coding: utf-8
+
+from __future__ import absolute_import
+from datetime import date, datetime # noqa: F401
+
+from typing import List, Dict # noqa: F401
+
+
+class EnforceStatus():
+
+ def __init__(self, enforce_status: str=None, enforce_reason: str=None): # noqa: E501
+ """EnforceStatus
+
+ :param enforce_status: The enforce_status of this EnforceStatus. # noqa: E501
+ :type enforce_status: str
+ :param enforce_reason: The enforce_reason of this EnforceStatus. # noqa: E501
+ :type enforce_reason: str
+ """
+ self._enforce_status = enforce_status
+ self._enforce_reason = enforce_reason
+
+ @property
+ def enforce_status(self) -> str:
+ """Gets the enforce_status of this EnforceStatus.
+
+ :return: The enforce_status of this EnforceStatus.
+ :rtype: str
+ """
+ return self._enforce_status
+
+ @enforce_status.setter
+ def enforce_status(self, enforce_status: str):
+ """Sets the enforce_status of this EnforceStatus.
+
+ :param enforce_status: The enforce_status of this EnforceStatus.
+ :type enforce_status: str
+ """
+ allowed_values = ["ENFORCED", "NOT_ENFORCED"] # noqa: E501
+ if enforce_status not in allowed_values:
+ raise ValueError(
+ "Invalid value for `enforce_status` ({0}), must be one of {1}"
+ .format(enforce_status, allowed_values)
+ )
+
+ self._enforce_status = enforce_status
+
+ @property
+ def enforce_reason(self) -> str:
+ """Gets the enforce_reason of this EnforceStatus.
+
+ :return: The enforce_reason of this EnforceStatus.
+ :rtype: str
+ """
+ return self._enforce_reason
+
+ @enforce_reason.setter
+ def enforce_reason(self, enforce_reason: str):
+ """Sets the enforce_reason of this EnforceStatus.
+
+ :param enforce_reason: The enforce_reason of this EnforceStatus.
+ :type enforce_reason: str
+ """
+ allowed_values = ["SCOPE_NOT_APPLICABLE", "STATEMENT_NOT_APPLICABLE", "OTHER_REASON"] # noqa: E501
+ if enforce_reason not in allowed_values:
+ raise ValueError(
+ "Invalid value for `enforce_reason` ({0}), must be one of {1}"
+ .format(enforce_reason, allowed_values)
+ )
+
+ self._enforce_reason = enforce_reason
+
+ def to_dict(self):
+ """Returns the model properties as a dict
+
+ :rtype: dict
+ """
+ result = {
+ 'enforceStatus': self._enforce_status,
+ 'enforceReason': self._enforce_reason
+ }
+ return result
do_curl GET /A1-P/v2/policytypes/STD_1/policies 409
echo "=== API: Get policy status ==="
-RESULT="json:{\"enforceStatus\": \"\", \"enforceReason\": \"\"}"
+RESULT="json:{\"enforceStatus\": \"NOT_ENFORCED\", \"enforceReason\": \"OTHER_REASON\"}"
do_curl GET /A1-P/v2/policytypes/STD_1/policies/pi1/status 200
echo "=== API: Create policy instance pi2 of type: STD_1 ==="
fi
echo "=== API: Get policy status ==="
-RESULT="json:{\"enforceStatus\": \"\", \"enforceReason\": \"\"}"
+RESULT="json:{\"enforceStatus\": \"NOT_ENFORCED\", \"enforceReason\": \"OTHER_REASON\"}"
do_curl GET /A1-P/v2/policytypes/STD_1/policies/pi2/status 200
echo "=== Set status for policy instance pi2 ==="
-RESULT="Status set to OK for policy: pi2"
-do_curl PUT '/status?policyid=pi2&status=OK' 200
+RESULT="Status set to ENFORCED for policy: pi2"
+do_curl PUT '/status?policyid=pi2&status=ENFORCED' 200
echo "=== API: Get policy status ==="
-RESULT="json:{\"enforceStatus\": \"OK\"}"
+RESULT="json:{\"enforceStatus\": \"ENFORCED\"}"
do_curl GET /A1-P/v2/policytypes/STD_1/policies/pi2/status 200
echo "=== Set status for policy instance pi2 ==="
-RESULT="Status set to NOTOK and notok_reason for policy: pi2"
-do_curl PUT '/status?policyid=pi2&status=NOTOK&reason=notok_reason' 200
+RESULT="Status set to NOT_ENFORCED and SCOPE_NOT_APPLICABLE for policy: pi2"
+do_curl PUT '/status?policyid=pi2&status=NOT_ENFORCED&reason=SCOPE_NOT_APPLICABLE' 200
echo "=== API: Get policy status ==="
-RESULT="json:{\"enforceStatus\": \"NOTOK\", \"enforceReason\":\"notok_reason\"}"
+RESULT="json:{\"enforceStatus\": \"NOT_ENFORCED\", \"enforceReason\":\"SCOPE_NOT_APPLICABLE\"}"
do_curl GET /A1-P/v2/policytypes/STD_1/policies/pi2/status 200
echo "=== Send status for pi2==="
-RESULT="json:{\"enforceStatus\": \"NOTOK\", \"enforceReason\": \"notok_reason\"}"
+RESULT=""
do_curl POST '/sendstatus?policyid=pi2' 204
echo "=== Get counter: datadelivery ==="
# This test case tests the STD_2.0.0 version of the simulator.
import json
+import pytest
import time
import multiprocessing
from unittest_setup import SERVER_URL, HOST_IP, PORT_NUMBER, setup_env, get_testdata_dir, client
setup_env(INTERFACE_VERSION)
from compare_json import compare
+from models.enforceStatus import EnforceStatus
+
+def test_enforce_reason(client):
+ """
+ Test that we can set a valid enforce status and reason, and that we reject invalid cases.
+ """
+ enforceStatus = EnforceStatus()
+
+ enforceStatus.enforce_status = 'NOT_ENFORCED'
+ enforceStatus.enforce_reason = 'SCOPE_NOT_APPLICABLE'
+ enforce_dict = enforceStatus.to_dict()
+ assert enforce_dict['enforceStatus'] == 'NOT_ENFORCED'
+ assert enforce_dict['enforceReason'] == 'SCOPE_NOT_APPLICABLE'
+
+ enforceStatus.enforce_status = 'ENFORCED'
+ enforceStatus.enforce_reason = 'STATEMENT_NOT_APPLICABLE'
+ enforce_dict = enforceStatus.to_dict()
+ assert enforce_dict['enforceStatus'] == 'ENFORCED'
+ assert enforce_dict['enforceReason'] == 'STATEMENT_NOT_APPLICABLE'
+
+ enforceStatus.enforce_reason = 'OTHER_REASON'
+ enforce_dict = enforceStatus.to_dict()
+ assert enforce_dict['enforceReason'] == 'OTHER_REASON'
+
+ enforce_status = enforceStatus.enforce_status
+ assert str(enforce_status) == 'ENFORCED'
+
+ enforce_reason = enforceStatus.enforce_reason
+ assert str(enforce_reason) == 'OTHER_REASON'
+
+ with pytest.raises(ValueError):
+ enforceStatus.enforce_status = 'ERROR'
+
+ with pytest.raises(ValueError):
+ enforceStatus.enforce_reason = 'ERROR'
+
def test_apis(client):
assert res == True
# API: API: Get policy status
- data_response = {"enforceStatus" : "", "enforceReason" : ""}
+ data_response = {"enforceStatus" : "NOT_ENFORCED", "enforceReason" : "OTHER_REASON"}
response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi1/status')
assert response.status_code == 200
result=json.loads(response.data)
assert res == True
# API: API: Get policy status
- data_response = {"enforceStatus" : "", "enforceReason" : ""}
+ data_response = {"enforceStatus" : "NOT_ENFORCED", "enforceReason" : "OTHER_REASON"}
response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2/status')
assert response.status_code == 200
result=json.loads(response.data)
assert res == True
# Set status for policy instance pi2
- response=client.put(SERVER_URL+'status?policyid=pi2&status=OK')
+ response=client.put(SERVER_URL+'status?policyid=pi2&status=NOT_ENFORCED&reason=STATEMENT_NOT_APPLICABLE')
assert response.status_code == 200
# API: API: Get policy status
- data_response = {"enforceStatus" : "OK"}
+ data_response = {"enforceStatus" : "NOT_ENFORCED", "enforceReason" : "STATEMENT_NOT_APPLICABLE"}
response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2/status')
assert response.status_code == 200
result=json.loads(response.data)
assert res == True
# Set status for policy instance pi2
- response=client.put(SERVER_URL+'status?policyid=pi2&status=NOTOK&reason=notok_reason')
+ response=client.put(SERVER_URL+'status?policyid=pi2&status=NOT_ENFORCED&reason=OTHER_REASON')
assert response.status_code == 200
# API: API: Get policy status
- data_response = {"enforceStatus" : "NOTOK", "enforceReason" : "notok_reason"}
+ data_response = {"enforceStatus" : "NOT_ENFORCED", "enforceReason" : "OTHER_REASON"}
response=client.get(SERVER_URL+'A1-P/v2/policytypes/STD_1/policies/pi2/status')
assert response.status_code == 200
result=json.loads(response.data)