Adapt to O-RU use case spec
[nonrtric.git] / test / prodstub / app / prodstub.py
1
2 #  ============LICENSE_START===============================================
3 #  Copyright (C) 2020 Nordix Foundation. All rights reserved.
4 #  ========================================================================
5 #  Licensed under the Apache License, Version 2.0 (the "License");
6 #  you may not use this file except in compliance with the License.
7 #  You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under the License is distributed on an "AS IS" BASIS,
13 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #  See the License for the specific language governing permissions and
15 #  limitations under the License.
16 #  ============LICENSE_END=================================================
17 #
18
19 from flask import Flask
20 from flask import request
21
22 import requests
23
24 import json
25 from jsonschema import validate
26
27 import threading
28 import time
29 import datetime
30 import logging
31
32 # Disable all logging of GET on reading counters and status
33 class AjaxFilter(logging.Filter):
34     def filter(self, record):
35         return ("/counter/" not in record.getMessage()) and ("/status" not in record.getMessage())
36
37 log = logging.getLogger('werkzeug')
38 log.addFilter(AjaxFilter())
39
40 app = Flask(__name__)
41
42 # # list of callback messages
43 # msg_callbacks={}
44
45 # Server info
46 HOST_IP = "::"
47 HOST_PORT = 2222
48
49 # Request and response constants
50 CALLBACK_CREATE_URL="/callbacks/job/<string:producer_id>"
51 CALLBACK_DELETE_URL="/callbacks/job/<string:producer_id>/<string:job_id>"
52 CALLBACK_SUPERVISION_URL="/callbacks/supervision/<string:producer_id>"
53
54 ARM_CREATE_RESPONSE="/arm/create/<string:producer_id>/<string:job_id>"
55 ARM_DELETE_RESPONSE="/arm/delete/<string:producer_id>/<string:job_id>"
56 ARM_SUPERVISION_RESPONSE="/arm/supervision/<string:producer_id>"
57 ARM_TYPE="/arm/type/<string:producer_id>/<string:type_id>"
58 COUNTER_SUPERVISION="/counter/supervision/<string:producer_id>"
59 COUNTER_CREATE="/counter/create/<string:producer_id>/<string:job_id>"
60 COUNTER_DELETE="/counter/delete/<string:producer_id>/<string:job_id>"
61
62 JOB_DATA="/jobdata/<string:producer_id>/<string:job_id>"
63
64 STATUS="/status"
65
66 #Constsants
67 APPL_JSON='application/json'
68 UNKNOWN_QUERY_PARAMETERS="Unknown query parameter(s)"
69 RETURNING_CONFIGURED_RESP="returning configured response code"
70 JOBID_NO_MATCH="job id in stored json does not match request"
71 PRODUCER_OR_JOB_NOT_FOUND="producer or job not found"
72 PRODUCER_NOT_FOUND="producer not found"
73 TYPE_NOT_FOUND="type not found"
74 TYPE_IN_USE="type is in use in a job"
75 JOB_NOT_FOUND="job not found"
76 JOB_DATA_NOT_FOUND="job data not found"
77 JSON_CORRUPT="json in request is corrupt or missing"
78
79 #Producer and job db, including armed responses
80 db={}
81 # producer
82 #  armed response for supervision
83 #  armed types
84 #  supervision counter
85 #  job
86 #    job json
87 #    target_type
88 #    armed response for create
89 #    armed response for delete
90 #    create counter
91 #    delete counter
92 #    delivering status
93
94 # disable warning about unverified https requests
95 from requests.packages import urllib3
96
97 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
98
99 # Helper function to populate a callback dict with the basic structure
100 # if job_id is None then only the producer level is setup and the producer dict is returned
101 # if job_id is not None, the job level is setup and the job dict is returned (producer must exist)
102 def setup_callback_dict(producer_id, job_id):
103
104     producer_dict=None
105     if (producer_id in db.keys()):
106         producer_dict=db[producer_id]
107     else:
108         if (job_id is not None):
109             return None
110         producer_dict={}
111         db[producer_id]=producer_dict
112
113         producer_dict['supervision_response']=200
114         producer_dict['supervision_counter']=0
115         producer_dict['types']=[]
116
117     if (job_id is None):
118         return producer_dict
119
120     job_dict=None
121     if (job_id in producer_dict.keys()):
122         job_dict=producer_dict[job_id]
123     else:
124         job_dict={}
125         producer_dict[job_id]=job_dict
126         job_dict['create_response']=201
127         job_dict['delete_response']=404
128         job_dict['json']=None
129         job_dict['create_counter']=0
130         job_dict['delete_counter']=0
131         job_dict['delivering']="stopped"
132         job_dict['delivery_attempts']=0
133     return job_dict
134
135
136 # Helper function to get an entry from the callback db
137 # if job_id is None then only the producer dict is returned (or None if producer is not found)
138 # if job_id is not None, the job is returned (or None if producer/job is not found)
139 def get_callback_dict(producer_id, job_id):
140
141     producer_dict=None
142     if (producer_id in db.keys()):
143         producer_dict=db[producer_id]
144
145     if (producer_dict is None):
146         return None
147
148     if (job_id is None):
149         return producer_dict
150
151     job_dict=None
152     if (job_id in producer_dict.keys()):
153         job_dict=producer_dict[job_id]
154
155     return job_dict
156
157 # Helper function find if a key/valye exist in the dictionay tree
158 # True if found
159 def recursive_search(s_dict, s_key, s_id):
160     for pkey in s_dict:
161         if (pkey == s_key) and (s_dict[pkey] == s_id):
162             return True
163         if (isinstance(s_dict[pkey], dict)):
164             recursive_search(s_dict[pkey], s_key, s_id)
165
166     return False
167
168 # Helper function to find all job dicts
169 def get_all_jobs():
170     job_dicts={}
171     for producer_key in db:
172         producer_dict = db[producer_key]
173         for job_key in producer_dict:
174             job_dict = producer_dict[job_key]
175             if (isinstance(job_dict, dict)):
176                 job_dicts[job_key]=job_dict
177     return job_dicts
178
179 # I'm alive function
180 # response: always 200
181 @app.route('/',
182     methods=['GET'])
183 def index():
184     return 'OK', 200
185
186 # Arm the create callback with a response code
187 # Omitting the query parameter switch to response back to the standard 200/201 response
188 # URI and parameters (PUT): /arm/create/<producer_id>/<job_id>[?response=<resonsecode>]
189 # Setting
190 # response: 200 (400 if incorrect query params)
191 @app.route(ARM_CREATE_RESPONSE,
192      methods=['PUT'])
193 def arm_create(producer_id, job_id):
194
195     arm_response=request.args.get('response')
196
197     if (arm_response is None):
198         if (len(request.args) != 0):
199             return UNKNOWN_QUERY_PARAMETERS,400
200     else:
201         if (len(request.args) != 1):
202             return UNKNOWN_QUERY_PARAMETERS,400
203
204
205     print("Arm create received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
206
207     job_dict=setup_callback_dict(producer_id, job_id)
208
209     if (arm_response is None):    #Reset the response depending if a job exists or not
210         if (job_dict['json'] is None):
211             job_dict['create_response']=201
212         else:
213             job_dict['create_response']=200
214     else:
215         job_dict['create_response']=int(arm_response)
216
217     return "",200
218
219 # Arm the delete callback with a response code
220 # Omitting the query parameter switch to response back to the standard 204 response
221 # URI and parameters (PUT): /arm/delete/<producer_id>/<job-id>[?response=<resonsecode>]
222 # response: 200 (400 if incorrect query params)
223 @app.route(ARM_DELETE_RESPONSE,
224      methods=['PUT'])
225 def arm_delete(producer_id, job_id):
226
227     arm_response=request.args.get('response')
228
229     if (arm_response is None):
230         if (len(request.args) != 0):
231             return UNKNOWN_QUERY_PARAMETERS,400
232     else:
233         if (len(request.args) != 1):
234             return UNKNOWN_QUERY_PARAMETERS,400
235
236     print("Arm delete received for producer: "+str(producer_id)+" and job: "+str(job_id)+" and response: "+str(arm_response))
237
238     arm_response=request.args.get('response')
239
240     job_dict=setup_callback_dict(producer_id, job_id)
241
242     if (arm_response is None): #Reset the response depening if a job exists or not
243         if (job_dict['json'] is None):
244             job_dict['delete_response']=404
245         else:
246             job_dict['delete_response']=204
247     else:
248         job_dict['delete_response']=int(arm_response)
249
250     return "",200
251
252 # Arm the supervision callback with a response code
253 # Omitting the query parameter switch to response back to the standard 200 response
254 # URI and parameters (PUT): /arm/supervision/<producer_id>[?response=<resonsecode>]
255 # response: 200 (400 if incorrect query params)
256 @app.route(ARM_SUPERVISION_RESPONSE,
257      methods=['PUT'])
258 def arm_supervision(producer_id):
259
260     arm_response=request.args.get('response')
261
262     if (arm_response is None):
263         if (len(request.args) != 0):
264             return UNKNOWN_QUERY_PARAMETERS,400
265     else:
266         if (len(request.args) != 1):
267             return UNKNOWN_QUERY_PARAMETERS,400
268
269     print("Arm supervision received for producer: "+str(producer_id)+" and response: "+str(arm_response))
270
271     producer_dict=setup_callback_dict(producer_id, None)
272     if (arm_response is None):
273         producer_dict['supervision_response']=200
274     else:
275         producer_dict['supervision_response']=int(arm_response)
276
277     return "",200
278
279 # Arm a producer with a type
280 # URI and parameters (PUT): /arm/type/<string:producer_id>/<string:type-id>
281 # response: 200 (404)
282 @app.route(ARM_TYPE,
283     methods=['PUT'])
284 def arm_type(producer_id, type_id):
285
286     print("Arm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
287
288     producer_dict=get_callback_dict(producer_id, None)
289
290     if (producer_dict is None):
291         return PRODUCER_NOT_FOUND,404
292
293     type_list=producer_dict['types']
294     if (type_id not in type_list):
295         type_list.append(type_id)
296
297     return "",200
298
299 # Disarm a producer with a type
300 # URI and parameters (DELETE): /arm/type/<string:producer_id>/<string:type-id>
301 # response: 200 (404)
302 @app.route(ARM_TYPE,
303     methods=['DELETE'])
304 def disarm_type(producer_id, type_id):
305
306     print("Disarm type received for producer: "+str(producer_id)+" and type: "+str(type_id))
307
308     producer_dict=get_callback_dict(producer_id, None)
309
310     if (producer_dict is None):
311         return PRODUCER_NOT_FOUND,404
312
313     if (recursive_search(producer_dict, "ei_job_type",type_id) is True):
314         return "TYPE_IN_USE",400
315
316     type_list=producer_dict['types']
317     type_list.remove(type_id)
318
319     return "",200
320
321
322 # Callback for create job
323 # URI and parameters (POST): /callbacks/job/<producer_id>
324 # response 201 at create, 200 at update or other configured response code
325 @app.route(CALLBACK_CREATE_URL,
326      methods=['POST'])
327 def callback_create(producer_id):
328
329     req_json_dict=None
330     try:
331         req_json_dict = json.loads(request.data)
332         with open('job-schema.json') as f:
333             schema = json.load(f)
334             validate(instance=req_json_dict, schema=schema)
335     except Exception:
336         return JSON_CORRUPT,400
337
338     producer_dict=get_callback_dict(producer_id, None)
339     if (producer_dict is None):
340         return PRODUCER_OR_JOB_NOT_FOUND,400
341     type_list=producer_dict['types']
342     type_id=req_json_dict['ei_type_identity']
343     if (type_id not in type_list):
344         return TYPE_NOT_FOUND, 400
345
346     job_id=req_json_dict['ei_job_identity']
347     job_dict=get_callback_dict(producer_id, job_id)
348     if (job_dict is None):
349         return PRODUCER_OR_JOB_NOT_FOUND,400
350     return_code=0
351     return_msg=""
352     if (req_json_dict['ei_job_identity'] == job_id):
353         print("Create callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
354         return_code=job_dict['create_response']
355         if ((job_dict['create_response'] == 200) or (job_dict['create_response'] == 201)):
356             job_dict['json']=req_json_dict
357             job_dict['delivering']="delivering"
358             if (job_dict['create_response'] == 201): #Set up next response code if create was ok
359                 job_dict['create_response'] = 200
360             if (job_dict['delete_response'] == 404):
361                 job_dict['delete_response'] = 204
362         else:
363             if(job_dict['delivering'] == "delivering"):
364                 job_dict['delivering']="hold"
365             return_msg=RETURNING_CONFIGURED_RESP
366
367         job_dict['create_counter']=job_dict['create_counter']+1
368     else:
369         return JOBID_NO_MATCH, 400
370
371     return return_msg, return_code
372
373 # Callback for delete job
374 # URI and parameters (DELETE): /callbacks/job/<producer_id>/<job_id>
375 # response: 204 at delete or other configured response code
376 @app.route(CALLBACK_DELETE_URL,
377      methods=['DELETE'])
378 def callback_delete(producer_id, job_id):
379
380     job_dict=get_callback_dict(producer_id, job_id)
381     if (job_dict is None):
382         return PRODUCER_OR_JOB_NOT_FOUND,400
383     return_code=0
384     return_msg=""
385     print("Delete callback received for producer: "+str(producer_id)+" and job: "+str(job_id))
386     return_code=job_dict['delete_response']
387     if (job_dict['delete_response'] == 204):
388         job_dict['json']=None
389         job_dict['delete_response']=404
390         job_dict['delivering']="stopped"
391         if (job_dict['create_response'] == 200):
392             job_dict['create_response'] = 201 # reset create response if delete was ok
393     else:
394         return_msg=RETURNING_CONFIGURED_RESP
395
396     job_dict['delete_counter']=job_dict['delete_counter']+1
397
398     return return_msg, return_code
399
400 # Callback for supervision of producer
401 # URI and parameters (GET): /callbacks/supervision/<producer_id>
402 # response: 200 or other configured response code
403 @app.route(CALLBACK_SUPERVISION_URL,
404      methods=['GET'])
405 def callback_supervision(producer_id):
406
407     print("Supervision callback received for producer: "+str(producer_id))
408
409     producer_dict=get_callback_dict(producer_id, None)
410     if (producer_dict is None):
411         return PRODUCER_NOT_FOUND,400
412     return_code=producer_dict['supervision_response']
413     return_msg=""
414     if (return_code != 200):
415         return_msg="returning configured response code"
416
417     producer_dict['supervision_counter']=producer_dict['supervision_counter']+1
418
419     return return_msg,producer_dict['supervision_response']
420
421 # Get the job definition for a job
422 # URI and parameters (GET): "/jobdata/<string:producer_id>/<string:job_id>"
423 # response: 200 or 204
424 @app.route(JOB_DATA,
425      methods=['GET'])
426 def get_jobdata(producer_id, job_id):
427
428     print("Get job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
429
430     job_dict=get_callback_dict(producer_id, job_id)
431
432     if (job_dict is None):
433         return PRODUCER_OR_JOB_NOT_FOUND,400
434
435     if (job_dict['json'] is None):
436         return "",204
437     else:
438         return json.dumps(job_dict['json']), 200
439
440 # Delete the job definition for a job
441 # URI and parameters (DELETE): "/jobdata/<string:producer_id>/<string:job_id>"
442 # response: 204
443 @app.route(JOB_DATA,
444      methods=['DELETE'])
445 def del_jobdata(producer_id, job_id):
446
447     print("Delete job data received for producer: "+str(producer_id)+" and job: "+str(job_id))
448
449     job_dict=get_callback_dict(producer_id, job_id)
450
451     if (job_dict is None):
452         return PRODUCER_OR_JOB_NOT_FOUND,400
453
454     job_dict['json']=None
455
456     return "",204
457
458
459 # Start data delivery for a job, action : START or STOP
460 # URI and parameters (POST): "/jobdata/<string:producer_id>/<string:job_id>?action=action"
461 # response: 200 or 204
462 @app.route(JOB_DATA,
463      methods=['POST'])
464 def start_jobdata(producer_id, job_id):
465
466     action=request.args.get('action')
467
468     if (action is None):
469         return UNKNOWN_QUERY_PARAMETERS,400
470     else:
471         if (len(request.args) != 1):
472             return UNKNOWN_QUERY_PARAMETERS,400
473         else:
474             if ((action != "START") and (action != "STOP")):
475                 return UNKNOWN_QUERY_PARAMETERS,400
476
477     print("Job data action received for producer: "+str(producer_id)+" and job: "+str(job_id) + " action: " + action)
478
479     job_dict=get_callback_dict(producer_id, job_id)
480     if (job_dict is None):
481         return JOB_NOT_FOUND,404
482
483     if (job_dict['json'] is None):
484         return JOB_DATA_NOT_FOUND, 400
485     else:
486         if (action == "START"):
487             job_dict['delivering']="delivering"
488         else:
489             job_dict['delivering']="stopped"
490         return "",200
491
492
493 # Counter for create calls for a job
494 # URI and parameters (GET): "/counter/create/<string:producer_id>/<string:job_id>"
495 # response: 200 and counter value
496 @app.route(COUNTER_CREATE,
497      methods=['GET'])
498 def counter_create(producer_id, job_id):
499     job_dict=get_callback_dict(producer_id, job_id)
500     if (job_dict is None):
501         return "-1",200
502     return str(job_dict['create_counter']),200
503
504 # Counter for delete calls for a job
505 # URI and parameters (GET): "/counter/delete/<string:producer_id>/<string:job_id>"
506 # response: 200 and counter value
507 @app.route(COUNTER_DELETE,
508      methods=['GET'])
509 def counter_delete(producer_id, job_id):
510     job_dict=get_callback_dict(producer_id, job_id)
511     if (job_dict is None):
512         return "-1",200
513     return str(job_dict['delete_counter']),200
514
515 # Counter for supervision calls for a producer
516 # URI and parameters (GET): "/counter/supervision/<string:producer_id>"
517 # response: 200 and counter value
518 @app.route(COUNTER_SUPERVISION,
519      methods=['GET'])
520 def counter_supervision(producer_id):
521     producer_dict=get_callback_dict(producer_id, None)
522     if (producer_dict is None):
523         return "-1",200
524     return str(producer_dict['supervision_counter']),200
525
526 # Get status info
527 # URI and parameters (GET): "/status"
528 # -
529 @app.route(STATUS,
530     methods=['GET'])
531 def status():
532     global db
533     return json.dumps(db),200
534
535
536 # Reset db
537 @app.route('/reset',
538     methods=['GET', 'POST', 'PUT'])
539 def reset():
540     global db
541     db={}
542     return "",200
543
544
545 def datadelivery() :
546     while True:
547         try:
548             job_dicts=get_all_jobs()
549             for key in job_dicts:
550                 job=job_dicts[key]
551                 if (job['delivering'] == "delivering" and job['json'] != None):
552                     url=job['json']['target_uri']
553                     if (str(url).find("localhost:") == -1):   #Dont deliver to localhost...
554                         data={}
555                         data["date"]=str(datetime.datetime.now())
556                         data["job"]=""+key
557                         data["sequence_no"]=""+str(job['delivery_attempts'])
558                         data["value"]=str(100)
559                         print("Sending to "+url+" payload:"+json.dumps(data))
560
561                         requests.post(url, json=data, verify=False, timeout=2) #NOSONAR
562                         job['delivery_attempts'] += 1
563         except Exception as err:
564             print("Error during data delivery: "+ str(err))
565         time.sleep(1)
566
567
568 ### Main function ###
569
570 print("Starting data delivery thread")
571 thread = threading.Thread(target=datadelivery, args=())
572 thread.daemon = True
573 thread.start()
574
575 if __name__ == "__main__":
576     app.run(port=HOST_PORT, host=HOST_IP)