75b23f125f9510510c3bf3cfbc160cbadb8135e7
[nonrtric.git] / test / mrstub / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from time import sleep
21 import time
22 import datetime
23 import json
24 from flask import Flask
25 from flask import Response
26 import traceback
27 from threading import RLock
28 import logging
29
30 # Disable all logging of GET on reading counters
31 class AjaxFilter(logging.Filter):
32     def filter(self, record):
33         return ("/counter/" not in record.getMessage())
34
35 log = logging.getLogger('werkzeug')
36 log.addFilter(AjaxFilter())
37
38 app = Flask(__name__)
39 lock = RLock()
40 # list of messages to/from Dmaap
41 msg_requests=[]
42 msg_responses={}
43
44 # Server info
45 HOST_IP = "::"
46 HOST_PORT = 2222
47
48 # Metrics vars
49 cntr_msg_requests_submitted=0
50 cntr_msg_requests_fetched=0
51 cntr_msg_responses_submitted=0
52 cntr_msg_responses_fetched=0
53
54 # Request and response constants
55 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
56 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
57 APP_WRITE_URL="/send-request"
58 APP_READ_URL="/receive-response"
59 MIME_TEXT="text/plain"
60 MIME_JSON="application/json"
61 CAUGHT_EXCEPTION="Caught exception: "
62 SERVER_ERROR="Server error :"
63
64 #I'm alive function
65 @app.route('/',
66     methods=['GET'])
67 def index():
68     return 'OK', 200
69
70
71 # Helper function to create a Dmaap request message
72 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
73 # response: json formatted string of a complete Dmaap message
74 def create_message(operation, correlation_id, payload, url):
75     if (payload is None):
76         payload="{}"
77     time_stamp=datetime.datetime.utcnow()
78     msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
79     msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
80     return msg
81
82
83 ### MR-stub interface, for MR control
84
85 # Send a message to MR
86 # URI and parameters (GET): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
87 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
88 @app.route(APP_WRITE_URL,
89     methods=['PUT','POST'])
90 def sendrequest():
91     global msg_requests
92     global cntr_msg_requests_submitted
93     with lock:
94         print("APP_WRITE_URL lock")
95         try:
96             oper=request.args.get('operation')
97             if (oper is None):
98                 print(APP_WRITE_URL+" parameter 'operation' missing")
99                 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
100
101             url=request.args.get('url')
102             if (url is None):
103                 print(APP_WRITE_URL+" parameter 'url' missing")
104                 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
105
106             if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
107                 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
108                 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
109
110             print(APP_WRITE_URL+" operation="+oper+" url="+url)
111             correlation_id=str(time.time_ns())
112             payload=None
113             if (oper == "PUT") and (request.json is not None):
114                 payload=json.dumps(request.json)
115
116             msg=create_message(oper, correlation_id, payload, url)
117             print(msg)
118             print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
119             msg_requests.append(msg)
120             cntr_msg_requests_submitted += 1
121             return Response(correlation_id, status=200, mimetype=MIME_TEXT)
122         except Exception as e:
123             print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
124             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
125
126 # Receive a message response for MR for the included correlation id
127 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
128 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
129 @app.route(APP_READ_URL,
130     methods=['GET'])
131 def receiveresponse():
132     global msg_responses
133     global cntr_msg_responses_fetched
134     with lock:
135         print("APP_READ_URL lock")
136         try:
137             id=request.args.get('correlationid')
138             if (id is None):
139                 print(APP_READ_URL+" parameter 'correclationid' missing")
140                 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
141
142             if (id in msg_responses):
143                 answer=msg_responses[id]
144                 del msg_responses[id]
145                 print(APP_READ_URL+" response (correlationid="+id+"): " + answer)
146                 cntr_msg_responses_fetched += 1
147                 return Response(answer, status=200, mimetype=MIME_JSON)
148
149             print(APP_READ_URL+" - no messages (correlationid="+id+"): ")
150             return Response('', status=204, mimetype=MIME_JSON)
151         except Exception as e:
152             print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
153             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
154
155 ### Dmaap interface ###
156
157 # Read messages stream. URI according to agent configuration.
158 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
159 # response: 200 <json array of request messages>, or 500 for other errors
160 @app.route(AGENT_READ_URL,
161     methods=['GET'])
162 def events_read():
163     global msg_requests
164     global cntr_msg_requests_fetched
165
166     limit=request.args.get('limit')
167     if (limit is None):
168         limit=4096
169     else:
170         limit=int(limit)
171     if (limit<0):
172         limit=0
173     if (limit>4096):
174         limit=4096
175     print("Limting number of returned messages to: "+str(limit))
176
177     timeout=request.args.get('timeout')
178     if (timeout is None):
179         timeout=10000
180     else:
181         timeout=min(int(timeout),60000)
182
183     startTime=int(round(time.time() * 1000))
184     currentTime=int(round(time.time() * 1000))
185
186     while(currentTime<startTime+int(timeout)):
187         with lock:
188             if(len(msg_requests)>0):
189                 try:
190                     msgs=''
191                     cntr=0
192                     while(cntr<limit and len(msg_requests)>0):
193                         if (len(msgs)>1):
194                             msgs=msgs+','
195                         msgs=msgs+msg_requests.pop(0)
196                         cntr_msg_requests_fetched += 1
197                         cntr=cntr+1
198                     msgs='['+msgs+']'
199                     print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
200                     return Response(msgs, status=200, mimetype=MIME_JSON)
201                 except Exception as e:
202                     print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
203                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
204         sleep(0.025) # sleep 25 milliseconds
205         currentTime=int(round(time.time() * 1000))
206
207     print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime))
208     return Response("[]", status=200, mimetype=MIME_JSON)
209
210 # Write messages stream. URI according to agent configuration.
211 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
212 # response: OK 200 or 400 for missing json parameters, 500 for other errors
213 @app.route(AGENT_WRITE_URL,
214     methods=['PUT','POST'])
215 def events_write():
216     global msg_responses
217     global cntr_msg_responses_submitted
218     with lock:
219         print("AGENT_WRITE_URL lock")
220         try:
221             answer=request.json
222             print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
223             if isinstance(answer, dict):
224                 #Create a an array if the answer is a dict (single message)
225                 answer_list=[]
226                 answer_list.append(answer)
227                 answer=answer_list
228
229             for item in answer:
230                 id=item['correlationId']
231                 if (id is None):
232                     print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
233                     return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
234                 msg=item['message']
235                 if (msg is None):
236                     print(AGENT_WRITE_URL+" parameter 'msgs' missing")
237                     return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
238                 status=item['status']
239                 if (status is None):
240                     print(AGENT_WRITE_URL+" parameter 'status' missing")
241                     return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
242                 if isinstance(msg, list) or isinstance(msg, dict):
243                     msg_str=json.dumps(msg)+status[0:3]
244                 else:
245                     msg_str=msg+status[0:3]
246                 msg_responses[id]=msg_str
247                 cntr_msg_responses_submitted += 1
248                 print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str))
249         except Exception as e:
250             print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
251             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
252
253         return Response('{}', status=200, mimetype=MIME_JSON)
254
255
256 ### Functions for metrics read out ###
257
258 @app.route('/counter/requests_submitted',
259     methods=['GET'])
260 def requests_submitted():
261     return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
262
263 @app.route('/counter/requests_fetched',
264     methods=['GET'])
265 def requests_fetched():
266     return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
267
268 @app.route('/counter/responses_submitted',
269     methods=['GET'])
270 def responses_submitted():
271     return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
272
273 @app.route('/counter/responses_fetched',
274     methods=['GET'])
275 def responses_fetched():
276     return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
277
278 @app.route('/counter/current_requests',
279     methods=['GET'])
280 def current_requests():
281     return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
282
283 @app.route('/counter/current_responses',
284     methods=['GET'])
285 def current_responses():
286     return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
287
288 ### Admin ###
289
290 # Reset all messsages and counters
291 @app.route('/reset',
292     methods=['GET', 'POST', 'PUT'])
293 def reset():
294     global cntr_msg_requests_submitted
295     global cntr_msg_requests_fetched
296     global cntr_msg_responses_submitted
297     global cntr_msg_responses_fetched
298     global msg_requests
299     global msg_responses
300
301     cntr_msg_requests_submitted=0
302     cntr_msg_requests_fetched=0
303     cntr_msg_responses_submitted=0
304     cntr_msg_responses_fetched=0
305     msg_requests=[]
306     msg_responses={}
307     return Response('OK', status=200, mimetype=MIME_TEXT)
308
309 if __name__ == "__main__":
310     app.run(port=HOST_PORT, host=HOST_IP)