1 # Copyright (C) 2021 Wind River Systems, Inc.
\r
3 # Licensed under the Apache License, Version 2.0 (the "License");
\r
4 # you may not use this file except in compliance with the License.
\r
5 # You may obtain a copy of the License at
\r
7 # http://www.apache.org/licenses/LICENSE-2.0
\r
9 # Unless required by applicable law or agreed to in writing, software
\r
10 # distributed under the License is distributed on an "AS IS" BASIS,
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 # See the License for the specific language governing permissions and
\r
13 # limitations under the License.
\r
18 from flask import Flask, request
\r
19 from flask.helpers import url_for
\r
21 import mock_smo.config as config
\r
22 import mock_smo.logging as logging
\r
23 logger = logging.get_logger(__name__)
\r
25 apibase = config.get_o2ims_api_base()
\r
26 app = Flask(__name__)
\r
28 r = redis.Redis(**config.get_redis_host_and_port())
\r
29 REDIS_SUB_KEY = 'mock_smo_sub_key'
\r
30 REDIS_O2IMS_URL = 'mock_smo_o2ims_url'
\r
33 @app.route('/', methods=['GET', 'POST'])
\r
35 if request.method == 'POST':
\r
36 url = request.form['url']
\r
37 consumerSubscriptionId = request.form['consumerSubId']
\r
38 sub_id = subscription_ims(url, consumerSubscriptionId)
\r
40 <h1>Subscribed O2IMS</h1>
\r
41 <h3>Subscription ID: %s</h3>
\r
42 <h3>Subscribed O2IMS URL: %s</h3>
\r
44 <input type="button" value="Unsubscription" />
\r
46 """ % (sub_id, url, url_for('unsubscription'))
\r
48 <h1>Subscribe O2IMS</h1>
\r
49 <form method="POST">
\r
50 <label for="url">O2 IMS URL: </label>
\r
51 <input type="text" id="url" name="url" value="api:80"></br></br>
\r
52 <label for="consumerSubId">Consumer Sub ID: </label>
\r
53 <input type="text" id="consumerSubId" name="consumerSubId"></br></br>
\r
54 <input type="submit" value="Submit">
\r
59 @app.route('/unsubscription')
\r
60 def unsubscription():
\r
61 sub_key = r.get(REDIS_SUB_KEY)
\r
62 logger.info('Subscription key is {}'.format(sub_key))
\r
64 return '<h1>Already unsubscribed</h1>'
\r
65 url = r.get(REDIS_O2IMS_URL).decode('utf-8')
\r
66 logger.info('O2 IMS API is: {}'.format(url))
\r
67 unsubscription_ims(url, sub_key.decode('utf-8'))
\r
68 r.delete(REDIS_O2IMS_URL)
\r
69 r.delete(REDIS_SUB_KEY)
\r
71 <h1>Unsubscribed O2IMS</h1>
\r
73 <input type="button" value="Go Back" />
\r
78 @app.route('/callback', methods=['POST'])
\r
80 logger.info('Callback data: {}'.format(request.get_data()))
\r
84 @app.route('/registration', methods=['POST'])
\r
86 logger.info('Registration data: {}'.format(request.get_data()))
\r
90 def subscription_ims(url, consumerSubscriptionId):
\r
91 sub_key = r.get(REDIS_SUB_KEY)
\r
92 logger.info('Subscription key is {}'.format(sub_key))
\r
93 if sub_key is not None:
\r
94 return sub_key.decode('utf-8')
\r
96 logger.info(request.host_url)
\r
97 conn = http.client.HTTPConnection(url)
\r
98 headers = {'Content-type': 'application/json'}
\r
100 'callback': 'http://mock_smo:80' + url_for('callback'),
\r
101 'consumerSubscriptionId': consumerSubscriptionId,
\r
102 'filter': '["pserver"]' # '["pserver","pserver_mem"]'
\r
104 json_val = json.dumps(post_val)
\r
105 conn.request('POST', apibase+'/subscriptions', json_val, headers)
\r
106 resp = conn.getresponse()
\r
107 data = resp.read().decode('utf-8')
\r
108 logger.info('Subscription response: {} {}, data: {}'.format(
\r
109 resp.status, resp.reason, data))
\r
110 json_data = json.loads(data)
\r
112 r.set(REDIS_SUB_KEY, json_data['subscriptionId'])
\r
113 r.set(REDIS_O2IMS_URL, url)
\r
114 return json_data['subscriptionId']
\r
117 def unsubscription_ims(url, subId):
\r
118 conn = http.client.HTTPConnection(url)
\r
119 conn.request('DELETE', apibase + '/subscriptions/' + subId)
\r
120 resp = conn.getresponse()
\r
121 logger.info('Unsubscription response: {} {}'.format(
\r
122 resp.status, resp.reason))
\r