Updated test env documentation
[nonrtric.git] / test / cr / app / cr.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request, Response
20 from time import sleep
21 import time
22 from datetime import datetime
23 import json
24 import traceback
25 import logging
26 import socket
27 from threading import RLock
28 from hashlib import md5
29
30 # Disable all logging of GET on reading counters and db
31 class AjaxFilter(logging.Filter):
32     def filter(self, record):
33         return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
34
35 log = logging.getLogger('werkzeug')
36 log.addFilter(AjaxFilter())
37
38 app = Flask(__name__)
39
40 lock = RLock()
41
42 # list of callback messages
43 msg_callbacks={}
44
45 # Server info
46 HOST_IP = "::"
47 HOST_PORT = 2222
48
49 # Metrics vars
50 cntr_msg_callbacks=0
51 cntr_batch_callbacks=0
52 cntr_msg_fetched=0
53 cntr_callbacks={}
54 hosts_set=set()
55
56 # Request and response constants
57 CALLBACK_URL="/callbacks/<string:id>"
58 CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
59 CALLBACK_TEXT_URL="/callbacks-text/<string:id>" # Callback for string of text
60 APP_READ_URL="/get-event/<string:id>"
61 APP_READ_ALL_URL="/get-all-events/<string:id>"
62 DUMP_ALL_URL="/db"
63 NULL_URL="/callbacks-null"  # Url for ignored callback. Callbacks are not checked, counted or stored
64
65 MIME_TEXT="text/plain"
66 MIME_JSON="application/json"
67 CAUGHT_EXCEPTION="Caught exception: "
68 SERVER_ERROR="Server error :"
69 TIME_STAMP="cr-timestamp"
70
71 forced_settings={}
72 forced_settings['delay']=None
73
74
75 # Remote host lookup and print host name
76 def remote_host_logging(request):
77
78     if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
79         host_ip=str(request.environ['REMOTE_ADDR'])
80     else:
81         host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
82     prefix='::ffff:'
83     if (host_ip.startswith('::ffff:')):
84         host_ip=host_ip[len(prefix):]
85     try:
86         name, alias, addresslist = socket.gethostbyaddr(host_ip)
87         print("Calling host: "+str(name))
88         hosts_set.add(name)
89     except Exception:
90         print("Calling host not possible to retrieve IP: "+str(host_ip))
91         hosts_set.add(host_ip)
92
93
94 #I'm alive function
95 @app.route('/',
96     methods=['GET'])
97 def index():
98     return 'OK', 200
99
100 ### Callback interface, for control
101
102 # Fetch the oldest callback message for an id
103 # URI and parameter, (GET): /get-event/<id>
104 # response: message + 200 or just 204 or just 500(error)
105 @app.route(APP_READ_URL,
106     methods=['GET'])
107 def receiveresponse(id):
108     global msg_callbacks
109     global cntr_msg_fetched
110
111     with lock:
112         try:
113             if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
114                 cntr_msg_fetched+=1
115                 cntr_callbacks[id][1]+=1
116                 msg=msg_callbacks[id][0]
117                 print("Fetching msg for id: "+id+", msg="+str(msg))
118
119                 if (isinstance(msg,dict)):
120                     del msg[TIME_STAMP]
121                     if ("md5" in msg.keys()):
122                         print("EXTRACTED MD5")
123                         msg=msg["md5"]
124                         print("MD5: "+str(msg))
125
126                 del msg_callbacks[id][0]
127                 return json.dumps(msg),200
128             print("No messages for id: "+id)
129         except Exception as e:
130             print(CAUGHT_EXCEPTION+str(e))
131             traceback.print_exc()
132             return "",500
133
134         return "",204
135
136 # Fetch all callback message for an id in an array
137 # URI and parameter, (GET): /get-all-events/<id>
138 # response: message + 200 or just 500(error)
139 @app.route(APP_READ_ALL_URL,
140     methods=['GET'])
141 def receiveresponse_all(id):
142     global msg_callbacks
143     global cntr_msg_fetched
144
145     with lock:
146         try:
147             if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
148                 cntr_msg_fetched+=len(msg_callbacks[id])
149                 cntr_callbacks[id][1]+=len(msg_callbacks[id])
150                 msg=msg_callbacks[id]
151                 print("Fetching all msgs for id: "+id+", msg="+str(msg))
152                 for sub_msg in msg:
153                     if (isinstance(sub_msg, dict)):
154                         del sub_msg[TIME_STAMP]
155                 del msg_callbacks[id]
156                 return json.dumps(msg),200
157             print("No messages for id: "+id)
158         except Exception as e:
159             print(CAUGHT_EXCEPTION+str(e))
160             traceback.print_exc()
161             return "",500
162
163         msg=[]
164         return json.dumps(msg),200
165
166 # Receive a callback message
167 # URI and payload, (PUT or POST): /callbacks/<id> <json messages>
168 # response: OK 200 or 500 for other errors
169 @app.route(CALLBACK_URL,
170     methods=['PUT','POST'])
171 def events_write(id):
172     global msg_callbacks
173     global cntr_msg_callbacks
174
175     try:
176         print("Received callback for id: "+id +", content-type="+request.content_type)
177         remote_host_logging(request)
178         print("raw data: str(request.data): "+str(request.data))
179         do_delay()
180         try:
181             if (request.content_type == MIME_JSON):
182                 data = request.data
183                 msg = json.loads(data)
184                 print("Payload(json): "+str(msg))
185             else:
186                 msg={}
187                 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
188         except Exception as e:
189             msg={}
190             print("(Exception) Payload does not contain any json, setting empty json as payload")
191             traceback.print_exc()
192
193         with lock:
194             cntr_msg_callbacks += 1
195             if (isinstance(msg, dict)):
196                 msg[TIME_STAMP]=str(datetime.now())
197             if (id in msg_callbacks.keys()):
198                 msg_callbacks[id].append(msg)
199             else:
200                 msg_callbacks[id]=[]
201                 msg_callbacks[id].append(msg)
202
203             if (id in cntr_callbacks.keys()):
204                 cntr_callbacks[id][0] += 1
205                 cntr_callbacks[id][2] += 1
206             else:
207                 cntr_callbacks[id]=[]
208                 cntr_callbacks[id].append(1)
209                 cntr_callbacks[id].append(0)
210                 cntr_callbacks[id].append(0)
211
212     except Exception as e:
213         print(CAUGHT_EXCEPTION+str(e))
214         traceback.print_exc()
215         return 'NOTOK',500
216
217     return 'OK',200
218
219
220 # Receive a json callback message with payload formatted according to output from the message router
221 # Array of stringified json objects
222 # URI and payload, (PUT or POST): /callbacks-mr/<id> <json messages>
223 # json is a list of string encoded json items
224 # response: OK 200 or 500 for other errors
225 @app.route(CALLBACK_MR_URL,
226     methods=['PUT','POST'])
227 def events_write_mr(id):
228     global msg_callbacks
229     global cntr_msg_callbacks
230     global cntr_batch_callbacks
231
232     storeas=request.args.get('storeas') #If set, store payload as a md5 hash code and dont log the payload
233                                         #Large payloads will otherwise overload the server
234     try:
235         print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
236         print("raw data: str(request.data): "+str(request.data))
237         if (storeas is None):
238             print("raw data: str(request.data): "+str(request.data))
239         do_delay()
240         list_data=False
241         try:
242             #if (request.content_type == MIME_JSON):
243             if (MIME_JSON in request.content_type):
244                 data = request.data
245                 msg_list = json.loads(data)
246                 if (storeas is None):
247                     print("Payload(json): "+str(msg_list))
248                 list_data=True
249             else:
250                 msg_list=[]
251                 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
252         except Exception as e:
253             msg_list=[]
254             print("(Exception) Payload does not contain any json, setting empty json as payload")
255             traceback.print_exc()
256
257         with lock:
258             remote_host_logging(request)
259             if (list_data):
260                 cntr_batch_callbacks += 1
261             for msg in msg_list:
262                 if (storeas is None):
263                     msg=json.loads(msg)
264                 else:
265                     #Convert to compact json without ws between parameter and value...
266                     #It seem that ws is added somewhere along to way to this server
267                     msg=json.loads(msg)
268                     msg=json.dumps(msg, separators=(',', ':'))
269
270                     md5msg={}
271                     md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
272                     msg=md5msg
273                     print("msg (json converted to md5 hash): "+str(msg["md5"]))
274                 cntr_msg_callbacks += 1
275                 if (isinstance(msg, dict)):
276                     msg[TIME_STAMP]=str(datetime.now())
277                 if (id in msg_callbacks.keys()):
278                     msg_callbacks[id].append(msg)
279                 else:
280                     msg_callbacks[id]=[]
281                     msg_callbacks[id].append(msg)
282
283                 if (id in cntr_callbacks.keys()):
284                     cntr_callbacks[id][0] += 1
285                 else:
286                     cntr_callbacks[id]=[]
287                     cntr_callbacks[id].append(1)
288                     cntr_callbacks[id].append(0)
289                     cntr_callbacks[id].append(0)
290             if (id in msg_callbacks.keys() and list_data):
291                 cntr_callbacks[id][2] += 1
292
293     except Exception as e:
294         print(CAUGHT_EXCEPTION+str(e))
295         traceback.print_exc()
296         return 'NOTOK',500
297
298     return 'OK',200
299
300 # Receive a callback message of a single text message (content type ignored)
301 # or a json array of strings (content type json)
302 # URI and payload, (PUT or POST): /callbacks-text/<id> <text message>
303 # response: OK 200 or 500 for other errors
304 @app.route(CALLBACK_TEXT_URL,
305     methods=['PUT','POST'])
306 def events_write_text(id):
307     global msg_callbacks
308     global cntr_msg_callbacks
309     global cntr_batch_callbacks
310
311     storeas=request.args.get('storeas') #If set, store payload as a md5 hash code and dont log the payload
312                                         #Large payloads will otherwise overload the server
313     try:
314         print("Received callback for id: "+id +", content-type="+request.content_type)
315         remote_host_logging(request)
316         if (storeas is None):
317             print("raw data: str(request.data): "+str(request.data))
318         do_delay()
319
320         try:
321             msg_list=None
322             list_data=False
323             if (MIME_JSON in request.content_type):  #Json array of strings
324                 msg_list=json.loads(request.data)
325                 list_data=True
326             else:
327                 data=request.data.decode("utf-8")    #Assuming string
328                 msg_list=[]
329                 msg_list.append(data)
330             with lock:
331                 cntr_batch_callbacks += 1
332                 for msg in msg_list:
333                     if (storeas == "md5"):
334                         md5msg={}
335                         print("msg: "+str(msg))
336                         print("msg (endcode str): "+str(msg.encode('utf-8')))
337                         md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
338                         msg=md5msg
339                         print("msg (data converted to md5 hash): "+str(msg["md5"]))
340
341                     if (isinstance(msg, dict)):
342                         msg[TIME_STAMP]=str(datetime.now())
343
344                     cntr_msg_callbacks += 1
345                     if (id in msg_callbacks.keys()):
346                         msg_callbacks[id].append(msg)
347                     else:
348                         msg_callbacks[id]=[]
349                         msg_callbacks[id].append(msg)
350
351                     if (id in cntr_callbacks.keys()):
352                         cntr_callbacks[id][0] += 1
353                     else:
354                         cntr_callbacks[id]=[]
355                         cntr_callbacks[id].append(1)
356                         cntr_callbacks[id].append(0)
357                         cntr_callbacks[id].append(0)
358                 if (id in cntr_callbacks.keys() and list_data):
359                     cntr_callbacks[id][2] += 1
360         except Exception as e:
361             print(CAUGHT_EXCEPTION+str(e))
362             traceback.print_exc()
363             return 'NOTOK',500
364
365
366     except Exception as e:
367         print(CAUGHT_EXCEPTION+str(e))
368         traceback.print_exc()
369         return 'NOTOK',500
370
371     return 'OK',200
372
373 # Receive a callback message but ignore contents and return 200
374 # URI and payload, (PUT or POST): /callbacks-text/<id> <text message>
375 # response: OK 200
376 @app.route(NULL_URL,
377     methods=['PUT','POST'])
378 def null_url(id):
379     return 'OK',200
380
381 # Dump the whole db of current callbacks
382 # URI and parameter, (GET): /db
383 # response: message + 200
384 @app.route(DUMP_ALL_URL,
385     methods=['GET'])
386 def dump_db():
387     return json.dumps(msg_callbacks),200
388
389 ### Functions for metrics read out ###
390
391 @app.route('/counter/received_callbacks',
392     methods=['GET'])
393 def requests_submitted():
394     req_id = request.args.get('id')
395     if (req_id is None):
396         return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
397
398     if (req_id in cntr_callbacks.keys()):
399         return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT)
400     else:
401         return Response(str("0"), status=200, mimetype=MIME_TEXT)
402
403 @app.route('/counter/received_callback_batches',
404     methods=['GET'])
405 def batches_submitted():
406     req_id = request.args.get('id')
407     if (req_id is None):
408         return Response(str(cntr_batch_callbacks), status=200, mimetype=MIME_TEXT)
409
410     if (req_id in cntr_callbacks.keys()):
411         return Response(str(cntr_callbacks[req_id][2]), status=200, mimetype=MIME_TEXT)
412     else:
413         return Response(str("0"), status=200, mimetype=MIME_TEXT)
414
415 @app.route('/counter/fetched_callbacks',
416     methods=['GET'])
417 def requests_fetched():
418     req_id = request.args.get('id')
419     if (req_id is None):
420         return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
421
422     if (req_id in cntr_callbacks.keys()):
423         return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
424     else:
425         return Response(str("0"), status=200, mimetype=MIME_TEXT)
426
427 @app.route('/counter/current_messages',
428     methods=['GET'])
429 def current_messages():
430     req_id = request.args.get('id')
431     if (req_id is None):
432         return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
433
434     if (req_id in cntr_callbacks.keys()):
435         return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
436     else:
437         return Response(str("0"), status=200, mimetype=MIME_TEXT)
438
439 @app.route('/counter/remote_hosts',
440     methods=['GET'])
441 def remote_hosts():
442     global hosts_set
443
444     hosts=",".join(hosts_set)
445     return Response(str(hosts), status=200, mimetype=MIME_TEXT)
446
447
448 #Set force delay response, in seconds, for all callbacks
449 #/forceresponse?delay=<seconds>
450 @app.route('/forcedelay', methods=['POST'])
451 def forcedelay():
452
453   try:
454     forced_settings['delay']=int(request.args.get('delay'))
455   except Exception:
456     forced_settings['delay']=None
457   return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
458
459 # Helper: Delay if delayed response code is set
460 def do_delay():
461   if (forced_settings['delay'] is not None):
462     try:
463       val=int(forced_settings['delay'])
464       if (val < 1):
465           return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
466       print("Delaying "+str(val)+ " sec.")
467       time.sleep(val)
468     except Exception:
469       return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
470 ### Admin ###
471
472 # Reset all messages and counters
473 @app.route('/reset',
474     methods=['GET', 'POST', 'PUT'])
475 def reset():
476     global msg_callbacks
477     global cntr_msg_fetched
478     global cntr_msg_callbacks
479     global cntr_batch_callbacks
480     global cntr_callbacks
481     global forced_settings
482
483     with lock:
484         msg_callbacks={}
485         cntr_msg_fetched=0
486         cntr_msg_callbacks=0
487         cntr_batch_callbacks=0
488         cntr_callbacks={}
489         forced_settings['delay']=None
490
491         return Response('OK', status=200, mimetype=MIME_TEXT)
492
493 ### Main function ###
494
495 if __name__ == "__main__":
496     app.run(port=HOST_PORT, host=HOST_IP)