40707d62ff5935385ecc8052c6d859f53b31b425
[nonrtric.git] / test / mrstub / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from time import sleep
21 import time
22 import datetime
23 import json
24 from flask import Flask
25 from flask import Response
26 import traceback
27 from threading import RLock, Thread
28 import logging
29 import os
30 import requests
31
32 # Disable all logging of GET on reading counters
33 class AjaxFilter(logging.Filter):
34     def filter(self, record):
35         return ("/counter/" not in record.getMessage())
36
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
39
40 app = Flask(__name__)
41 lock = RLock()
42 # list of messages to/from Dmaap
43 msg_requests=[]
44 msg_responses={}
45
46 # Server info
47 HOST_IP = "::"
48 HOST_PORT = 2222
49
50 # Metrics vars
51 cntr_msg_requests_submitted=0
52 cntr_msg_requests_fetched=0
53 cntr_msg_responses_submitted=0
54 cntr_msg_responses_fetched=0
55
56 # Request and response constants
57 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
58 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
59 APP_WRITE_URL="/send-request"
60 APP_READ_URL="/receive-response"
61 MIME_TEXT="text/plain"
62 MIME_JSON="application/json"
63 CAUGHT_EXCEPTION="Caught exception: "
64 SERVER_ERROR="Server error :"
65
66 topic_write=""
67 topic_read=""
68
69 uploader_thread=None
70 downloader_thread=None
71
72 # Function to download messages from dmaap
73 def dmaap_uploader():
74     global msg_requests
75     global cntr_msg_requests_fetched
76
77     print("Starting uploader")
78
79     headers = {'Content-type': 'application/json', 'Accept': '*/*'}
80     #url="http://"+topic_host+"/events/"+topic_read
81     url=topic_read
82
83     while True:
84         while (len(msg_requests)>0):
85             msg=msg_requests[0]
86             if msg is not None:
87                 try:
88                     print("Sending to dmaap : "+ url)
89                     print("Sending to dmaap : "+ msg)
90                     resp=requests.post(url, data=msg, headers=headers, timeout=10)
91                     if (resp.status_code<199 & resp.status_code > 299):
92                         print("Failed, response code: " + str(resp.status_code))
93                         sleep(1)
94                     else:
95                         print("Dmaap response code: " + str(resp.status_code))
96                         print("Dmaap response text: " + str(resp.text))
97                         with lock:
98                             msg_requests.pop(0)
99                             cntr_msg_requests_fetched += 1
100                 except Exception as e:
101                     print("Failed, exception: "+ str(e))
102                     sleep(1)
103         sleep(0.01)
104
105
106 # Function to download messages from dmaap
107 def dmaap_downloader():
108     global msg_responses
109     global cntr_msg_responses_submitted
110
111     print("Starting uploader")
112
113     while True:
114
115         try :
116             #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
117             url=topic_write
118             headers = {'Accept': 'application/json'}
119             print("Reading from dmaap: " + url)
120             resp=requests.get(url, headers=headers)
121             if (resp.status_code<199 & resp.status_code > 299):
122                 print("Failed, response code: " + resp.status_code)
123                 sleep(1)
124             else:
125                 print("Recieved data from dmaap mr")
126                 try:
127                     data=resp.json()
128                     print("Recieved data (raw): " + str(resp.text))
129                     if isinstance(data, list):
130                         for item in data:
131                             item=json.loads(item)
132                             corrid=str(item["correlationId"])
133                             status=str(item["status"])
134                             msg=str(item["message"])
135                             item_str=msg+status[0:3]
136                             with lock:
137                                 msg_responses[corrid]=item_str
138                                 cntr_msg_responses_submitted += 1
139                     else:
140                         print("Data from dmaap is not json array: " + str(resp.text))
141                         sleep(1)
142                 except Exception as e:
143                     print("Corrupt data from dmaap mr -  dropping " + str(data))
144                     print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
145                     sleep(1)
146         except Exception as e:
147             sleep(1)
148
149 #I'm alive function
150 @app.route('/',
151     methods=['GET'])
152 def index():
153     return 'OK', 200
154
155
156 # Helper function to create a Dmaap request message
157 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
158 # response: json formatted string of a complete Dmaap message
159 def create_message(operation, correlation_id, payload, url):
160     if (payload is None):
161         payload="{}"
162     time_stamp=datetime.datetime.utcnow()
163     msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
164     msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
165     return msg
166
167
168 ### MR-stub interface, for MR control
169
170 # Send a message to MR
171 # URI and parameters (PUT or POST): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
172 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
173 @app.route(APP_WRITE_URL,
174     methods=['PUT','POST'])
175 def sendrequest():
176     global msg_requests
177     global cntr_msg_requests_submitted
178     with lock:
179         print("APP_WRITE_URL lock")
180         try:
181             oper=request.args.get('operation')
182             if (oper is None):
183                 print(APP_WRITE_URL+" parameter 'operation' missing")
184                 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
185
186             url=request.args.get('url')
187             if (url is None):
188                 print(APP_WRITE_URL+" parameter 'url' missing")
189                 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
190
191             if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
192                 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
193                 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
194
195             print(APP_WRITE_URL+" operation="+oper+" url="+url)
196             correlation_id=str(time.time_ns())
197             payload=None
198             if (oper == "PUT") and (request.json is not None):
199                 payload=json.dumps(request.json)
200
201             msg=create_message(oper, correlation_id, payload, url)
202             print(msg)
203             print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
204             msg_requests.append(msg)
205             cntr_msg_requests_submitted += 1
206             return Response(correlation_id, status=200, mimetype=MIME_TEXT)
207         except Exception as e:
208             print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
209             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
210
211 # Receive a message response for MR for the included correlation id
212 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
213 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
214 @app.route(APP_READ_URL,
215     methods=['GET'])
216 def receiveresponse():
217     global msg_responses
218     global cntr_msg_responses_fetched
219     with lock:
220         print("APP_READ_URL lock")
221         try:
222             cid=request.args.get('correlationid')
223             if (cid is None):
224                 print(APP_READ_URL+" parameter 'correclationid' missing")
225                 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
226
227             if (cid in msg_responses):
228                 answer=msg_responses[cid]
229                 del msg_responses[cid]
230                 print(APP_READ_URL+" response (correlationid="+cid+"): " + answer)
231                 cntr_msg_responses_fetched += 1
232                 return Response(answer, status=200, mimetype=MIME_JSON)
233
234             print(APP_READ_URL+" - no messages (correlationid="+cid+"): ")
235             return Response('', status=204, mimetype=MIME_JSON)
236         except Exception as e:
237             print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
238             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
239
240 ### Dmaap interface ###
241
242 # Read messages stream. URI according to agent configuration.
243 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
244 # response: 200 <json array of request messages>, or 500 for other errors
245 @app.route(AGENT_READ_URL,
246     methods=['GET'])
247 def events_read():
248     global msg_requests
249     global cntr_msg_requests_fetched
250
251     if topic_write or topic_read:
252         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
253
254     limit=request.args.get('limit')
255     if (limit is None):
256         limit=4096
257     else:
258         limit=int(limit)
259     if (limit<0):
260         limit=0
261     if (limit>4096):
262         limit=4096
263     print("Limting number of returned messages to: "+str(limit))
264
265     timeout=request.args.get('timeout')
266     if (timeout is None):
267         timeout=10000
268     else:
269         timeout=min(int(timeout),60000)
270
271     start_time=int(round(time.time() * 1000))
272     current_time=int(round(time.time() * 1000))
273
274     while(current_time<start_time+int(timeout)):
275         with lock:
276             if(len(msg_requests)>0):
277                 try:
278                     msgs=''
279                     cntr=0
280                     while(cntr<limit and len(msg_requests)>0):
281                         if (len(msgs)>1):
282                             msgs=msgs+','
283                         msgs=msgs+msg_requests.pop(0)
284                         cntr_msg_requests_fetched += 1
285                         cntr=cntr+1
286                     msgs='['+msgs+']'
287                     print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
288                     return Response(msgs, status=200, mimetype=MIME_JSON)
289                 except Exception as e:
290                     print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
291                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
292         sleep(0.025) # sleep 25 milliseconds
293         current_time=int(round(time.time() * 1000))
294
295     print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
296     return Response("[]", status=200, mimetype=MIME_JSON)
297
298 # Write messages stream. URI according to agent configuration.
299 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
300 # response: OK 200 or 400 for missing json parameters, 500 for other errors
301 @app.route(AGENT_WRITE_URL,
302     methods=['PUT','POST'])
303 def events_write():
304     global msg_responses
305     global cntr_msg_responses_submitted
306
307     if topic_write or topic_read:
308         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
309
310     with lock:
311         print("AGENT_WRITE_URL lock")
312         try:
313             answer=request.json
314             print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
315             if isinstance(answer, dict):
316                 #Create a an array if the answer is a dict (single message)
317                 answer_list=[]
318                 answer_list.append(answer)
319                 answer=answer_list
320
321             for item in answer:
322                 cid=item['correlationId']
323                 if (cid is None):
324                     print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
325                     return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
326                 msg=item['message']
327                 if (msg is None):
328                     print(AGENT_WRITE_URL+" parameter 'msgs' missing")
329                     return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
330                 status=item['status']
331                 if (status is None):
332                     print(AGENT_WRITE_URL+" parameter 'status' missing")
333                     return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
334                 if isinstance(msg, list) or isinstance(msg, dict):
335                     msg_str=json.dumps(msg)+status[0:3]
336                 else:
337                     msg_str=msg+status[0:3]
338                 msg_responses[cid]=msg_str
339                 cntr_msg_responses_submitted += 1
340                 print(AGENT_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str))
341         except Exception as e:
342             print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
343             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
344
345         return Response('{}', status=200, mimetype=MIME_JSON)
346
347
348 ### Functions for metrics read out ###
349
350 @app.route('/counter/requests_submitted',
351     methods=['GET'])
352 def requests_submitted():
353     return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
354
355 @app.route('/counter/requests_fetched',
356     methods=['GET'])
357 def requests_fetched():
358     return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
359
360 @app.route('/counter/responses_submitted',
361     methods=['GET'])
362 def responses_submitted():
363     return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
364
365 @app.route('/counter/responses_fetched',
366     methods=['GET'])
367 def responses_fetched():
368     return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
369
370 @app.route('/counter/current_requests',
371     methods=['GET'])
372 def current_requests():
373     return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
374
375 @app.route('/counter/current_responses',
376     methods=['GET'])
377 def current_responses():
378     return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
379
380 ### Admin ###
381
382 # Reset all messsages and counters
383 @app.route('/reset',
384     methods=['GET', 'POST', 'PUT'])
385 def reset():
386     global cntr_msg_requests_submitted
387     global cntr_msg_requests_fetched
388     global cntr_msg_responses_submitted
389     global cntr_msg_responses_fetched
390     global msg_requests
391     global msg_responses
392
393     cntr_msg_requests_submitted=0
394     cntr_msg_requests_fetched=0
395     cntr_msg_responses_submitted=0
396     cntr_msg_responses_fetched=0
397     msg_requests=[]
398     msg_responses={}
399     return Response('OK', status=200, mimetype=MIME_TEXT)
400
401 # Get env vars, if present
402 if os.getenv("TOPIC_READ") is not None:
403
404     print("Env variables:")
405     print("TOPIC_READ:"+os.environ['TOPIC_READ'])
406     print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE'])
407
408     topic_read=os.environ['TOPIC_READ']
409     topic_write=os.environ['TOPIC_WRITE']
410
411
412     if topic_read and downloader_thread is None:
413         downloader_thread=Thread(target=dmaap_downloader)
414         downloader_thread.start()
415
416     if topic_write and uploader_thread is None:
417         uploader_thread=Thread(target=dmaap_uploader)
418         uploader_thread.start()
419
420 else:
421     print("No env variables - OK")
422
423 if __name__ == "__main__":
424     app.run(port=HOST_PORT, host=HOST_IP)