Changed the config loading from consul
[nonrtric.git] / test / mrstub / mr.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask, request
20 from time import sleep
21 import time
22 import datetime
23 import json
24 from flask import Flask
25 from flask import Response
26 import traceback
27
28 app = Flask(__name__)
29
30 # list of messages to/from Dmaap
31 msg_requests={}
32 msg_responses={}
33
34 # Server info
35 HOST_IP = "0.0.0.0"
36 HOST_PORT = 3905
37
38 # Metrics vars
39 cntr_msg_requests_submitted=0
40 cntr_msg_requests_fetched=0
41 cntr_msg_responses_submitted=0
42 cntr_msg_responses_fetched=0
43
44 # Request and response constants
45 AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
46 AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
47 APP_WRITE_URL="/send-request"
48 APP_READ_URL="/receive-response"
49 MIME_TEXT="text/plain"
50 MIME_JSON="application/json"
51 CAUGHT_EXCEPTION="Caught exception: "
52 SERVER_ERROR="Server error :"
53
54 #I'm alive function
55 @app.route('/',
56     methods=['GET'])
57 def index():
58     return 'OK', 200
59
60
61 # Helper function to create a Dmaap request message
62 # args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
63 # response: json formatted string of a complete Dmaap message
64 def create_message(operation, correlation_id, payload, url):
65     if (payload is None):
66         payload="{}"
67     time_stamp=datetime.datetime.utcnow()
68     msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
69     msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
70     return msg
71
72
73 ### MR-stub interface, for MR control
74
75 # Send a message to MR
76 # URI and parameters (GET): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
77 # response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
78 @app.route(APP_WRITE_URL,
79     methods=['PUT','POST'])
80 def sendrequest():
81     global msg_requests
82     global cntr_msg_requests_submitted
83
84     try:
85
86         oper=request.args.get('operation')
87         if (oper is None):
88             print(APP_WRITE_URL+" parameter 'operation' missing")
89             return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
90
91         url=request.args.get('url')
92         if (url is None):
93             print(APP_WRITE_URL+" parameter 'url' missing")
94             return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
95
96         if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
97             print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
98             return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
99
100         print(APP_WRITE_URL+" operation="+oper+" url="+url)
101         correlation_id=str(time.time_ns())
102         payload=None
103         if (oper == "PUT") and (request.json is not None):
104             payload=json.dumps(request.json)
105
106         msg=create_message(oper, correlation_id, payload, url)
107         print(msg)
108         print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
109         msg_requests[correlation_id]=msg
110         cntr_msg_requests_submitted += 1
111         return Response(correlation_id, status=200, mimetype=MIME_TEXT)
112     except Exception as e:
113         print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
114         return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
115
116 # Receive a message response for MR for the included correlation id
117 # URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
118 # response: <json-array of 1 response> 200 or empty 204 or other errors 500
119 @app.route(APP_READ_URL,
120     methods=['GET'])
121 def receiveresponse():
122     global msg_responses
123     global cntr_msg_responses_fetched
124
125     try:
126         id=request.args.get('correlationid')
127         if (id is None):
128             print(APP_READ_URL+" parameter 'correclationid' missing")
129             return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
130
131         if (id in msg_responses):
132             answer=msg_responses[id]
133             del msg_responses[id]
134             print(APP_READ_URL+" response (correlationid="+id+"): " + answer)
135             cntr_msg_responses_fetched += 1
136             return Response(answer, status=200, mimetype=MIME_JSON)
137
138         print(APP_READ_URL+" - no messages (correlationid="+id+"): ")
139         return Response('', status=204, mimetype=MIME_JSON)
140     except Exception as e:
141         print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
142         return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
143
144 ### Dmaap interface ###
145
146 # Read messages stream. URI according to agent configuration.
147 # URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
148 # response: 200 <json array of request messages>, or 500 for other errors
149 @app.route(AGENT_READ_URL,
150     methods=['GET'])
151 def events_read():
152     global msg_requests
153     global cntr_msg_requests_fetched
154     try:
155         msgs=''
156         for item in msg_requests:
157             if (len(msgs)>1):
158                 msgs=msgs+','
159             msgs=msgs+msg_requests[item]
160             cntr_msg_requests_fetched += 1
161
162         msg_requests={}
163         msgs='['+msgs+']'
164         print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
165         return Response(msgs, status=200, mimetype=MIME_JSON)
166     except Exception as e:
167         print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
168         return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
169
170 # Write messages stream. URI according to agent configuration.
171 # URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
172 # response: OK 200 or 400 for missing json parameters, 500 for other errors
173 @app.route(AGENT_WRITE_URL,
174     methods=['PUT','POST'])
175 def events_write():
176     global msg_responses
177     global cntr_msg_responses_submitted
178
179     try:
180         answer=request.json
181         print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
182         for item in answer:
183             id=item['correlationId']
184             if (id is None):
185                 print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
186                 return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
187             msg=item['message']
188             if (msg is None):
189                 print(AGENT_WRITE_URL+" parameter 'msgs' missing")
190                 return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
191             status=item['status']
192             if (status is None):
193                 print(AGENT_WRITE_URL+" parameter 'status' missing")
194                 return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
195             if isinstance(msg, list) or isinstance(msg, dict):
196                 msg_str=json.dumps(msg)+status[0:3]
197             else:
198                 msg_str=msg+status[0:3]
199             msg_responses[id]=msg_str
200             cntr_msg_responses_submitted += 1
201             print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str))
202     except Exception as e:
203         print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
204         return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
205
206     return Response('OK', status=200, mimetype=MIME_TEXT)
207
208
209 ### Functions for metrics read out ###
210
211 @app.route('/counter/requests_submitted',
212     methods=['GET'])
213 def requests_submitted():
214     return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
215
216 @app.route('/counter/requests_fetched',
217     methods=['GET'])
218 def requests_fetched():
219     return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
220
221 @app.route('/counter/responses_submitted',
222     methods=['GET'])
223 def responses_submitted():
224     return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
225
226 @app.route('/counter/responses_fetched',
227     methods=['GET'])
228 def responses_fetched():
229     return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
230
231 @app.route('/counter/current_requests',
232     methods=['GET'])
233 def current_requests():
234     return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
235
236 @app.route('/counter/current_responses',
237     methods=['GET'])
238 def current_responses():
239     return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
240
241 ### Admin ###
242
243 # Reset all messsages and counters
244 @app.route('/reset',
245     methods=['GET', 'POST', 'PUT'])
246 def reset():
247     global cntr_msg_requests_submitted
248     global cntr_msg_requests_fetched
249     global cntr_msg_responses_submitted
250     global cntr_msg_responses_fetched
251     global msg_requests
252     global msg_responses
253
254     cntr_msg_requests_submitted=0
255     cntr_msg_requests_fetched=0
256     cntr_msg_responses_submitted=0
257     cntr_msg_responses_fetched=0
258     msg_requests={}
259     msg_responses={}
260     return Response('OK', status=200, mimetype=MIME_TEXT)
261
262 ### Main function ###
263
264 if __name__ == "__main__":
265     app.run(port=HOST_PORT, host=HOST_IP)