2 # ============LICENSE_START===============================================
3 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 # ========================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=================================================
19 from flask import Flask, request, Response
20 from time import sleep
22 from datetime import datetime
27 from threading import RLock
29 # Disable all logging of GET on reading counters and db
30 class AjaxFilter(logging.Filter):
31 def filter(self, record):
32 return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
34 log = logging.getLogger('werkzeug')
35 log.addFilter(AjaxFilter())
41 # list of callback messages
54 # Request and response constants
55 CALLBACK_URL="/callbacks/<string:id>"
56 CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
57 APP_READ_URL="/get-event/<string:id>"
58 APP_READ_ALL_URL="/get-all-events/<string:id>"
61 MIME_TEXT="text/plain"
62 MIME_JSON="application/json"
63 CAUGHT_EXCEPTION="Caught exception: "
64 SERVER_ERROR="Server error :"
65 TIME_STAMP="cr-timestamp"
68 forced_settings['delay']=None
71 # Remote host lookup and print host name
72 def remote_host_logging(request):
74 if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
75 host_ip=str(request.environ['REMOTE_ADDR'])
77 host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
79 if (host_ip.startswith('::ffff:')):
80 host_ip=host_ip[len(prefix):]
82 name, alias, addresslist = socket.gethostbyaddr(host_ip)
83 print("Calling host: "+str(name))
86 print("Calling host not possible to retrieve IP: "+str(host_ip))
87 hosts_set.add(host_ip)
96 ### Callback interface, for control
98 # Fetch the oldest callback message for an id
99 # URI and parameter, (GET): /get-event/<id>
100 # response: message + 200 or just 204 or just 500(error)
101 @app.route(APP_READ_URL,
103 def receiveresponse(id):
105 global cntr_msg_fetched
109 if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
111 cntr_callbacks[id][1]+=1
112 msg=msg_callbacks[id][0]
113 print("Fetching msg for id: "+id+", msg="+str(msg))
115 del msg_callbacks[id][0]
116 return json.dumps(msg),200
117 print("No messages for id: "+id)
118 except Exception as e:
119 print(CAUGHT_EXCEPTION+str(e))
120 traceback.print_exc()
125 # Fetch all callback message for an id in an array
126 # URI and parameter, (GET): /get-all-events/<id>
127 # response: message + 200 or just 500(error)
128 @app.route(APP_READ_ALL_URL,
130 def receiveresponse_all(id):
132 global cntr_msg_fetched
136 if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
137 cntr_msg_fetched+=len(msg_callbacks[id])
138 cntr_callbacks[id][1]+=len(msg_callbacks[id])
139 msg=msg_callbacks[id]
140 print("Fetching all msgs for id: "+id+", msg="+str(msg))
142 del sub_msg[TIME_STAMP]
143 del msg_callbacks[id]
144 return json.dumps(msg),200
145 print("No messages for id: "+id)
146 except Exception as e:
147 print(CAUGHT_EXCEPTION+str(e))
148 traceback.print_exc()
152 return json.dumps(msg),200
154 # Receive a callback message
155 # URI and payload, (PUT or POST): /callbacks/<id> <json messages>
156 # response: OK 200 or 500 for other errors
157 @app.route(CALLBACK_URL,
158 methods=['PUT','POST'])
159 def events_write(id):
161 global cntr_msg_callbacks
164 print("Received callback for id: "+id +", content-type="+request.content_type)
165 remote_host_logging(request)
166 print("raw data: str(request.data): "+str(request.data))
169 if (request.content_type == MIME_JSON):
171 msg = json.loads(data)
172 print("Payload(json): "+str(msg))
175 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
176 except Exception as e:
178 print("(Exception) Payload does not contain any json, setting empty json as payload")
179 traceback.print_exc()
182 cntr_msg_callbacks += 1
183 msg[TIME_STAMP]=str(datetime.now())
184 if (id in msg_callbacks.keys()):
185 msg_callbacks[id].append(msg)
188 msg_callbacks[id].append(msg)
190 if (id in cntr_callbacks.keys()):
191 cntr_callbacks[id][0] += 1
193 cntr_callbacks[id]=[]
194 cntr_callbacks[id].append(1)
195 cntr_callbacks[id].append(0)
197 except Exception as e:
198 print(CAUGHT_EXCEPTION+str(e))
199 traceback.print_exc()
205 # Receive a json callback message with payload fromatted accoirding to output frm the message router
206 # URI and payload, (PUT or POST): /callbacks/<id> <json messages>
207 # json is a list of string encoded json items
208 # response: OK 200 or 500 for other errors
209 @app.route(CALLBACK_MR_URL,
210 methods=['PUT','POST'])
211 def events_write_mr(id):
213 global cntr_msg_callbacks
216 print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
217 remote_host_logging(request)
218 print("raw data: str(request.data): "+str(request.data))
221 #if (request.content_type == MIME_JSON):
222 if (MIME_JSON in request.content_type):
224 msg_list = json.loads(data)
225 print("Payload(json): "+str(msg_list))
228 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
229 except Exception as e:
231 print("(Exception) Payload does not contain any json, setting empty json as payload")
232 traceback.print_exc()
235 remote_host_logging(request)
237 print("msg (str): "+str(msg))
239 print("msg (json): "+str(msg))
240 cntr_msg_callbacks += 1
241 msg[TIME_STAMP]=str(datetime.now())
242 if (id in msg_callbacks.keys()):
243 msg_callbacks[id].append(msg)
246 msg_callbacks[id].append(msg)
248 if (id in cntr_callbacks.keys()):
249 cntr_callbacks[id][0] += 1
251 cntr_callbacks[id]=[]
252 cntr_callbacks[id].append(1)
253 cntr_callbacks[id].append(0)
255 except Exception as e:
256 print(CAUGHT_EXCEPTION+str(e))
257 traceback.print_exc()
262 ### Functions for test ###
264 # Dump the whole db of current callbacks
265 # URI and parameter, (GET): /db
266 # response: message + 200
267 @app.route(DUMP_ALL_URL,
270 return json.dumps(msg_callbacks),200
272 ### Functions for metrics read out ###
274 @app.route('/counter/received_callbacks',
276 def requests_submitted():
277 req_id = request.args.get('id')
279 return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
281 if (req_id in cntr_callbacks.keys()):
282 return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT)
284 return Response(str("0"), status=200, mimetype=MIME_TEXT)
286 @app.route('/counter/fetched_callbacks',
288 def requests_fetched():
289 req_id = request.args.get('id')
291 return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
293 if (req_id in cntr_callbacks.keys()):
294 return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
296 return Response(str("0"), status=200, mimetype=MIME_TEXT)
298 @app.route('/counter/current_messages',
300 def current_messages():
301 req_id = request.args.get('id')
303 return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
305 if (req_id in cntr_callbacks.keys()):
306 return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
308 return Response(str("0"), status=200, mimetype=MIME_TEXT)
310 @app.route('/counter/remote_hosts',
315 hosts=",".join(hosts_set)
316 return Response(str(hosts), status=200, mimetype=MIME_TEXT)
319 #Set force delay response, in seconds, for all callbacks
320 #/froceesponse?delay=<seconds>
321 @app.route('/forcedelay', methods=['POST'])
325 forced_settings['delay']=int(request.args.get('delay'))
327 forced_settings['delay']=None
328 return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
330 # Helper: Delay if delayed response code is set
332 if (forced_settings['delay'] is not None):
334 val=int(forced_settings['delay'])
336 return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
337 print("Delaying "+str(val)+ " sec.")
340 return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
343 # Reset all messsages and counters
345 methods=['GET', 'POST', 'PUT'])
348 global cntr_msg_fetched
349 global cntr_msg_callbacks
350 global cntr_callbacks
351 global forced_settings
358 forced_settings['delay']=None
360 return Response('OK', status=200, mimetype=MIME_TEXT)
362 ### Main function ###
364 if __name__ == "__main__":
365 app.run(port=HOST_PORT, host=HOST_IP)