-# Copyright (C) 2021 Wind River Systems, Inc.\r
-#\r
-# Licensed under the Apache License, Version 2.0 (the "License");\r
-# you may not use this file except in compliance with the License.\r
-# You may obtain a copy of the License at\r
-#\r
-# http://www.apache.org/licenses/LICENSE-2.0\r
-#\r
-# Unless required by applicable law or agreed to in writing, software\r
-# distributed under the License is distributed on an "AS IS" BASIS,\r
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-# See the License for the specific language governing permissions and\r
-# limitations under the License.\r
-\r
-import json\r
-import redis\r
-import http.client\r
-from flask import Flask, request\r
-from flask.helpers import url_for\r
-\r
-import mock_smo.config as config\r
-import mock_smo.logging as logging\r
-logger = logging.get_logger(__name__)\r
-\r
-apibase = config.get_o2ims_api_base()\r
-app = Flask(__name__)\r
-\r
-r = redis.Redis(**config.get_redis_host_and_port())\r
-REDIS_SUB_KEY = 'mock_smo_sub_key'\r
-REDIS_O2IMS_URL = 'mock_smo_o2ims_url'\r
-\r
-\r
-@app.route('/', methods=['GET', 'POST'])\r
-def index():\r
- if request.method == 'POST':\r
- url = request.form['url']\r
- consumerSubscriptionId = request.form['consumerSubId']\r
- sub_id = subscription_ims(url, consumerSubscriptionId)\r
- return """\r
-<h1>Subscribed O2IMS</h1>\r
-<h3>Subscription ID: %s</h3>\r
-<h3>Subscribed O2IMS URL: %s</h3>\r
-<a href="%s">\r
- <input type="button" value="Unsubscription" />\r
-</a>\r
-""" % (sub_id, url, url_for('unsubscription'))\r
- return """\r
-<h1>Subscribe O2IMS</h1>\r
-<form method="POST">\r
- <label for="url">O2 IMS URL: </label>\r
- <input type="text" id="url" name="url" value="api:80"></br></br>\r
- <label for="consumerSubId">Consumer Sub ID: </label>\r
- <input type="text" id="consumerSubId" name="consumerSubId"></br></br>\r
- <input type="submit" value="Submit">\r
-</form>\r
-"""\r
-\r
-\r
-@app.route('/unsubscription')\r
-def unsubscription():\r
- sub_key = r.get(REDIS_SUB_KEY)\r
- logger.info('Subscription key is {}'.format(sub_key))\r
- if sub_key is None:\r
- return '<h1>Already unsubscribed</h1>'\r
- url = r.get(REDIS_O2IMS_URL).decode('utf-8')\r
- logger.info('O2 IMS API is: {}'.format(url))\r
- unsubscription_ims(url, sub_key.decode('utf-8'))\r
- r.delete(REDIS_O2IMS_URL)\r
- r.delete(REDIS_SUB_KEY)\r
- return """\r
-<h1>Unsubscribed O2IMS</h1>\r
-<a href="/">\r
- <input type="button" value="Go Back" />\r
-</a>\r
-"""\r
-\r
-\r
-@app.route('/callback', methods=['POST'])\r
-def callback():\r
- logger.info('Callback data: {}'.format(request.get_data()))\r
- return '', 202\r
-\r
-\r
-def subscription_ims(url, consumerSubscriptionId):\r
- sub_key = r.get(REDIS_SUB_KEY)\r
- logger.info('Subscription key is {}'.format(sub_key))\r
- if sub_key is not None:\r
- return sub_key.decode('utf-8')\r
-\r
- logger.info(request.host_url)\r
- conn = http.client.HTTPConnection(url)\r
- headers = {'Content-type': 'application/json'}\r
- post_val = {\r
- 'callback': 'http://mock_smo:80' + url_for('callback'),\r
- 'consumerSubscriptionId': consumerSubscriptionId,\r
- 'filter': '["pserver"]' # '["pserver","pserver_mem"]'\r
- }\r
- json_val = json.dumps(post_val)\r
- conn.request('POST', apibase+'/subscriptions', json_val, headers)\r
- resp = conn.getresponse()\r
- data = resp.read().decode('utf-8')\r
- logger.info('Subscription response: {} {}, data: {}'.format(\r
- resp.status, resp.reason, data))\r
- json_data = json.loads(data)\r
-\r
- r.set(REDIS_SUB_KEY, json_data['subscriptionId'])\r
- r.set(REDIS_O2IMS_URL, url)\r
- return json_data['subscriptionId']\r
-\r
-\r
-def unsubscription_ims(url, subId):\r
- conn = http.client.HTTPConnection(url)\r
- conn.request('DELETE', apibase + '/subscriptions/' + subId)\r
- resp = conn.getresponse()\r
- logger.info('Unsubscription response: {} {}'.format(\r
- resp.status, resp.reason))\r
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import redis
+import http.client
+from flask import Flask, request
+from flask.helpers import url_for
+
+import mock_smo.config as config
+import mock_smo.logging as logging
+logger = logging.get_logger(__name__)
+
+apibase = config.get_o2ims_api_base()
+app = Flask(__name__)
+
+r = redis.Redis(**config.get_redis_host_and_port())
+REDIS_SUB_KEY = 'mock_smo_sub_key'
+REDIS_O2IMS_URL = 'mock_smo_o2ims_url'
+
+
+@app.route('/', methods=['GET', 'POST'])
+def index():
+ if request.method == 'POST':
+ url = request.form['url']
+ consumerSubscriptionId = request.form['consumerSubId']
+ sub_id = subscription_ims(url, consumerSubscriptionId)
+ return """
+<h1>Subscribed O2IMS</h1>
+<h3>Subscription ID: %s</h3>
+<h3>Subscribed O2IMS URL: %s</h3>
+<a href="%s">
+ <input type="button" value="Unsubscription" />
+</a>
+""" % (sub_id, url, url_for('unsubscription'))
+ return """
+<h1>Subscribe O2IMS</h1>
+<form method="POST">
+ <label for="url">O2 IMS URL: </label>
+ <input type="text" id="url" name="url" value="api:80"></br></br>
+ <label for="consumerSubId">Consumer Sub ID: </label>
+ <input type="text" id="consumerSubId" name="consumerSubId"></br></br>
+ <input type="submit" value="Submit">
+</form>
+"""
+
+
+@app.route('/unsubscription')
+def unsubscription():
+ sub_key = r.get(REDIS_SUB_KEY)
+ logger.info('Subscription key is {}'.format(sub_key))
+ if sub_key is None:
+ return '<h1>Already unsubscribed</h1>'
+ url = r.get(REDIS_O2IMS_URL).decode('utf-8')
+ logger.info('O2 IMS API is: {}'.format(url))
+ unsubscription_ims(url, sub_key.decode('utf-8'))
+ r.delete(REDIS_O2IMS_URL)
+ r.delete(REDIS_SUB_KEY)
+ return """
+<h1>Unsubscribed O2IMS</h1>
+<a href="/">
+ <input type="button" value="Go Back" />
+</a>
+"""
+
+
+@app.route('/callback', methods=['POST'])
+def callback():
+ logger.info('Callback data: {}'.format(request.get_data()))
+ return '', 202
+
+
+@app.route('/registration', methods=['POST'])
+def registration():
+ logger.info('Registration data: {}'.format(request.get_data()))
+ return '', 200
+
+
+def subscription_ims(url, consumerSubscriptionId):
+ sub_key = r.get(REDIS_SUB_KEY)
+ logger.info('Subscription key is {}'.format(sub_key))
+ if sub_key is not None:
+ return sub_key.decode('utf-8')
+
+ logger.info(request.host_url)
+ conn = http.client.HTTPConnection(url)
+ headers = {'Content-type': 'application/json'}
+ post_val = {
+ 'callback': 'http://mock_smo:80' + url_for('callback'),
+ 'consumerSubscriptionId': consumerSubscriptionId,
+ 'filter': '["pserver"]' # '["pserver","pserver_mem"]'
+ }
+ json_val = json.dumps(post_val)
+ conn.request('POST', apibase+'/subscriptions', json_val, headers)
+ resp = conn.getresponse()
+ data = resp.read().decode('utf-8')
+ logger.info('Subscription response: {} {}, data: {}'.format(
+ resp.status, resp.reason, data))
+ json_data = json.loads(data)
+
+ r.set(REDIS_SUB_KEY, json_data['subscriptionId'])
+ r.set(REDIS_O2IMS_URL, url)
+ return json_data['subscriptionId']
+
+
+def unsubscription_ims(url, subId):
+ conn = http.client.HTTPConnection(url)
+ conn.request('DELETE', apibase + '/subscriptions/' + subId)
+ resp = conn.getresponse()
+ logger.info('Unsubscription response: {} {}'.format(
+ resp.status, resp.reason))