Added tests and improvements
[nonrtric.git] / test / mrstub / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from time import sleep
21 import time
22 import datetime
23 import json
24 from flask import Flask
25 from flask import Response
26 import traceback
27 from threading import RLock, Thread
28 import logging
29 import os
30 import requests
31
32 # Disable all logging of GET on reading counters
33 class AjaxFilter(logging.Filter):
34     def filter(self, record):
35         return ("/counter/" not in record.getMessage())
36
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
39
40 app = Flask(__name__)
41 lock = RLock()
42 # list of messages to/from Dmaap
43 msg_requests=[]
44 msg_responses={}
45
46 generic_messages={}
47
48 # Server info
49 HOST_IP = "::"
50 HOST_PORT = 2222
51
52 # Metrics vars
53 cntr_msg_requests_submitted=0
54 cntr_msg_requests_fetched=0
55 cntr_msg_responses_submitted=0
56 cntr_msg_responses_fetched=0
57
58 # Request and response constants
59 ORU_WRITE_URL="/events/unauthenticated.SEC_FAULT_OUTPUT"
60 ORU_READ_URL="/events/unauthenticated.SEC_FAULT_OUTPUT/users/test/"
61 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
62 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
63 APP_WRITE_URL="/send-request"
64 APP_READ_URL="/receive-response"
65 MIME_TEXT="text/plain"
66 MIME_JSON="application/json"
67 CAUGHT_EXCEPTION="Caught exception: "
68 SERVER_ERROR="Server error :"
69
70 topic_write=""
71 topic_read=""
72 generic_topics_upload_baseurl=""
73
74 uploader_thread=None
75 downloader_thread=None
76 generic_uploader_thread=None
77
78 # Function to upload PMS messages to dmaap
79 def dmaap_uploader():
80     global msg_requests
81     global cntr_msg_requests_fetched
82
83     print("Starting uploader")
84
85     headers = {'Content-type': 'application/json', 'Accept': '*/*'}
86     #url="http://"+topic_host+"/events/"+topic_read
87     url=topic_read
88
89     while True:
90         while (len(msg_requests)>0):
91             msg=msg_requests[0]
92             if msg is not None:
93                 try:
94                     print("Sending to dmaap : "+ url)
95                     print("Sending to dmaap : "+ msg)
96                     resp=requests.post(url, data=msg, headers=headers, timeout=10)
97                     if (resp.status_code<199 & resp.status_code > 299):
98                         print("Failed, response code: " + str(resp.status_code))
99                         sleep(1)
100                     else:
101                         print("Dmaap response code: " + str(resp.status_code))
102                         print("Dmaap response text: " + str(resp.text))
103                         with lock:
104                             msg_requests.pop(0)
105                             cntr_msg_requests_fetched += 1
106                 except Exception as e:
107                     print("Failed, exception: "+ str(e))
108                     sleep(1)
109         sleep(0.01)
110
111
112 # Function to download PMS messages from dmaap
113 def dmaap_downloader():
114     global msg_responses
115     global cntr_msg_responses_submitted
116
117     print("Starting uploader")
118
119     while True:
120
121         try :
122             #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
123             url=topic_write
124             headers = {'Accept': 'application/json'}
125             print("Reading from dmaap: " + url)
126             resp=requests.get(url, headers=headers)
127             if (resp.status_code<199 & resp.status_code > 299):
128                 print("Failed, response code: " + resp.status_code)
129                 sleep(1)
130             else:
131                 print("Recieved data from dmaap mr")
132                 try:
133                     data=resp.json()
134                     print("Recieved data (raw): " + str(resp.text))
135                     if isinstance(data, list):
136                         for item in data:
137                             item=json.loads(item)
138                             corrid=str(item["correlationId"])
139                             status=str(item["status"])
140                             msg=str(item["message"])
141                             item_str=msg+status[0:3]
142                             with lock:
143                                 msg_responses[corrid]=item_str
144                                 cntr_msg_responses_submitted += 1
145                     else:
146                         print("Data from dmaap is not json array: " + str(resp.text))
147                         sleep(1)
148                 except Exception as e:
149                     print("Corrupt data from dmaap mr -  dropping " + str(data))
150                     print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
151                     sleep(1)
152         except Exception as e:
153             sleep(1)
154
155 # Function to upload generic messages to dmaap
156 def dmaap_generic_uploader():
157     global msg_requests
158     global cntr_msg_requests_fetched
159
160     print("Starting generic uploader")
161
162     headers_json = {'Content-type': 'application/json', 'Accept': '*/*'}
163     headers_text = {'Content-type': 'text/plain', 'Accept': '*/*'}
164
165     while True:
166         if (len(generic_messages)):
167             for topicname in generic_messages.keys():    #topicname contains the path of the topics, eg. "/event/<topic>"
168                 topic_queue=generic_messages[topicname]
169                 if (len(topic_queue)>0):
170                     if (topicname.endswith(".text")):
171                         msg=topic_queue[0]
172                         headers=headers_text
173                     else:
174                         msg=topic_queue[0]
175                         msg=json.dumps(msg)
176                         headers=headers_json
177                     url=generic_topics_upload_baseurl+topicname
178                     print("Sending to dmaap : "+ url)
179                     print("Sending to dmaap : "+ msg)
180                     print("Sending to dmaap : "+ str(headers))
181                     try:
182                         resp=requests.post(url, data=msg, headers=headers, timeout=10)
183                         if (resp.status_code<199 & resp.status_code > 299):
184                             print("Failed, response code: " + str(resp.status_code))
185                             sleep(1)
186                         else:
187                             print("Dmaap response code: " + str(resp.status_code))
188                             print("Dmaap response text: " + str(resp.text))
189                             with lock:
190                                 topic_queue.pop(0)
191                                 cntr_msg_requests_fetched += 1
192                     except Exception as e:
193                         print("Failed, exception: "+ str(e))
194                         sleep(1)
195         sleep(0.01)
196
197 #I'm alive function
198 @app.route('/',
199     methods=['GET'])
200 def index():
201     return 'OK', 200
202
203
204 # Helper function to create a Dmaap PMS request message
205 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
206 # response: json formatted string of a complete Dmaap message
207 def create_message(operation, correlation_id, payload, url):
208     if (payload is None):
209         payload="{}"
210     time_stamp=datetime.datetime.utcnow()
211     msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
212     msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
213     return msg
214
215
216 ### MR-stub interface, for MR control
217
218 # Send a PMS message to MR
219 # URI and parameters (PUT or POST): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
220 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
221 @app.route(APP_WRITE_URL,
222     methods=['PUT','POST'])
223 def sendrequest():
224     global msg_requests
225     global cntr_msg_requests_submitted
226     with lock:
227         print("APP_WRITE_URL lock")
228         try:
229             oper=request.args.get('operation')
230             if (oper is None):
231                 print(APP_WRITE_URL+" parameter 'operation' missing")
232                 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
233
234             url=request.args.get('url')
235             if (url is None):
236                 print(APP_WRITE_URL+" parameter 'url' missing")
237                 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
238
239             if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
240                 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
241                 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
242
243             print(APP_WRITE_URL+" operation="+oper+" url="+url)
244             correlation_id=str(time.time_ns())
245             payload=None
246             if (oper == "PUT") and (request.json is not None):
247                 payload=json.dumps(request.json)
248
249             msg=create_message(oper, correlation_id, payload, url)
250             print(msg)
251             print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
252             msg_requests.append(msg)
253             cntr_msg_requests_submitted += 1
254             return Response(correlation_id, status=200, mimetype=MIME_TEXT)
255         except Exception as e:
256             print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
257             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
258
259 # Receive a PMS message response for MR for the included correlation id
260 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
261 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
262 @app.route(APP_READ_URL,
263     methods=['GET'])
264 def receiveresponse():
265     global msg_responses
266     global cntr_msg_responses_fetched
267     with lock:
268         print("APP_READ_URL lock")
269         try:
270             cid=request.args.get('correlationid')
271             if (cid is None):
272                 print(APP_READ_URL+" parameter 'correclationid' missing")
273                 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
274
275             if (cid in msg_responses):
276                 answer=msg_responses[cid]
277                 del msg_responses[cid]
278                 print(APP_READ_URL+" response (correlationid="+cid+"): " + answer)
279                 cntr_msg_responses_fetched += 1
280                 return Response(answer, status=200, mimetype=MIME_JSON)
281
282             print(APP_READ_URL+" - no messages (correlationid="+cid+"): ")
283             return Response('', status=204, mimetype=MIME_JSON)
284         except Exception as e:
285             print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
286             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
287
288 ### Dmaap interface ###
289
290 # Read PMS messages stream. URI according to agent configuration.
291 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
292 # response: 200 <json array of request messages>, or 500 for other errors
293 @app.route(AGENT_READ_URL,
294     methods=['GET'])
295 def events_read():
296     global msg_requests
297     global cntr_msg_requests_fetched
298
299     if topic_write or topic_read:
300         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
301
302     limit=request.args.get('limit')
303     if (limit is None):
304         limit=4096
305     else:
306         limit=int(limit)
307     if (limit<0):
308         limit=0
309     if (limit>4096):
310         limit=4096
311     print("Limting number of returned messages to: "+str(limit))
312
313     timeout=request.args.get('timeout')
314     if (timeout is None):
315         timeout=10000
316     else:
317         timeout=min(int(timeout),60000)
318
319     start_time=int(round(time.time() * 1000))
320     current_time=int(round(time.time() * 1000))
321
322     while(current_time<start_time+int(timeout)):
323         with lock:
324             if(len(msg_requests)>0):
325                 try:
326                     msgs=''
327                     cntr=0
328                     while(cntr<limit and len(msg_requests)>0):
329                         if (len(msgs)>1):
330                             msgs=msgs+','
331                         msgs=msgs+msg_requests.pop(0)
332                         cntr_msg_requests_fetched += 1
333                         cntr=cntr+1
334                     msgs='['+msgs+']'
335                     print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
336                     return Response(msgs, status=200, mimetype=MIME_JSON)
337                 except Exception as e:
338                     print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
339                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
340         sleep(0.025) # sleep 25 milliseconds
341         current_time=int(round(time.time() * 1000))
342
343     print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
344     return Response("[]", status=200, mimetype=MIME_JSON)
345
346 # Write PMS messages stream. URI according to agent configuration.
347 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
348 # response: OK 200 or 400 for missing json parameters, 500 for other errors
349 @app.route(AGENT_WRITE_URL,
350     methods=['PUT','POST'])
351 def events_write():
352     global msg_responses
353     global cntr_msg_responses_submitted
354
355     if topic_write or topic_read:
356         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
357
358     with lock:
359         print("AGENT_WRITE_URL lock")
360         try:
361             answer=request.json
362             print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
363             if isinstance(answer, dict):
364                 #Create a an array if the answer is a dict (single message)
365                 answer_list=[]
366                 answer_list.append(answer)
367                 answer=answer_list
368
369             for item in answer:
370                 cid=item['correlationId']
371                 if (cid is None):
372                     print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
373                     return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
374                 msg=item['message']
375                 if (msg is None):
376                     print(AGENT_WRITE_URL+" parameter 'msgs' missing")
377                     return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
378                 status=item['status']
379                 if (status is None):
380                     print(AGENT_WRITE_URL+" parameter 'status' missing")
381                     return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
382                 if isinstance(msg, list) or isinstance(msg, dict):
383                     msg_str=json.dumps(msg)+status[0:3]
384                 else:
385                     msg_str=msg+status[0:3]
386                 msg_responses[cid]=msg_str
387                 cntr_msg_responses_submitted += 1
388                 print(AGENT_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str))
389         except Exception as e:
390             print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
391             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
392
393         return Response('{}', status=200, mimetype=MIME_JSON)
394
395 @app.route(ORU_WRITE_URL,
396     methods=['PUT','POST'])
397 def oru_write():
398     global msg_requests
399     msg=json.dumps(request.json)
400     msg_requests.append(msg)
401     return Response('{}', status=200, mimetype=MIME_JSON)
402
403 @app.route(ORU_READ_URL,
404     methods=['GET'])
405 def oru_read():
406     global msg_requests
407     if(len(msg_requests)>0):
408         rsp=msg_requests.pop(0)
409         res=[]
410         res.append(rsp)
411         return Response(json.dumps(res), status=200, mimetype=MIME_JSON)
412     return Response("[]", status=200, mimetype=MIME_JSON)
413
414 # Generic POST catching all urls starting with /events/<topic>.
415 # Writes the message in a que for that topic
416 @app.route("/events/<path>",
417     methods=['POST'])
418 def generic_write(path):
419     global generic_messages
420     global cntr_msg_responses_submitted
421     urlkey="/events/"+str(path)
422     write_method=str(request.method)
423     with lock:
424         try:
425             if (urlkey.endswith(".text")):
426                 payload=str(request.data.decode('UTF-8'))
427                 print(write_method+" on "+urlkey+" text=" + payload)
428             else:
429                 payload=request.json
430                 print(write_method+" on "+urlkey+" json=" + json.dumps(payload))
431             topicmsgs=[]
432             if (urlkey in generic_messages.keys()):
433                 topicmsgs=generic_messages[urlkey]
434             else:
435                 generic_messages[urlkey]=topicmsgs
436
437             if isinstance(payload, list):
438                 for listitem in payload:
439                     topicmsgs.append(listitem)
440             else:
441                 topicmsgs.append(payload)
442
443             cntr_msg_responses_submitted += 1
444         except Exception as e:
445             print(write_method + "on "+urlkey+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
446             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
447
448         return Response('{}', status=200, mimetype=MIME_JSON)
449
450 # Generic GET catching all urls starting with /events/. Returns max 4096 json msgs in an array.
451 # Returns only the messages previously written to the same urls
452 @app.route("/events/<path:path>",
453     methods=['GET'])
454 def generic_read(path):
455     global generic_messages
456     global cntr_msg_requests_fetched
457
458     if generic_topics_upload_baseurl:
459         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
460
461     urlpath="/events/"+str(path)
462     urlkey="/events/"+str(path).split("/")[0] #Extract topic
463     print("GET on topic"+urlkey)
464     limit=request.args.get('limit')
465     if (limit is None):
466         limit=4096
467     else:
468         limit=int(limit)
469     if (limit<0):
470         limit=0
471     if (limit>4096):
472         limit=4096
473     print("Limting number of returned messages to: "+str(limit))
474
475     timeout=request.args.get('timeout')
476     if (timeout is None):
477         timeout=10000
478     else:
479         timeout=min(int(timeout),60000)
480
481     start_time=int(round(time.time() * 1000))
482     current_time=int(round(time.time() * 1000))
483     topicmsgs=[]
484     if (urlkey in generic_messages.keys()):
485         topicmsgs=generic_messages[urlkey]
486
487     while(current_time<start_time+int(timeout)):
488         with lock:
489             if(len(topicmsgs)>0):
490                 try:
491                     msgs=''
492                     cntr=0
493                     while(cntr<limit and len(topicmsgs)>0):
494                         if (len(msgs)>1):
495                             msgs=msgs+','
496                         msgs=msgs+json.dumps(json.dumps(topicmsgs.pop(0)))
497                         cntr_msg_requests_fetched += 1
498                         cntr=cntr+1
499                     msgs='['+msgs+']'
500                     print("GET on "+urlpath+" MSGs: "+msgs)
501                     return Response(msgs, status=200, mimetype=MIME_JSON)
502                 except Exception as e:
503                     print("GET on "+urlpath+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
504                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
505         sleep(0.025) # sleep 25 milliseconds
506         current_time=int(round(time.time() * 1000))
507
508     print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
509     return Response("[]", status=200, mimetype=MIME_JSON)
510
511
512 ### Functions for metrics read out ###
513
514 @app.route('/counter/requests_submitted',
515     methods=['GET'])
516 def requests_submitted():
517     return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
518
519 @app.route('/counter/requests_fetched',
520     methods=['GET'])
521 def requests_fetched():
522     return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
523
524 @app.route('/counter/responses_submitted',
525     methods=['GET'])
526 def responses_submitted():
527     return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
528
529 @app.route('/counter/responses_fetched',
530     methods=['GET'])
531 def responses_fetched():
532     return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
533
534 @app.route('/counter/current_requests',
535     methods=['GET'])
536 def current_requests():
537     return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
538
539 @app.route('/counter/current_responses',
540     methods=['GET'])
541 def current_responses():
542     return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
543
544 ### Admin ###
545
546 # Reset all messsages and counters
547 @app.route('/reset',
548     methods=['GET', 'POST', 'PUT'])
549 def reset():
550     global cntr_msg_requests_submitted
551     global cntr_msg_requests_fetched
552     global cntr_msg_responses_submitted
553     global cntr_msg_responses_fetched
554     global msg_requests
555     global msg_responses
556
557     cntr_msg_requests_submitted=0
558     cntr_msg_requests_fetched=0
559     cntr_msg_responses_submitted=0
560     cntr_msg_responses_fetched=0
561     msg_requests=[]
562     msg_responses={}
563     return Response('OK', status=200, mimetype=MIME_TEXT)
564
565 # Get env vars, if present
566 if os.getenv("TOPIC_READ") is not None:
567
568     print("Env variables:")
569     print("TOPIC_READ:"+os.environ['TOPIC_READ'])
570     print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE'])
571
572     topic_read=os.environ['TOPIC_READ']
573     topic_write=os.environ['TOPIC_WRITE']
574
575
576     if topic_read and downloader_thread is None:
577         downloader_thread=Thread(target=dmaap_downloader)
578         downloader_thread.start()
579
580     if topic_write and uploader_thread is None:
581         uploader_thread=Thread(target=dmaap_uploader)
582         uploader_thread.start()
583
584 if os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] is not None:
585     print("GENERIC_TOPICS_UPLOAD_BASEURL:"+os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'])
586     generic_topics_upload_baseurl=os.environ['GENERIC_TOPICS_UPLOAD_BASEURL']
587     if generic_topics_upload_baseurl and generic_uploader_thread is None:
588         generic_uploader_thread=Thread(target=dmaap_generic_uploader)
589         generic_uploader_thread.start()
590
591 if os.getenv("TOPIC_READ") is None or os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] is None:
592     print("No env variables - OK")
593
594 if __name__ == "__main__":
595     app.run(port=HOST_PORT, host=HOST_IP)