Updated test env documentation
[nonrtric.git] / test / mrstub / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from time import sleep
21 import time
22 import datetime
23 import json
24 from flask import Flask
25 from flask import Response
26 import traceback
27 from threading import RLock, Thread
28 import logging
29 import os
30 import requests
31
32 # Disable all logging of GET on reading counters
33 class AjaxFilter(logging.Filter):
34     def filter(self, record):
35         return ("/counter/" not in record.getMessage())
36
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
39
40 app = Flask(__name__)
41 lock = RLock()
42 # list of messages to/from Dmaap
43 msg_requests=[]
44 msg_responses={}
45
46 generic_messages={}
47
48 # Server info
49 HOST_IP = "::"
50 HOST_PORT = 2222
51
52 # Metrics vars
53 cntr_msg_requests_submitted=0
54 cntr_msg_requests_fetched=0
55 cntr_msg_responses_submitted=0
56 cntr_msg_responses_fetched=0
57
58 # Request and response constants
59 ORU_WRITE_URL="/events/unauthenticated.SEC_FAULT_OUTPUT"
60 ORU_READ_URL="/events/unauthenticated.SEC_FAULT_OUTPUT/users/test/"
61 A1PMS_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
62 A1PMS_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
63 APP_WRITE_URL="/send-request"
64 APP_READ_URL="/receive-response"
65 MIME_TEXT="text/plain"
66 MIME_JSON="application/json"
67 CAUGHT_EXCEPTION="Caught exception: "
68 SERVER_ERROR="Server error :"
69
70 topic_write=""
71 topic_read=""
72 generic_topics_upload_baseurl=""
73
74 uploader_thread=None
75 downloader_thread=None
76 generic_uploader_thread=None
77
78 # Function to upload A1PMS messages to dmaap
79 def dmaap_uploader():
80     global msg_requests
81     global cntr_msg_requests_fetched
82
83     print("Starting uploader")
84
85     headers = {'Content-type': 'application/json', 'Accept': '*/*'}
86     #url="http://"+topic_host+"/events/"+topic_read
87     url=topic_read
88
89     while True:
90         while (len(msg_requests)>0):
91             msg=msg_requests[0]
92             if msg is not None:
93                 try:
94                     print("Sending to dmaap : "+ url)
95                     print("Sending to dmaap : "+ msg)
96                     resp=requests.post(url, data=msg, headers=headers, timeout=10)
97                     if (resp.status_code<199 & resp.status_code > 299):
98                         print("Failed, response code: " + str(resp.status_code))
99                         sleep(1)
100                     else:
101                         print("Dmaap response code: " + str(resp.status_code))
102                         print("Dmaap response text: " + str(resp.text))
103                         with lock:
104                             msg_requests.pop(0)
105                             cntr_msg_requests_fetched += 1
106                 except Exception as e:
107                     print("Failed, exception: "+ str(e))
108                     sleep(1)
109         sleep(0.01)
110
111
112 # Function to download A1PMS messages from dmaap
113 def dmaap_downloader():
114     global msg_responses
115     global cntr_msg_responses_submitted
116
117     print("Starting uploader")
118
119     while True:
120
121         try :
122             #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
123             url=topic_write
124             headers = {'Accept': 'application/json'}
125             print("Reading from dmaap: " + url)
126             resp=requests.get(url, headers=headers)
127             if (resp.status_code<199 & resp.status_code > 299):
128                 print("Failed, response code: " + resp.status_code)
129                 sleep(1)
130             else:
131                 print("Recieved data from dmaap mr")
132                 try:
133                     data=resp.json()
134                     print("Recieved data (raw): " + str(resp.text))
135                     if isinstance(data, list):
136                         for item in data:
137                             item=json.loads(item)
138                             corrid=str(item["correlationId"])
139                             status=str(item["status"])
140                             msg=str(item["message"])
141                             item_str=msg+status[0:3]
142                             with lock:
143                                 msg_responses[corrid]=item_str
144                                 cntr_msg_responses_submitted += 1
145                     else:
146                         print("Data from dmaap is not json array: " + str(resp.text))
147                         sleep(1)
148                 except Exception as e:
149                     print("Corrupt data from dmaap mr -  dropping " + str(data))
150                     print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
151                     sleep(1)
152         except Exception as e:
153             sleep(1)
154
155 # Function to upload generic messages to dmaap
156 def dmaap_generic_uploader():
157     global msg_requests
158     global cntr_msg_requests_fetched
159
160     print("Starting generic uploader")
161
162     headers_json = {'Content-type': 'application/json', 'Accept': '*/*'}
163     headers_text = {'Content-type': 'text/plain', 'Accept': '*/*'}
164
165     while True:
166         if (len(generic_messages)):
167             keys_copy = list(generic_messages.keys())
168             for topicname in keys_copy:    #topicname contains the path of the topics, eg. "/event/<topic>"
169                 topic_queue=generic_messages[topicname]
170                 if (len(topic_queue)>0):
171                     if (topicname.endswith(".text")):
172                         msg=topic_queue[0]
173                         headers=headers_text
174                     else:
175                         msg=topic_queue[0]
176                         msg=json.dumps(msg)
177                         headers=headers_json
178                     url=generic_topics_upload_baseurl+topicname
179                     print("Sending to dmaap : "+ url)
180                     print("Sending to dmaap : "+ msg)
181                     print("Sending to dmaap : "+ str(headers))
182                     try:
183                         resp=requests.post(url, data=msg, headers=headers, timeout=10)
184                         if (resp.status_code<199 & resp.status_code > 299):
185                             print("Failed, response code: " + str(resp.status_code))
186                             sleep(1)
187                         else:
188                             print("Dmaap response code: " + str(resp.status_code))
189                             print("Dmaap response text: " + str(resp.text))
190                             with lock:
191                                 topic_queue.pop(0)
192                                 cntr_msg_requests_fetched += 1
193                     except Exception as e:
194                         print("Failed, exception: "+ str(e))
195                         sleep(1)
196         sleep(0.01)
197
198 #I'm alive function
199 @app.route('/',
200     methods=['GET'])
201 def index():
202     return 'OK', 200
203
204
205 # Helper function to create a Dmaap A1PMS request message
206 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
207 # response: json formatted string of a complete Dmaap message
208 def create_message(operation, correlation_id, payload, url):
209     if (payload is None):
210         payload="{}"
211     time_stamp=datetime.datetime.utcnow()
212     msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
213     msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
214     return msg
215
216
217 ### MR-stub interface, for MR control
218
219 # Send a A1PMS message to MR
220 # URI and parameters (PUT or POST): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
221 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
222 @app.route(APP_WRITE_URL,
223     methods=['PUT','POST'])
224 def sendrequest():
225     global msg_requests
226     global cntr_msg_requests_submitted
227     with lock:
228         print("APP_WRITE_URL lock")
229         try:
230             oper=request.args.get('operation')
231             if (oper is None):
232                 print(APP_WRITE_URL+" parameter 'operation' missing")
233                 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
234
235             url=request.args.get('url')
236             if (url is None):
237                 print(APP_WRITE_URL+" parameter 'url' missing")
238                 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
239
240             if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
241                 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
242                 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
243
244             print(APP_WRITE_URL+" operation="+oper+" url="+url)
245             correlation_id=str(time.time_ns())
246             payload=None
247             if (oper == "PUT") and len(request.data) > 0:
248                 payload=json.dumps(request.json)
249
250             msg=create_message(oper, correlation_id, payload, url)
251             print(msg)
252             print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
253             msg_requests.append(msg)
254             cntr_msg_requests_submitted += 1
255             return Response(correlation_id, status=200, mimetype=MIME_TEXT)
256         except Exception as e:
257             print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
258             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
259
260 # Receive a A1PMS message response for MR for the included correlation id
261 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
262 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
263 @app.route(APP_READ_URL,
264     methods=['GET'])
265 def receiveresponse():
266     global msg_responses
267     global cntr_msg_responses_fetched
268     with lock:
269         print("APP_READ_URL lock")
270         try:
271             cid=request.args.get('correlationid')
272             if (cid is None):
273                 print(APP_READ_URL+" parameter 'correclationid' missing")
274                 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
275
276             if (cid in msg_responses):
277                 answer=msg_responses[cid]
278                 del msg_responses[cid]
279                 print(APP_READ_URL+" response (correlationid="+cid+"): " + answer)
280                 cntr_msg_responses_fetched += 1
281                 return Response(answer, status=200, mimetype=MIME_JSON)
282
283             print(APP_READ_URL+" - no messages (correlationid="+cid+"): ")
284             return Response('', status=204, mimetype=MIME_JSON)
285         except Exception as e:
286             print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
287             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
288
289 ### Dmaap interface ###
290
291 # Read A1PMS messages stream. URI according to agent configuration.
292 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
293 # response: 200 <json array of request messages>, or 500 for other errors
294 @app.route(A1PMS_READ_URL,
295     methods=['GET'])
296 def events_read():
297     global msg_requests
298     global cntr_msg_requests_fetched
299
300     if topic_write or topic_read:
301         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
302
303     limit=request.args.get('limit')
304     if (limit is None):
305         limit=4096
306     else:
307         limit=int(limit)
308     if (limit<0):
309         limit=0
310     if (limit>4096):
311         limit=4096
312     print("Limting number of returned messages to: "+str(limit))
313
314     timeout=request.args.get('timeout')
315     if (timeout is None):
316         timeout=10000
317     else:
318         timeout=min(int(timeout),60000)
319
320     start_time=int(round(time.time() * 1000))
321     current_time=int(round(time.time() * 1000))
322
323     while(current_time<start_time+int(timeout)):
324         with lock:
325             if(len(msg_requests)>0):
326                 try:
327                     msgs=''
328                     cntr=0
329                     while(cntr<limit and len(msg_requests)>0):
330                         if (len(msgs)>1):
331                             msgs=msgs+','
332                         msgs=msgs+msg_requests.pop(0)
333                         cntr_msg_requests_fetched += 1
334                         cntr=cntr+1
335                     msgs='['+msgs+']'
336                     print(A1PMS_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
337                     return Response(msgs, status=200, mimetype=MIME_JSON)
338                 except Exception as e:
339                     print(A1PMS_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
340                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
341         sleep(0.025) # sleep 25 milliseconds
342         current_time=int(round(time.time() * 1000))
343
344     print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
345     return Response("[]", status=200, mimetype=MIME_JSON)
346
347 # Write A1PMS messages stream. URI according to agent configuration.
348 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
349 # response: OK 200 or 400 for missing json parameters, 500 for other errors
350 @app.route(A1PMS_WRITE_URL,
351     methods=['PUT','POST'])
352 def events_write():
353     global msg_responses
354     global cntr_msg_responses_submitted
355
356     if topic_write or topic_read:
357         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
358
359     with lock:
360         print("A1PMS_WRITE_URL lock")
361         try:
362             answer=request.json
363             print(A1PMS_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
364             if isinstance(answer, dict):
365                 #Create a an array if the answer is a dict (single message)
366                 answer_list=[]
367                 answer_list.append(answer)
368                 answer=answer_list
369
370             for item in answer:
371                 cid=item['correlationId']
372                 if (cid is None):
373                     print(A1PMS_WRITE_URL+" parameter 'correlatonid' missing")
374                     return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
375                 msg=item['message']
376                 if (msg is None):
377                     print(A1PMS_WRITE_URL+" parameter 'msgs' missing")
378                     return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
379                 status=item['status']
380                 if (status is None):
381                     print(A1PMS_WRITE_URL+" parameter 'status' missing")
382                     return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
383                 if isinstance(msg, list) or isinstance(msg, dict):
384                     msg_str=json.dumps(msg)+status[0:3]
385                 else:
386                     msg_str=msg+status[0:3]
387                 msg_responses[cid]=msg_str
388                 cntr_msg_responses_submitted += 1
389                 print(A1PMS_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str))
390         except Exception as e:
391             print(A1PMS_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
392             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
393
394         return Response('{}', status=200, mimetype=MIME_JSON)
395
396 @app.route(ORU_WRITE_URL,
397     methods=['PUT','POST'])
398 def oru_write():
399     global msg_requests
400     msg=json.dumps(request.json)
401     msg_requests.append(msg)
402     return Response('{}', status=200, mimetype=MIME_JSON)
403
404 @app.route(ORU_READ_URL,
405     methods=['GET'])
406 def oru_read():
407     global msg_requests
408     if(len(msg_requests)>0):
409         rsp=msg_requests.pop(0)
410         res=[]
411         res.append(rsp)
412         return Response(json.dumps(res), status=200, mimetype=MIME_JSON)
413     return Response("[]", status=200, mimetype=MIME_JSON)
414
415 # Generic POST catching all urls starting with /events/<topic>.
416 # Writes the message in a que for that topic
417 @app.route("/events/<path>",
418     methods=['POST'])
419 def generic_write(path):
420     global generic_messages
421     global cntr_msg_responses_submitted
422     urlkey="/events/"+str(path)
423     write_method=str(request.method)
424     with lock:
425         try:
426             if (urlkey.endswith(".text")):
427                 payload=str(request.data.decode('UTF-8'))
428                 print(write_method+" on "+urlkey+" text=" + payload)
429             else:
430                 payload=request.json
431                 print(write_method+" on "+urlkey+" json=" + json.dumps(payload))
432             topicmsgs=[]
433             if (urlkey in generic_messages.keys()):
434                 topicmsgs=generic_messages[urlkey]
435             else:
436                 generic_messages[urlkey]=topicmsgs
437
438             if isinstance(payload, list):
439                 for listitem in payload:
440                     topicmsgs.append(listitem)
441             else:
442                 topicmsgs.append(payload)
443
444             cntr_msg_responses_submitted += 1
445         except Exception as e:
446             print(write_method + "on "+urlkey+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
447             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
448
449         return Response('{}', status=200, mimetype=MIME_JSON)
450
451 # Generic GET catching all urls starting with /events/. Returns max 4096 json msgs in an array.
452 # Returns only the messages previously written to the same urls
453 @app.route("/events/<path:path>",
454     methods=['GET'])
455 def generic_read(path):
456     global generic_messages
457     global cntr_msg_requests_fetched
458
459     if generic_topics_upload_baseurl:
460         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
461
462     urlpath="/events/"+str(path)
463     urlkey="/events/"+str(path).split("/")[0] #Extract topic
464     print("GET on topic"+urlkey)
465     limit=request.args.get('limit')
466     if (limit is None):
467         limit=4096
468     else:
469         limit=int(limit)
470     if (limit<0):
471         limit=0
472     if (limit>4096):
473         limit=4096
474     print("Limting number of returned messages to: "+str(limit))
475
476     timeout=request.args.get('timeout')
477     if (timeout is None):
478         timeout=10000
479     else:
480         timeout=min(int(timeout),60000)
481
482     start_time=int(round(time.time() * 1000))
483     current_time=int(round(time.time() * 1000))
484     topicmsgs=[]
485     if (urlkey in generic_messages.keys()):
486         topicmsgs=generic_messages[urlkey]
487
488     while(current_time<start_time+int(timeout)):
489         with lock:
490             if(len(topicmsgs)>0):
491                 try:
492                     msgs=''
493                     cntr=0
494                     while(cntr<limit and len(topicmsgs)>0):
495                         if (len(msgs)>1):
496                             msgs=msgs+','
497                         msgs=msgs+json.dumps(json.dumps(topicmsgs.pop(0)))
498                         cntr_msg_requests_fetched += 1
499                         cntr=cntr+1
500                     msgs='['+msgs+']'
501                     print("GET on "+urlpath+" MSGs: "+msgs)
502                     return Response(msgs, status=200, mimetype=MIME_JSON)
503                 except Exception as e:
504                     print("GET on "+urlpath+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
505                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
506         sleep(0.025) # sleep 25 milliseconds
507         current_time=int(round(time.time() * 1000))
508
509     print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
510     return Response("[]", status=200, mimetype=MIME_JSON)
511
512
513 ### Functions for metrics read out ###
514
515 @app.route('/counter/requests_submitted',
516     methods=['GET'])
517 def requests_submitted():
518     return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
519
520 @app.route('/counter/requests_fetched',
521     methods=['GET'])
522 def requests_fetched():
523     return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
524
525 @app.route('/counter/responses_submitted',
526     methods=['GET'])
527 def responses_submitted():
528     return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
529
530 @app.route('/counter/responses_fetched',
531     methods=['GET'])
532 def responses_fetched():
533     return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
534
535 @app.route('/counter/current_requests',
536     methods=['GET'])
537 def current_requests():
538     return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
539
540 @app.route('/counter/current_responses',
541     methods=['GET'])
542 def current_responses():
543     return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
544
545 ### Admin ###
546
547 # Reset all messages and counters
548 @app.route('/reset',
549     methods=['GET', 'POST', 'PUT'])
550 def reset():
551     global cntr_msg_requests_submitted
552     global cntr_msg_requests_fetched
553     global cntr_msg_responses_submitted
554     global cntr_msg_responses_fetched
555     global msg_requests
556     global msg_responses
557
558     cntr_msg_requests_submitted=0
559     cntr_msg_requests_fetched=0
560     cntr_msg_responses_submitted=0
561     cntr_msg_responses_fetched=0
562     msg_requests=[]
563     msg_responses={}
564     return Response('OK', status=200, mimetype=MIME_TEXT)
565
566 # Get env vars, if present
567 if os.getenv("TOPIC_READ") is not None:
568
569     print("Env variables:")
570     print("TOPIC_READ:"+os.environ['TOPIC_READ'])
571     print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE'])
572
573     topic_read=os.environ['TOPIC_READ']
574     topic_write=os.environ['TOPIC_WRITE']
575
576
577     if topic_read and downloader_thread is None:
578         downloader_thread=Thread(target=dmaap_downloader)
579         downloader_thread.start()
580
581     if topic_write and uploader_thread is None:
582         uploader_thread=Thread(target=dmaap_uploader)
583         uploader_thread.start()
584
585 if 'GENERIC_TOPICS_UPLOAD_BASEURL' in os.environ:
586     print("GENERIC_TOPICS_UPLOAD_BASEURL:"+os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'])
587     generic_topics_upload_baseurl=os.environ['GENERIC_TOPICS_UPLOAD_BASEURL']
588     if generic_topics_upload_baseurl and generic_uploader_thread is None:
589         generic_uploader_thread=Thread(target=dmaap_generic_uploader)
590         generic_uploader_thread.start()
591
592 if os.getenv("TOPIC_READ") is None or os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] is None:
593     print("No env variables - OK")
594
595 if __name__ == "__main__":
596     app.run(port=HOST_PORT, host=HOST_IP)