Adaptation of test env to helm chart
[nonrtric.git] / test / mrstub / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from time import sleep
21 import time
22 import datetime
23 import json
24 from flask import Flask
25 from flask import Response
26 import traceback
27 from threading import RLock, Thread
28 import logging
29 import os
30 import requests
31
32 # Disable all logging of GET on reading counters
33 class AjaxFilter(logging.Filter):
34     def filter(self, record):
35         return ("/counter/" not in record.getMessage())
36
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
39
40 app = Flask(__name__)
41 lock = RLock()
42 # list of messages to/from Dmaap
43 msg_requests=[]
44 msg_responses={}
45
46 generic_messages={}
47
48 # Server info
49 HOST_IP = "::"
50 HOST_PORT = 2222
51
52 # Metrics vars
53 cntr_msg_requests_submitted=0
54 cntr_msg_requests_fetched=0
55 cntr_msg_responses_submitted=0
56 cntr_msg_responses_fetched=0
57
58 # Request and response constants
59 ORU_WRITE_URL="/events/unauthenticated.SEC_FAULT_OUTPUT"
60 ORU_READ_URL="/events/unauthenticated.SEC_FAULT_OUTPUT/users/test/"
61 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
62 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
63 APP_WRITE_URL="/send-request"
64 APP_READ_URL="/receive-response"
65 MIME_TEXT="text/plain"
66 MIME_JSON="application/json"
67 CAUGHT_EXCEPTION="Caught exception: "
68 SERVER_ERROR="Server error :"
69
70 topic_write=""
71 topic_read=""
72
73 uploader_thread=None
74 downloader_thread=None
75
76 # Function to download messages from dmaap
77 def dmaap_uploader():
78     global msg_requests
79     global cntr_msg_requests_fetched
80
81     print("Starting uploader")
82
83     headers = {'Content-type': 'application/json', 'Accept': '*/*'}
84     #url="http://"+topic_host+"/events/"+topic_read
85     url=topic_read
86
87     while True:
88         while (len(msg_requests)>0):
89             msg=msg_requests[0]
90             if msg is not None:
91                 try:
92                     print("Sending to dmaap : "+ url)
93                     print("Sending to dmaap : "+ msg)
94                     resp=requests.post(url, data=msg, headers=headers, timeout=10)
95                     if (resp.status_code<199 & resp.status_code > 299):
96                         print("Failed, response code: " + str(resp.status_code))
97                         sleep(1)
98                     else:
99                         print("Dmaap response code: " + str(resp.status_code))
100                         print("Dmaap response text: " + str(resp.text))
101                         with lock:
102                             msg_requests.pop(0)
103                             cntr_msg_requests_fetched += 1
104                 except Exception as e:
105                     print("Failed, exception: "+ str(e))
106                     sleep(1)
107         sleep(0.01)
108
109
110 # Function to upload messages to dmaap
111 def dmaap_downloader():
112     global msg_responses
113     global cntr_msg_responses_submitted
114
115     print("Starting uploader")
116
117     while True:
118
119         try :
120             #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
121             url=topic_write
122             headers = {'Accept': 'application/json'}
123             print("Reading from dmaap: " + url)
124             resp=requests.get(url, headers=headers)
125             if (resp.status_code<199 & resp.status_code > 299):
126                 print("Failed, response code: " + resp.status_code)
127                 sleep(1)
128             else:
129                 print("Recieved data from dmaap mr")
130                 try:
131                     data=resp.json()
132                     print("Recieved data (raw): " + str(resp.text))
133                     if isinstance(data, list):
134                         for item in data:
135                             item=json.loads(item)
136                             corrid=str(item["correlationId"])
137                             status=str(item["status"])
138                             msg=str(item["message"])
139                             item_str=msg+status[0:3]
140                             with lock:
141                                 msg_responses[corrid]=item_str
142                                 cntr_msg_responses_submitted += 1
143                     else:
144                         print("Data from dmaap is not json array: " + str(resp.text))
145                         sleep(1)
146                 except Exception as e:
147                     print("Corrupt data from dmaap mr -  dropping " + str(data))
148                     print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
149                     sleep(1)
150         except Exception as e:
151             sleep(1)
152
153 #I'm alive function
154 @app.route('/',
155     methods=['GET'])
156 def index():
157     return 'OK', 200
158
159
160 # Helper function to create a Dmaap request message
161 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
162 # response: json formatted string of a complete Dmaap message
163 def create_message(operation, correlation_id, payload, url):
164     if (payload is None):
165         payload="{}"
166     time_stamp=datetime.datetime.utcnow()
167     msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
168     msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
169     return msg
170
171
172 ### MR-stub interface, for MR control
173
174 # Send a message to MR
175 # URI and parameters (PUT or POST): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
176 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
177 @app.route(APP_WRITE_URL,
178     methods=['PUT','POST'])
179 def sendrequest():
180     global msg_requests
181     global cntr_msg_requests_submitted
182     with lock:
183         print("APP_WRITE_URL lock")
184         try:
185             oper=request.args.get('operation')
186             if (oper is None):
187                 print(APP_WRITE_URL+" parameter 'operation' missing")
188                 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
189
190             url=request.args.get('url')
191             if (url is None):
192                 print(APP_WRITE_URL+" parameter 'url' missing")
193                 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
194
195             if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
196                 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
197                 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
198
199             print(APP_WRITE_URL+" operation="+oper+" url="+url)
200             correlation_id=str(time.time_ns())
201             payload=None
202             if (oper == "PUT") and (request.json is not None):
203                 payload=json.dumps(request.json)
204
205             msg=create_message(oper, correlation_id, payload, url)
206             print(msg)
207             print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
208             msg_requests.append(msg)
209             cntr_msg_requests_submitted += 1
210             return Response(correlation_id, status=200, mimetype=MIME_TEXT)
211         except Exception as e:
212             print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
213             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
214
215 # Receive a message response for MR for the included correlation id
216 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
217 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
218 @app.route(APP_READ_URL,
219     methods=['GET'])
220 def receiveresponse():
221     global msg_responses
222     global cntr_msg_responses_fetched
223     with lock:
224         print("APP_READ_URL lock")
225         try:
226             cid=request.args.get('correlationid')
227             if (cid is None):
228                 print(APP_READ_URL+" parameter 'correclationid' missing")
229                 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
230
231             if (cid in msg_responses):
232                 answer=msg_responses[cid]
233                 del msg_responses[cid]
234                 print(APP_READ_URL+" response (correlationid="+cid+"): " + answer)
235                 cntr_msg_responses_fetched += 1
236                 return Response(answer, status=200, mimetype=MIME_JSON)
237
238             print(APP_READ_URL+" - no messages (correlationid="+cid+"): ")
239             return Response('', status=204, mimetype=MIME_JSON)
240         except Exception as e:
241             print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
242             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
243
244 ### Dmaap interface ###
245
246 # Read messages stream. URI according to agent configuration.
247 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
248 # response: 200 <json array of request messages>, or 500 for other errors
249 @app.route(AGENT_READ_URL,
250     methods=['GET'])
251 def events_read():
252     global msg_requests
253     global cntr_msg_requests_fetched
254
255     if topic_write or topic_read:
256         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
257
258     limit=request.args.get('limit')
259     if (limit is None):
260         limit=4096
261     else:
262         limit=int(limit)
263     if (limit<0):
264         limit=0
265     if (limit>4096):
266         limit=4096
267     print("Limting number of returned messages to: "+str(limit))
268
269     timeout=request.args.get('timeout')
270     if (timeout is None):
271         timeout=10000
272     else:
273         timeout=min(int(timeout),60000)
274
275     start_time=int(round(time.time() * 1000))
276     current_time=int(round(time.time() * 1000))
277
278     while(current_time<start_time+int(timeout)):
279         with lock:
280             if(len(msg_requests)>0):
281                 try:
282                     msgs=''
283                     cntr=0
284                     while(cntr<limit and len(msg_requests)>0):
285                         if (len(msgs)>1):
286                             msgs=msgs+','
287                         msgs=msgs+msg_requests.pop(0)
288                         cntr_msg_requests_fetched += 1
289                         cntr=cntr+1
290                     msgs='['+msgs+']'
291                     print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
292                     return Response(msgs, status=200, mimetype=MIME_JSON)
293                 except Exception as e:
294                     print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
295                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
296         sleep(0.025) # sleep 25 milliseconds
297         current_time=int(round(time.time() * 1000))
298
299     print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
300     return Response("[]", status=200, mimetype=MIME_JSON)
301
302 # Write messages stream. URI according to agent configuration.
303 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
304 # response: OK 200 or 400 for missing json parameters, 500 for other errors
305 @app.route(AGENT_WRITE_URL,
306     methods=['PUT','POST'])
307 def events_write():
308     global msg_responses
309     global cntr_msg_responses_submitted
310
311     if topic_write or topic_read:
312         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
313
314     with lock:
315         print("AGENT_WRITE_URL lock")
316         try:
317             answer=request.json
318             print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
319             if isinstance(answer, dict):
320                 #Create a an array if the answer is a dict (single message)
321                 answer_list=[]
322                 answer_list.append(answer)
323                 answer=answer_list
324
325             for item in answer:
326                 cid=item['correlationId']
327                 if (cid is None):
328                     print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
329                     return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
330                 msg=item['message']
331                 if (msg is None):
332                     print(AGENT_WRITE_URL+" parameter 'msgs' missing")
333                     return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
334                 status=item['status']
335                 if (status is None):
336                     print(AGENT_WRITE_URL+" parameter 'status' missing")
337                     return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
338                 if isinstance(msg, list) or isinstance(msg, dict):
339                     msg_str=json.dumps(msg)+status[0:3]
340                 else:
341                     msg_str=msg+status[0:3]
342                 msg_responses[cid]=msg_str
343                 cntr_msg_responses_submitted += 1
344                 print(AGENT_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str))
345         except Exception as e:
346             print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
347             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
348
349         return Response('{}', status=200, mimetype=MIME_JSON)
350
351 @app.route(ORU_WRITE_URL,
352     methods=['PUT','POST'])
353 def oru_write():
354     global msg_requests
355     msg=json.dumps(request.json)
356     msg_requests.append(msg)
357     return Response('{}', status=200, mimetype=MIME_JSON)
358
359 @app.route(ORU_READ_URL,
360     methods=['GET'])
361 def oru_read():
362     global msg_requests
363     if(len(msg_requests)>0):
364         rsp=msg_requests.pop(0)
365         res=[]
366         res.append(rsp)
367         return Response(json.dumps(res), status=200, mimetype=MIME_JSON)
368     return Response("[]", status=200, mimetype=MIME_JSON)
369
370 # Generic POST/PUT catching all urls starting with /events/<topic>.
371 # Writes the message in a que for that topic
372 @app.route("/events/<path>",
373     methods=['PUT','POST'])
374 def generic_write(path):
375     global generic_messages
376     global cntr_msg_responses_submitted
377     urlkey="/events/"+str(path)
378     write_method=str(request.method)
379     with lock:
380         try:
381             payload=request.json
382             print(write_method+" on "+urlkey+" json=" + json.dumps(payload))
383             topicmsgs=[]
384             if (urlkey in generic_messages.keys()):
385                 topicmsgs=generic_messages[urlkey]
386             else:
387                 generic_messages[urlkey]=topicmsgs
388
389             if isinstance(payload, list):
390                 for listitem in payload:
391                     topicmsgs.append(listitem)
392             else:
393                 topicmsgs.append(payload)
394
395             cntr_msg_responses_submitted += 1
396         except Exception as e:
397             print(write_method + "on "+urlkey+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
398             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
399
400         return Response('{}', status=200, mimetype=MIME_JSON)
401
402 # Generic GET catching all urls starting with /events/. Returns max 4096 json msgs in an array.
403 # Returns only the messages previously written to the same urls
404 @app.route("/events/<path:path>",
405     methods=['GET'])
406 def generic_read(path):
407     global generic_messages
408     global cntr_msg_requests_fetched
409
410     urlpath="/events/"+str(path)
411     urlkey="/events/"+str(path).split("/")[0] #Extract topic
412     print("GET on topic"+urlkey)
413     limit=request.args.get('limit')
414     if (limit is None):
415         limit=4096
416     else:
417         limit=int(limit)
418     if (limit<0):
419         limit=0
420     if (limit>4096):
421         limit=4096
422     print("Limting number of returned messages to: "+str(limit))
423
424     timeout=request.args.get('timeout')
425     if (timeout is None):
426         timeout=10000
427     else:
428         timeout=min(int(timeout),60000)
429
430     start_time=int(round(time.time() * 1000))
431     current_time=int(round(time.time() * 1000))
432     topicmsgs=[]
433     if (urlkey in generic_messages.keys()):
434         topicmsgs=generic_messages[urlkey]
435
436     while(current_time<start_time+int(timeout)):
437         with lock:
438             if(len(topicmsgs)>0):
439                 try:
440                     msgs=''
441                     cntr=0
442                     while(cntr<limit and len(topicmsgs)>0):
443                         if (len(msgs)>1):
444                             msgs=msgs+','
445                         msgs=msgs+json.dumps(json.dumps(topicmsgs.pop(0)))
446                         cntr_msg_requests_fetched += 1
447                         cntr=cntr+1
448                     msgs='['+msgs+']'
449                     print("GET on "+urlpath+" MSGs: "+msgs)
450                     return Response(msgs, status=200, mimetype=MIME_JSON)
451                 except Exception as e:
452                     print("GET on "+urlpath+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
453                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
454         sleep(0.025) # sleep 25 milliseconds
455         current_time=int(round(time.time() * 1000))
456
457     print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
458     return Response("[]", status=200, mimetype=MIME_JSON)
459
460
461 ### Functions for metrics read out ###
462
463 @app.route('/counter/requests_submitted',
464     methods=['GET'])
465 def requests_submitted():
466     return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
467
468 @app.route('/counter/requests_fetched',
469     methods=['GET'])
470 def requests_fetched():
471     return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
472
473 @app.route('/counter/responses_submitted',
474     methods=['GET'])
475 def responses_submitted():
476     return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
477
478 @app.route('/counter/responses_fetched',
479     methods=['GET'])
480 def responses_fetched():
481     return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
482
483 @app.route('/counter/current_requests',
484     methods=['GET'])
485 def current_requests():
486     return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
487
488 @app.route('/counter/current_responses',
489     methods=['GET'])
490 def current_responses():
491     return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
492
493 ### Admin ###
494
495 # Reset all messsages and counters
496 @app.route('/reset',
497     methods=['GET', 'POST', 'PUT'])
498 def reset():
499     global cntr_msg_requests_submitted
500     global cntr_msg_requests_fetched
501     global cntr_msg_responses_submitted
502     global cntr_msg_responses_fetched
503     global msg_requests
504     global msg_responses
505
506     cntr_msg_requests_submitted=0
507     cntr_msg_requests_fetched=0
508     cntr_msg_responses_submitted=0
509     cntr_msg_responses_fetched=0
510     msg_requests=[]
511     msg_responses={}
512     return Response('OK', status=200, mimetype=MIME_TEXT)
513
514 # Get env vars, if present
515 if os.getenv("TOPIC_READ") is not None:
516
517     print("Env variables:")
518     print("TOPIC_READ:"+os.environ['TOPIC_READ'])
519     print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE'])
520
521     topic_read=os.environ['TOPIC_READ']
522     topic_write=os.environ['TOPIC_WRITE']
523
524
525     if topic_read and downloader_thread is None:
526         downloader_thread=Thread(target=dmaap_downloader)
527         downloader_thread.start()
528
529     if topic_write and uploader_thread is None:
530         uploader_thread=Thread(target=dmaap_uploader)
531         uploader_thread.start()
532
533 else:
534     print("No env variables - OK")
535
536 if __name__ == "__main__":
537     app.run(port=HOST_PORT, host=HOST_IP)