Add functionality to rAPP Catalogue
[nonrtric.git] / test / prodstub / app / prodstub.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask
20 from flask import request
21
22 import requests
23
24 import json
25 from jsonschema import validate
26
27 import threading
28 import time
29 import datetime
30
31 app = Flask(__name__)
32
33 # # list of callback messages
34 # msg_callbacks={}
35
36 # Server info
37 HOST_IP = "::"
38 HOST_PORT = 2222
39
40 # # Metrics vars
41 # cntr_msg_callbacks=0
42 # cntr_msg_fetched=0
43
44 # Request and response constants
45 CALLBACK_CREATE_URL="/callbacks/create/<string:producer_id>"
46 CALLBACK_DELETE_URL="/callbacks/delete/<string:producer_id>"
47 CALLBACK_SUPERVISION_URL="/callbacks/supervision/<string:producer_id>"
48
49 ARM_CREATE_RESPONSE="/arm/create/<string:producer_id>/<string:job_id>"
50 ARM_DELETE_RESPONSE="/arm/delete/<string:producer_id>/<string:job_id>"
51 ARM_SUPERVISION_RESPONSE="/arm/supervision/<string:producer_id>"
52 ARM_TYPE="/arm/type/<string:producer_id>/<string:type_id>"
53 COUNTER_SUPERVISION="/counter/supervision/<string:producer_id>"
54 COUNTER_CREATE="/counter/create/<string:producer_id>/<string:job_id>"
55 COUNTER_DELETE="/counter/delete/<string:producer_id>/<string:job_id>"
56
57 JOB_DATA="/jobdata/<string:producer_id>/<string:job_id>"
58
59 STATUS="/status"
60
61 #Constsants
62 APPL_JSON='application/json'
63 UNKNOWN_QUERY_PARAMETERS="Unknown query parameter(s)"
64 RETURNING_CONFIGURED_RESP="returning configured response code"
65 JOBID_NO_MATCH="job id in stored json does not match request"
66 PRODUCER_OR_JOB_NOT_FOUND="producer or job not found"
67 PRODUCER_NOT_FOUND="producer not found"
68 TYPE_NOT_FOUND="type not found"
69 TYPE_IN_USE="type is in use in a job"
70 JOB_NOT_FOUND="job not found"
71 JOB_DATA_NOT_FOUND="job data not found"
72 JSON_CORRUPT="json in request is corrupt or missing"
73
74 #Producer and job db, including armed responses
75 db={}
76 # producer
77 #  armed response for supervision
78 #  armed types
79 #  supervision counter
80 #  job
81 #    job json
82 #    target_type
83 #    armed response for create
84 #    armed response for delete
85 #    create counter
86 #    delete counter
87
88 # Helper function to populate a callback dict with the basic structure
89 # if job_id is None then only the producer level is setup and the producer dict is returned
90 # if job_id is not None, the job level is setup and the job dict is returned (producer must exist)
91 def setup_callback_dict(producer_id, job_id):
92
93     producer_dict=None
94     if (producer_id in db.keys()):
95         producer_dict=db[producer_id]
96     else:
97         if (job_id is not None):
98             return None
99         producer_dict={}
100         db[producer_id]=producer_dict
101
102         producer_dict['supervision_response']=200
103         producer_dict['supervision_counter']=0
104         producer_dict['types']=[]
105
106     if (job_id is None):
107         return producer_dict
108
109     job_dict=None
110     if (job_id in producer_dict.keys()):
111         job_dict=producer_dict[job_id]
112     else:
113         job_dict={}
114         producer_dict[job_id]=job_dict
115         job_dict['create_response']=201
116         job_dict['delete_response']=404
117         job_dict['json']=None
118         job_dict['create_counter']=0
119         job_dict['delete_counter']=0
120         job_dict['delivering']=False
121         job_dict['delivery_attempts']=0
122     return job_dict
123
124
125 # Helper function to get an entry from the callback db
126 # if job_id is None then only the producer dict is returned (or None if producer is not found)
127 # if job_id is not None, the job is returned (or None if producer/job is not found)
128 def get_callback_dict(producer_id, job_id):
129
130     producer_dict=None
131     if (producer_id in db.keys()):
132         producer_dict=db[producer_id]
133
134     if (producer_dict is None):
135         return None
136
137     if (job_id is None):
138         return producer_dict
139
140     job_dict=None
141     if (job_id in producer_dict.keys()):
142         job_dict=producer_dict[job_id]
143
144     return job_dict
145
146 # Helper function find if a key/valye exist in the dictionay tree
147 # True if found
148 def recursive_search(s_dict, s_key, s_id):
149     for pkey in s_dict:
150         if (pkey == s_key) and (s_dict[pkey] == s_id):
151             return True
152         if (isinstance(s_dict[pkey], dict)):
153             recursive_search(s_dict[pkey], s_key, s_id)
154
155     return False
156
157 # Helper function to find all job dicts
158 def get_all_jobs():
159     job_dicts={}
160     for producer_key in db:
161         producer_dict = db[producer_key]
162         for job_key in producer_dict:
163             job_dict = producer_dict[job_key]
164             if (isinstance(job_dict, dict)):
165                 job_dicts[job_key]=job_dict
166     return job_dicts
167
168 # I'm alive function
169 # response: always 200
170 @app.route('/',
171     methods=['GET'])
172 def index():
173     return 'OK', 200
174
175 # Arm the create callback with a response code
176 # Omitting the query parameter switch to response back to the standard 200/201 response
177 # URI and parameters (PUT): /arm/create/<producer_id>/<job_id>[?response=<resonsecode>]
178 # Setting
179 # response: 200 (400 if incorrect query params)
180 @app.route(ARM_CREATE_RESPONSE,
181      methods=['PUT'])
182 def arm_create(producer_id, job_id):
183
184     arm_response=request.args.get('response')
185
186     if (arm_response is None):
187         if (len(request.args) != 0):
188             return UNKNOWN_QUERY_PARAMETERS,400
189     else:
190         if (len(request.args) != 1):
191             return UNKNOWN_QUERY_PARAMETERS,400
192
193
194     print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
195
196     job_dict=setup_callback_dict(producer_id, job_id)
197
198     if (arm_response is None):    #Reset the response depending if a job exists or not
199         if (job_dict['json'] is None):
200             job_dict['create_response']=201
201         else:
202             job_dict['create_response']=200
203     else:
204         job_dict['create_response']=int(arm_response)
205
206     return "",200
207
208 # Arm the delete callback with a response code
209 # Omitting the query parameter switch to response back to the standard 204 response
210 # URI and parameters (PUT): /arm/delete/<producer_id>/<job-id>[?response=<resonsecode>]
211 # response: 200 (400 if incorrect query params)
212 @app.route(ARM_DELETE_RESPONSE,
213      methods=['PUT'])
214 def arm_delete(producer_id, job_id):
215
216     arm_response=request.args.get('response')
217
218     if (arm_response is None):
219         if (len(request.args) != 0):
220             return UNKNOWN_QUERY_PARAMETERS,400
221     else:
222         if (len(request.args) != 1):
223             return UNKNOWN_QUERY_PARAMETERS,400
224
225     print("Arm delete received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
226
227     arm_response=request.args.get('response')
228
229     job_dict=setup_callback_dict(producer_id, job_id)
230
231     if (arm_response is None): #Reset the response depening if a job exists or not
232         if (job_dict['json'] is None):
233             job_dict['delete_response']=404
234         else:
235             job_dict['delete_response']=204
236     else:
237         job_dict['delete_response']=int(arm_response)
238
239     return "",200
240
241 # Arm the supervision callback with a response code
242 # Omitting the query parameter switch to response back to the standard 200 response
243 # URI and parameters (PUT): /arm/supervision/<producer_id>[?response=<resonsecode>]
244 # response: 200 (400 if incorrect query params)
245 @app.route(ARM_SUPERVISION_RESPONSE,
246      methods=['PUT'])
247 def arm_supervision(producer_id):
248
249     arm_response=request.args.get('response')
250
251     if (arm_response is None):
252         if (len(request.args) != 0):
253             return UNKNOWN_QUERY_PARAMETERS,400
254     else:
255         if (len(request.args) != 1):
256             return UNKNOWN_QUERY_PARAMETERS,400
257
258     print("Arm supervision received for producer: "+str(producer_id)+" and response: "+str(arm_response))
259
260     producer_dict=setup_callback_dict(producer_id, None)
261     if (arm_response is None):
262         producer_dict['supervision_response']=200
263     else:
264         producer_dict['supervision_response']=int(arm_response)
265
266     return "",200
267
268 # Arm a producer with a type
269 # URI and parameters (PUT): /arm/type/<string:producer_id>/<string:type-id>
270 # response: 200 (404)
271 @app.route(ARM_TYPE,
272     methods=['PUT'])
273 def arm_type(producer_id, type_id):
274
275     print("Arm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
276
277     producer_dict=get_callback_dict(producer_id, None)
278
279     if (producer_dict is None):
280         return PRODUCER_NOT_FOUND,404
281
282     type_list=producer_dict['types']
283     if (type_id not in type_list):
284         type_list.append(type_id)
285
286     return "",200
287
288 # Disarm a producer with a type
289 # URI and parameters (DELETE): /arm/type/<string:producer_id>/<string:type-id>
290 # response: 200 (404)
291 @app.route(ARM_TYPE,
292     methods=['DELETE'])
293 def disarm_type(producer_id, type_id):
294
295     print("Disarm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
296
297     producer_dict=get_callback_dict(producer_id, None)
298
299     if (producer_dict is None):
300         return PRODUCER_NOT_FOUND,404
301
302     if (recursive_search(producer_dict, "ei_job_type",type_id) is True):
303         return "TYPE_IN_USE",400
304
305     type_list=producer_dict['types']
306     type_list.remove(type_id)
307
308     return "",200
309
310
311 # Callback for create job
312 # URI and parameters (POST): /callbacks/create/<producer_id>
313 # response 201 at create, 200 at update or other configured response code
314 @app.route(CALLBACK_CREATE_URL,
315      methods=['POST'])
316 def callback_create(producer_id):
317
318     req_json_dict=None
319     try:
320         req_json_dict = json.loads(request.data)
321         with open('job-schema.json') as f:
322             schema = json.load(f)
323             validate(instance=req_json_dict, schema=schema)
324     except Exception:
325         return JSON_CORRUPT,400
326
327     producer_dict=get_callback_dict(producer_id, None)
328     if (producer_dict is None):
329         return PRODUCER_OR_JOB_NOT_FOUND,400
330     type_list=producer_dict['types']
331     type_id=req_json_dict['ei_type_identity']
332     if (type_id not in type_list):
333         return TYPE_NOT_FOUND, 400
334
335     job_id=req_json_dict['ei_job_identity']
336     job_dict=get_callback_dict(producer_id, job_id)
337     if (job_dict is None):
338         return PRODUCER_OR_JOB_NOT_FOUND,400
339     return_code=0
340     return_msg=""
341     if (req_json_dict['ei_job_identity'] == job_id):
342         print("Create callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
343         return_code=job_dict['create_response']
344         if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)):
345             job_dict['json']=req_json_dict
346             job_dict['delivering']=True
347             if (job_dict['create_response'] == 201): #Set up next response code if create was ok
348                 job_dict['create_response'] = 200
349             if (job_dict['delete_response'] == 404):
350                 job_dict['delete_response'] = 204
351         else:
352             return_msg=RETURNING_CONFIGURED_RESP
353
354         job_dict['create_counter']=job_dict['create_counter']+1
355     else:
356         return JOBID_NO_MATCH, 400
357
358     return return_msg, return_code
359
360 # Callback for delete job
361 # URI and parameters (POST): /callbacks/delete/<producer_id>
362 # response: 204 at delete or other configured response code
363 @app.route(CALLBACK_DELETE_URL,
364      methods=['POST'])
365 def callback_delete(producer_id):
366
367     req_json_dict=None
368     try:
369         req_json_dict = json.loads(request.data)
370         with open('job-schema.json') as f:
371             schema = json.load(f)
372             validate(instance=req_json_dict, schema=schema)
373     except Exception:
374         return JSON_CORRUPT,400
375
376     job_id=req_json_dict['ei_job_identity']
377     job_dict=get_callback_dict(producer_id, job_id)
378     if (job_dict is None):
379         return PRODUCER_OR_JOB_NOT_FOUND,400
380     return_code=0
381     return_msg=""
382     if (req_json_dict['ei_job_identity'] == job_id):
383         print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
384         return_code=job_dict['delete_response']
385         if (job_dict['delete_response'] == 204):
386             job_dict['json']=None
387             job_dict['delete_response']=404
388             job_dict['delivering']=False
389             if (job_dict['create_response'] == 200):
390                 job_dict['create_response'] = 201 # reset create response if delete was ok
391         else:
392             return_msg=RETURNING_CONFIGURED_RESP
393
394         job_dict['delete_counter']=job_dict['delete_counter']+1
395     else:
396         return JOBID_NO_MATCH, 400
397
398     return return_msg, return_code
399
400 # Callback for supervision of producer
401 # URI and parameters (GET): /callbacks/supervision/<producer_id>
402 # response: 200 or other configured response code
403 @app.route(CALLBACK_SUPERVISION_URL,
404      methods=['GET'])
405 def callback_supervision(producer_id):
406
407     print("Supervision callback received for producer: "+str(producer_id))
408
409     producer_dict=get_callback_dict(producer_id, None)
410     if (producer_dict is None):
411         return PRODUCER_NOT_FOUND,400
412     return_code=producer_dict['supervision_response']
413     return_msg=""
414     if (return_code != 200):
415         return_msg="returning configured response code"
416
417     producer_dict['supervision_counter']=producer_dict['supervision_counter']+1
418
419     return return_msg,producer_dict['supervision_response']
420
421 # Get the job definition for a job
422 # URI and parameters (GET): "/jobdata/<string:producer_id>/<string:job_id>"
423 # response: 200 or 204
424 @app.route(JOB_DATA,
425      methods=['GET'])
426 def get_jobdata(producer_id, job_id):
427
428     print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
429
430     job_dict=get_callback_dict(producer_id, job_id)
431
432     if (job_dict is None):
433         return PRODUCER_OR_JOB_NOT_FOUND,400
434
435     if (job_dict['json'] is None):
436         return "",204
437     else:
438         return json.dumps(job_dict['json']), 200
439
440 # Start data delivery for a job, action : START or STOP
441 # URI and parameters (POST): "/jobdata/<string:producer_id>/<string:job_id>?action=action"
442 # response: 200 or 204
443 @app.route(JOB_DATA,
444      methods=['POST'])
445 def start_jobdata(producer_id, job_id):
446
447     action=request.args.get('action')
448
449     if (action is None):
450         return UNKNOWN_QUERY_PARAMETERS,400
451     else:
452         if (len(request.args) != 1):
453             return UNKNOWN_QUERY_PARAMETERS,400
454         else:
455             if ((action != "START") and (action != "STOP")):
456                 return UNKNOWN_QUERY_PARAMETERS,400
457
458     print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action)
459
460     job_dict=get_callback_dict(producer_id, job_id)
461     if (job_dict is None):
462         return JOB_NOT_FOUND,404
463
464     if (job_dict['json'] is None):
465         return JOB_DATA_NOT_FOUND, 400
466     else:
467         if (action == "START"):
468             job_dict['delivering']=True
469         else:
470             job_dict['delivering']=False
471         return "",200
472
473
474 # Counter for create calls for a job
475 # URI and parameters (GET): "/counter/create/<string:producer_id>/<string:job_id>"
476 # response: 200 and counter value
477 @app.route(COUNTER_CREATE,
478      methods=['GET'])
479 def counter_create(producer_id, job_id):
480     job_dict=get_callback_dict(producer_id, job_id)
481     if (job_dict is None):
482         return "-1",200
483     return str(job_dict['create_counter']),200
484
485 # Counter for delete calls for a job
486 # URI and parameters (GET): "/counter/delete/<string:producer_id>/<string:job_id>"
487 # response: 200 and counter value
488 @app.route(COUNTER_DELETE,
489      methods=['GET'])
490 def counter_delete(producer_id, job_id):
491     job_dict=get_callback_dict(producer_id, job_id)
492     if (job_dict is None):
493         return "-1",200
494     return str(job_dict['delete_counter']),200
495
496 # Counter for supervision calls for a producer
497 # URI and parameters (GET): "/counter/supervision/<string:producer_id>"
498 # response: 200 and counter value
499 @app.route(COUNTER_SUPERVISION,
500      methods=['GET'])
501 def counter_supervision(producer_id):
502     producer_dict=get_callback_dict(producer_id, None)
503     if (producer_dict is None):
504         return "-1",200
505     return str(producer_dict['supervision_counter']),200
506
507 # Get status info
508 # URI and parameters (GET): "/status"
509 # -
510 @app.route(STATUS,
511     methods=['GET'])
512 def status():
513     global db
514     return json.dumps(db),200
515
516
517 # Reset db
518 @app.route('/reset',
519     methods=['GET', 'POST', 'PUT'])
520 def reset():
521     global db
522     db={}
523     return "",200
524
525
526 def datadelivery() :
527     while True:
528         try:
529             job_dicts=get_all_jobs()
530             for key in job_dicts:
531                 job=job_dicts[key]
532                 if (job['delivering'] == True and job['json'] != None):
533                     url=job['json']['target_uri']
534
535                     data={}
536                     data["date"]=str(datetime.datetime.now())
537                     data["job"]=""+key
538                     data["sequence_no"]=""+str(job['delivery_attempts'])
539                     data["value"]=str(100)
540                     print("Sending "+json.dumps(data))
541
542                     requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
543                     job['delivery_attempts'] += 1
544         except Exception as err:
545             print("Error during data delivery: "+ str(err))
546         time.sleep(1)
547
548
549 ### Main function ###
550
551 print("Starting data delivery thread")
552 thread = threading.Thread(target=datadelivery, args=())
553 thread.daemon = True
554 thread.start()
555
556 if __name__ == "__main__":
557     app.run(port=HOST_PORT, host=HOST_IP)