Merge "Release Maven Policy Agent"
[nonrtric.git] / test / mrstub / app / main.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from time import sleep
21 import time
22 import datetime
23 import json
24 from flask import Flask
25 from flask import Response
26 import traceback
27 from threading import RLock
28
29 app = Flask(__name__)
30 lock = RLock()
31 # list of messages to/from Dmaap
32 msg_requests=[]
33 msg_responses={}
34
35 # Server info
36 HOST_IP = "::"
37 HOST_PORT = 2222
38
39 # Metrics vars
40 cntr_msg_requests_submitted=0
41 cntr_msg_requests_fetched=0
42 cntr_msg_responses_submitted=0
43 cntr_msg_responses_fetched=0
44
45 # Request and response constants
46 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
47 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
48 APP_WRITE_URL="/send-request"
49 APP_READ_URL="/receive-response"
50 MIME_TEXT="text/plain"
51 MIME_JSON="application/json"
52 CAUGHT_EXCEPTION="Caught exception: "
53 SERVER_ERROR="Server error :"
54
55 #I'm alive function
56 @app.route('/',
57     methods=['GET'])
58 def index():
59     return 'OK', 200
60
61
62 # Helper function to create a Dmaap request message
63 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
64 # response: json formatted string of a complete Dmaap message
65 def create_message(operation, correlation_id, payload, url):
66     if (payload is None):
67         payload="{}"
68     time_stamp=datetime.datetime.utcnow()
69     msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
70     msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
71     return msg
72
73
74 ### MR-stub interface, for MR control
75
76 # Send a message to MR
77 # URI and parameters (GET): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
78 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
79 @app.route(APP_WRITE_URL,
80     methods=['PUT','POST'])
81 def sendrequest():
82     global msg_requests
83     global cntr_msg_requests_submitted
84     with lock:
85         print("APP_WRITE_URL lock")
86         try:
87             oper=request.args.get('operation')
88             if (oper is None):
89                 print(APP_WRITE_URL+" parameter 'operation' missing")
90                 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
91
92             url=request.args.get('url')
93             if (url is None):
94                 print(APP_WRITE_URL+" parameter 'url' missing")
95                 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
96
97             if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
98                 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
99                 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
100
101             print(APP_WRITE_URL+" operation="+oper+" url="+url)
102             correlation_id=str(time.time_ns())
103             payload=None
104             if (oper == "PUT") and (request.json is not None):
105                 payload=json.dumps(request.json)
106
107             msg=create_message(oper, correlation_id, payload, url)
108             print(msg)
109             print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
110             msg_requests.append(msg)
111             cntr_msg_requests_submitted += 1
112             return Response(correlation_id, status=200, mimetype=MIME_TEXT)
113         except Exception as e:
114             print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
115             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
116
117 # Receive a message response for MR for the included correlation id
118 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
119 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
120 @app.route(APP_READ_URL,
121     methods=['GET'])
122 def receiveresponse():
123     global msg_responses
124     global cntr_msg_responses_fetched
125     with lock:
126         print("APP_READ_URL lock")
127         try:
128             id=request.args.get('correlationid')
129             if (id is None):
130                 print(APP_READ_URL+" parameter 'correclationid' missing")
131                 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
132
133             if (id in msg_responses):
134                 answer=msg_responses[id]
135                 del msg_responses[id]
136                 print(APP_READ_URL+" response (correlationid="+id+"): " + answer)
137                 cntr_msg_responses_fetched += 1
138                 return Response(answer, status=200, mimetype=MIME_JSON)
139
140             print(APP_READ_URL+" - no messages (correlationid="+id+"): ")
141             return Response('', status=204, mimetype=MIME_JSON)
142         except Exception as e:
143             print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
144             return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
145
146 ### Dmaap interface ###
147
148 # Read messages stream. URI according to agent configuration.
149 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
150 # response: 200 <json array of request messages>, or 500 for other errors
151 @app.route(AGENT_READ_URL,
152     methods=['GET'])
153 def events_read():
154     global msg_requests
155     global cntr_msg_requests_fetched
156
157     limit=request.args.get('limit')
158     if (limit is None):
159         limit=4096
160     else:
161         limit=int(limit)
162     if (limit<0):
163         limit=0
164     if (limit>4096):
165         limit=4096
166     print("Limting number of returned messages to: "+str(limit))
167
168     timeout=request.args.get('timeout')
169     if (timeout is None):
170         timeout=10000
171     else:
172         timeout=min(int(timeout),60000)
173
174     startTime=int(round(time.time() * 1000))
175     currentTime=int(round(time.time() * 1000))
176
177     while(currentTime<startTime+int(timeout)):
178         with lock:
179             if(len(msg_requests)>0):
180                 try:
181                     msgs=''
182                     cntr=0
183                     while(cntr<limit and len(msg_requests)>0):
184                         if (len(msgs)>1):
185                             msgs=msgs+','
186                         msgs=msgs+msg_requests.pop(0)
187                         cntr_msg_requests_fetched += 1
188                         cntr=cntr+1
189                     msgs='['+msgs+']'
190                     print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
191                     return Response(msgs, status=200, mimetype=MIME_JSON)
192                 except Exception as e:
193                     print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
194                     return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
195         sleep(0.025) # sleep 25 milliseconds
196         currentTime=int(round(time.time() * 1000))
197
198     print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime))
199     return Response("[]", status=200, mimetype=MIME_JSON)
200
201 # Write messages stream. URI according to agent configuration.
202 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
203 # response: OK 200 or 400 for missing json parameters, 500 for other errors
204 @app.route(AGENT_WRITE_URL,
205     methods=['PUT','POST'])
206 def events_write():
207     global msg_responses
208     global cntr_msg_responses_submitted
209     with lock:
210         print("AGENT_WRITE_URL lock")
211         try:
212             answer=request.json
213             print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
214             if isinstance(answer, dict):
215                 #Create a an array if the answer is a dict (single message)
216                 answer_list=[]
217                 answer_list.append(answer)
218                 answer=answer_list
219
220             for item in answer:
221                 id=item['correlationId']
222                 if (id is None):
223                     print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
224                     return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
225                 msg=item['message']
226                 if (msg is None):
227                     print(AGENT_WRITE_URL+" parameter 'msgs' missing")
228                     return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
229                 status=item['status']
230                 if (status is None):
231                     print(AGENT_WRITE_URL+" parameter 'status' missing")
232                     return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
233                 if isinstance(msg, list) or isinstance(msg, dict):
234                     msg_str=json.dumps(msg)+status[0:3]
235                 else:
236                     msg_str=msg+status[0:3]
237                 msg_responses[id]=msg_str
238                 cntr_msg_responses_submitted += 1
239                 print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str))
240         except Exception as e:
241             print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
242             return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
243
244         return Response('{}', status=200, mimetype=MIME_JSON)
245
246
247 ### Functions for metrics read out ###
248
249 @app.route('/counter/requests_submitted',
250     methods=['GET'])
251 def requests_submitted():
252     return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
253
254 @app.route('/counter/requests_fetched',
255     methods=['GET'])
256 def requests_fetched():
257     return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
258
259 @app.route('/counter/responses_submitted',
260     methods=['GET'])
261 def responses_submitted():
262     return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
263
264 @app.route('/counter/responses_fetched',
265     methods=['GET'])
266 def responses_fetched():
267     return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
268
269 @app.route('/counter/current_requests',
270     methods=['GET'])
271 def current_requests():
272     return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
273
274 @app.route('/counter/current_responses',
275     methods=['GET'])
276 def current_responses():
277     return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
278
279 ### Admin ###
280
281 # Reset all messsages and counters
282 @app.route('/reset',
283     methods=['GET', 'POST', 'PUT'])
284 def reset():
285     global cntr_msg_requests_submitted
286     global cntr_msg_requests_fetched
287     global cntr_msg_responses_submitted
288     global cntr_msg_responses_fetched
289     global msg_requests
290     global msg_responses
291
292     cntr_msg_requests_submitted=0
293     cntr_msg_requests_fetched=0
294     cntr_msg_responses_submitted=0
295     cntr_msg_responses_fetched=0
296     msg_requests=[]
297     msg_responses={}
298     return Response('OK', status=200, mimetype=MIME_TEXT)
299
300 if __name__ == "__main__":
301     app.run(port=HOST_PORT, host=HOST_IP)