import threading
import time
import datetime
+import logging
+
+# Disable all logging of GET on reading counters and status
+class AjaxFilter(logging.Filter):
+ def filter(self, record):
+ return ("/counter/" not in record.getMessage()) and ("/status" not in record.getMessage())
+
+log = logging.getLogger('werkzeug')
+log.addFilter(AjaxFilter())
app = Flask(__name__)
HOST_IP = "::"
HOST_PORT = 2222
-# # Metrics vars
-# cntr_msg_callbacks=0
-# cntr_msg_fetched=0
-
# Request and response constants
-CALLBACK_CREATE_URL="/callbacks/create/<string:producer_id>"
-CALLBACK_DELETE_URL="/callbacks/delete/<string:producer_id>"
+CALLBACK_CREATE_URL="/callbacks/job/<string:producer_id>"
+CALLBACK_DELETE_URL="/callbacks/job/<string:producer_id>/<string:job_id>"
CALLBACK_SUPERVISION_URL="/callbacks/supervision/<string:producer_id>"
ARM_CREATE_RESPONSE="/arm/create/<string:producer_id>/<string:job_id>"
# armed response for delete
# create counter
# delete counter
+# delivering status
+
+# disable warning about unverified https requests
+from requests.packages import urllib3
+
+urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Helper function to populate a callback dict with the basic structure
# if job_id is None then only the producer level is setup and the producer dict is returned
job_dict['json']=None
job_dict['create_counter']=0
job_dict['delete_counter']=0
- job_dict['delivering']=False
+ job_dict['delivering']="stopped"
job_dict['delivery_attempts']=0
return job_dict
if (recursive_search(producer_dict, "ei_job_type",type_id) is True):
return "TYPE_IN_USE",400
+ elif (recursive_search(producer_dict, "ei_type_identity",type_id) is True):
+ return "TYPE_IN_USE",400
+ elif (recursive_search(producer_dict, "info_type_identity",type_id) is True):
+ return "TYPE_IN_USE",400
type_list=producer_dict['types']
type_list.remove(type_id)
# Callback for create job
-# URI and parameters (POST): /callbacks/create/<producer_id>
+# URI and parameters (POST): /callbacks/job/<producer_id>
# response 201 at create, 200 at update or other configured response code
@app.route(CALLBACK_CREATE_URL,
methods=['POST'])
if (producer_dict is None):
return PRODUCER_OR_JOB_NOT_FOUND,400
type_list=producer_dict['types']
- type_id=req_json_dict['ei_type_identity']
- if (type_id not in type_list):
+
+
+ if 'ei_type_identity' in req_json_dict.keys():
+ type_key_name='ei_type_identity'
+ job_key_name='ei_job_identity'
+ elif 'info_type_identity' in req_json_dict.keys():
+ type_key_name='info_type_identity'
+ job_key_name='info_job_identity'
+ else:
return TYPE_NOT_FOUND, 400
- job_id=req_json_dict['ei_job_identity']
+ type_id=req_json_dict[type_key_name]
+ job_id=req_json_dict[job_key_name]
+
job_dict=get_callback_dict(producer_id, job_id)
if (job_dict is None):
return PRODUCER_OR_JOB_NOT_FOUND,400
return_code=0
return_msg=""
- if (req_json_dict['ei_job_identity'] == job_id):
+ if (req_json_dict[job_key_name] == job_id):
print("Create callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
return_code=job_dict['create_response']
if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)):
job_dict['json']=req_json_dict
- job_dict['delivering']=True
+ job_dict['delivering']="delivering"
if (job_dict['create_response'] == 201): #Set up next response code if create was ok
job_dict['create_response'] = 200
if (job_dict['delete_response'] == 404):
job_dict['delete_response'] = 204
else:
+ if(job_dict['delivering'] == "delivering"):
+ job_dict['delivering']="hold"
return_msg=RETURNING_CONFIGURED_RESP
job_dict['create_counter']=job_dict['create_counter']+1
return return_msg, return_code
# Callback for delete job
-# URI and parameters (POST): /callbacks/delete/<producer_id>
+# URI and parameters (DELETE): /callbacks/job/<producer_id>/<job_id>
# response: 204 at delete or other configured response code
@app.route(CALLBACK_DELETE_URL,
- methods=['POST'])
-def callback_delete(producer_id):
-
- req_json_dict=None
- try:
- req_json_dict = json.loads(request.data)
- with open('job-schema.json') as f:
- schema = json.load(f)
- validate(instance=req_json_dict, schema=schema)
- except Exception:
- return JSON_CORRUPT,400
+ methods=['DELETE'])
+def callback_delete(producer_id, job_id):
- job_id=req_json_dict['ei_job_identity']
job_dict=get_callback_dict(producer_id, job_id)
if (job_dict is None):
return PRODUCER_OR_JOB_NOT_FOUND,400
return_code=0
return_msg=""
- if (req_json_dict['ei_job_identity'] == job_id):
- print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
- return_code=job_dict['delete_response']
- if (job_dict['delete_response'] == 204):
- job_dict['json']=None
- job_dict['delete_response']=404
- job_dict['delivering']=False
- if (job_dict['create_response'] == 200):
- job_dict['create_response'] = 201 # reset create response if delete was ok
- else:
- return_msg=RETURNING_CONFIGURED_RESP
-
- job_dict['delete_counter']=job_dict['delete_counter']+1
+ print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
+ return_code=job_dict['delete_response']
+ if (job_dict['delete_response'] == 204):
+ job_dict['json']=None
+ job_dict['delete_response']=404
+ job_dict['delivering']="stopped"
+ if (job_dict['create_response'] == 200):
+ job_dict['create_response'] = 201 # reset create response if delete was ok
else:
- return JOBID_NO_MATCH, 400
+ return_msg=RETURNING_CONFIGURED_RESP
+
+ job_dict['delete_counter']=job_dict['delete_counter']+1
return return_msg, return_code
else:
return json.dumps(job_dict['json']), 200
+# Delete the job definition for a job
+# URI and parameters (DELETE): "/jobdata/<string:producer_id>/<string:job_id>"
+# response: 204
+@app.route(JOB_DATA,
+ methods=['DELETE'])
+def del_jobdata(producer_id, job_id):
+
+ print("Delete job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
+
+ job_dict=get_callback_dict(producer_id, job_id)
+
+ if (job_dict is None):
+ return PRODUCER_OR_JOB_NOT_FOUND,400
+
+ job_dict['json']=None
+
+ return "",204
+
+
# Start data delivery for a job, action : START or STOP
# URI and parameters (POST): "/jobdata/<string:producer_id>/<string:job_id>?action=action"
# response: 200 or 204
return JOB_DATA_NOT_FOUND, 400
else:
if (action == "START"):
- job_dict['delivering']=True
+ job_dict['delivering']="delivering"
else:
- job_dict['delivering']=False
+ job_dict['delivering']="stopped"
return "",200
job_dicts=get_all_jobs()
for key in job_dicts:
job=job_dicts[key]
- if (job['delivering'] == True and job['json'] != None):
+ if (job['delivering'] == "delivering" and job['json'] != None):
url=job['json']['target_uri']
-
- data={}
- data["date"]=str(datetime.datetime.now())
- data["job"]=""+key
- data["sequence_no"]=""+str(job['delivery_attempts'])
- data["value"]=str(100)
- print("Sending to "+url+" payload:"+json.dumps(data))
-
- requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
- job['delivery_attempts'] += 1
+ if (str(url).find("localhost:") == -1): #Dont deliver to localhost...
+ data={}
+ data["date"]=str(datetime.datetime.now())
+ data["job"]=""+key
+ data["sequence_no"]=""+str(job['delivery_attempts'])
+ data["value"]=str(100)
+ print("Sending to "+url+" payload:"+json.dumps(data))
+
+ requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
+ job['delivery_attempts'] += 1
except Exception as err:
print("Error during data delivery: "+ str(err))
time.sleep(1)