Update Release Notes for E Maintenance Release
[nonrtric.git] / test / prodstub / app / prodstub.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask
20 from flask import request
21
22 import requests
23
24 import json
25 from jsonschema import validate
26
27 import threading
28 import time
29 import datetime
30 import logging
31
32 # Disable all logging of GET on reading counters and status
33 class AjaxFilter(logging.Filter):
34     def filter(self, record):
35         return ("/counter/" not in record.getMessage()) and ("/status" not in record.getMessage())
36
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
39
40 app = Flask(__name__)
41
42 # # list of callback messages
43 # msg_callbacks={}
44
45 # Server info
46 HOST_IP = "::"
47 HOST_PORT = 2222
48
49 # Request and response constants
50 CALLBACK_CREATE_URL="/callbacks/job/<string:producer_id>"
51 CALLBACK_DELETE_URL="/callbacks/job/<string:producer_id>/<string:job_id>"
52 CALLBACK_SUPERVISION_URL="/callbacks/supervision/<string:producer_id>"
53
54 ARM_CREATE_RESPONSE="/arm/create/<string:producer_id>/<string:job_id>"
55 ARM_DELETE_RESPONSE="/arm/delete/<string:producer_id>/<string:job_id>"
56 ARM_SUPERVISION_RESPONSE="/arm/supervision/<string:producer_id>"
57 ARM_TYPE="/arm/type/<string:producer_id>/<string:type_id>"
58 COUNTER_SUPERVISION="/counter/supervision/<string:producer_id>"
59 COUNTER_CREATE="/counter/create/<string:producer_id>/<string:job_id>"
60 COUNTER_DELETE="/counter/delete/<string:producer_id>/<string:job_id>"
61
62 JOB_DATA="/jobdata/<string:producer_id>/<string:job_id>"
63
64 STATUS="/status"
65
66 #Constsants
67 APPL_JSON='application/json'
68 UNKNOWN_QUERY_PARAMETERS="Unknown query parameter(s)"
69 RETURNING_CONFIGURED_RESP="returning configured response code"
70 JOBID_NO_MATCH="job id in stored json does not match request"
71 PRODUCER_OR_JOB_NOT_FOUND="producer or job not found"
72 PRODUCER_NOT_FOUND="producer not found"
73 TYPE_NOT_FOUND="type not found"
74 TYPE_IN_USE="type is in use in a job"
75 JOB_NOT_FOUND="job not found"
76 JOB_DATA_NOT_FOUND="job data not found"
77 JSON_CORRUPT="json in request is corrupt or missing"
78
79 #Producer and job db, including armed responses
80 db={}
81 # producer
82 #  armed response for supervision
83 #  armed types
84 #  supervision counter
85 #  job
86 #    job json
87 #    target_type
88 #    armed response for create
89 #    armed response for delete
90 #    create counter
91 #    delete counter
92 #    delivering status
93
94 # disable warning about unverified https requests
95 from requests.packages import urllib3
96
97 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
98
99 # Helper function to populate a callback dict with the basic structure
100 # if job_id is None then only the producer level is setup and the producer dict is returned
101 # if job_id is not None, the job level is setup and the job dict is returned (producer must exist)
102 def setup_callback_dict(producer_id, job_id):
103
104     producer_dict=None
105     if (producer_id in db.keys()):
106         producer_dict=db[producer_id]
107     else:
108         if (job_id is not None):
109             return None
110         producer_dict={}
111         db[producer_id]=producer_dict
112
113         producer_dict['supervision_response']=200
114         producer_dict['supervision_counter']=0
115         producer_dict['types']=[]
116
117     if (job_id is None):
118         return producer_dict
119
120     job_dict=None
121     if (job_id in producer_dict.keys()):
122         job_dict=producer_dict[job_id]
123     else:
124         job_dict={}
125         producer_dict[job_id]=job_dict
126         job_dict['create_response']=201
127         job_dict['delete_response']=404
128         job_dict['json']=None
129         job_dict['create_counter']=0
130         job_dict['delete_counter']=0
131         job_dict['delivering']="stopped"
132         job_dict['delivery_attempts']=0
133     return job_dict
134
135
136 # Helper function to get an entry from the callback db
137 # if job_id is None then only the producer dict is returned (or None if producer is not found)
138 # if job_id is not None, the job is returned (or None if producer/job is not found)
139 def get_callback_dict(producer_id, job_id):
140
141     producer_dict=None
142     if (producer_id in db.keys()):
143         producer_dict=db[producer_id]
144
145     if (producer_dict is None):
146         return None
147
148     if (job_id is None):
149         return producer_dict
150
151     job_dict=None
152     if (job_id in producer_dict.keys()):
153         job_dict=producer_dict[job_id]
154
155     return job_dict
156
157 # Helper function find if a key/valye exist in the dictionay tree
158 # True if found
159 def recursive_search(s_dict, s_key, s_id):
160     for pkey in s_dict:
161         if (pkey == s_key) and (s_dict[pkey] == s_id):
162             return True
163         if (isinstance(s_dict[pkey], dict)):
164             recursive_search(s_dict[pkey], s_key, s_id)
165
166     return False
167
168 # Helper function to find all job dicts
169 def get_all_jobs():
170     job_dicts={}
171     for producer_key in db:
172         producer_dict = db[producer_key]
173         for job_key in producer_dict:
174             job_dict = producer_dict[job_key]
175             if (isinstance(job_dict, dict)):
176                 job_dicts[job_key]=job_dict
177     return job_dicts
178
179 # I'm alive function
180 # response: always 200
181 @app.route('/',
182     methods=['GET'])
183 def index():
184     return 'OK', 200
185
186 # Arm the create callback with a response code
187 # Omitting the query parameter switch to response back to the standard 200/201 response
188 # URI and parameters (PUT): /arm/create/<producer_id>/<job_id>[?response=<resonsecode>]
189 # Setting
190 # response: 200 (400 if incorrect query params)
191 @app.route(ARM_CREATE_RESPONSE,
192      methods=['PUT'])
193 def arm_create(producer_id, job_id):
194
195     arm_response=request.args.get('response')
196
197     if (arm_response is None):
198         if (len(request.args) != 0):
199             return UNKNOWN_QUERY_PARAMETERS,400
200     else:
201         if (len(request.args) != 1):
202             return UNKNOWN_QUERY_PARAMETERS,400
203
204
205     print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
206
207     job_dict=setup_callback_dict(producer_id, job_id)
208
209     if (arm_response is None):    #Reset the response depending if a job exists or not
210         if (job_dict['json'] is None):
211             job_dict['create_response']=201
212         else:
213             job_dict['create_response']=200
214     else:
215         job_dict['create_response']=int(arm_response)
216
217     return "",200
218
219 # Arm the delete callback with a response code
220 # Omitting the query parameter switch to response back to the standard 204 response
221 # URI and parameters (PUT): /arm/delete/<producer_id>/<job-id>[?response=<resonsecode>]
222 # response: 200 (400 if incorrect query params)
223 @app.route(ARM_DELETE_RESPONSE,
224      methods=['PUT'])
225 def arm_delete(producer_id, job_id):
226
227     arm_response=request.args.get('response')
228
229     if (arm_response is None):
230         if (len(request.args) != 0):
231             return UNKNOWN_QUERY_PARAMETERS,400
232     else:
233         if (len(request.args) != 1):
234             return UNKNOWN_QUERY_PARAMETERS,400
235
236     print("Arm delete received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
237
238     arm_response=request.args.get('response')
239
240     job_dict=setup_callback_dict(producer_id, job_id)
241
242     if (arm_response is None): #Reset the response depening if a job exists or not
243         if (job_dict['json'] is None):
244             job_dict['delete_response']=404
245         else:
246             job_dict['delete_response']=204
247     else:
248         job_dict['delete_response']=int(arm_response)
249
250     return "",200
251
252 # Arm the supervision callback with a response code
253 # Omitting the query parameter switch to response back to the standard 200 response
254 # URI and parameters (PUT): /arm/supervision/<producer_id>[?response=<resonsecode>]
255 # response: 200 (400 if incorrect query params)
256 @app.route(ARM_SUPERVISION_RESPONSE,
257      methods=['PUT'])
258 def arm_supervision(producer_id):
259
260     arm_response=request.args.get('response')
261
262     if (arm_response is None):
263         if (len(request.args) != 0):
264             return UNKNOWN_QUERY_PARAMETERS,400
265     else:
266         if (len(request.args) != 1):
267             return UNKNOWN_QUERY_PARAMETERS,400
268
269     print("Arm supervision received for producer: "+str(producer_id)+" and response: "+str(arm_response))
270
271     producer_dict=setup_callback_dict(producer_id, None)
272     if (arm_response is None):
273         producer_dict['supervision_response']=200
274     else:
275         producer_dict['supervision_response']=int(arm_response)
276
277     return "",200
278
279 # Arm a producer with a type
280 # URI and parameters (PUT): /arm/type/<string:producer_id>/<string:type-id>
281 # response: 200 (404)
282 @app.route(ARM_TYPE,
283     methods=['PUT'])
284 def arm_type(producer_id, type_id):
285
286     print("Arm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
287
288     producer_dict=get_callback_dict(producer_id, None)
289
290     if (producer_dict is None):
291         return PRODUCER_NOT_FOUND,404
292
293     type_list=producer_dict['types']
294     if (type_id not in type_list):
295         type_list.append(type_id)
296
297     return "",200
298
299 # Disarm a producer with a type
300 # URI and parameters (DELETE): /arm/type/<string:producer_id>/<string:type-id>
301 # response: 200 (404)
302 @app.route(ARM_TYPE,
303     methods=['DELETE'])
304 def disarm_type(producer_id, type_id):
305
306     print("Disarm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
307
308     producer_dict=get_callback_dict(producer_id, None)
309
310     if (producer_dict is None):
311         return PRODUCER_NOT_FOUND,404
312
313     if (recursive_search(producer_dict, "ei_job_type",type_id) is True):
314         return "TYPE_IN_USE",400
315     elif (recursive_search(producer_dict, "ei_type_identity",type_id) is True):
316         return "TYPE_IN_USE",400
317     elif (recursive_search(producer_dict, "info_type_identity",type_id) is True):
318         return "TYPE_IN_USE",400
319
320     type_list=producer_dict['types']
321     type_list.remove(type_id)
322
323     return "",200
324
325
326 # Callback for create job
327 # URI and parameters (POST): /callbacks/job/<producer_id>
328 # response 201 at create, 200 at update or other configured response code
329 @app.route(CALLBACK_CREATE_URL,
330      methods=['POST'])
331 def callback_create(producer_id):
332
333     req_json_dict=None
334     try:
335         req_json_dict = json.loads(request.data)
336         with open('job-schema.json') as f:
337             schema = json.load(f)
338             validate(instance=req_json_dict, schema=schema)
339     except Exception:
340         return JSON_CORRUPT,400
341
342     producer_dict=get_callback_dict(producer_id, None)
343     if (producer_dict is None):
344         return PRODUCER_OR_JOB_NOT_FOUND,400
345     type_list=producer_dict['types']
346
347
348     if 'ei_type_identity' in req_json_dict.keys():
349         type_key_name='ei_type_identity'
350         job_key_name='ei_job_identity'
351     elif 'info_type_identity' in req_json_dict.keys():
352         type_key_name='info_type_identity'
353         job_key_name='info_job_identity'
354     else:
355         return TYPE_NOT_FOUND, 400
356
357     type_id=req_json_dict[type_key_name]
358     job_id=req_json_dict[job_key_name]
359
360     job_dict=get_callback_dict(producer_id, job_id)
361     if (job_dict is None):
362         return PRODUCER_OR_JOB_NOT_FOUND,400
363     return_code=0
364     return_msg=""
365     if (req_json_dict[job_key_name] == job_id):
366         print("Create callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
367         return_code=job_dict['create_response']
368         if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)):
369             job_dict['json']=req_json_dict
370             job_dict['delivering']="delivering"
371             if (job_dict['create_response'] == 201): #Set up next response code if create was ok
372                 job_dict['create_response'] = 200
373             if (job_dict['delete_response'] == 404):
374                 job_dict['delete_response'] = 204
375         else:
376             if(job_dict['delivering'] == "delivering"):
377                 job_dict['delivering']="hold"
378             return_msg=RETURNING_CONFIGURED_RESP
379
380         job_dict['create_counter']=job_dict['create_counter']+1
381     else:
382         return JOBID_NO_MATCH, 400
383
384     return return_msg, return_code
385
386 # Callback for delete job
387 # URI and parameters (DELETE): /callbacks/job/<producer_id>/<job_id>
388 # response: 204 at delete or other configured response code
389 @app.route(CALLBACK_DELETE_URL,
390      methods=['DELETE'])
391 def callback_delete(producer_id, job_id):
392
393     job_dict=get_callback_dict(producer_id, job_id)
394     if (job_dict is None):
395         return PRODUCER_OR_JOB_NOT_FOUND,400
396     return_code=0
397     return_msg=""
398     print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
399     return_code=job_dict['delete_response']
400     if (job_dict['delete_response'] == 204):
401         job_dict['json']=None
402         job_dict['delete_response']=404
403         job_dict['delivering']="stopped"
404         if (job_dict['create_response'] == 200):
405             job_dict['create_response'] = 201 # reset create response if delete was ok
406     else:
407         return_msg=RETURNING_CONFIGURED_RESP
408
409     job_dict['delete_counter']=job_dict['delete_counter']+1
410
411     return return_msg, return_code
412
413 # Callback for supervision of producer
414 # URI and parameters (GET): /callbacks/supervision/<producer_id>
415 # response: 200 or other configured response code
416 @app.route(CALLBACK_SUPERVISION_URL,
417      methods=['GET'])
418 def callback_supervision(producer_id):
419
420     print("Supervision callback received for producer: "+str(producer_id))
421
422     producer_dict=get_callback_dict(producer_id, None)
423     if (producer_dict is None):
424         return PRODUCER_NOT_FOUND,400
425     return_code=producer_dict['supervision_response']
426     return_msg=""
427     if (return_code != 200):
428         return_msg="returning configured response code"
429
430     producer_dict['supervision_counter']=producer_dict['supervision_counter']+1
431
432     return return_msg,producer_dict['supervision_response']
433
434 # Get the job definition for a job
435 # URI and parameters (GET): "/jobdata/<string:producer_id>/<string:job_id>"
436 # response: 200 or 204
437 @app.route(JOB_DATA,
438      methods=['GET'])
439 def get_jobdata(producer_id, job_id):
440
441     print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
442
443     job_dict=get_callback_dict(producer_id, job_id)
444
445     if (job_dict is None):
446         return PRODUCER_OR_JOB_NOT_FOUND,400
447
448     if (job_dict['json'] is None):
449         return "",204
450     else:
451         return json.dumps(job_dict['json']), 200
452
453 # Delete the job definition for a job
454 # URI and parameters (DELETE): "/jobdata/<string:producer_id>/<string:job_id>"
455 # response: 204
456 @app.route(JOB_DATA,
457      methods=['DELETE'])
458 def del_jobdata(producer_id, job_id):
459
460     print("Delete job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
461
462     job_dict=get_callback_dict(producer_id, job_id)
463
464     if (job_dict is None):
465         return PRODUCER_OR_JOB_NOT_FOUND,400
466
467     job_dict['json']=None
468
469     return "",204
470
471
472 # Start data delivery for a job, action : START or STOP
473 # URI and parameters (POST): "/jobdata/<string:producer_id>/<string:job_id>?action=action"
474 # response: 200 or 204
475 @app.route(JOB_DATA,
476      methods=['POST'])
477 def start_jobdata(producer_id, job_id):
478
479     action=request.args.get('action')
480
481     if (action is None):
482         return UNKNOWN_QUERY_PARAMETERS,400
483     else:
484         if (len(request.args) != 1):
485             return UNKNOWN_QUERY_PARAMETERS,400
486         else:
487             if ((action != "START") and (action != "STOP")):
488                 return UNKNOWN_QUERY_PARAMETERS,400
489
490     print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action)
491
492     job_dict=get_callback_dict(producer_id, job_id)
493     if (job_dict is None):
494         return JOB_NOT_FOUND,404
495
496     if (job_dict['json'] is None):
497         return JOB_DATA_NOT_FOUND, 400
498     else:
499         if (action == "START"):
500             job_dict['delivering']="delivering"
501         else:
502             job_dict['delivering']="stopped"
503         return "",200
504
505
506 # Counter for create calls for a job
507 # URI and parameters (GET): "/counter/create/<string:producer_id>/<string:job_id>"
508 # response: 200 and counter value
509 @app.route(COUNTER_CREATE,
510      methods=['GET'])
511 def counter_create(producer_id, job_id):
512     job_dict=get_callback_dict(producer_id, job_id)
513     if (job_dict is None):
514         return "-1",200
515     return str(job_dict['create_counter']),200
516
517 # Counter for delete calls for a job
518 # URI and parameters (GET): "/counter/delete/<string:producer_id>/<string:job_id>"
519 # response: 200 and counter value
520 @app.route(COUNTER_DELETE,
521      methods=['GET'])
522 def counter_delete(producer_id, job_id):
523     job_dict=get_callback_dict(producer_id, job_id)
524     if (job_dict is None):
525         return "-1",200
526     return str(job_dict['delete_counter']),200
527
528 # Counter for supervision calls for a producer
529 # URI and parameters (GET): "/counter/supervision/<string:producer_id>"
530 # response: 200 and counter value
531 @app.route(COUNTER_SUPERVISION,
532      methods=['GET'])
533 def counter_supervision(producer_id):
534     producer_dict=get_callback_dict(producer_id, None)
535     if (producer_dict is None):
536         return "-1",200
537     return str(producer_dict['supervision_counter']),200
538
539 # Get status info
540 # URI and parameters (GET): "/status"
541 # -
542 @app.route(STATUS,
543     methods=['GET'])
544 def status():
545     global db
546     return json.dumps(db),200
547
548
549 # Reset db
550 @app.route('/reset',
551     methods=['GET', 'POST', 'PUT'])
552 def reset():
553     global db
554     db={}
555     return "",200
556
557
558 def datadelivery() :
559     while True:
560         try:
561             job_dicts=get_all_jobs()
562             for key in job_dicts:
563                 job=job_dicts[key]
564                 if (job['delivering'] == "delivering" and job['json'] != None):
565                     url=job['json']['target_uri']
566                     if (str(url).find("localhost:") == -1):   #Dont deliver to localhost...
567                         data={}
568                         data["date"]=str(datetime.datetime.now())
569                         data["job"]=""+key
570                         data["sequence_no"]=""+str(job['delivery_attempts'])
571                         data["value"]=str(100)
572                         print("Sending to "+url+" payload:"+json.dumps(data))
573
574                         requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
575                         job['delivery_attempts'] += 1
576         except Exception as err:
577             print("Error during data delivery: "+ str(err))
578         time.sleep(1)
579
580
581 ### Main function ###
582
583 print("Starting data delivery thread")
584 thread = threading.Thread(target=datadelivery, args=())
585 thread.daemon = True
586 thread.start()
587
588 if __name__ == "__main__":
589     app.run(port=HOST_PORT, host=HOST_IP)