Updates to test env and A1function test
[nonrtric.git] / test / prodstub / app / prodstub.py
index c21a7ab..df4898d 100644 (file)
 from flask import Flask
 from flask import request
 
+import requests
+
 import json
 from jsonschema import validate
 
+import threading
+import time
+import datetime
+
 app = Flask(__name__)
 
 # # list of callback messages
@@ -44,7 +50,6 @@ ARM_CREATE_RESPONSE="/arm/create/<string:producer_id>/<string:job_id>"
 ARM_DELETE_RESPONSE="/arm/delete/<string:producer_id>/<string:job_id>"
 ARM_SUPERVISION_RESPONSE="/arm/supervision/<string:producer_id>"
 ARM_TYPE="/arm/type/<string:producer_id>/<string:type_id>"
-
 COUNTER_SUPERVISION="/counter/supervision/<string:producer_id>"
 COUNTER_CREATE="/counter/create/<string:producer_id>/<string:job_id>"
 COUNTER_DELETE="/counter/delete/<string:producer_id>/<string:job_id>"
@@ -62,6 +67,8 @@ PRODUCER_OR_JOB_NOT_FOUND="producer or job not found"
 PRODUCER_NOT_FOUND="producer not found"
 TYPE_NOT_FOUND="type not found"
 TYPE_IN_USE="type is in use in a job"
+JOB_NOT_FOUND="job not found"
+JOB_DATA_NOT_FOUND="job data not found"
 JSON_CORRUPT="json in request is corrupt or missing"
 
 #Producer and job db, including armed responses
@@ -72,6 +79,7 @@ db={}
 #  supervision counter
 #  job
 #    job json
+#    target_type
 #    armed response for create
 #    armed response for delete
 #    create counter
@@ -109,6 +117,8 @@ def setup_callback_dict(producer_id, job_id):
         job_dict['json']=None
         job_dict['create_counter']=0
         job_dict['delete_counter']=0
+        job_dict['delivering']=False
+        job_dict['delivery_attempts']=0
     return job_dict
 
 
@@ -121,6 +131,9 @@ def get_callback_dict(producer_id, job_id):
     if (producer_id in db.keys()):
         producer_dict=db[producer_id]
 
+    if (producer_dict is None):
+        return None
+
     if (job_id is None):
         return producer_dict
 
@@ -141,6 +154,17 @@ def recursive_search(s_dict, s_key, s_id):
 
     return False
 
+# Helper function to find all job dicts
+def get_all_jobs():
+    job_dicts={}
+    for producer_key in db:
+        producer_dict = db[producer_key]
+        for job_key in producer_dict:
+            job_dict = producer_dict[job_key]
+            if (isinstance(job_dict, dict)):
+                job_dicts[job_key]=job_dict
+    return job_dicts
+
 # I'm alive function
 # response: always 200
 @app.route('/',
@@ -150,7 +174,7 @@ def index():
 
 # Arm the create callback with a response code
 # Omitting the query parameter switch to response back to the standard 200/201 response
-# URI and parameters (PUT): /arm/create/<producer_id>/<job-id>[?response=<resonsecode>]
+# URI and parameters (PUT): /arm/create/<producer_id>/<job_id>[?response=<resonsecode>]
 # Setting
 # response: 200 (400 if incorrect query params)
 @app.route(ARM_CREATE_RESPONSE,
@@ -166,6 +190,7 @@ def arm_create(producer_id, job_id):
         if (len(request.args) != 1):
             return UNKNOWN_QUERY_PARAMETERS,400
 
+
     print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
 
     job_dict=setup_callback_dict(producer_id, job_id)
@@ -176,7 +201,7 @@ def arm_create(producer_id, job_id):
         else:
             job_dict['create_response']=200
     else:
-        job_dict['create_response']=arm_response
+        job_dict['create_response']=int(arm_response)
 
     return "",200
 
@@ -209,7 +234,7 @@ def arm_delete(producer_id, job_id):
         else:
             job_dict['delete_response']=204
     else:
-        job_dict['delete_response']=arm_response
+        job_dict['delete_response']=int(arm_response)
 
     return "",200
 
@@ -236,7 +261,7 @@ def arm_supervision(producer_id):
     if (arm_response is None):
         producer_dict['supervision_response']=200
     else:
-        producer_dict['supervision_response']=arm_response
+        producer_dict['supervision_response']=int(arm_response)
 
     return "",200
 
@@ -282,6 +307,7 @@ def disarm_type(producer_id, type_id):
 
     return "",200
 
+
 # Callback for create job
 # URI and parameters (POST): /callbacks/create/<producer_id>
 # response 201 at create, 200 at update or other configured response code
@@ -304,7 +330,7 @@ def callback_create(producer_id):
     type_list=producer_dict['types']
     type_id=req_json_dict['ei_type_identity']
     if (type_id not in type_list):
-        return TYPE_NOT_FOUND
+        return TYPE_NOT_FOUND, 400
 
     job_id=req_json_dict['ei_job_identity']
     job_dict=get_callback_dict(producer_id, job_id)
@@ -317,6 +343,7 @@ def callback_create(producer_id):
         return_code=job_dict['create_response']
         if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)):
             job_dict['json']=req_json_dict
