import json
import requests
+import validators
+
+def create_url_host_port(protocol, host, port, path=''):
+ """
+ This function creates and validates URL based on the inputs.
+ """
+ url = protocol + '://' + host + ':' + port + '/' + path
+ if not validators.url(url):
+ logger.debug('URL validation error: ',url)
+ return None
+ return url
def data_extraction_start(training_config_obj, trainingjob_name, feature_list, query_filter,
datalake_source, _measurement, bucket):
'Accept-Charset': 'UTF-8'})
return response
+
+def create_dme_filtered_data_job(training_config_obj, source_name, db_org, bucket_name,
+ token, features, feature_group_name, host, port):
+ """
+ This function calls Non-RT RIC DME APIs for creating filter PM data jobs.
+ """
+ logger = training_config_obj.logger
+
+ job_json = {
+ "info_type_id": "json-file-data-from-filestore-to-influx",
+ "job_owner": "console",
+ "status_notification_uri": "http://callback.nonrtric:80/post",
+ "job_definition": { "db-url":"http://influxdb.onap:8086",
+ "db-org":db_org,
+ "db-bucket":bucket_name,
+ "db-token": token,
+ "filterType":"pmdata",
+ "filter":
+ {"sourceNames":[source_name],
+ "measTypes":features}}}
+
+ headers = {'Content-type': 'application/json'}
+
+ url = create_url_host_port('http', host, port, 'data-consumer/v1/info-jobs/{}'.format(feature_group_name))
+ logger.debug(url)
+ logger.debug(json.dumps(job_json))
+ response = requests.put(url, data=json.dumps(job_json), headers=headers)
+
+ return response
+
+def delete_dme_filtered_data_job(training_config_obj, feature_group_name, host, port):
+ """
+ This function calls Non-RT RIC DME APIs for deleting filter PM data jobs.
+ """
+ logger = training_config_obj.logger
+
+ url = create_url_host_port('http', host, port, 'data-consumer/v1/info-jobs/{}'.format(feature_group_name))
+ logger.debug(url)
+ response = requests.delete(url)
+ return response