2 # ============LICENSE_START===============================================
3 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 # ========================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=================================================
19 from flask import Flask
20 from flask import request
25 from jsonschema import validate
33 # # list of callback messages
41 # cntr_msg_callbacks=0
44 # Request and response constants
45 CALLBACK_CREATE_URL="/callbacks/create/<string:producer_id>"
46 CALLBACK_DELETE_URL="/callbacks/delete/<string:producer_id>"
47 CALLBACK_SUPERVISION_URL="/callbacks/supervision/<string:producer_id>"
49 ARM_CREATE_RESPONSE="/arm/create/<string:producer_id>/<string:job_id>"
50 ARM_DELETE_RESPONSE="/arm/delete/<string:producer_id>/<string:job_id>"
51 ARM_SUPERVISION_RESPONSE="/arm/supervision/<string:producer_id>"
52 ARM_TYPE="/arm/type/<string:producer_id>/<string:type_id>"
53 COUNTER_SUPERVISION="/counter/supervision/<string:producer_id>"
54 COUNTER_CREATE="/counter/create/<string:producer_id>/<string:job_id>"
55 COUNTER_DELETE="/counter/delete/<string:producer_id>/<string:job_id>"
57 JOB_DATA="/jobdata/<string:producer_id>/<string:job_id>"
62 APPL_JSON='application/json'
63 UNKNOWN_QUERY_PARAMETERS="Unknown query parameter(s)"
64 RETURNING_CONFIGURED_RESP="returning configured response code"
65 JOBID_NO_MATCH="job id in stored json does not match request"
66 PRODUCER_OR_JOB_NOT_FOUND="producer or job not found"
67 PRODUCER_NOT_FOUND="producer not found"
68 TYPE_NOT_FOUND="type not found"
69 TYPE_IN_USE="type is in use in a job"
70 JOB_NOT_FOUND="job not found"
71 JOB_DATA_NOT_FOUND="job data not found"
72 JSON_CORRUPT="json in request is corrupt or missing"
74 #Producer and job db, including armed responses
77 # armed response for supervision
83 # armed response for create
84 # armed response for delete
88 # Helper function to populate a callback dict with the basic structure
89 # if job_id is None then only the producer level is setup and the producer dict is returned
90 # if job_id is not None, the job level is setup and the job dict is returned (producer must exist)
91 def setup_callback_dict(producer_id, job_id):
94 if (producer_id in db.keys()):
95 producer_dict=db[producer_id]
97 if (job_id is not None):
100 db[producer_id]=producer_dict
102 producer_dict['supervision_response']=200
103 producer_dict['supervision_counter']=0
104 producer_dict['types']=[]
110 if (job_id in producer_dict.keys()):
111 job_dict=producer_dict[job_id]
114 producer_dict[job_id]=job_dict
115 job_dict['create_response']=201
116 job_dict['delete_response']=404
117 job_dict['json']=None
118 job_dict['create_counter']=0
119 job_dict['delete_counter']=0
120 job_dict['delivering']=False
121 job_dict['delivery_attempts']=0
125 # Helper function to get an entry from the callback db
126 # if job_id is None then only the producer dict is returned (or None if producer is not found)
127 # if job_id is not None, the job is returned (or None if producer/job is not found)
128 def get_callback_dict(producer_id, job_id):
131 if (producer_id in db.keys()):
132 producer_dict=db[producer_id]
134 if (producer_dict is None):
141 if (job_id in producer_dict.keys()):
142 job_dict=producer_dict[job_id]
146 # Helper function find if a key/valye exist in the dictionay tree
148 def recursive_search(s_dict, s_key, s_id):
150 if (pkey == s_key) and (s_dict[pkey] == s_id):
152 if (isinstance(s_dict[pkey], dict)):
153 recursive_search(s_dict[pkey], s_key, s_id)
157 # Helper function to find all job dicts
160 for producer_key in db:
161 producer_dict = db[producer_key]
162 for job_key in producer_dict:
163 job_dict = producer_dict[job_key]
164 if (isinstance(job_dict, dict)):
165 job_dicts[job_key]=job_dict
169 # response: always 200
175 # Arm the create callback with a response code
176 # Omitting the query parameter switch to response back to the standard 200/201 response
177 # URI and parameters (PUT): /arm/create/<producer_id>/<job_id>[?response=<resonsecode>]
179 # response: 200 (400 if incorrect query params)
180 @app.route(ARM_CREATE_RESPONSE,
182 def arm_create(producer_id, job_id):
184 arm_response=request.args.get('response')
186 if (arm_response is None):
187 if (len(request.args) != 0):
188 return UNKNOWN_QUERY_PARAMETERS,400
190 if (len(request.args) != 1):
191 return UNKNOWN_QUERY_PARAMETERS,400
194 print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
196 job_dict=setup_callback_dict(producer_id, job_id)
198 if (arm_response is None): #Reset the response depending if a job exists or not
199 if (job_dict['json'] is None):
200 job_dict['create_response']=201
202 job_dict['create_response']=200
204 job_dict['create_response']=int(arm_response)
208 # Arm the delete callback with a response code
209 # Omitting the query parameter switch to response back to the standard 204 response
210 # URI and parameters (PUT): /arm/delete/<producer_id>/<job-id>[?response=<resonsecode>]
211 # response: 200 (400 if incorrect query params)
212 @app.route(ARM_DELETE_RESPONSE,
214 def arm_delete(producer_id, job_id):
216 arm_response=request.args.get('response')
218 if (arm_response is None):
219 if (len(request.args) != 0):
220 return UNKNOWN_QUERY_PARAMETERS,400
222 if (len(request.args) != 1):
223 return UNKNOWN_QUERY_PARAMETERS,400
225 print("Arm delete received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
227 arm_response=request.args.get('response')
229 job_dict=setup_callback_dict(producer_id, job_id)
231 if (arm_response is None): #Reset the response depening if a job exists or not
232 if (job_dict['json'] is None):
233 job_dict['delete_response']=404
235 job_dict['delete_response']=204
237 job_dict['delete_response']=int(arm_response)
241 # Arm the supervision callback with a response code
242 # Omitting the query parameter switch to response back to the standard 200 response
243 # URI and parameters (PUT): /arm/supervision/<producer_id>[?response=<resonsecode>]
244 # response: 200 (400 if incorrect query params)
245 @app.route(ARM_SUPERVISION_RESPONSE,
247 def arm_supervision(producer_id):
249 arm_response=request.args.get('response')
251 if (arm_response is None):
252 if (len(request.args) != 0):
253 return UNKNOWN_QUERY_PARAMETERS,400
255 if (len(request.args) != 1):
256 return UNKNOWN_QUERY_PARAMETERS,400
258 print("Arm supervision received for producer: "+str(producer_id)+" and response: "+str(arm_response))
260 producer_dict=setup_callback_dict(producer_id, None)
261 if (arm_response is None):
262 producer_dict['supervision_response']=200
264 producer_dict['supervision_response']=int(arm_response)
268 # Arm a producer with a type
269 # URI and parameters (PUT): /arm/type/<string:producer_id>/<string:type-id>
270 # response: 200 (404)
273 def arm_type(producer_id, type_id):
275 print("Arm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
277 producer_dict=get_callback_dict(producer_id, None)
279 if (producer_dict is None):
280 return PRODUCER_NOT_FOUND,404
282 type_list=producer_dict['types']
283 if (type_id not in type_list):
284 type_list.append(type_id)
288 # Disarm a producer with a type
289 # URI and parameters (DELETE): /arm/type/<string:producer_id>/<string:type-id>
290 # response: 200 (404)
293 def disarm_type(producer_id, type_id):
295 print("Disarm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
297 producer_dict=get_callback_dict(producer_id, None)
299 if (producer_dict is None):
300 return PRODUCER_NOT_FOUND,404
302 if (recursive_search(producer_dict, "ei_job_type",type_id) is True):
303 return "TYPE_IN_USE",400
305 type_list=producer_dict['types']
306 type_list.remove(type_id)
311 # Callback for create job
312 # URI and parameters (POST): /callbacks/create/<producer_id>
313 # response 201 at create, 200 at update or other configured response code
314 @app.route(CALLBACK_CREATE_URL,
316 def callback_create(producer_id):
320 req_json_dict = json.loads(request.data)
321 with open('job-schema.json') as f:
322 schema = json.load(f)
323 validate(instance=req_json_dict, schema=schema)
325 return JSON_CORRUPT,400
327 producer_dict=get_callback_dict(producer_id, None)
328 if (producer_dict is None):
329 return PRODUCER_OR_JOB_NOT_FOUND,400
330 type_list=producer_dict['types']
331 type_id=req_json_dict['ei_type_identity']
332 if (type_id not in type_list):
333 return TYPE_NOT_FOUND, 400
335 job_id=req_json_dict['ei_job_identity']
336 job_dict=get_callback_dict(producer_id, job_id)
337 if (job_dict is None):
338 return PRODUCER_OR_JOB_NOT_FOUND,400
341 if (req_json_dict['ei_job_identity'] == job_id):
342 print("Create callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
343 return_code=job_dict['create_response']
344 if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)):
345 job_dict['json']=req_json_dict
346 job_dict['delivering']=True
347 if (job_dict['create_response'] == 201): #Set up next response code if create was ok
348 job_dict['create_response'] = 200
349 if (job_dict['delete_response'] == 404):
350 job_dict['delete_response'] = 204
352 return_msg=RETURNING_CONFIGURED_RESP
354 job_dict['create_counter']=job_dict['create_counter']+1
356 return JOBID_NO_MATCH, 400
358 return return_msg, return_code
360 # Callback for delete job
361 # URI and parameters (POST): /callbacks/delete/<producer_id>
362 # response: 204 at delete or other configured response code
363 @app.route(CALLBACK_DELETE_URL,
365 def callback_delete(producer_id):
369 req_json_dict = json.loads(request.data)
370 with open('job-schema.json') as f:
371 schema = json.load(f)
372 validate(instance=req_json_dict, schema=schema)
374 return JSON_CORRUPT,400
376 job_id=req_json_dict['ei_job_identity']
377 job_dict=get_callback_dict(producer_id, job_id)
378 if (job_dict is None):
379 return PRODUCER_OR_JOB_NOT_FOUND,400
382 if (req_json_dict['ei_job_identity'] == job_id):
383 print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
384 return_code=job_dict['delete_response']
385 if (job_dict['delete_response'] == 204):
386 job_dict['json']=None
387 job_dict['delete_response']=404
388 job_dict['delivering']=False
389 if (job_dict['create_response'] == 200):
390 job_dict['create_response'] = 201 # reset create response if delete was ok
392 return_msg=RETURNING_CONFIGURED_RESP
394 job_dict['delete_counter']=job_dict['delete_counter']+1
396 return JOBID_NO_MATCH, 400
398 return return_msg, return_code
400 # Callback for supervision of producer
401 # URI and parameters (GET): /callbacks/supervision/<producer_id>
402 # response: 200 or other configured response code
403 @app.route(CALLBACK_SUPERVISION_URL,
405 def callback_supervision(producer_id):
407 print("Supervision callback received for producer: "+str(producer_id))
409 producer_dict=get_callback_dict(producer_id, None)
410 if (producer_dict is None):
411 return PRODUCER_NOT_FOUND,400
412 return_code=producer_dict['supervision_response']
414 if (return_code != 200):
415 return_msg="returning configured response code"
417 producer_dict['supervision_counter']=producer_dict['supervision_counter']+1
419 return return_msg,producer_dict['supervision_response']
421 # Get the job definition for a job
422 # URI and parameters (GET): "/jobdata/<string:producer_id>/<string:job_id>"
423 # response: 200 or 204
426 def get_jobdata(producer_id, job_id):
428 print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
430 job_dict=get_callback_dict(producer_id, job_id)
432 if (job_dict is None):
433 return PRODUCER_OR_JOB_NOT_FOUND,400
435 if (job_dict['json'] is None):
438 return json.dumps(job_dict['json']), 200
440 # Start data delivery for a job, action : START or STOP
441 # URI and parameters (POST): "/jobdata/<string:producer_id>/<string:job_id>?action=action"
442 # response: 200 or 204
445 def start_jobdata(producer_id, job_id):
447 action=request.args.get('action')
450 return UNKNOWN_QUERY_PARAMETERS,400
452 if (len(request.args) != 1):
453 return UNKNOWN_QUERY_PARAMETERS,400
455 if ((action != "START") and (action != "STOP")):
456 return UNKNOWN_QUERY_PARAMETERS,400
458 print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action)
460 job_dict=get_callback_dict(producer_id, job_id)
461 if (job_dict is None):
462 return JOB_NOT_FOUND,404
464 if (job_dict['json'] is None):
465 return JOB_DATA_NOT_FOUND, 400
467 if (action == "START"):
468 job_dict['delivering']=True
470 job_dict['delivering']=False
474 # Counter for create calls for a job
475 # URI and parameters (GET): "/counter/create/<string:producer_id>/<string:job_id>"
476 # response: 200 and counter value
477 @app.route(COUNTER_CREATE,
479 def counter_create(producer_id, job_id):
480 job_dict=get_callback_dict(producer_id, job_id)
481 if (job_dict is None):
483 return str(job_dict['create_counter']),200
485 # Counter for delete calls for a job
486 # URI and parameters (GET): "/counter/delete/<string:producer_id>/<string:job_id>"
487 # response: 200 and counter value
488 @app.route(COUNTER_DELETE,
490 def counter_delete(producer_id, job_id):
491 job_dict=get_callback_dict(producer_id, job_id)
492 if (job_dict is None):
494 return str(job_dict['delete_counter']),200
496 # Counter for supervision calls for a producer
497 # URI and parameters (GET): "/counter/supervision/<string:producer_id>"
498 # response: 200 and counter value
499 @app.route(COUNTER_SUPERVISION,
501 def counter_supervision(producer_id):
502 producer_dict=get_callback_dict(producer_id, None)
503 if (producer_dict is None):
505 return str(producer_dict['supervision_counter']),200
508 # URI and parameters (GET): "/status"
514 return json.dumps(db),200
519 methods=['GET', 'POST', 'PUT'])
529 job_dicts=get_all_jobs()
530 for key in job_dicts:
532 if (job['delivering'] == True and job['json'] != None):
533 url=job['json']['target_uri']
536 data["date"]=str(datetime.datetime.now())
538 data["sequence_no"]=""+str(job['delivery_attempts'])
539 data["value"]=str(100)
540 print("Sending "+json.dumps(data))
542 requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
543 job['delivery_attempts'] += 1
544 except Exception as err:
545 print("Error during data delivery: "+ str(err))
549 ### Main function ###
551 print("Starting data delivery thread")
552 thread = threading.Thread(target=datadelivery, args=())
556 if __name__ == "__main__":
557 app.run(port=HOST_PORT, host=HOST_IP)