Update Release Notes for Bronze
[nonrtric.git] / test / mrstub / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from time import sleep
21 import time
22 import datetime
23 import json
24 from flask import Flask
25 from flask import Response
26 import traceback
27 from threading import RLock
28
29 app = Flask(__name__)
30 lock = RLock()
31 # list of messages to/from Dmaap
32 msg_requests=[]
33 msg_responses={}
34
35 # Server info
36 HOST_IP = "::"
37 HOST_PORT = 2222
38
39 # Metrics vars
40 cntr_msg_requests_submitted=0
41 cntr_msg_requests_fetched=0
42 cntr_msg_responses_submitted=0
43 cntr_msg_responses_fetched=0
44
45 # Request and response constants
46 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
47 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
48 APP_WRITE_URL="/send-request"
49 APP_READ_URL="/receive-response"
50 MIME_TEXT="text/plain"
51 MIME_JSON="application/json"
52 CAUGHT_EXCEPTION="Caught exception: "
53 SERVER_ERROR="Server error :"
54
55 #I'm alive function
56 @app.route('/',
57     methods=['GET'])
58 def index():
59     return 'OK', 200
60
61
62 # Helper function to create a Dmaap request message
63 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
64 # response: json formatted string of a complete Dmaap message
65 def create_message(operation, correlation_id, payload, url):
66     if (payload is None):
67         payload="{}"
68     time_stamp=datetime.datetime.utcnow()
69     msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
70     msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
71     return msg
72
73
74 ### MR-stub interface, for MR control
75
76 # Send a message to MR
77 # URI and parameters (GET): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
78 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
79 @app.route(APP_WRITE_URL,
80     methods=['PUT','POST'])
81 def sendrequest():
82     global msg_requests
83     global cntr_msg_requests_submitted
84     with lock:
85         print("APP_WRITE_URL lock")
86         try:
87             oper=request.args.get('operation')
88             if (oper is None):
89                 print(APP_WRITE_URL+" parameter 'operation' missing")
90                 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
91
92             url=request.args.get('url')
93             if (url is None):
94                 print(APP_WRITE_URL+" parameter 'url' missing")
95                 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
96
97             if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
98                 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
99                 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
100
101             print(APP_WRITE_URL+" operation="+oper+" url="+url)
102             correlation_id=str(time.time_ns())
103             payload=None
104             if (oper == "PUT") and (request.json is not None):
105                 payload=json.dumps(request.json)
106
107             msg=create_message(oper, correlation_id, payload, url)
108             print(msg)
109             print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
110             msg_requests.append(msg)
111             cntr_msg_requests_submitted += 1
112             return Response(correlation_id, status=200, mimetype=MIME_TEXT)
113         except Exception as e:
114             print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
115             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
116
117 # Receive a message response for MR for the included correlation id
118 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
119 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
120 @app.route(APP_READ_URL,
121     methods=['GET'])
122 def receiveresponse():
123     global msg_responses
124     global cntr_msg_responses_fetched
125     with lock:
126         print("APP_READ_URL lock")
127         try:
128             id=request.args.get('correlationid')
129             if (id is None):
130                 print(APP_READ_URL+" parameter 'correclationid' missing")
131                 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
132
133             if (id in msg_responses):
134                 answer=msg_responses[id]
135                 del msg_responses[id]
136                 print(APP_READ_URL+" response (correlationid="+id+"): " + answer)
137                 cntr_msg_responses_fetched += 1
138                 return Response(answer, status=200, mimetype=MIME_JSON)
139
140             print(APP_READ_URL+" - no messages (correlationid="+id+"): ")
141             return Response('', status=204, mimetype=MIME_JSON)
142         except Exception as e:
143             print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
144             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
145
146 ### Dmaap interface ###
147
148 # Read messages stream. URI according to agent configuration.
149 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
150 # response: 200 <json array of request messages>, or 500 for other errors
151 @app.route(AGENT_READ_URL,
152     methods=['GET'])
153 def events_read():
154     global msg_requests
155     global cntr_msg_requests_fetched
156
157     limit=request.args.get('limit')
158     if (limit is None):
159         limit=4096
160     else:
161         limit=int(limit)
162     if (limit<0):
163         limit=0
164     if (limit>4096):
165         limit=4096
166     print("Limting number of returned messages to: "+str(limit))
167
168     timeout=request.args.get('timeout')
169     if (timeout is None):
170         timeout=10000
171     else:
172         timeout=min(int(timeout),60000)
173
174     startTime=int(round(time.time() * 1000))
175     currentTime=int(round(time.time() * 1000))
176
177     while(currentTime<startTime+int(timeout)):
178         with lock:
179             if(len(msg_requests)>0):
180                 try:
181                     msgs=''
182                     cntr=0
183                     while(cntr<limit and len(msg_requests)>0):
184                         if (len(msgs)>1):
185                             msgs=msgs+','
186                         msgs=msgs+msg_requests.pop(0)
187                         cntr_msg_requests_fetched += 1
188                         cntr=cntr+1
189                     msgs='['+msgs+']'
190                     print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
191                     return Response(msgs, status=200, mimetype=MIME_JSON)
192                 except Exception as e:
193                     print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
194                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
195         sleep(0.025) # sleep 25 milliseconds
196         currentTime=int(round(time.time() * 1000))
197
198     print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime))
199     return Response("[]", status=200, mimetype=MIME_JSON)
200
201 # Write messages stream. URI according to agent configuration.
202 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
203 # response: OK 200 or 400 for missing json parameters, 500 for other errors
204 @app.route(AGENT_WRITE_URL,
205     methods=['PUT','POST'])
206 def events_write():
207     global msg_responses
208     global cntr_msg_responses_submitted
209     with lock:
210         print("AGENT_WRITE_URL lock")
211         try:
212             answer=request.json
213             print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
214             for item in answer:
215                 id=item['correlationId']
216                 if (id is None):
217                     print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
218                     return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
219                 msg=item['message']
220                 if (msg is None):
221                     print(AGENT_WRITE_URL+" parameter 'msgs' missing")
222                     return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
223                 status=item['status']
224                 if (status is None):
225                     print(AGENT_WRITE_URL+" parameter 'status' missing")
226                     return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
227                 if isinstance(msg, list) or isinstance(msg, dict):
228                     msg_str=json.dumps(msg)+status[0:3]
229                 else:
230                     msg_str=msg+status[0:3]
231                 msg_responses[id]=msg_str
232                 cntr_msg_responses_submitted += 1
233                 print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str))
234         except Exception as e:
235             print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
236             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
237
238         return Response('{}', status=200, mimetype=MIME_JSON)
239
240
241 ### Functions for metrics read out ###
242
243 @app.route('/counter/requests_submitted',
244     methods=['GET'])
245 def requests_submitted():
246     return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
247
248 @app.route('/counter/requests_fetched',
249     methods=['GET'])
250 def requests_fetched():
251     return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
252
253 @app.route('/counter/responses_submitted',
254     methods=['GET'])
255 def responses_submitted():
256     return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
257
258 @app.route('/counter/responses_fetched',
259     methods=['GET'])
260 def responses_fetched():
261     return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
262
263 @app.route('/counter/current_requests',
264     methods=['GET'])
265 def current_requests():
266     return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
267
268 @app.route('/counter/current_responses',
269     methods=['GET'])
270 def current_responses():
271     return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
272
273 ### Admin ###
274
275 # Reset all messsages and counters
276 @app.route('/reset',
277     methods=['GET', 'POST', 'PUT'])
278 def reset():
279     global cntr_msg_requests_submitted
280     global cntr_msg_requests_fetched
281     global cntr_msg_responses_submitted
282     global cntr_msg_responses_fetched
283     global msg_requests
284     global msg_responses
285
286     cntr_msg_requests_submitted=0
287     cntr_msg_requests_fetched=0
288     cntr_msg_responses_submitted=0
289     cntr_msg_responses_fetched=0
290     msg_requests=[]
291     msg_responses={}
292     return Response('OK', status=200, mimetype=MIME_TEXT)
293
294 if __name__ == "__main__":
295     app.run(port=HOST_PORT, host=HOST_IP)