+            job_dict['delivering']=True
             if (job_dict['create_response'] == 201): #Set up next response code if create was ok
                 job_dict['create_response'] = 200
             if (job_dict['delete_response'] == 404):
@@ -328,7 +355,7 @@ def callback_create(producer_id):
     else:
         return JOBID_NO_MATCH, 400
 
-    return return_msg,return_code
+    return return_msg, return_code
 
 # Callback for delete job
 # URI and parameters (POST): /callbacks/delete/<producer_id>
@@ -358,6 +385,7 @@ def callback_delete(producer_id):
         if (job_dict['delete_response'] == 204):
             job_dict['json']=None
             job_dict['delete_response']=404
+            job_dict['delivering']=False
             if (job_dict['create_response'] == 200):
                 job_dict['create_response'] = 201 # reset create response if delete was ok
         else:
@@ -390,7 +418,7 @@ def callback_supervision(producer_id):
 
     return return_msg,producer_dict['supervision_response']
 
-# Callback for supervision of producer
+# Get the job definition for a job
 # URI and parameters (GET): "/jobdata/<string:producer_id>/<string:job_id>"
 # response: 200 or 204
 @app.route(JOB_DATA,
@@ -399,12 +427,49 @@ def get_jobdata(producer_id, job_id):
 
     print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
 
-    job_dict=setup_callback_dict(producer_id, job_id)
+    job_dict=get_callback_dict(producer_id, job_id)
+
+    if (job_dict is None):
+        return PRODUCER_OR_JOB_NOT_FOUND,400
+
     if (job_dict['json'] is None):
         return "",204
     else:
         return json.dumps(job_dict['json']), 200
 
+# Start data delivery for a job, action : START or STOP
+# URI and parameters (POST): "/jobdata/<string:producer_id>/<string:job_id>?action=action"
+# response: 200 or 204
+@app.route(JOB_DATA,
+     methods=['POST'])
+def start_jobdata(producer_id, job_id):
+
+    action=request.args.get('action')
+
+    if (action is None):
+        return UNKNOWN_QUERY_PARAMETERS,400
+    else:
+        if (len(request.args) != 1):
+            return UNKNOWN_QUERY_PARAMETERS,400
+        else:
+            if ((action != "START") and (action != "STOP")):
+                return UNKNOWN_QUERY_PARAMETERS,400
+
+    print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action)
+
+    job_dict=get_callback_dict(producer_id, job_id)
+    if (job_dict is None):
+        return JOB_NOT_FOUND,404
+
+    if (job_dict['json'] is None):
+        return JOB_DATA_NOT_FOUND, 400
+    else:
+        if (action == "START"):
+            job_dict['delivering']=True
+        else:
+            job_dict['delivering']=False
+        return "",200
+
 
 # Counter for create calls for a job
 # URI and parameters (GET): "/counter/create/<string:producer_id>/<string:job_id>"
@@ -414,7 +479,7 @@ def get_jobdata(producer_id, job_id):
 def counter_create(producer_id, job_id):
     job_dict=get_callback_dict(producer_id, job_id)
     if (job_dict is None):
-        return -1,200
+        return "-1",200
     return str(job_dict['create_counter']),200
 
 # Counter for delete calls for a job
@@ -425,7 +490,7 @@ def counter_create(producer_id, job_id):
 def counter_delete(producer_id, job_id):
     job_dict=get_callback_dict(producer_id, job_id)
     if (job_dict is None):
-        return -1,200
+        return "-1",200
     return str(job_dict['delete_counter']),200
 
 # Counter for supervision calls for a producer
@@ -436,7 +501,7 @@ def counter_delete(producer_id, job_id):
 def counter_supervision(producer_id):
     producer_dict=get_callback_dict(producer_id, None)
     if (producer_dict is None):
-        return -1,200
+        return "-1",200
     return str(producer_dict['supervision_counter']),200
 
 # Get status info
@@ -457,7 +522,36 @@ def reset():
     db={}
     return "",200
 
+
+def datadelivery() :
+    while True:
+        try:
+            job_dicts=get_all_jobs()
+            for key in job_dicts:
+                job=job_dicts[key]
+                if (job['delivering'] == True and job['json'] != None):
+                    url=job['json']['target_uri']
+
+                    data={}
+                    data["date"]=str(datetime.datetime.now())
+                    data["job"]=""+key
+                    data["sequence_no"]=""+str(job['delivery_attempts'])
+                    data["value"]=str(100)
+                    print("Sending "+json.dumps(data))
+
+                    requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
+                    job['delivery_attempts'] += 1
+        except Exception as err:
+            print("Error during data delivery: "+ str(err))
+        time.sleep(1)
+
+
 ### Main function ###
 
+print("Starting data delivery thread")
+thread = threading.Thread(target=datadelivery, args=())
+thread.daemon = True
+thread.start()
+
 if __name__ == "__main__":
     app.run(port=HOST_PORT, host=HOST_IP)