summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
f13ce01)
Signed-off-by: DenisGNoonan <denis.noonan@est.tech>
Change-Id: I57090883cb12f602194a43801adfc41741f67c98
RUN apk add --update --no-cache python3=3.10.13-r0 py3-pip nginx nginx-mod-http-lua
RUN apk add --update --no-cache python3=3.10.13-r0 py3-pip nginx nginx-mod-http-lua
-RUN pip3 install Flask==2.2.5 connexion[swagger-ui,flask,uvicorn]
+RUN pip3 install Flask connexion[swagger-ui,flask,uvicorn]
res = list(policy_instances.keys())
res = list(map(int, res))
res = list(policy_instances.keys())
res = list(map(int, res))
+ return Response(json.dumps(res), 200, mimetype=APPL_JSON)
# API Function: Get a policy type
def a1_controller_get_policy_type(policy_type_id):
# API Function: Get a policy type
def a1_controller_get_policy_type(policy_type_id):
if (policy_type_id not in policy_instances.keys()):
log_resp_text("Policy type id not found")
return (None, 404)
if (policy_type_id not in policy_instances.keys()):
log_resp_text("Policy type id not found")
return (None, 404)
- return (list(policy_instances[policy_type_id].keys()), 200)
+
+ res = list(policy_instances[policy_type_id].keys())
+ return Response(json.dumps(res), 200, mimetype=APPL_JSON)
# API Function: Get a policy instance
def a1_controller_get_policy_instance(policy_type_id, policy_instance_id):
# API Function: Get a policy instance
def a1_controller_get_policy_instance(policy_type_id, policy_instance_id):
import time
from connexion import NoContent
import time
from connexion import NoContent
-from flask import Flask, escape, request, Response, make_response
+from flask import Flask, request, Response
from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
from utils import calcFingerprint
from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check
from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
from utils import calcFingerprint
from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name, is_duplicate_check
if ((r := check_modified_response()) is not None):
return r
if ((r := check_modified_response()) is not None):
return r
- return (list(policy_instances.keys()), 200)
-
+ res = list(policy_instances.keys())
+ return Response(json.dumps(res), 200, mimetype=APPL_JSON)
+
# API Function: Create or update a policy
def put_policy(policyId):
# API Function: Create or update a policy
def put_policy(policyId):
-from flask import Flask, escape, request, Response
+from flask import Flask, request, Response
from jsonschema import validate
from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set, app
from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name
from jsonschema import validate
from var_declaration import policy_instances, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set, app
from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name
import requests
from connexion import NoContent
import requests
from connexion import NoContent
-from flask import Flask, escape, request, Response, make_response
+from flask import Flask, request, Response
from jsonschema import validate
from var_declaration import policy_instances, policy_types, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
from utils import calcFingerprint
from jsonschema import validate
from var_declaration import policy_instances, policy_types, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set
from utils import calcFingerprint
return r
res = list(policy_types.keys())
return r
res = list(policy_types.keys())
+ return Response(json.dumps(res), 200, mimetype=APPL_JSON)
# API Function: Get a policy type
def get_policy_type(policyTypeId):
# API Function: Get a policy type
def get_policy_type(policyTypeId):
pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_type_id)
return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
pjson=create_problem_json(None, "The policy type does not exist.", 404, None, policy_type_id)
return Response(json.dumps(pjson), 404, mimetype=APPL_PROB_JSON)
- return (list(policy_instances[policy_type_id].keys()), 200)
+ res = list(policy_instances[policy_type_id].keys())
+ return Response(json.dumps(res), 200, mimetype=APPL_JSON)
# API Function: Create or update a policy
def put_policy(policyTypeId, policyId):
# API Function: Create or update a policy
def put_policy(policyTypeId, policyId):
import json
import sys
import os
import json
import sys
import os
-from flask import Flask, escape, request, Response
+from flask import Flask, request, Response
from jsonschema import validate
from var_declaration import policy_instances, policy_types, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set, data_delivery_counter, app
from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name
from jsonschema import validate
from var_declaration import policy_instances, policy_types, policy_status, callbacks, forced_settings, policy_fingerprint, hosts_set, data_delivery_counter, app
from maincommon import check_apipath, apipath, get_supported_interfaces_response, extract_host_name
if isinstance(sys.argv[1], int):
port_number = sys.argv[1]
if isinstance(sys.argv[1], int):
port_number = sys.argv[1]
-app.add_api('ORAN_A1-p_V2.0.0_api.yaml')
-
if __name__ == '__main__':
if __name__ == '__main__':
- app.run(port=port_number, host="127.0.0.1")
\ No newline at end of file
+ # Use Uvicorn to run the combined app
+ uvicorn.run(app, host="127.0.0.1", port=port_number, log_level="info")
\ No newline at end of file
#
from maincommon import apipath
#
from maincommon import apipath
+from flask import Flask
+from connexion import FlaskApp
-#Main app
-app = connexion.App(__name__, specification_dir=apipath)
+flask_app = Flask(__name__)
+
+# Main app
+app = FlaskApp(__name__, specification_dir=apipath)
+app.add_api('ORAN_A1-p_V2.0.0_api.yaml')
+
+# Combine Connexion app with Flask app
+app.app = flask_app
policy_types={}
policy_instances = {}
policy_types={}
policy_instances = {}
import collections
import time
import collections
import time
-from flask import Flask, escape, request, Response, make_response
+from flask import Flask, request, Response
from jsonschema import validate
from var_declaration import a1_policy_instances, forced_settings
from jsonschema import validate
from var_declaration import a1_policy_instances, forced_settings
do_curl GET /a1-p/policytypes/1/policies 200
echo "=== API: Get instances for type 2, shall contain pi2 ==="
do_curl GET /a1-p/policytypes/1/policies 200
echo "=== API: Get instances for type 2, shall contain pi2 ==="
+RESULT="json:[ \"pi2\" ]"
do_curl GET /a1-p/policytypes/2/policies 200
echo "=== Get counter: instances ==="
do_curl GET /a1-p/policytypes/2/policies 200
echo "=== Get counter: instances ==="