NONRTRIC-955: Add uvicorn, fix for appl/json
[sim/a1-interface.git] / near-rt-ric-simulator / src / STD_1.1.3 / a1.py
1 #  ============LICENSE_START===============================================
2 #  Copyright (C) 2021 Nordix Foundation. All rights reserved.
3 #  ========================================================================
4 #  Licensed under the Apache License, Version 2.0 (the "License");
5 #  you may not use this file except in compliance with the License.
6 #  You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under the License is distributed on an "AS IS" BASIS,
12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  See the License for the specific language governing permissions and
14 #  limitations under the License.
15 #  ============LICENSE_END=================================================
16 #
17
18 import copy
19 import datetime
20 import json
21 import logging
22 import collections
23 import time
24
25 from connexion import NoContent
26 from flask import Flask, request, Response
27 from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
28 from utils import calcFingerprint
29 from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check
30
31 #Constsants
32 APPL_JSON='application/json'
33 APPL_PROB_JSON='application/problem+json'
34
35 # API Function: Get all policy ids
36 def get_all_policy_identities():
37
38   extract_host_name(hosts_set, request)
39
40   if ((r := check_modified_response()) is not None):
41     return r
42
43   res = list(policy_instances.keys())
44   return Response(json.dumps(res), 200, mimetype=APPL_JSON)
45   
46 # API Function: Create or update a policy
47 def put_policy(policyId):
48
49   extract_host_name(hosts_set, request)
50
51   if ((r := check_modified_response()) is not None):
52     return r
53
54   try:
55     data = request.data
56     data = json.loads(data)
57   except Exception:
58     pjson=create_problem_json(None, "The policy is corrupt or missing.", 400, None, policyId)
59     return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
60
61   fp_previous=None
62   retcode=201
63   if policyId in policy_instances.keys():
64     retcode=200
65     if (is_duplicate_check()):
66       fp_previous=calcFingerprint(policy_instances[policyId])
67     else:
68       fp_previous=policyId
69
70   if (is_duplicate_check()):
71     fp=calcFingerprint(data)
72   else:
73     fp=policyId
74
75   if (fp in policy_fingerprint.keys()):
76     p_id=policy_fingerprint[fp]
77     if (p_id != policyId):
78       pjson=create_problem_json(None, "The policy json already exists.", 400, None, policyId)
79       return Response(json.dumps(pjson), 400, mimetype=APPL_PROB_JSON)
80
81   if (fp_previous is not None):
82     del policy_fingerprint[fp_previous]
83
84   policy_fingerprint[fp]=policyId
85
86   noti=request.args.get('notificationDestination')
87   callbacks[policyId]=noti
88
89   policy_instances[policyId]=data
90   ps={}
91   ps["enforceStatus"] = "UNDEFINED"
92   policy_status[policyId]=ps
93
94   if (retcode == 200):
95     return Response(json.dumps(data), 200, mimetype=APPL_JSON)
96   else:
97     headers={}
98     headers['Location']='/A1-P/v1/policies/' + policyId
99     return Response(json.dumps(data), 201, headers=headers, mimetype=APPL_JSON)
100
101 # API Function: Get a policy
102 def get_policy(policyId):
103
104   extract_host_name(hosts_set, request)
105
106   if ((r := check_modified_response()) is not None):
107     return r
108
109   if policyId in policy_instances.keys():
110     return Response(json.dumps(policy_instances[policyId]), 200, mimetype=APPL_JSON)
111
112   pjson=create_problem_json(None, "The requested policy does not exist.", 404, None, policyId)
113   return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
114
115 # API Function: Delete a policy
116 def delete_policy(policyId):
117
118   extract_host_name(hosts_set, request)
119
120   if ((r := check_modified_response()) is not None):
121     return r
122
123   if policyId in policy_instances.keys():
124     if (is_duplicate_check()):
125       fp_previous=calcFingerprint(policy_instances[policyId])
126     else:
127       fp_previous=policyId
128
129     policy_fingerprint.pop(fp_previous)
130     policy_instances.pop(policyId)
131     policy_status.pop(policyId)
132     callbacks.pop(policyId)
133     return Response('', 204, mimetype=APPL_JSON)
134
135   pjson=create_problem_json(None, "The policy identity does not exist.", 404, "No policy instance has been deleted.", policyId)
136   return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
137
138 # API Function: Get status for a policy
139 def get_policy_status(policyId):
140
141   extract_host_name(hosts_set, request)
142
143   if ((r := check_modified_response()) is not None):
144     return r
145
146   if policyId in policy_instances.keys():
147     return Response(json.dumps(policy_status[policyId]), status=200, mimetype=APPL_JSON)
148
149   pjson=create_problem_json(None, "The policy identity does not exist.", 404, "There is no existing policy instance with the identity: " + policyId, policyId)
150   return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
151
152 # Helper: Create a response object if forced http response code is set
153 def get_forced_response():
154   if (forced_settings['code'] is not None):
155     pjson=create_error_response(forced_settings['code'])
156     forced_settings['code']=None
157     return Response(json.dumps(pjson), pjson['status'], mimetype=APPL_PROB_JSON)
158   return None
159
160 # Helper: Delay if delayed response code is set
161 def do_delay():
162   if (forced_settings['delay'] is not None):
163     try:
164       val=int(forced_settings['delay'])
165       time.sleep(val)
166     except Exception:
167       return
168
169 # Helper: Check if response shall be delayed or a forced response shall be sent
170 def check_modified_response():
171   do_delay()
172   return get_forced_response()
173
174 # Helper: Create a problem json object
175 def create_problem_json(type_of, title, status, detail, instance):
176
177   error = {}
178   if type_of is not None:
179     error["type"] = type_of
180   if title is not None:
181     error["title"] = title
182   if status is not None:
183     error["status"] = status
184   if detail is not None:
185     error["detail"] = detail
186   if instance is not None:
187     error["instance"] = instance
188   return error
189
190 # Helper: Create a problem json based on a generic http response code
191 def create_error_response(code):
192
193     if code == '400':
194       return(create_problem_json(None, "Bad request", 400, "Object in payload not properly formulated or not related to the method", None))
195     elif code == '404':
196       return(create_problem_json(None, "Not found", 404, "No resource found at the URI", None))
197     elif code == '405':
198       return(create_problem_json(None, "Method not allowed", 405, "Method not allowed for the URI", None))
199     elif code == '409':
200       return(create_problem_json(None, "Conflict", 409, "Request could not be processed in the current state of the resource", None))
201     elif code == '429':
202       return(create_problem_json(None, "Too many requests", 429, "Too many requests have been sent in a given amount of time", None))
203     elif code == '507':
204       return(create_problem_json(None, "Insufficient storage", 507, "The method could not be performed on the resource because the provider is unable to store the representation needed to successfully complete the request", None))
205     elif code == '503':
206       return(create_problem_json(None, "Service unavailable", 503, "The provider is currently unable to handle the request due to a temporary overload", None))
207     else:
208       return(create_problem_json(None, "Unknown", code, "Not implemented response code", None))