2 # ============LICENSE_START===============================================
3 # Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 # ========================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=================================================
19 from flask import Flask
20 from flask import request
25 from jsonschema import validate
32 # Disable all logging of GET on reading counters and status
33 class AjaxFilter(logging.Filter):
34 def filter(self, record):
35 return ("/counter/" not in record.getMessage()) and ("/status" not in record.getMessage())
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
42 # # list of callback messages
49 # Request and response constants
50 CALLBACK_CREATE_URL="/callbacks/job/<string:producer_id>"
51 CALLBACK_DELETE_URL="/callbacks/job/<string:producer_id>/<string:job_id>"
52 CALLBACK_SUPERVISION_URL="/callbacks/supervision/<string:producer_id>"
54 ARM_CREATE_RESPONSE="/arm/create/<string:producer_id>/<string:job_id>"
55 ARM_DELETE_RESPONSE="/arm/delete/<string:producer_id>/<string:job_id>"
56 ARM_SUPERVISION_RESPONSE="/arm/supervision/<string:producer_id>"
57 ARM_TYPE="/arm/type/<string:producer_id>/<string:type_id>"
58 COUNTER_SUPERVISION="/counter/supervision/<string:producer_id>"
59 COUNTER_CREATE="/counter/create/<string:producer_id>/<string:job_id>"
60 COUNTER_DELETE="/counter/delete/<string:producer_id>/<string:job_id>"
62 JOB_DATA="/jobdata/<string:producer_id>/<string:job_id>"
67 APPL_JSON='application/json'
68 UNKNOWN_QUERY_PARAMETERS="Unknown query parameter(s)"
69 RETURNING_CONFIGURED_RESP="returning configured response code"
70 JOBID_NO_MATCH="job id in stored json does not match request"
71 PRODUCER_OR_JOB_NOT_FOUND="producer or job not found"
72 PRODUCER_NOT_FOUND="producer not found"
73 TYPE_NOT_FOUND="type not found"
74 TYPE_IN_USE="type is in use in a job"
75 JOB_NOT_FOUND="job not found"
76 JOB_DATA_NOT_FOUND="job data not found"
77 JSON_CORRUPT="json in request is corrupt or missing"
79 #Producer and job db, including armed responses
82 # armed response for supervision
88 # armed response for create
89 # armed response for delete
94 # disable warning about unverified https requests
95 from requests.packages import urllib3
97 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
99 # Helper function to populate a callback dict with the basic structure
100 # if job_id is None then only the producer level is setup and the producer dict is returned
101 # if job_id is not None, the job level is setup and the job dict is returned (producer must exist)
102 def setup_callback_dict(producer_id, job_id):
105 if (producer_id in db.keys()):
106 producer_dict=db[producer_id]
108 if (job_id is not None):
111 db[producer_id]=producer_dict
113 producer_dict['supervision_response']=200
114 producer_dict['supervision_counter']=0
115 producer_dict['types']=[]
121 if (job_id in producer_dict.keys()):
122 job_dict=producer_dict[job_id]
125 producer_dict[job_id]=job_dict
126 job_dict['create_response']=201
127 job_dict['delete_response']=404
128 job_dict['json']=None
129 job_dict['create_counter']=0
130 job_dict['delete_counter']=0
131 job_dict['delivering']="stopped"
132 job_dict['delivery_attempts']=0
136 # Helper function to get an entry from the callback db
137 # if job_id is None then only the producer dict is returned (or None if producer is not found)
138 # if job_id is not None, the job is returned (or None if producer/job is not found)
139 def get_callback_dict(producer_id, job_id):
142 if (producer_id in db.keys()):
143 producer_dict=db[producer_id]
145 if (producer_dict is None):
152 if (job_id in producer_dict.keys()):
153 job_dict=producer_dict[job_id]
157 # Helper function find if a key/valye exist in the dictionay tree
159 def recursive_search(s_dict, s_key, s_id):
161 if (pkey == s_key) and (s_dict[pkey] == s_id):
163 if (isinstance(s_dict[pkey], dict)):
164 recursive_search(s_dict[pkey], s_key, s_id)
168 # Helper function to find all job dicts
171 for producer_key in db:
172 producer_dict = db[producer_key]
173 for job_key in producer_dict:
174 job_dict = producer_dict[job_key]
175 if (isinstance(job_dict, dict)):
176 job_dicts[job_key]=job_dict
180 # response: always 200
186 # Arm the create callback with a response code
187 # Omitting the query parameter switch to response back to the standard 200/201 response
188 # URI and parameters (PUT): /arm/create/<producer_id>/<job_id>[?response=<resonsecode>]
190 # response: 200 (400 if incorrect query params)
191 @app.route(ARM_CREATE_RESPONSE,
193 def arm_create(producer_id, job_id):
195 arm_response=request.args.get('response')
197 if (arm_response is None):
198 if (len(request.args) != 0):
199 return UNKNOWN_QUERY_PARAMETERS,400
201 if (len(request.args) != 1):
202 return UNKNOWN_QUERY_PARAMETERS,400
205 print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
207 job_dict=setup_callback_dict(producer_id, job_id)
209 if (arm_response is None): #Reset the response depending if a job exists or not
210 if (job_dict['json'] is None):
211 job_dict['create_response']=201
213 job_dict['create_response']=200
215 job_dict['create_response']=int(arm_response)
219 # Arm the delete callback with a response code
220 # Omitting the query parameter switch to response back to the standard 204 response
221 # URI and parameters (PUT): /arm/delete/<producer_id>/<job-id>[?response=<resonsecode>]
222 # response: 200 (400 if incorrect query params)
223 @app.route(ARM_DELETE_RESPONSE,
225 def arm_delete(producer_id, job_id):
227 arm_response=request.args.get('response')
229 if (arm_response is None):
230 if (len(request.args) != 0):
231 return UNKNOWN_QUERY_PARAMETERS,400
233 if (len(request.args) != 1):
234 return UNKNOWN_QUERY_PARAMETERS,400
236 print("Arm delete received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
238 arm_response=request.args.get('response')
240 job_dict=setup_callback_dict(producer_id, job_id)
242 if (arm_response is None): #Reset the response depening if a job exists or not
243 if (job_dict['json'] is None):
244 job_dict['delete_response']=404
246 job_dict['delete_response']=204
248 job_dict['delete_response']=int(arm_response)
252 # Arm the supervision callback with a response code
253 # Omitting the query parameter switch to response back to the standard 200 response
254 # URI and parameters (PUT): /arm/supervision/<producer_id>[?response=<resonsecode>]
255 # response: 200 (400 if incorrect query params)
256 @app.route(ARM_SUPERVISION_RESPONSE,
258 def arm_supervision(producer_id):
260 arm_response=request.args.get('response')
262 if (arm_response is None):
263 if (len(request.args) != 0):
264 return UNKNOWN_QUERY_PARAMETERS,400
266 if (len(request.args) != 1):
267 return UNKNOWN_QUERY_PARAMETERS,400
269 print("Arm supervision received for producer: "+str(producer_id)+" and response: "+str(arm_response))
271 producer_dict=setup_callback_dict(producer_id, None)
272 if (arm_response is None):
273 producer_dict['supervision_response']=200
275 producer_dict['supervision_response']=int(arm_response)
279 # Arm a producer with a type
280 # URI and parameters (PUT): /arm/type/<string:producer_id>/<string:type-id>
281 # response: 200 (404)
284 def arm_type(producer_id, type_id):
286 print("Arm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
288 producer_dict=get_callback_dict(producer_id, None)
290 if (producer_dict is None):
291 return PRODUCER_NOT_FOUND,404
293 type_list=producer_dict['types']
294 if (type_id not in type_list):
295 type_list.append(type_id)
299 # Disarm a producer with a type
300 # URI and parameters (DELETE): /arm/type/<string:producer_id>/<string:type-id>
301 # response: 200 (404)
304 def disarm_type(producer_id, type_id):
306 print("Disarm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
308 producer_dict=get_callback_dict(producer_id, None)
310 if (producer_dict is None):
311 return PRODUCER_NOT_FOUND,404
313 if (recursive_search(producer_dict, "ei_job_type",type_id) is True):
314 return "TYPE_IN_USE",400
316 type_list=producer_dict['types']
317 type_list.remove(type_id)
322 # Callback for create job
323 # URI and parameters (POST): /callbacks/job/<producer_id>
324 # response 201 at create, 200 at update or other configured response code
325 @app.route(CALLBACK_CREATE_URL,
327 def callback_create(producer_id):
331 req_json_dict = json.loads(request.data)
332 with open('job-schema.json') as f:
333 schema = json.load(f)
334 validate(instance=req_json_dict, schema=schema)
336 return JSON_CORRUPT,400
338 producer_dict=get_callback_dict(producer_id, None)
339 if (producer_dict is None):
340 return PRODUCER_OR_JOB_NOT_FOUND,400
341 type_list=producer_dict['types']
342 type_id=req_json_dict['ei_type_identity']
343 if (type_id not in type_list):
344 return TYPE_NOT_FOUND, 400
346 job_id=req_json_dict['ei_job_identity']
347 job_dict=get_callback_dict(producer_id, job_id)
348 if (job_dict is None):
349 return PRODUCER_OR_JOB_NOT_FOUND,400
352 if (req_json_dict['ei_job_identity'] == job_id):
353 print("Create callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
354 return_code=job_dict['create_response']
355 if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)):
356 job_dict['json']=req_json_dict
357 job_dict['delivering']="delivering"
358 if (job_dict['create_response'] == 201): #Set up next response code if create was ok
359 job_dict['create_response'] = 200
360 if (job_dict['delete_response'] == 404):
361 job_dict['delete_response'] = 204
363 if(job_dict['delivering'] == "delivering"):
364 job_dict['delivering']="hold"
365 return_msg=RETURNING_CONFIGURED_RESP
367 job_dict['create_counter']=job_dict['create_counter']+1
369 return JOBID_NO_MATCH, 400
371 return return_msg, return_code
373 # Callback for delete job
374 # URI and parameters (DELETE): /callbacks/job/<producer_id>/<job_id>
375 # response: 204 at delete or other configured response code
376 @app.route(CALLBACK_DELETE_URL,
378 def callback_delete(producer_id, job_id):
380 job_dict=get_callback_dict(producer_id, job_id)
381 if (job_dict is None):
382 return PRODUCER_OR_JOB_NOT_FOUND,400
385 print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
386 return_code=job_dict['delete_response']
387 if (job_dict['delete_response'] == 204):
388 job_dict['json']=None
389 job_dict['delete_response']=404
390 job_dict['delivering']="stopped"
391 if (job_dict['create_response'] == 200):
392 job_dict['create_response'] = 201 # reset create response if delete was ok
394 return_msg=RETURNING_CONFIGURED_RESP
396 job_dict['delete_counter']=job_dict['delete_counter']+1
398 return return_msg, return_code
400 # Callback for supervision of producer
401 # URI and parameters (GET): /callbacks/supervision/<producer_id>
402 # response: 200 or other configured response code
403 @app.route(CALLBACK_SUPERVISION_URL,
405 def callback_supervision(producer_id):
407 print("Supervision callback received for producer: "+str(producer_id))
409 producer_dict=get_callback_dict(producer_id, None)
410 if (producer_dict is None):
411 return PRODUCER_NOT_FOUND,400
412 return_code=producer_dict['supervision_response']
414 if (return_code != 200):
415 return_msg="returning configured response code"
417 producer_dict['supervision_counter']=producer_dict['supervision_counter']+1
419 return return_msg,producer_dict['supervision_response']
421 # Get the job definition for a job
422 # URI and parameters (GET): "/jobdata/<string:producer_id>/<string:job_id>"
423 # response: 200 or 204
426 def get_jobdata(producer_id, job_id):
428 print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
430 job_dict=get_callback_dict(producer_id, job_id)
432 if (job_dict is None):
433 return PRODUCER_OR_JOB_NOT_FOUND,400
435 if (job_dict['json'] is None):
438 return json.dumps(job_dict['json']), 200
440 # Delete the job definition for a job
441 # URI and parameters (DELETE): "/jobdata/<string:producer_id>/<string:job_id>"
445 def del_jobdata(producer_id, job_id):
447 print("Delete job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
449 job_dict=get_callback_dict(producer_id, job_id)
451 if (job_dict is None):
452 return PRODUCER_OR_JOB_NOT_FOUND,400
454 job_dict['json']=None
459 # Start data delivery for a job, action : START or STOP
460 # URI and parameters (POST): "/jobdata/<string:producer_id>/<string:job_id>?action=action"
461 # response: 200 or 204
464 def start_jobdata(producer_id, job_id):
466 action=request.args.get('action')
469 return UNKNOWN_QUERY_PARAMETERS,400
471 if (len(request.args) != 1):
472 return UNKNOWN_QUERY_PARAMETERS,400
474 if ((action != "START") and (action != "STOP")):
475 return UNKNOWN_QUERY_PARAMETERS,400
477 print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action)
479 job_dict=get_callback_dict(producer_id, job_id)
480 if (job_dict is None):
481 return JOB_NOT_FOUND,404
483 if (job_dict['json'] is None):
484 return JOB_DATA_NOT_FOUND, 400
486 if (action == "START"):
487 job_dict['delivering']="delivering"
489 job_dict['delivering']="stopped"
493 # Counter for create calls for a job
494 # URI and parameters (GET): "/counter/create/<string:producer_id>/<string:job_id>"
495 # response: 200 and counter value
496 @app.route(COUNTER_CREATE,
498 def counter_create(producer_id, job_id):
499 job_dict=get_callback_dict(producer_id, job_id)
500 if (job_dict is None):
502 return str(job_dict['create_counter']),200
504 # Counter for delete calls for a job
505 # URI and parameters (GET): "/counter/delete/<string:producer_id>/<string:job_id>"
506 # response: 200 and counter value
507 @app.route(COUNTER_DELETE,
509 def counter_delete(producer_id, job_id):
510 job_dict=get_callback_dict(producer_id, job_id)
511 if (job_dict is None):
513 return str(job_dict['delete_counter']),200
515 # Counter for supervision calls for a producer
516 # URI and parameters (GET): "/counter/supervision/<string:producer_id>"
517 # response: 200 and counter value
518 @app.route(COUNTER_SUPERVISION,
520 def counter_supervision(producer_id):
521 producer_dict=get_callback_dict(producer_id, None)
522 if (producer_dict is None):
524 return str(producer_dict['supervision_counter']),200
527 # URI and parameters (GET): "/status"
533 return json.dumps(db),200
538 methods=['GET', 'POST', 'PUT'])
548 job_dicts=get_all_jobs()
549 for key in job_dicts:
551 if (job['delivering'] == "delivering" and job['json'] != None):
552 url=job['json']['target_uri']
553 if (str(url).find("localhost:") == -1): #Dont deliver to localhost...
555 data["date"]=str(datetime.datetime.now())
557 data["sequence_no"]=""+str(job['delivery_attempts'])
558 data["value"]=str(100)
559 print("Sending to "+url+" payload:"+json.dumps(data))
561 requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
562 job['delivery_attempts'] += 1
563 except Exception as err:
564 print("Error during data delivery: "+ str(err))
568 ### Main function ###
570 print("Starting data delivery thread")
571 thread = threading.Thread(target=datadelivery, args=())
575 if __name__ == "__main__":
576 app.run(port=HOST_PORT, host=HOST_IP)