94ef606d533115a74e304193c82b9df6de12beb2
[nonrtric.git] / test / cr / app / cr.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request, Response
20 from time import sleep
21 import time
22 from datetime import datetime
23 import json
24 import traceback
25 import logging
26 import socket
27 from threading import RLock
28 from hashlib import md5
29
30 # Disable all logging of GET on reading counters and db
31 class AjaxFilter(logging.Filter):
32     def filter(self, record):
33         return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
34
35 log = logging.getLogger('werkzeug')
36 log.addFilter(AjaxFilter())
37
38 app = Flask(__name__)
39
40 lock = RLock()
41
42 # list of callback messages
43 msg_callbacks={}
44
45 # Server info
46 HOST_IP = "::"
47 HOST_PORT = 2222
48
49 # Metrics vars
50 cntr_msg_callbacks=0
51 cntr_msg_fetched=0
52 cntr_callbacks={}
53 hosts_set=set()
54
55 # Request and response constants
56 CALLBACK_URL="/callbacks/<string:id>"
57 CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
58 CALLBACK_TEXT_URL="/callbacks-text/<string:id>" # Callback for string of text
59 APP_READ_URL="/get-event/<string:id>"
60 APP_READ_ALL_URL="/get-all-events/<string:id>"
61 DUMP_ALL_URL="/db"
62
63 MIME_TEXT="text/plain"
64 MIME_JSON="application/json"
65 CAUGHT_EXCEPTION="Caught exception: "
66 SERVER_ERROR="Server error :"
67 TIME_STAMP="cr-timestamp"
68
69 forced_settings={}
70 forced_settings['delay']=None
71
72
73 # Remote host lookup and print host name
74 def remote_host_logging(request):
75
76     if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
77         host_ip=str(request.environ['REMOTE_ADDR'])
78     else:
79         host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
80     prefix='::ffff:'
81     if (host_ip.startswith('::ffff:')):
82         host_ip=host_ip[len(prefix):]
83     try:
84         name, alias, addresslist = socket.gethostbyaddr(host_ip)
85         print("Calling host: "+str(name))
86         hosts_set.add(name)
87     except Exception:
88         print("Calling host not possible to retrieve IP: "+str(host_ip))
89         hosts_set.add(host_ip)
90
91
92 #I'm alive function
93 @app.route('/',
94     methods=['GET'])
95 def index():
96     return 'OK', 200
97
98 ### Callback interface, for control
99
100 # Fetch the oldest callback message for an id
101 # URI and parameter, (GET): /get-event/<id>
102 # response: message + 200 or just 204 or just 500(error)
103 @app.route(APP_READ_URL,
104     methods=['GET'])
105 def receiveresponse(id):
106     global msg_callbacks
107     global cntr_msg_fetched
108
109     with lock:
110         try:
111             if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
112                 cntr_msg_fetched+=1
113                 cntr_callbacks[id][1]+=1
114                 msg=msg_callbacks[id][0]
115                 print("Fetching msg for id: "+id+", msg="+str(msg))
116
117                 if (isinstance(msg,dict)):
118                     del msg[TIME_STAMP]
119                     if ("md5" in msg.keys()):
120                         print("EXTRACTED MD5")
121                         msg=msg["md5"]
122                         print("MD5: "+str(msg))
123
124                 del msg_callbacks[id][0]
125                 return json.dumps(msg),200
126             print("No messages for id: "+id)
127         except Exception as e:
128             print(CAUGHT_EXCEPTION+str(e))
129             traceback.print_exc()
130             return "",500
131
132         return "",204
133
134 # Fetch all callback message for an id in an array
135 # URI and parameter, (GET): /get-all-events/<id>
136 # response: message + 200 or just 500(error)
137 @app.route(APP_READ_ALL_URL,
138     methods=['GET'])
139 def receiveresponse_all(id):
140     global msg_callbacks
141     global cntr_msg_fetched
142
143     with lock:
144         try:
145             if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
146                 cntr_msg_fetched+=len(msg_callbacks[id])
147                 cntr_callbacks[id][1]+=len(msg_callbacks[id])
148                 msg=msg_callbacks[id]
149                 print("Fetching all msgs for id: "+id+", msg="+str(msg))
150                 for sub_msg in msg:
151                     if (isinstance(sub_msg, dict)):
152                         del sub_msg[TIME_STAMP]
153                 del msg_callbacks[id]
154                 return json.dumps(msg),200
155             print("No messages for id: "+id)
156         except Exception as e:
157             print(CAUGHT_EXCEPTION+str(e))
158             traceback.print_exc()
159             return "",500
160
161         msg=[]
162         return json.dumps(msg),200
163
164 # Receive a callback message
165 # URI and payload, (PUT or POST): /callbacks/<id> <json messages>
166 # response: OK 200 or 500 for other errors
167 @app.route(CALLBACK_URL,
168     methods=['PUT','POST'])
169 def events_write(id):
170     global msg_callbacks
171     global cntr_msg_callbacks
172
173     try:
174         print("Received callback for id: "+id +", content-type="+request.content_type)
175         remote_host_logging(request)
176         print("raw data: str(request.data): "+str(request.data))
177         do_delay()
178         try:
179             if (request.content_type == MIME_JSON):
180                 data = request.data
181                 msg = json.loads(data)
182                 print("Payload(json): "+str(msg))
183             else:
184                 msg={}
185                 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
186         except Exception as e:
187             msg={}
188             print("(Exception) Payload does not contain any json, setting empty json as payload")
189             traceback.print_exc()
190
191         with lock:
192             cntr_msg_callbacks += 1
193             if (isinstance(msg, dict)):
194                 msg[TIME_STAMP]=str(datetime.now())
195             if (id in msg_callbacks.keys()):
196                 msg_callbacks[id].append(msg)
197             else:
198                 msg_callbacks[id]=[]
199                 msg_callbacks[id].append(msg)
200
201             if (id in cntr_callbacks.keys()):
202                 cntr_callbacks[id][0] += 1
203             else:
204                 cntr_callbacks[id]=[]
205                 cntr_callbacks[id].append(1)
206                 cntr_callbacks[id].append(0)
207
208     except Exception as e:
209         print(CAUGHT_EXCEPTION+str(e))
210         traceback.print_exc()
211         return 'NOTOK',500
212
213     return 'OK',200
214
215
216 # Receive a json callback message with payload formatted according to output from the message router
217 # Array of stringified json objects
218 # URI and payload, (PUT or POST): /callbacks-mr/<id> <json messages>
219 # json is a list of string encoded json items
220 # response: OK 200 or 500 for other errors
221 @app.route(CALLBACK_MR_URL,
222     methods=['PUT','POST'])
223 def events_write_mr(id):
224     global msg_callbacks
225     global cntr_msg_callbacks
226
227     storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
228                                         #Large payloads will otherwise overload the server
229     try:
230         print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
231         print("raw data: str(request.data): "+str(request.data))
232         if (storeas is None):
233             print("raw data: str(request.data): "+str(request.data))
234         do_delay()
235         try:
236             #if (request.content_type == MIME_JSON):
237             if (MIME_JSON in request.content_type):
238                 data = request.data
239                 msg_list = json.loads(data)
240                 if (storeas is None):
241                     print("Payload(json): "+str(msg_list))
242             else:
243                 msg_list=[]
244                 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
245         except Exception as e:
246             msg_list=[]
247             print("(Exception) Payload does not contain any json, setting empty json as payload")
248             traceback.print_exc()
249
250         with lock:
251             remote_host_logging(request)
252             for msg in msg_list:
253                 if (storeas is None):
254                     msg=json.loads(msg)
255                 else:
256                     #Convert to compact json without ws between parameter and value...
257                     #It seem that ws is added somewhere along to way to this server
258                     msg=json.loads(msg)
259                     msg=json.dumps(msg, separators=(',', ':'))
260
261                     md5msg={}
262                     md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
263                     msg=md5msg
264                     print("msg (json converted to md5 hash): "+str(msg["md5"]))
265                 cntr_msg_callbacks += 1
266                 if (isinstance(msg, dict)):
267                     msg[TIME_STAMP]=str(datetime.now())
268                 if (id in msg_callbacks.keys()):
269                     msg_callbacks[id].append(msg)
270                 else:
271                     msg_callbacks[id]=[]
272                     msg_callbacks[id].append(msg)
273
274                 if (id in cntr_callbacks.keys()):
275                     cntr_callbacks[id][0] += 1
276                 else:
277                     cntr_callbacks[id]=[]
278                     cntr_callbacks[id].append(1)
279                     cntr_callbacks[id].append(0)
280
281     except Exception as e:
282         print(CAUGHT_EXCEPTION+str(e))
283         traceback.print_exc()
284         return 'NOTOK',500
285
286     return 'OK',200
287
288 # Receive a callback message of a single text message (content type ignored)
289 # or a json array of strings (content type json)
290 # URI and payload, (PUT or POST): /callbacks-text/<id> <text message>
291 # response: OK 200 or 500 for other errors
292 @app.route(CALLBACK_TEXT_URL,
293     methods=['PUT','POST'])
294 def events_write_text(id):
295     global msg_callbacks
296     global cntr_msg_callbacks
297
298     storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
299                                         #Large payloads will otherwise overload the server
300     try:
301         print("Received callback for id: "+id +", content-type="+request.content_type)
302         remote_host_logging(request)
303         if (storeas is None):
304             print("raw data: str(request.data): "+str(request.data))
305         do_delay()
306
307         try:
308             msg_list=None
309             if (MIME_JSON in request.content_type):  #Json array of strings
310                 msg_list=json.loads(request.data)
311             else:
312                 data=request.data.decode("utf-8")    #Assuming string
313                 msg_list=[]
314                 msg_list.append(data)
315
316             for msg in msg_list:
317                 if (storeas == "md5"):
318                     md5msg={}
319                     print("msg: "+str(msg))
320                     print("msg (endcode str): "+str(msg.encode('utf-8')))
321                     md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
322                     msg=md5msg
323                     print("msg (data converted to md5 hash): "+str(msg["md5"]))
324
325                 if (isinstance(msg, dict)):
326                     msg[TIME_STAMP]=str(datetime.now())
327
328                 with lock:
329                     cntr_msg_callbacks += 1
330                     if (id in msg_callbacks.keys()):
331                         msg_callbacks[id].append(msg)
332                     else:
333                         msg_callbacks[id]=[]
334                         msg_callbacks[id].append(msg)
335
336                     if (id in cntr_callbacks.keys()):
337                         cntr_callbacks[id][0] += 1
338                     else:
339                         cntr_callbacks[id]=[]
340                         cntr_callbacks[id].append(1)
341                         cntr_callbacks[id].append(0)
342         except Exception as e:
343             print(CAUGHT_EXCEPTION+str(e))
344             traceback.print_exc()
345             return 'NOTOK',500
346
347
348     except Exception as e:
349         print(CAUGHT_EXCEPTION+str(e))
350         traceback.print_exc()
351         return 'NOTOK',500
352
353     return 'OK',200
354
355 ### Functions for test ###
356
357 # Dump the whole db of current callbacks
358 # URI and parameter, (GET): /db
359 # response: message + 200
360 @app.route(DUMP_ALL_URL,
361     methods=['GET'])
362 def dump_db():
363     return json.dumps(msg_callbacks),200
364
365 ### Functions for metrics read out ###
366
367 @app.route('/counter/received_callbacks',
368     methods=['GET'])
369 def requests_submitted():
370     req_id = request.args.get('id')
371     if (req_id is None):
372         return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
373
374     if (req_id in cntr_callbacks.keys()):
375         return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT)
376     else:
377         return Response(str("0"), status=200, mimetype=MIME_TEXT)
378
379 @app.route('/counter/fetched_callbacks',
380     methods=['GET'])
381 def requests_fetched():
382     req_id = request.args.get('id')
383     if (req_id is None):
384         return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
385
386     if (req_id in cntr_callbacks.keys()):
387         return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
388     else:
389         return Response(str("0"), status=200, mimetype=MIME_TEXT)
390
391 @app.route('/counter/current_messages',
392     methods=['GET'])
393 def current_messages():
394     req_id = request.args.get('id')
395     if (req_id is None):
396         return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
397
398     if (req_id in cntr_callbacks.keys()):
399         return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
400     else:
401         return Response(str("0"), status=200, mimetype=MIME_TEXT)
402
403 @app.route('/counter/remote_hosts',
404     methods=['GET'])
405 def remote_hosts():
406     global hosts_set
407
408     hosts=",".join(hosts_set)
409     return Response(str(hosts), status=200, mimetype=MIME_TEXT)
410
411
412 #Set force delay response, in seconds, for all callbacks
413 #/froceesponse?delay=<seconds>
414 @app.route('/forcedelay', methods=['POST'])
415 def forcedelay():
416
417   try:
418     forced_settings['delay']=int(request.args.get('delay'))
419   except Exception:
420     forced_settings['delay']=None
421   return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
422
423 # Helper: Delay if delayed response code is set
424 def do_delay():
425   if (forced_settings['delay'] is not None):
426     try:
427       val=int(forced_settings['delay'])
428       if (val < 1):
429           return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
430       print("Delaying "+str(val)+ " sec.")
431       time.sleep(val)
432     except Exception:
433       return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
434 ### Admin ###
435
436 # Reset all messsages and counters
437 @app.route('/reset',
438     methods=['GET', 'POST', 'PUT'])
439 def reset():
440     global msg_callbacks
441     global cntr_msg_fetched
442     global cntr_msg_callbacks
443     global cntr_callbacks
444     global forced_settings
445
446     with lock:
447         msg_callbacks={}
448         cntr_msg_fetched=0
449         cntr_msg_callbacks=0
450         cntr_callbacks={}
451         forced_settings['delay']=None
452
453         return Response('OK', status=200, mimetype=MIME_TEXT)
454
455 ### Main function ###
456
457 if __name__ == "__main__":
458     app.run(port=HOST_PORT, host=HOST_IP)