2 # ============LICENSE_START===============================================
3 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 # ========================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=================================================
19 from flask import Flask, request
20 from time import sleep
24 from flask import Flask
25 from flask import Response
27 from threading import RLock, Thread
32 # Disable all logging of GET on reading counters
33 class AjaxFilter(logging.Filter):
34 def filter(self, record):
35 return ("/counter/" not in record.getMessage())
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
42 # list of messages to/from Dmaap
51 cntr_msg_requests_submitted=0
52 cntr_msg_requests_fetched=0
53 cntr_msg_responses_submitted=0
54 cntr_msg_responses_fetched=0
56 # Request and response constants
57 ORU_WRITE_URL="/events/unauthenticated.SEC_FAULT_OUTPUT"
58 ORU_READ_URL="/events/unauthenticated.SEC_FAULT_OUTPUT/users/test/"
59 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
60 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
61 APP_WRITE_URL="/send-request"
62 APP_READ_URL="/receive-response"
63 MIME_TEXT="text/plain"
64 MIME_JSON="application/json"
65 CAUGHT_EXCEPTION="Caught exception: "
66 SERVER_ERROR="Server error :"
72 downloader_thread=None
74 # Function to download messages from dmaap
77 global cntr_msg_requests_fetched
79 print("Starting uploader")
81 headers = {'Content-type': 'application/json', 'Accept': '*/*'}
82 #url="http://"+topic_host+"/events/"+topic_read
86 while (len(msg_requests)>0):
90 print("Sending to dmaap : "+ url)
91 print("Sending to dmaap : "+ msg)
92 resp=requests.post(url, data=msg, headers=headers, timeout=10)
93 if (resp.status_code<199 & resp.status_code > 299):
94 print("Failed, response code: " + str(resp.status_code))
97 print("Dmaap response code: " + str(resp.status_code))
98 print("Dmaap response text: " + str(resp.text))
101 cntr_msg_requests_fetched += 1
102 except Exception as e:
103 print("Failed, exception: "+ str(e))
108 # Function to download messages from dmaap
109 def dmaap_downloader():
111 global cntr_msg_responses_submitted
113 print("Starting uploader")
118 #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
120 headers = {'Accept': 'application/json'}
121 print("Reading from dmaap: " + url)
122 resp=requests.get(url, headers=headers)
123 if (resp.status_code<199 & resp.status_code > 299):
124 print("Failed, response code: " + resp.status_code)
127 print("Recieved data from dmaap mr")
130 print("Recieved data (raw): " + str(resp.text))
131 if isinstance(data, list):
133 item=json.loads(item)
134 corrid=str(item["correlationId"])
135 status=str(item["status"])
136 msg=str(item["message"])
137 item_str=msg+status[0:3]
139 msg_responses[corrid]=item_str
140 cntr_msg_responses_submitted += 1
142 print("Data from dmaap is not json array: " + str(resp.text))
144 except Exception as e:
145 print("Corrupt data from dmaap mr - dropping " + str(data))
146 print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
148 except Exception as e:
158 # Helper function to create a Dmaap request message
159 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
160 # response: json formatted string of a complete Dmaap message
161 def create_message(operation, correlation_id, payload, url):
162 if (payload is None):
164 time_stamp=datetime.datetime.utcnow()
165 msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
166 msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
170 ### MR-stub interface, for MR control
172 # Send a message to MR
173 # URI and parameters (PUT or POST): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
174 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
175 @app.route(APP_WRITE_URL,
176 methods=['PUT','POST'])
179 global cntr_msg_requests_submitted
181 print("APP_WRITE_URL lock")
183 oper=request.args.get('operation')
185 print(APP_WRITE_URL+" parameter 'operation' missing")
186 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
188 url=request.args.get('url')
190 print(APP_WRITE_URL+" parameter 'url' missing")
191 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
193 if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
194 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
195 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
197 print(APP_WRITE_URL+" operation="+oper+" url="+url)
198 correlation_id=str(time.time_ns())
200 if (oper == "PUT") and (request.json is not None):
201 payload=json.dumps(request.json)
203 msg=create_message(oper, correlation_id, payload, url)
205 print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
206 msg_requests.append(msg)
207 cntr_msg_requests_submitted += 1
208 return Response(correlation_id, status=200, mimetype=MIME_TEXT)
209 except Exception as e:
210 print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
211 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
213 # Receive a message response for MR for the included correlation id
214 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
215 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
216 @app.route(APP_READ_URL,
218 def receiveresponse():
220 global cntr_msg_responses_fetched
222 print("APP_READ_URL lock")
224 cid=request.args.get('correlationid')
226 print(APP_READ_URL+" parameter 'correclationid' missing")
227 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
229 if (cid in msg_responses):
230 answer=msg_responses[cid]
231 del msg_responses[cid]
232 print(APP_READ_URL+" response (correlationid="+cid+"): " + answer)
233 cntr_msg_responses_fetched += 1
234 return Response(answer, status=200, mimetype=MIME_JSON)
236 print(APP_READ_URL+" - no messages (correlationid="+cid+"): ")
237 return Response('', status=204, mimetype=MIME_JSON)
238 except Exception as e:
239 print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
240 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
242 ### Dmaap interface ###
244 # Read messages stream. URI according to agent configuration.
245 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
246 # response: 200 <json array of request messages>, or 500 for other errors
247 @app.route(AGENT_READ_URL,
251 global cntr_msg_requests_fetched
253 if topic_write or topic_read:
254 return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
256 limit=request.args.get('limit')
265 print("Limting number of returned messages to: "+str(limit))
267 timeout=request.args.get('timeout')
268 if (timeout is None):
271 timeout=min(int(timeout),60000)
273 start_time=int(round(time.time() * 1000))
274 current_time=int(round(time.time() * 1000))
276 while(current_time<start_time+int(timeout)):
278 if(len(msg_requests)>0):
282 while(cntr<limit and len(msg_requests)>0):
285 msgs=msgs+msg_requests.pop(0)
286 cntr_msg_requests_fetched += 1
289 print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
290 return Response(msgs, status=200, mimetype=MIME_JSON)
291 except Exception as e:
292 print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
293 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
294 sleep(0.025) # sleep 25 milliseconds
295 current_time=int(round(time.time() * 1000))
297 print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
298 return Response("[]", status=200, mimetype=MIME_JSON)
300 # Write messages stream. URI according to agent configuration.
301 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
302 # response: OK 200 or 400 for missing json parameters, 500 for other errors
303 @app.route(AGENT_WRITE_URL,
304 methods=['PUT','POST'])
307 global cntr_msg_responses_submitted
309 if topic_write or topic_read:
310 return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
313 print("AGENT_WRITE_URL lock")
316 print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
317 if isinstance(answer, dict):
318 #Create a an array if the answer is a dict (single message)
320 answer_list.append(answer)
324 cid=item['correlationId']
326 print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
327 return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
330 print(AGENT_WRITE_URL+" parameter 'msgs' missing")
331 return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
332 status=item['status']
334 print(AGENT_WRITE_URL+" parameter 'status' missing")
335 return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
336 if isinstance(msg, list) or isinstance(msg, dict):
337 msg_str=json.dumps(msg)+status[0:3]
339 msg_str=msg+status[0:3]
340 msg_responses[cid]=msg_str
341 cntr_msg_responses_submitted += 1
342 print(AGENT_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str))
343 except Exception as e:
344 print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
345 return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
347 return Response('{}', status=200, mimetype=MIME_JSON)
349 @app.route(ORU_WRITE_URL,
350 methods=['PUT','POST'])
353 msg=json.dumps(request.json)
354 msg_requests.append(msg)
355 return Response('{}', status=200, mimetype=MIME_JSON)
357 @app.route(ORU_READ_URL,
361 if(len(msg_requests)>0):
362 rsp=msg_requests.pop(0)
365 return Response(json.dumps(res), status=200, mimetype=MIME_JSON)
366 return Response("[]", status=200, mimetype=MIME_JSON)
368 ### Functions for metrics read out ###
370 @app.route('/counter/requests_submitted',
372 def requests_submitted():
373 return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
375 @app.route('/counter/requests_fetched',
377 def requests_fetched():
378 return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
380 @app.route('/counter/responses_submitted',
382 def responses_submitted():
383 return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
385 @app.route('/counter/responses_fetched',
387 def responses_fetched():
388 return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
390 @app.route('/counter/current_requests',
392 def current_requests():
393 return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
395 @app.route('/counter/current_responses',
397 def current_responses():
398 return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
402 # Reset all messsages and counters
404 methods=['GET', 'POST', 'PUT'])
406 global cntr_msg_requests_submitted
407 global cntr_msg_requests_fetched
408 global cntr_msg_responses_submitted
409 global cntr_msg_responses_fetched
413 cntr_msg_requests_submitted=0
414 cntr_msg_requests_fetched=0
415 cntr_msg_responses_submitted=0
416 cntr_msg_responses_fetched=0
419 return Response('OK', status=200, mimetype=MIME_TEXT)
421 # Get env vars, if present
422 if os.getenv("TOPIC_READ") is not None:
424 print("Env variables:")
425 print("TOPIC_READ:"+os.environ['TOPIC_READ'])
426 print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE'])
428 topic_read=os.environ['TOPIC_READ']
429 topic_write=os.environ['TOPIC_WRITE']
432 if topic_read and downloader_thread is None:
433 downloader_thread=Thread(target=dmaap_downloader)
434 downloader_thread.start()
436 if topic_write and uploader_thread is None:
437 uploader_thread=Thread(target=dmaap_uploader)
438 uploader_thread.start()
441 print("No env variables - OK")
443 if __name__ == "__main__":
444 app.run(port=HOST_PORT, host=HOST_IP)