Function: Get FS Port
"""
return self.fs_dict['fs_port']
+
from lru import LRU
from ConfigHelper import ConfigHelper
from Pipeline import Pipeline
+from source.InfluxSource import InfluxSource
class FeatureEngineeringFactory():
"""
self.modulename = ""
self.default_class = ""
self.hash_table = LRU(10)
+ self.influxdb_dict={}
self.logger.debug("************Starting Init FeatureEngineeringFactory***********")
try:
self.spark_session = spark_manager
my_class = getattr(importlib.import_module(self.modulename+"."+self.default_class), self.default_class)
my_classinstance = my_class(my_classorflavour)
self.logger.debug("Initialization of Class")
- my_classinstance.init(self.spark_session, self.config_help, inputs_to_class)
+ # if the tm sends the influx_db info dict
+ if any(self.influxdb_dict.values()) and baseclass_name=="Source":
+ my_classinstance.init_dynamic(self.spark_session, self.config_help, inputs_to_class, self.influxdb_dict)
+ else :
+ #when the tm does not sends the influx_db dict
+ my_classinstance.init(self.spark_session, self.config_help, inputs_to_class)
return my_classinstance
return my_instancelist
- def get_batch_pipeline(self, source_classdict, transform_classdict, sink_classdict, caching_key):
+ def get_batch_pipeline(self, source_classdict, transform_classdict, sink_classdict, influxdb_dict, caching_key):
"""
@Function: Makes to get Batch Pipeline
@ Input : source Classess, Sink Classess, Transform Classess
@Output : Instance of Pipeline Object
"""
+ self.influxdb_dict= influxdb_dict
if self.hash_table.get(caching_key) is None:
self.logger.debug("Cached Instance Not Found, Creating Instance, Key" + caching_key)
source_instancelist = None
source_dict = request_json["source"]
transform_dict = request_json["transform"]
sink_dict = request_json["sink"]
+ influxdb_dict= request_json["influxdb_info"]
c_key = str(source_dict)+str(transform_dict)+str(sink_dict)
logger.debug(c_key)
- feature_engineering_pipeline = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, c_key)
+ feature_engineering_pipeline = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, influxdb_dict, c_key)
session = session_helper.get_session()
feature_engineering_pipeline.load_data(session)
feature_engineering_pipeline.transform_data(session)
self.logger.debug("Set variable query"+ self.query)
else:
self.logger.error("Expected variable query"+ self.query)
+
+ def init_dynamic(self, sparkhelper, confighelper, inputdict, influxdb_dict):
+ """
+ @Method: init
+ @Input : spark - Spark
+ confighelper - config helper
+ inputdict - input dictionary
+ influx data source info - input dictionary
+ @Output: None
+ """
+ self.logger = confighelper.getLogger()
+ self.logger.debug("the init_dynamic is called")
+ classconfig = confighelper.getClassConfig(self)
+ self.url = f"http://{influxdb_dict['host']}:{influxdb_dict['port']}"
+ self.token = influxdb_dict["token"]
+ self.org = influxdb_dict["db_org"]
+ self.timeout = 100000
+ self.ssl = False
+ self.query = classconfig["Query"]
+ if "query" in inputdict.keys():
+ self.query = inputdict["query"]
+ self.logger.debug("Set variable query"+ self.query)
+ else:
+ self.logger.error("Expected variable query"+ self.query)
+
def load(self,sparksession):
"""
@Method: init
class Test_Pipeline:
def setup_method(self):
- api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}}
+ api_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check4'}}, 'influxdb_info': {'host': '', 'port': '', 'token': '', 'source_name': '', 'db_org': '', 'bucket': ''}}
load_dotenv('test/test_env.env')
os.environ['CODE_DIR_PATH'] = 'test'
session_helper = SparkSessionManager()
factory = FeatureEngineeringFactory(session_helper)
- (source_dict, transform_dict, sink_dict) = (api_json['source'], api_json['transform'], api_json['sink'])
- self.obj = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, str(source_dict) + str(transform_dict) + str(sink_dict))
+ (source_dict, transform_dict, sink_dict, influxdb_dict) = (api_json['source'], api_json['transform'], api_json['sink'], api_json['influxdb_info'])
+ self.obj = factory.get_batch_pipeline(source_dict, transform_dict, sink_dict, influxdb_dict,str(source_dict) + str(transform_dict) + str(sink_dict))
self.spark_session = session_helper
Flask-RESTful
lru-dict
PyYAML
+ Werkzeug==2.2.2
commands =