Add k8s yml file for oru application
[nonrtric.git] / test / mrstub / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from time import sleep
21 import time
22 import datetime
23 import json
24 from flask import Flask
25 from flask import Response
26 import traceback
27 from threading import RLock, Thread
28 import logging
29 import os
30 import requests
31
32 # Disable all logging of GET on reading counters
33 class AjaxFilter(logging.Filter):
34     def filter(self, record):
35         return ("/counter/" not in record.getMessage())
36
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
39
40 app = Flask(__name__)
41 lock = RLock()
42 # list of messages to/from Dmaap
43 msg_requests=[]
44 msg_responses={}
45
46 # Server info
47 HOST_IP = "::"
48 HOST_PORT = 2222
49
50 # Metrics vars
51 cntr_msg_requests_submitted=0
52 cntr_msg_requests_fetched=0
53 cntr_msg_responses_submitted=0
54 cntr_msg_responses_fetched=0
55
56 # Request and response constants
57 ORU_WRITE_URL="/events/unauthenticated.SEC_FAULT_OUTPUT"
58 ORU_READ_URL="/events/unauthenticated.SEC_FAULT_OUTPUT/users/test/"
59 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
60 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
61 APP_WRITE_URL="/send-request"
62 APP_READ_URL="/receive-response"
63 MIME_TEXT="text/plain"
64 MIME_JSON="application/json"
65 CAUGHT_EXCEPTION="Caught exception: "
66 SERVER_ERROR="Server error :"
67
68 topic_write=""
69 topic_read=""
70
71 uploader_thread=None
72 downloader_thread=None
73
74 # Function to download messages from dmaap
75 def dmaap_uploader():
76     global msg_requests
77     global cntr_msg_requests_fetched
78
79     print("Starting uploader")
80
81     headers = {'Content-type': 'application/json', 'Accept': '*/*'}
82     #url="http://"+topic_host+"/events/"+topic_read
83     url=topic_read
84
85     while True:
86         while (len(msg_requests)>0):
87             msg=msg_requests[0]
88             if msg is not None:
89                 try:
90                     print("Sending to dmaap : "+ url)
91                     print("Sending to dmaap : "+ msg)
92                     resp=requests.post(url, data=msg, headers=headers, timeout=10)
93                     if (resp.status_code<199 & resp.status_code > 299):
94                         print("Failed, response code: " + str(resp.status_code))
95                         sleep(1)
96                     else:
97                         print("Dmaap response code: " + str(resp.status_code))
98                         print("Dmaap response text: " + str(resp.text))
99                         with lock:
100                             msg_requests.pop(0)
101                             cntr_msg_requests_fetched += 1
102                 except Exception as e:
103                     print("Failed, exception: "+ str(e))
104                     sleep(1)
105         sleep(0.01)
106
107
108 # Function to download messages from dmaap
109 def dmaap_downloader():
110     global msg_responses
111     global cntr_msg_responses_submitted
112
113     print("Starting uploader")
114
115     while True:
116
117         try :
118             #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
119             url=topic_write
120             headers = {'Accept': 'application/json'}
121             print("Reading from dmaap: " + url)
122             resp=requests.get(url, headers=headers)
123             if (resp.status_code<199 & resp.status_code > 299):
124                 print("Failed, response code: " + resp.status_code)
125                 sleep(1)
126             else:
127                 print("Recieved data from dmaap mr")
128                 try:
129                     data=resp.json()
130                     print("Recieved data (raw): " + str(resp.text))
131                     if isinstance(data, list):
132                         for item in data:
133                             item=json.loads(item)
134                             corrid=str(item["correlationId"])
135                             status=str(item["status"])
136                             msg=str(item["message"])
137                             item_str=msg+status[0:3]
138                             with lock:
139                                 msg_responses[corrid]=item_str
140                                 cntr_msg_responses_submitted += 1
141                     else:
142                         print("Data from dmaap is not json array: " + str(resp.text))
143                         sleep(1)
144                 except Exception as e:
145                     print("Corrupt data from dmaap mr -  dropping " + str(data))
146                     print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
147                     sleep(1)
148         except Exception as e:
149             sleep(1)
150
151 #I'm alive function
152 @app.route('/',
153     methods=['GET'])
154 def index():
155     return 'OK', 200
156
157
158 # Helper function to create a Dmaap request message
159 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
160 # response: json formatted string of a complete Dmaap message
161 def create_message(operation, correlation_id, payload, url):
162     if (payload is None):
163         payload="{}"
164     time_stamp=datetime.datetime.utcnow()
165     msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
166     msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
167     return msg
168
169
170 ### MR-stub interface, for MR control
171
172 # Send a message to MR
173 # URI and parameters (PUT or POST): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
174 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
175 @app.route(APP_WRITE_URL,
176     methods=['PUT','POST'])
177 def sendrequest():
178     global msg_requests
179     global cntr_msg_requests_submitted
180     with lock:
181         print("APP_WRITE_URL lock")
182         try:
183             oper=request.args.get('operation')
184             if (oper is None):
185                 print(APP_WRITE_URL+" parameter 'operation' missing")
186                 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
187
188             url=request.args.get('url')
189             if (url is None):
190                 print(APP_WRITE_URL+" parameter 'url' missing")
191                 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
192
193             if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
194                 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
195                 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
196
197             print(APP_WRITE_URL+" operation="+oper+" url="+url)
198             correlation_id=str(time.time_ns())
199             payload=None
200             if (oper == "PUT") and (request.json is not None):
201                 payload=json.dumps(request.json)
202
203             msg=create_message(oper, correlation_id, payload, url)
204             print(msg)
205             print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
206             msg_requests.append(msg)
207             cntr_msg_requests_submitted += 1
208             return Response(correlation_id, status=200, mimetype=MIME_TEXT)
209         except Exception as e:
210             print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
211             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
212
213 # Receive a message response for MR for the included correlation id
214 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
215 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
216 @app.route(APP_READ_URL,
217     methods=['GET'])
218 def receiveresponse():
219     global msg_responses
220     global cntr_msg_responses_fetched
221     with lock:
222         print("APP_READ_URL lock")
223         try:
224             cid=request.args.get('correlationid')
225             if (cid is None):
226                 print(APP_READ_URL+" parameter 'correclationid' missing")
227                 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
228
229             if (cid in msg_responses):
230                 answer=msg_responses[cid]
231                 del msg_responses[cid]
232                 print(APP_READ_URL+" response (correlationid="+cid+"): " + answer)
233                 cntr_msg_responses_fetched += 1
234                 return Response(answer, status=200, mimetype=MIME_JSON)
235
236             print(APP_READ_URL+" - no messages (correlationid="+cid+"): ")
237             return Response('', status=204, mimetype=MIME_JSON)
238         except Exception as e:
239             print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
240             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
241
242 ### Dmaap interface ###
243
244 # Read messages stream. URI according to agent configuration.
245 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
246 # response: 200 <json array of request messages>, or 500 for other errors
247 @app.route(AGENT_READ_URL,
248     methods=['GET'])
249 def events_read():
250     global msg_requests
251     global cntr_msg_requests_fetched
252
253     if topic_write or topic_read:
254         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
255
256     limit=request.args.get('limit')
257     if (limit is None):
258         limit=4096
259     else:
260         limit=int(limit)
261     if (limit<0):
262         limit=0
263     if (limit>4096):
264         limit=4096
265     print("Limting number of returned messages to: "+str(limit))
266
267     timeout=request.args.get('timeout')
268     if (timeout is None):
269         timeout=10000
270     else:
271         timeout=min(int(timeout),60000)
272
273     start_time=int(round(time.time() * 1000))
274     current_time=int(round(time.time() * 1000))
275
276     while(current_time<start_time+int(timeout)):
277         with lock:
278             if(len(msg_requests)>0):
279                 try:
280                     msgs=''
281                     cntr=0
282                     while(cntr<limit and len(msg_requests)>0):
283                         if (len(msgs)>1):
284                             msgs=msgs+','
285                         msgs=msgs+msg_requests.pop(0)
286                         cntr_msg_requests_fetched += 1
287                         cntr=cntr+1
288                     msgs='['+msgs+']'
289                     print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
290                     return Response(msgs, status=200, mimetype=MIME_JSON)
291                 except Exception as e:
292                     print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
293                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
294         sleep(0.025) # sleep 25 milliseconds
295         current_time=int(round(time.time() * 1000))
296
297     print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
298     return Response("[]", status=200, mimetype=MIME_JSON)
299
300 # Write messages stream. URI according to agent configuration.
301 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
302 # response: OK 200 or 400 for missing json parameters, 500 for other errors
303 @app.route(AGENT_WRITE_URL,
304     methods=['PUT','POST'])
305 def events_write():
306     global msg_responses
307     global cntr_msg_responses_submitted
308
309     if topic_write or topic_read:
310         return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
311
312     with lock:
313         print("AGENT_WRITE_URL lock")
314         try:
315             answer=request.json
316             print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
317             if isinstance(answer, dict):
318                 #Create a an array if the answer is a dict (single message)
319                 answer_list=[]
320                 answer_list.append(answer)
321                 answer=answer_list
322
323             for item in answer:
324                 cid=item['correlationId']
325                 if (cid is None):
326                     print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
327                     return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
328                 msg=item['message']
329                 if (msg is None):
330                     print(AGENT_WRITE_URL+" parameter 'msgs' missing")
331                     return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
332                 status=item['status']
333                 if (status is None):
334                     print(AGENT_WRITE_URL+" parameter 'status' missing")
335                     return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
336                 if isinstance(msg, list) or isinstance(msg, dict):
337                     msg_str=json.dumps(msg)+status[0:3]
338                 else:
339                     msg_str=msg+status[0:3]
340                 msg_responses[cid]=msg_str
341                 cntr_msg_responses_submitted += 1
342                 print(AGENT_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str))
343         except Exception as e:
344             print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
345             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
346
347         return Response('{}', status=200, mimetype=MIME_JSON)
348
349 @app.route(ORU_WRITE_URL,
350     methods=['PUT','POST'])
351 def oru_write():
352     global msg_requests
353     msg=json.dumps(request.json)
354     msg_requests.append(msg)
355     return Response('{}', status=200, mimetype=MIME_JSON)
356
357 @app.route(ORU_READ_URL,
358     methods=['GET'])
359 def oru_read():
360     global msg_requests
361     if(len(msg_requests)>0):
362         rsp=msg_requests.pop(0)
363         res=[]
364         res.append(rsp)
365         return Response(json.dumps(res), status=200, mimetype=MIME_JSON)
366     return Response("[]", status=200, mimetype=MIME_JSON)
367
368 ### Functions for metrics read out ###
369
370 @app.route('/counter/requests_submitted',
371     methods=['GET'])
372 def requests_submitted():
373     return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
374
375 @app.route('/counter/requests_fetched',
376     methods=['GET'])
377 def requests_fetched():
378     return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
379
380 @app.route('/counter/responses_submitted',
381     methods=['GET'])
382 def responses_submitted():
383     return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
384
385 @app.route('/counter/responses_fetched',
386     methods=['GET'])
387 def responses_fetched():
388     return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
389
390 @app.route('/counter/current_requests',
391     methods=['GET'])
392 def current_requests():
393     return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
394
395 @app.route('/counter/current_responses',
396     methods=['GET'])
397 def current_responses():
398     return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
399
400 ### Admin ###
401
402 # Reset all messsages and counters
403 @app.route('/reset',
404     methods=['GET', 'POST', 'PUT'])
405 def reset():
406     global cntr_msg_requests_submitted
407     global cntr_msg_requests_fetched
408     global cntr_msg_responses_submitted
409     global cntr_msg_responses_fetched
410     global msg_requests
411     global msg_responses
412
413     cntr_msg_requests_submitted=0
414     cntr_msg_requests_fetched=0
415     cntr_msg_responses_submitted=0
416     cntr_msg_responses_fetched=0
417     msg_requests=[]
418     msg_responses={}
419     return Response('OK', status=200, mimetype=MIME_TEXT)
420
421 # Get env vars, if present
422 if os.getenv("TOPIC_READ") is not None:
423
424     print("Env variables:")
425     print("TOPIC_READ:"+os.environ['TOPIC_READ'])
426     print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE'])
427
428     topic_read=os.environ['TOPIC_READ']
429     topic_write=os.environ['TOPIC_WRITE']
430
431
432     if topic_read and downloader_thread is None:
433         downloader_thread=Thread(target=dmaap_downloader)
434         downloader_thread.start()
435
436     if topic_write and uploader_thread is None:
437         uploader_thread=Thread(target=dmaap_uploader)
438         uploader_thread.start()
439
440 else:
441     print("No env variables - OK")
442
443 if __name__ == "__main__":
444     app.run(port=HOST_PORT, host=HOST_IP)