Adaptation of test env to helm chart
[nonrtric.git] / test / cr / app / cr.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request, Response
20 from time import sleep
21 import time
22 from datetime import datetime
23 import json
24 import traceback
25 import logging
26 import socket
27 from threading import RLock
28
29 # Disable all logging of GET on reading counters and db
30 class AjaxFilter(logging.Filter):
31     def filter(self, record):
32         return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
33
34 log = logging.getLogger('werkzeug')
35 log.addFilter(AjaxFilter())
36
37 app = Flask(__name__)
38
39 lock = RLock()
40
41 # list of callback messages
42 msg_callbacks={}
43
44 # Server info
45 HOST_IP = "::"
46 HOST_PORT = 2222
47
48 # Metrics vars
49 cntr_msg_callbacks=0
50 cntr_msg_fetched=0
51 cntr_callbacks={}
52 hosts_set=set()
53
54 # Request and response constants
55 CALLBACK_URL="/callbacks/<string:id>"
56 CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
57 APP_READ_URL="/get-event/<string:id>"
58 APP_READ_ALL_URL="/get-all-events/<string:id>"
59 DUMP_ALL_URL="/db"
60
61 MIME_TEXT="text/plain"
62 MIME_JSON="application/json"
63 CAUGHT_EXCEPTION="Caught exception: "
64 SERVER_ERROR="Server error :"
65 TIME_STAMP="cr-timestamp"
66
67 forced_settings={}
68 forced_settings['delay']=None
69
70
71 # Remote host lookup and print host name
72 def remote_host_logging(request):
73
74     if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
75         host_ip=str(request.environ['REMOTE_ADDR'])
76     else:
77         host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
78     prefix='::ffff:'
79     if (host_ip.startswith('::ffff:')):
80         host_ip=host_ip[len(prefix):]
81     try:
82         name, alias, addresslist = socket.gethostbyaddr(host_ip)
83         print("Calling host: "+str(name))
84         hosts_set.add(name)
85     except Exception:
86         print("Calling host not possible to retrieve IP: "+str(host_ip))
87         hosts_set.add(host_ip)
88
89
90 #I'm alive function
91 @app.route('/',
92     methods=['GET'])
93 def index():
94     return 'OK', 200
95
96 ### Callback interface, for control
97
98 # Fetch the oldest callback message for an id
99 # URI and parameter, (GET): /get-event/<id>
100 # response: message + 200 or just 204 or just 500(error)
101 @app.route(APP_READ_URL,
102     methods=['GET'])
103 def receiveresponse(id):
104     global msg_callbacks
105     global cntr_msg_fetched
106
107     with lock:
108         try:
109             if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
110                 cntr_msg_fetched+=1
111                 cntr_callbacks[id][1]+=1
112                 msg=msg_callbacks[id][0]
113                 print("Fetching msg for id: "+id+", msg="+str(msg))
114                 del msg[TIME_STAMP]
115                 del msg_callbacks[id][0]
116                 return json.dumps(msg),200
117             print("No messages for id: "+id)
118         except Exception as e:
119             print(CAUGHT_EXCEPTION+str(e))
120             traceback.print_exc()
121             return "",500
122
123         return "",204
124
125 # Fetch all callback message for an id in an array
126 # URI and parameter, (GET): /get-all-events/<id>
127 # response: message + 200 or just 500(error)
128 @app.route(APP_READ_ALL_URL,
129     methods=['GET'])
130 def receiveresponse_all(id):
131     global msg_callbacks
132     global cntr_msg_fetched
133
134     with lock:
135         try:
136             if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
137                 cntr_msg_fetched+=len(msg_callbacks[id])
138                 cntr_callbacks[id][1]+=len(msg_callbacks[id])
139                 msg=msg_callbacks[id]
140                 print("Fetching all msgs for id: "+id+", msg="+str(msg))
141                 for sub_msg in msg:
142                     del sub_msg[TIME_STAMP]
143                 del msg_callbacks[id]
144                 return json.dumps(msg),200
145             print("No messages for id: "+id)
146         except Exception as e:
147             print(CAUGHT_EXCEPTION+str(e))
148             traceback.print_exc()
149             return "",500
150
151         msg=[]
152         return json.dumps(msg),200
153
154 # Receive a callback message
155 # URI and payload, (PUT or POST): /callbacks/<id> <json messages>
156 # response: OK 200 or 500 for other errors
157 @app.route(CALLBACK_URL,
158     methods=['PUT','POST'])
159 def events_write(id):
160     global msg_callbacks
161     global cntr_msg_callbacks
162
163     try:
164         print("Received callback for id: "+id +", content-type="+request.content_type)
165         remote_host_logging(request)
166         print("raw data: str(request.data): "+str(request.data))
167         do_delay()
168         try:
169             if (request.content_type == MIME_JSON):
170                 data = request.data
171                 msg = json.loads(data)
172                 print("Payload(json): "+str(msg))
173             else:
174                 msg={}
175                 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
176         except Exception as e:
177             msg={}
178             print("(Exception) Payload does not contain any json, setting empty json as payload")
179             traceback.print_exc()
180
181         with lock:
182             cntr_msg_callbacks += 1
183             msg[TIME_STAMP]=str(datetime.now())
184             if (id in msg_callbacks.keys()):
185                 msg_callbacks[id].append(msg)
186             else:
187                 msg_callbacks[id]=[]
188                 msg_callbacks[id].append(msg)
189
190             if (id in cntr_callbacks.keys()):
191                 cntr_callbacks[id][0] += 1
192             else:
193                 cntr_callbacks[id]=[]
194                 cntr_callbacks[id].append(1)
195                 cntr_callbacks[id].append(0)
196
197     except Exception as e:
198         print(CAUGHT_EXCEPTION+str(e))
199         traceback.print_exc()
200         return 'NOTOK',500
201
202     return 'OK',200
203
204
205 # Receive a json callback message with payload fromatted accoirding to output frm the message router
206 # URI and payload, (PUT or POST): /callbacks/<id> <json messages>
207 # json is a list of string encoded json items
208 # response: OK 200 or 500 for other errors
209 @app.route(CALLBACK_MR_URL,
210     methods=['PUT','POST'])
211 def events_write_mr(id):
212     global msg_callbacks
213     global cntr_msg_callbacks
214
215     try:
216         print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
217         remote_host_logging(request)
218         print("raw data: str(request.data): "+str(request.data))
219         do_delay()
220         try:
221             #if (request.content_type == MIME_JSON):
222             if (MIME_JSON in request.content_type):
223                 data = request.data
224                 msg_list = json.loads(data)
225                 print("Payload(json): "+str(msg_list))
226             else:
227                 msg_list=[]
228                 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
229         except Exception as e:
230             msg_list=[]
231             print("(Exception) Payload does not contain any json, setting empty json as payload")
232             traceback.print_exc()
233
234         with lock:
235             remote_host_logging(request)
236             for msg in msg_list:
237                 print("msg (str): "+str(msg))
238                 msg=json.loads(msg)
239                 print("msg (json): "+str(msg))
240                 cntr_msg_callbacks += 1
241                 msg[TIME_STAMP]=str(datetime.now())
242                 if (id in msg_callbacks.keys()):
243                     msg_callbacks[id].append(msg)
244                 else:
245                     msg_callbacks[id]=[]
246                     msg_callbacks[id].append(msg)
247
248                 if (id in cntr_callbacks.keys()):
249                     cntr_callbacks[id][0] += 1
250                 else:
251                     cntr_callbacks[id]=[]
252                     cntr_callbacks[id].append(1)
253                     cntr_callbacks[id].append(0)
254
255     except Exception as e:
256         print(CAUGHT_EXCEPTION+str(e))
257         traceback.print_exc()
258         return 'NOTOK',500
259
260     return 'OK',200
261
262 ### Functions for test ###
263
264 # Dump the whole db of current callbacks
265 # URI and parameter, (GET): /db
266 # response: message + 200
267 @app.route(DUMP_ALL_URL,
268     methods=['GET'])
269 def dump_db():
270     return json.dumps(msg_callbacks),200
271
272 ### Functions for metrics read out ###
273
274 @app.route('/counter/received_callbacks',
275     methods=['GET'])
276 def requests_submitted():
277     req_id = request.args.get('id')
278     if (req_id is None):
279         return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
280
281     if (req_id in cntr_callbacks.keys()):
282         return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT)
283     else:
284         return Response(str("0"), status=200, mimetype=MIME_TEXT)
285
286 @app.route('/counter/fetched_callbacks',
287     methods=['GET'])
288 def requests_fetched():
289     req_id = request.args.get('id')
290     if (req_id is None):
291         return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
292
293     if (req_id in cntr_callbacks.keys()):
294         return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
295     else:
296         return Response(str("0"), status=200, mimetype=MIME_TEXT)
297
298 @app.route('/counter/current_messages',
299     methods=['GET'])
300 def current_messages():
301     req_id = request.args.get('id')
302     if (req_id is None):
303         return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
304
305     if (req_id in cntr_callbacks.keys()):
306         return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
307     else:
308         return Response(str("0"), status=200, mimetype=MIME_TEXT)
309
310 @app.route('/counter/remote_hosts',
311     methods=['GET'])
312 def remote_hosts():
313     global hosts_set
314
315     hosts=",".join(hosts_set)
316     return Response(str(hosts), status=200, mimetype=MIME_TEXT)
317
318
319 #Set force delay response, in seconds, for all callbacks
320 #/froceesponse?delay=<seconds>
321 @app.route('/forcedelay', methods=['POST'])
322 def forcedelay():
323
324   try:
325     forced_settings['delay']=int(request.args.get('delay'))
326   except Exception:
327     forced_settings['delay']=None
328   return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
329
330 # Helper: Delay if delayed response code is set
331 def do_delay():
332   if (forced_settings['delay'] is not None):
333     try:
334       val=int(forced_settings['delay'])
335       if (val < 1):
336           return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
337       print("Delaying "+str(val)+ " sec.")
338       time.sleep(val)
339     except Exception:
340       return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
341 ### Admin ###
342
343 # Reset all messsages and counters
344 @app.route('/reset',
345     methods=['GET', 'POST', 'PUT'])
346 def reset():
347     global msg_callbacks
348     global cntr_msg_fetched
349     global cntr_msg_callbacks
350     global cntr_callbacks
351     global forced_settings
352
353     with lock:
354         msg_callbacks={}
355         cntr_msg_fetched=0
356         cntr_msg_callbacks=0
357         cntr_callbacks={}
358         forced_settings['delay']=None
359
360         return Response('OK', status=200, mimetype=MIME_TEXT)
361
362 ### Main function ###
363
364 if __name__ == "__main__":
365     app.run(port=HOST_PORT, host=HOST_IP)