2 # ============LICENSE_START===============================================
3 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 # ========================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=================================================
19 from flask import Flask
20 from flask import request
25 from jsonschema import validate
32 # Disable all logging of GET on reading counters and status
33 class AjaxFilter(logging.Filter):
34 def filter(self, record):
35 return ("/counter/" not in record.getMessage()) and ("/status" not in record.getMessage())
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
42 # # list of callback messages
49 # Request and response constants
50 CALLBACK_CREATE_URL="/callbacks/job/<string:producer_id>"
51 CALLBACK_DELETE_URL="/callbacks/job/<string:producer_id>/<string:job_id>"
52 CALLBACK_SUPERVISION_URL="/callbacks/supervision/<string:producer_id>"
54 ARM_CREATE_RESPONSE="/arm/create/<string:producer_id>/<string:job_id>"
55 ARM_DELETE_RESPONSE="/arm/delete/<string:producer_id>/<string:job_id>"
56 ARM_SUPERVISION_RESPONSE="/arm/supervision/<string:producer_id>"
57 ARM_TYPE="/arm/type/<string:producer_id>/<string:type_id>"
58 COUNTER_SUPERVISION="/counter/supervision/<string:producer_id>"
59 COUNTER_CREATE="/counter/create/<string:producer_id>/<string:job_id>"
60 COUNTER_DELETE="/counter/delete/<string:producer_id>/<string:job_id>"
62 JOB_DATA="/jobdata/<string:producer_id>/<string:job_id>"
67 APPL_JSON='application/json'
68 UNKNOWN_QUERY_PARAMETERS="Unknown query parameter(s)"
69 RETURNING_CONFIGURED_RESP="returning configured response code"
70 JOBID_NO_MATCH="job id in stored json does not match request"
71 PRODUCER_OR_JOB_NOT_FOUND="producer or job not found"
72 PRODUCER_NOT_FOUND="producer not found"
73 TYPE_NOT_FOUND="type not found"
74 TYPE_IN_USE="type is in use in a job"
75 JOB_NOT_FOUND="job not found"
76 JOB_DATA_NOT_FOUND="job data not found"
77 JSON_CORRUPT="json in request is corrupt or missing"
79 #Producer and job db, including armed responses
82 # armed response for supervision
88 # armed response for create
89 # armed response for delete
94 # disable warning about unverified https requests
95 from requests.packages import urllib3
97 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
99 # Helper function to populate a callback dict with the basic structure
100 # if job_id is None then only the producer level is setup and the producer dict is returned
101 # if job_id is not None, the job level is setup and the job dict is returned (producer must exist)
102 def setup_callback_dict(producer_id, job_id):
105 if (producer_id in db.keys()):
106 producer_dict=db[producer_id]
108 if (job_id is not None):
111 db[producer_id]=producer_dict
113 producer_dict['supervision_response']=200
114 producer_dict['supervision_counter']=0
115 producer_dict['types']=[]
121 if (job_id in producer_dict.keys()):
122 job_dict=producer_dict[job_id]
125 producer_dict[job_id]=job_dict
126 job_dict['create_response']=201
127 job_dict['delete_response']=404
128 job_dict['json']=None
129 job_dict['create_counter']=0
130 job_dict['delete_counter']=0
131 job_dict['delivering']="stopped"
132 job_dict['delivery_attempts']=0
136 # Helper function to get an entry from the callback db
137 # if job_id is None then only the producer dict is returned (or None if producer is not found)
138 # if job_id is not None, the job is returned (or None if producer/job is not found)
139 def get_callback_dict(producer_id, job_id):
142 if (producer_id in db.keys()):
143 producer_dict=db[producer_id]
145 if (producer_dict is None):
152 if (job_id in producer_dict.keys()):
153 job_dict=producer_dict[job_id]
157 # Helper function find if a key/valye exist in the dictionay tree
159 def recursive_search(s_dict, s_key, s_id):
161 if (pkey == s_key) and (s_dict[pkey] == s_id):
163 if (isinstance(s_dict[pkey], dict)):
164 recursive_search(s_dict[pkey], s_key, s_id)
168 # Helper function to find all job dicts
171 for producer_key in db:
172 producer_dict = db[producer_key]
173 for job_key in producer_dict:
174 job_dict = producer_dict[job_key]
175 if (isinstance(job_dict, dict)):
176 job_dicts[job_key]=job_dict
180 # response: always 200
186 # Arm the create callback with a response code
187 # Omitting the query parameter switch to response back to the standard 200/201 response
188 # URI and parameters (PUT): /arm/create/<producer_id>/<job_id>[?response=<resonsecode>]
190 # response: 200 (400 if incorrect query params)
191 @app.route(ARM_CREATE_RESPONSE,
193 def arm_create(producer_id, job_id):
195 arm_response=request.args.get('response')
197 if (arm_response is None):
198 if (len(request.args) != 0):
199 return UNKNOWN_QUERY_PARAMETERS,400
201 if (len(request.args) != 1):
202 return UNKNOWN_QUERY_PARAMETERS,400
205 print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
207 job_dict=setup_callback_dict(producer_id, job_id)
209 if (arm_response is None): #Reset the response depending if a job exists or not
210 if (job_dict['json'] is None):
211 job_dict['create_response']=201
213 job_dict['create_response']=200
215 job_dict['create_response']=int(arm_response)
219 # Arm the delete callback with a response code
220 # Omitting the query parameter switch to response back to the standard 204 response
221 # URI and parameters (PUT): /arm/delete/<producer_id>/<job-id>[?response=<resonsecode>]
222 # response: 200 (400 if incorrect query params)
223 @app.route(ARM_DELETE_RESPONSE,
225 def arm_delete(producer_id, job_id):
227 arm_response=request.args.get('response')
229 if (arm_response is None):
230 if (len(request.args) != 0):
231 return UNKNOWN_QUERY_PARAMETERS,400
233 if (len(request.args) != 1):
234 return UNKNOWN_QUERY_PARAMETERS,400
236 print("Arm delete received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
238 arm_response=request.args.get('response')
240 job_dict=setup_callback_dict(producer_id, job_id)
242 if (arm_response is None): #Reset the response depening if a job exists or not
243 if (job_dict['json'] is None):
244 job_dict['delete_response']=404
246 job_dict['delete_response']=204
248 job_dict['delete_response']=int(arm_response)
252 # Arm the supervision callback with a response code
253 # Omitting the query parameter switch to response back to the standard 200 response
254 # URI and parameters (PUT): /arm/supervision/<producer_id>[?response=<resonsecode>]
255 # response: 200 (400 if incorrect query params)
256 @app.route(ARM_SUPERVISION_RESPONSE,
258 def arm_supervision(producer_id):
260 arm_response=request.args.get('response')
262 if (arm_response is None):
263 if (len(request.args) != 0):
264 return UNKNOWN_QUERY_PARAMETERS,400
266 if (len(request.args) != 1):
267 return UNKNOWN_QUERY_PARAMETERS,400
269 print("Arm supervision received for producer: "+str(producer_id)+" and response: "+str(arm_response))
271 producer_dict=setup_callback_dict(producer_id, None)
272 if (arm_response is None):
273 producer_dict['supervision_response']=200
275 producer_dict['supervision_response']=int(arm_response)
279 # Arm a producer with a type
280 # URI and parameters (PUT): /arm/type/<string:producer_id>/<string:type-id>
281 # response: 200 (404)
284 def arm_type(producer_id, type_id):
286 print("Arm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
288 producer_dict=get_callback_dict(producer_id, None)
290 if (producer_dict is None):
291 return PRODUCER_NOT_FOUND,404
293 type_list=producer_dict['types']
294 if (type_id not in type_list):
295 type_list.append(type_id)
299 # Disarm a producer with a type
300 # URI and parameters (DELETE): /arm/type/<string:producer_id>/<string:type-id>
301 # response: 200 (404)
304 def disarm_type(producer_id, type_id):
306 print("Disarm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
308 producer_dict=get_callback_dict(producer_id, None)
310 if (producer_dict is None):
311 return PRODUCER_NOT_FOUND,404
313 if (recursive_search(producer_dict, "ei_job_type",type_id) is True):
314 return "TYPE_IN_USE",400
315 elif (recursive_search(producer_dict, "ei_type_identity",type_id) is True):
316 return "TYPE_IN_USE",400
317 elif (recursive_search(producer_dict, "info_type_identity",type_id) is True):
318 return "TYPE_IN_USE",400
320 type_list=producer_dict['types']
321 type_list.remove(type_id)
326 # Callback for create job
327 # URI and parameters (POST): /callbacks/job/<producer_id>
328 # response 201 at create, 200 at update or other configured response code
329 @app.route(CALLBACK_CREATE_URL,
331 def callback_create(producer_id):
335 req_json_dict = json.loads(request.data)
336 with open('job-schema.json') as f:
337 schema = json.load(f)
338 validate(instance=req_json_dict, schema=schema)
340 return JSON_CORRUPT,400
342 producer_dict=get_callback_dict(producer_id, None)
343 if (producer_dict is None):
344 return PRODUCER_OR_JOB_NOT_FOUND,400
345 type_list=producer_dict['types']
348 if 'ei_type_identity' in req_json_dict.keys():
349 type_key_name='ei_type_identity'
350 job_key_name='ei_job_identity'
351 elif 'info_type_identity' in req_json_dict.keys():
352 type_key_name='info_type_identity'
353 job_key_name='info_job_identity'
355 return TYPE_NOT_FOUND, 400
357 type_id=req_json_dict[type_key_name]
358 job_id=req_json_dict[job_key_name]
360 job_dict=get_callback_dict(producer_id, job_id)
361 if (job_dict is None):
362 return PRODUCER_OR_JOB_NOT_FOUND,400
365 if (req_json_dict[job_key_name] == job_id):
366 print("Create callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
367 return_code=job_dict['create_response']
368 if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)):
369 job_dict['json']=req_json_dict
370 job_dict['delivering']="delivering"
371 if (job_dict['create_response'] == 201): #Set up next response code if create was ok
372 job_dict['create_response'] = 200
373 if (job_dict['delete_response'] == 404):
374 job_dict['delete_response'] = 204
376 if(job_dict['delivering'] == "delivering"):
377 job_dict['delivering']="hold"
378 return_msg=RETURNING_CONFIGURED_RESP
380 job_dict['create_counter']=job_dict['create_counter']+1
382 return JOBID_NO_MATCH, 400
384 return return_msg, return_code
386 # Callback for delete job
387 # URI and parameters (DELETE): /callbacks/job/<producer_id>/<job_id>
388 # response: 204 at delete or other configured response code
389 @app.route(CALLBACK_DELETE_URL,
391 def callback_delete(producer_id, job_id):
393 job_dict=get_callback_dict(producer_id, job_id)
394 if (job_dict is None):
395 return PRODUCER_OR_JOB_NOT_FOUND,400
398 print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
399 return_code=job_dict['delete_response']
400 if (job_dict['delete_response'] == 204):
401 job_dict['json']=None
402 job_dict['delete_response']=404
403 job_dict['delivering']="stopped"
404 if (job_dict['create_response'] == 200):
405 job_dict['create_response'] = 201 # reset create response if delete was ok
407 return_msg=RETURNING_CONFIGURED_RESP
409 job_dict['delete_counter']=job_dict['delete_counter']+1
411 return return_msg, return_code
413 # Callback for supervision of producer
414 # URI and parameters (GET): /callbacks/supervision/<producer_id>
415 # response: 200 or other configured response code
416 @app.route(CALLBACK_SUPERVISION_URL,
418 def callback_supervision(producer_id):
420 print("Supervision callback received for producer: "+str(producer_id))
422 producer_dict=get_callback_dict(producer_id, None)
423 if (producer_dict is None):
424 return PRODUCER_NOT_FOUND,400
425 return_code=producer_dict['supervision_response']
427 if (return_code != 200):
428 return_msg="returning configured response code"
430 producer_dict['supervision_counter']=producer_dict['supervision_counter']+1
432 return return_msg,producer_dict['supervision_response']
434 # Get the job definition for a job
435 # URI and parameters (GET): "/jobdata/<string:producer_id>/<string:job_id>"
436 # response: 200 or 204
439 def get_jobdata(producer_id, job_id):
441 print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
443 job_dict=get_callback_dict(producer_id, job_id)
445 if (job_dict is None):
446 return PRODUCER_OR_JOB_NOT_FOUND,400
448 if (job_dict['json'] is None):
451 return json.dumps(job_dict['json']), 200
453 # Delete the job definition for a job
454 # URI and parameters (DELETE): "/jobdata/<string:producer_id>/<string:job_id>"
458 def del_jobdata(producer_id, job_id):
460 print("Delete job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
462 job_dict=get_callback_dict(producer_id, job_id)
464 if (job_dict is None):
465 return PRODUCER_OR_JOB_NOT_FOUND,400
467 job_dict['json']=None
472 # Start data delivery for a job, action : START or STOP
473 # URI and parameters (POST): "/jobdata/<string:producer_id>/<string:job_id>?action=action"
474 # response: 200 or 204
477 def start_jobdata(producer_id, job_id):
479 action=request.args.get('action')
482 return UNKNOWN_QUERY_PARAMETERS,400
484 if (len(request.args) != 1):
485 return UNKNOWN_QUERY_PARAMETERS,400
487 if ((action != "START") and (action != "STOP")):
488 return UNKNOWN_QUERY_PARAMETERS,400
490 print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action)
492 job_dict=get_callback_dict(producer_id, job_id)
493 if (job_dict is None):
494 return JOB_NOT_FOUND,404
496 if (job_dict['json'] is None):
497 return JOB_DATA_NOT_FOUND, 400
499 if (action == "START"):
500 job_dict['delivering']="delivering"
502 job_dict['delivering']="stopped"
506 # Counter for create calls for a job
507 # URI and parameters (GET): "/counter/create/<string:producer_id>/<string:job_id>"
508 # response: 200 and counter value
509 @app.route(COUNTER_CREATE,
511 def counter_create(producer_id, job_id):
512 job_dict=get_callback_dict(producer_id, job_id)
513 if (job_dict is None):
515 return str(job_dict['create_counter']),200
517 # Counter for delete calls for a job
518 # URI and parameters (GET): "/counter/delete/<string:producer_id>/<string:job_id>"
519 # response: 200 and counter value
520 @app.route(COUNTER_DELETE,
522 def counter_delete(producer_id, job_id):
523 job_dict=get_callback_dict(producer_id, job_id)
524 if (job_dict is None):
526 return str(job_dict['delete_counter']),200
528 # Counter for supervision calls for a producer
529 # URI and parameters (GET): "/counter/supervision/<string:producer_id>"
530 # response: 200 and counter value
531 @app.route(COUNTER_SUPERVISION,
533 def counter_supervision(producer_id):
534 producer_dict=get_callback_dict(producer_id, None)
535 if (producer_dict is None):
537 return str(producer_dict['supervision_counter']),200
540 # URI and parameters (GET): "/status"
546 return json.dumps(db),200
551 methods=['GET', 'POST', 'PUT'])
561 job_dicts=get_all_jobs()
562 for key in job_dicts:
564 if (job['delivering'] == "delivering" and job['json'] != None):
565 url=job['json']['target_uri']
566 if (str(url).find("localhost:") == -1): #Dont deliver to localhost...
568 data["date"]=str(datetime.datetime.now())
570 data["sequence_no"]=""+str(job['delivery_attempts'])
571 data["value"]=str(100)
572 print("Sending to "+url+" payload:"+json.dumps(data))
574 requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
575 job['delivery_attempts'] += 1
576 except Exception as err:
577 print("Error during data delivery: "+ str(err))
581 ### Main function ###
583 print("Starting data delivery thread")
584 thread = threading.Thread(target=datadelivery, args=())
588 if __name__ == "__main__":
589 app.run(port=HOST_PORT, host=HOST_IP)