Function test updates
[nonrtric.git] / test / prodstub / app / prodstub.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask
20 from flask import request
21
22 import requests
23
24 import json
25 from jsonschema import validate
26
27 import threading
28 import time
29 import datetime
30 import logging
31
32 # Disable all logging of GET on reading counters and status
33 class AjaxFilter(logging.Filter):
34     def filter(self, record):
35         return ("/counter/" not in record.getMessage()) and ("/status" not in record.getMessage())
36
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
39
40 app = Flask(__name__)
41
42 # # list of callback messages
43 # msg_callbacks={}
44
45 # Server info
46 HOST_IP = "::"
47 HOST_PORT = 2222
48
49 # Request and response constants
50 CALLBACK_CREATE_URL="/callbacks/job/<string:producer_id>"
51 CALLBACK_DELETE_URL="/callbacks/job/<string:producer_id>/<string:job_id>"
52 CALLBACK_SUPERVISION_URL="/callbacks/supervision/<string:producer_id>"
53
54 ARM_CREATE_RESPONSE="/arm/create/<string:producer_id>/<string:job_id>"
55 ARM_DELETE_RESPONSE="/arm/delete/<string:producer_id>/<string:job_id>"
56 ARM_SUPERVISION_RESPONSE="/arm/supervision/<string:producer_id>"
57 ARM_TYPE="/arm/type/<string:producer_id>/<string:type_id>"
58 COUNTER_SUPERVISION="/counter/supervision/<string:producer_id>"
59 COUNTER_CREATE="/counter/create/<string:producer_id>/<string:job_id>"
60 COUNTER_DELETE="/counter/delete/<string:producer_id>/<string:job_id>"
61
62 JOB_DATA="/jobdata/<string:producer_id>/<string:job_id>"
63
64 STATUS="/status"
65
66 #Constsants
67 APPL_JSON='application/json'
68 UNKNOWN_QUERY_PARAMETERS="Unknown query parameter(s)"
69 RETURNING_CONFIGURED_RESP="returning configured response code"
70 JOBID_NO_MATCH="job id in stored json does not match request"
71 PRODUCER_OR_JOB_NOT_FOUND="producer or job not found"
72 PRODUCER_NOT_FOUND="producer not found"
73 TYPE_NOT_FOUND="type not found"
74 TYPE_IN_USE="type is in use in a job"
75 JOB_NOT_FOUND="job not found"
76 JOB_DATA_NOT_FOUND="job data not found"
77 JSON_CORRUPT="json in request is corrupt or missing"
78
79 #Producer and job db, including armed responses
80 db={}
81 # producer
82 #  armed response for supervision
83 #  armed types
84 #  supervision counter
85 #  job
86 #    job json
87 #    target_type
88 #    armed response for create
89 #    armed response for delete
90 #    create counter
91 #    delete counter
92 #    delivering status
93
94 # disable warning about unverified https requests
95 from requests.packages import urllib3
96
97 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
98
99 # Helper function to populate a callback dict with the basic structure
100 # if job_id is None then only the producer level is setup and the producer dict is returned
101 # if job_id is not None, the job level is setup and the job dict is returned (producer must exist)
102 def setup_callback_dict(producer_id, job_id):
103
104     producer_dict=None
105     if (producer_id in db.keys()):
106         producer_dict=db[producer_id]
107     else:
108         if (job_id is not None):
109             return None
110         producer_dict={}
111         db[producer_id]=producer_dict
112
113         producer_dict['supervision_response']=200
114         producer_dict['supervision_counter']=0
115         producer_dict['types']=[]
116
117     if (job_id is None):
118         return producer_dict
119
120     job_dict=None
121     if (job_id in producer_dict.keys()):
122         job_dict=producer_dict[job_id]
123     else:
124         job_dict={}
125         producer_dict[job_id]=job_dict
126         job_dict['create_response']=201
127         job_dict['delete_response']=404
128         job_dict['json']=None
129         job_dict['create_counter']=0
130         job_dict['delete_counter']=0
131         job_dict['delivering']="stopped"
132         job_dict['delivery_attempts']=0
133     return job_dict
134
135
136 # Helper function to get an entry from the callback db
137 # if job_id is None then only the producer dict is returned (or None if producer is not found)
138 # if job_id is not None, the job is returned (or None if producer/job is not found)
139 def get_callback_dict(producer_id, job_id):
140
141     producer_dict=None
142     if (producer_id in db.keys()):
143         producer_dict=db[producer_id]
144
145     if (producer_dict is None):
146         return None
147
148     if (job_id is None):
149         return producer_dict
150
151     job_dict=None
152     if (job_id in producer_dict.keys()):
153         job_dict=producer_dict[job_id]
154
155     return job_dict
156
157 # Helper function find if a key/valye exist in the dictionay tree
158 # True if found
159 def recursive_search(s_dict, s_key, s_id):
160     for pkey in s_dict:
161         if (pkey == s_key) and (s_dict[pkey] == s_id):
162             return True
163         if (isinstance(s_dict[pkey], dict)):
164             recursive_search(s_dict[pkey], s_key, s_id)
165
166     return False
167
168 # Helper function to find all job dicts
169 def get_all_jobs():
170     job_dicts={}
171     for producer_key in db:
172         producer_dict = db[producer_key]
173         for job_key in producer_dict:
174             job_dict = producer_dict[job_key]
175             if (isinstance(job_dict, dict)):
176                 job_dicts[job_key]=job_dict
177     return job_dicts
178
179 # I'm alive function
180 # response: always 200
181 @app.route('/',
182     methods=['GET'])
183 def index():
184     return 'OK', 200
185
186 # Arm the create callback with a response code
187 # Omitting the query parameter switch to response back to the standard 200/201 response
188 # URI and parameters (PUT): /arm/create/<producer_id>/<job_id>[?response=<resonsecode>]
189 # Setting
190 # response: 200 (400 if incorrect query params)
191 @app.route(ARM_CREATE_RESPONSE,
192      methods=['PUT'])
193 def arm_create(producer_id, job_id):
194
195     arm_response=request.args.get('response')
196
197     if (arm_response is None):
198         if (len(request.args) != 0):
199             return UNKNOWN_QUERY_PARAMETERS,400
200     else:
201         if (len(request.args) != 1):
202             return UNKNOWN_QUERY_PARAMETERS,400
203
204
205     print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
206
207     job_dict=setup_callback_dict(producer_id, job_id)
208
209     if (arm_response is None):    #Reset the response depending if a job exists or not
210         if (job_dict['json'] is None):
211             job_dict['create_response']=201
212         else:
213             job_dict['create_response']=200
214     else:
215         job_dict['create_response']=int(arm_response)
216
217     return "",200
218
219 # Arm the delete callback with a response code
220 # Omitting the query parameter switch to response back to the standard 204 response
221 # URI and parameters (PUT): /arm/delete/<producer_id>/<job-id>[?response=<resonsecode>]
222 # response: 200 (400 if incorrect query params)
223 @app.route(ARM_DELETE_RESPONSE,
224      methods=['PUT'])
225 def arm_delete(producer_id, job_id):
226
227     arm_response=request.args.get('response')
228
229     if (arm_response is None):
230         if (len(request.args) != 0):
231             return UNKNOWN_QUERY_PARAMETERS,400
232     else:
233         if (len(request.args) != 1):
234             return UNKNOWN_QUERY_PARAMETERS,400
235
236     print("Arm delete received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
237
238     arm_response=request.args.get('response')
239
240     job_dict=setup_callback_dict(producer_id, job_id)
241
242     if (arm_response is None): #Reset the response depening if a job exists or not
243         if (job_dict['json'] is None):
244             job_dict['delete_response']=404
245         else:
246             job_dict['delete_response']=204
247     else:
248         job_dict['delete_response']=int(arm_response)
249
250     return "",200
251
252 # Arm the supervision callback with a response code
253 # Omitting the query parameter switch to response back to the standard 200 response
254 # URI and parameters (PUT): /arm/supervision/<producer_id>[?response=<resonsecode>]
255 # response: 200 (400 if incorrect query params)
256 @app.route(ARM_SUPERVISION_RESPONSE,
257      methods=['PUT'])
258 def arm_supervision(producer_id):
259
260     arm_response=request.args.get('response')
261
262     if (arm_response is None):
263         if (len(request.args) != 0):
264             return UNKNOWN_QUERY_PARAMETERS,400
265     else:
266         if (len(request.args) != 1):
267             return UNKNOWN_QUERY_PARAMETERS,400
268
269     print("Arm supervision received for producer: "+str(producer_id)+" and response: "+str(arm_response))
270
271     producer_dict=setup_callback_dict(producer_id, None)
272     if (arm_response is None):
273         producer_dict['supervision_response']=200
274     else:
275         producer_dict['supervision_response']=int(arm_response)
276
277     return "",200
278
279 # Arm a producer with a type
280 # URI and parameters (PUT): /arm/type/<string:producer_id>/<string:type-id>
281 # response: 200 (404)
282 @app.route(ARM_TYPE,
283     methods=['PUT'])
284 def arm_type(producer_id, type_id):
285
286     print("Arm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
287
288     producer_dict=get_callback_dict(producer_id, None)
289
290     if (producer_dict is None):
291         return PRODUCER_NOT_FOUND,404
292
293     type_list=producer_dict['types']
294     if (type_id not in type_list):
295         type_list.append(type_id)
296
297     return "",200
298
299 # Disarm a producer with a type
300 # URI and parameters (DELETE): /arm/type/<string:producer_id>/<string:type-id>
301 # response: 200 (404)
302 @app.route(ARM_TYPE,
303     methods=['DELETE'])
304 def disarm_type(producer_id, type_id):
305
306     print("Disarm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
307
308     producer_dict=get_callback_dict(producer_id, None)
309
310     if (producer_dict is None):
311         return PRODUCER_NOT_FOUND,404
312
313     if (recursive_search(producer_dict, "ei_job_type",type_id) is True):
314         return "TYPE_IN_USE",400
315     elif (recursive_search(producer_dict, "ei_type_identity",type_id) is True):
316         return "TYPE_IN_USE",400
317     elif (recursive_search(producer_dict, "info_type_identity",type_id) is True):
318         return "TYPE_IN_USE",400
319
320     type_list=producer_dict['types']
321     type_list.remove(type_id)
322
323     return "",200
324
325
326 # Callback for create job
327 # URI and parameters (POST): /callbacks/job/<producer_id>
328 # response 201 at create, 200 at update or other configured response code
329 @app.route(CALLBACK_CREATE_URL,
330      methods=['POST'])
331 def callback_create(producer_id):
332
333     req_json_dict=None
334     try:
335         req_json_dict = json.loads(request.data)
336         with open('job-schema.json') as f:
337             schema = json.load(f)
338             validate(instance=req_json_dict, schema=schema)
339     except Exception:
340         return JSON_CORRUPT,400
341
342     producer_dict=get_callback_dict(producer_id, None)
343     if (producer_dict is None):
344         return PRODUCER_OR_JOB_NOT_FOUND,400
345     type_list=producer_dict['types']
346
347
348     if 'ei_type_identity' in req_json_dict.keys():
349         type_key_name='ei_type_identity'
350         job_key_name='ei_job_identity'
351     elif 'info_type_identity' in req_json_dict.keys():
352         type_key_name='info_type_identity'
353         job_key_name='info_job_identity'
354     else:
355         return TYPE_NOT_FOUND, 400
356
357     type_id=req_json_dict[type_key_name]
358     job_id=req_json_dict[job_key_name]
359
360     job_dict=get_callback_dict(producer_id, job_id)
361     if (job_dict is None):
362         return PRODUCER_OR_JOB_NOT_FOUND,400
363     return_code=0
364     return_msg=""
365     if (req_json_dict[job_key_name] == job_id):
366         print("Create callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
367         print(json.loads(request.data))
368         return_code=job_dict['create_response']
369         if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)):
370             job_dict['json']=req_json_dict
371             job_dict['delivering']="delivering"
372             if (job_dict['create_response'] == 201): #Set up next response code if create was ok
373                 job_dict['create_response'] = 200
374             if (job_dict['delete_response'] == 404):
375                 job_dict['delete_response'] = 204
376         else:
377             if(job_dict['delivering'] == "delivering"):
378                 job_dict['delivering']="hold"
379             return_msg=RETURNING_CONFIGURED_RESP
380
381         job_dict['create_counter']=job_dict['create_counter']+1
382     else:
383         return JOBID_NO_MATCH, 400
384
385     return return_msg, return_code
386
387 # Callback for delete job
388 # URI and parameters (DELETE): /callbacks/job/<producer_id>/<job_id>
389 # response: 204 at delete or other configured response code
390 @app.route(CALLBACK_DELETE_URL,
391      methods=['DELETE'])
392 def callback_delete(producer_id, job_id):
393
394     job_dict=get_callback_dict(producer_id, job_id)
395     if (job_dict is None):
396         return PRODUCER_OR_JOB_NOT_FOUND,400
397     return_code=0
398     return_msg=""
399     print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
400     return_code=job_dict['delete_response']
401     if (job_dict['delete_response'] == 204):
402         job_dict['json']=None
403         job_dict['delete_response']=404
404         job_dict['delivering']="stopped"
405         if (job_dict['create_response'] == 200):
406             job_dict['create_response'] = 201 # reset create response if delete was ok
407     else:
408         return_msg=RETURNING_CONFIGURED_RESP
409
410     job_dict['delete_counter']=job_dict['delete_counter']+1
411
412     return return_msg, return_code
413
414 # Callback for supervision of producer
415 # URI and parameters (GET): /callbacks/supervision/<producer_id>
416 # response: 200 or other configured response code
417 @app.route(CALLBACK_SUPERVISION_URL,
418      methods=['GET'])
419 def callback_supervision(producer_id):
420
421     print("Supervision callback received for producer: "+str(producer_id))
422
423     producer_dict=get_callback_dict(producer_id, None)
424     if (producer_dict is None):
425         return PRODUCER_NOT_FOUND,400
426     return_code=producer_dict['supervision_response']
427     return_msg=""
428     if (return_code != 200):
429         return_msg="returning configured response code"
430
431     producer_dict['supervision_counter']=producer_dict['supervision_counter']+1
432
433     return return_msg,producer_dict['supervision_response']
434
435 # Get the job definition for a job
436 # URI and parameters (GET): "/jobdata/<string:producer_id>/<string:job_id>"
437 # response: 200 or 204
438 @app.route(JOB_DATA,
439      methods=['GET'])
440 def get_jobdata(producer_id, job_id):
441
442     print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
443
444     job_dict=get_callback_dict(producer_id, job_id)
445
446     if (job_dict is None):
447         return PRODUCER_OR_JOB_NOT_FOUND,400
448
449     if (job_dict['json'] is None):
450         return "",204
451     else:
452         return json.dumps(job_dict['json']), 200
453
454 # Delete the job definition for a job
455 # URI and parameters (DELETE): "/jobdata/<string:producer_id>/<string:job_id>"
456 # response: 204
457 @app.route(JOB_DATA,
458      methods=['DELETE'])
459 def del_jobdata(producer_id, job_id):
460
461     print("Delete job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
462
463     job_dict=get_callback_dict(producer_id, job_id)
464
465     if (job_dict is None):
466         return PRODUCER_OR_JOB_NOT_FOUND,400
467
468     job_dict['json']=None
469
470     return "",204
471
472
473 # Start data delivery for a job, action : START or STOP
474 # URI and parameters (POST): "/jobdata/<string:producer_id>/<string:job_id>?action=action"
475 # response: 200 or 204
476 @app.route(JOB_DATA,
477      methods=['POST'])
478 def start_jobdata(producer_id, job_id):
479
480     action=request.args.get('action')
481
482     if (action is None):
483         return UNKNOWN_QUERY_PARAMETERS,400
484     else:
485         if (len(request.args) != 1):
486             return UNKNOWN_QUERY_PARAMETERS,400
487         else:
488             if ((action != "START") and (action != "STOP")):
489                 return UNKNOWN_QUERY_PARAMETERS,400
490
491     print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action)
492
493     job_dict=get_callback_dict(producer_id, job_id)
494     if (job_dict is None):
495         return JOB_NOT_FOUND,404
496
497     if (job_dict['json'] is None):
498         return JOB_DATA_NOT_FOUND, 400
499     else:
500         if (action == "START"):
501             job_dict['delivering']="delivering"
502         else:
503             job_dict['delivering']="stopped"
504         return "",200
505
506
507 # Counter for create calls for a job
508 # URI and parameters (GET): "/counter/create/<string:producer_id>/<string:job_id>"
509 # response: 200 and counter value
510 @app.route(COUNTER_CREATE,
511      methods=['GET'])
512 def counter_create(producer_id, job_id):
513     job_dict=get_callback_dict(producer_id, job_id)
514     if (job_dict is None):
515         return "-1",200
516     return str(job_dict['create_counter']),200
517
518 # Counter for delete calls for a job
519 # URI and parameters (GET): "/counter/delete/<string:producer_id>/<string:job_id>"
520 # response: 200 and counter value
521 @app.route(COUNTER_DELETE,
522      methods=['GET'])
523 def counter_delete(producer_id, job_id):
524     job_dict=get_callback_dict(producer_id, job_id)
525     if (job_dict is None):
526         return "-1",200
527     return str(job_dict['delete_counter']),200
528
529 # Counter for supervision calls for a producer
530 # URI and parameters (GET): "/counter/supervision/<string:producer_id>"
531 # response: 200 and counter value
532 @app.route(COUNTER_SUPERVISION,
533      methods=['GET'])
534 def counter_supervision(producer_id):
535     producer_dict=get_callback_dict(producer_id, None)
536     if (producer_dict is None):
537         return "-1",200
538     return str(producer_dict['supervision_counter']),200
539
540 # Get status info
541 # URI and parameters (GET): "/status"
542 # -
543 @app.route(STATUS,
544     methods=['GET'])
545 def status():
546     global db
547     return json.dumps(db),200
548
549
550 # Reset db
551 @app.route('/reset',
552     methods=['GET', 'POST', 'PUT'])
553 def reset():
554     global db
555     db={}
556     return "",200
557
558
559 def datadelivery() :
560     while True:
561         try:
562             job_dicts=get_all_jobs()
563             for key in job_dicts:
564                 job=job_dicts[key]
565                 if (job['delivering'] == "delivering" and job['json'] != None):
566                     url=job['json']['target_uri']
567                     if (str(url).find("localhost:") == -1):   #Dont deliver to localhost...
568                         data={}
569                         data["date"]=str(datetime.datetime.now())
570                         data["job"]=""+key
571                         data["sequence_no"]=""+str(job['delivery_attempts'])
572                         data["value"]=str(100)
573                         print("Sending to "+url+" payload:"+json.dumps(data))
574
575                         requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
576                         job['delivery_attempts'] += 1
577         except Exception as err:
578             print("Error during data delivery: "+ str(err))
579         time.sleep(1)
580
581
582 ### Main function ###
583
584 print("Starting data delivery thread")
585 thread = threading.Thread(target=datadelivery, args=())
586 thread.daemon = True
587 thread.start()
588
589 if __name__ == "__main__":
590     app.run(port=HOST_PORT, host=HOST_IP)