beef up the AI/ML framework tests by adding InfluxDB as data source and populate...
[it/test.git] / XTesting / aiml-fw / insert.py
diff --git a/XTesting/aiml-fw/insert.py b/XTesting/aiml-fw/insert.py
new file mode 100644 (file)
index 0000000..25983e4
--- /dev/null
@@ -0,0 +1,51 @@
+import pandas as pd
+from influxdb_client import InfluxDBClient
+from influxdb_client.client.write_api import SYNCHRONOUS
+import datetime
+
+
+class INSERTDATA:
+
+   def __init__(self):
+        self.client = InfluxDBClient(url = "http://localhost:8086", token="xJVlOom1GRUxDNkldo1v")
+
+
+def explode(df):
+     for col in df.columns:
+             if isinstance(df.iloc[0][col], list):
+                     df = df.explode(col)
+             d = df[col].apply(pd.Series)
+             df[d.columns] = d
+             df = df.drop(col, axis=1)
+     return df
+
+
+def jsonToTable(df):
+     df.index = range(len(df))
+     cols = [col for col in df.columns if isinstance(df.iloc[0][col], dict) or isinstance(df.iloc[0][col], list)]
+     if len(cols) == 0:
+             return df
+     for col in cols:
+             d = explode(pd.DataFrame(df[col], columns=[col]))
+             d = d.dropna(axis=1, how='all')
+             df = pd.concat([df, d], axis=1)
+             df = df.drop(col, axis=1).dropna()
+     return jsonToTable(df)
+
+
+def time(df):
+     df.index = pd.date_range(start=datetime.datetime.now(), freq='10ms', periods=len(df))
+     df['measTimeStampRf'] = df['measTimeStampRf'].apply(lambda x: str(x))
+     return df
+
+
+def populatedb():
+     df = pd.read_json('cell.json', lines=True)
+     df = df[['cellMeasReport']].dropna()
+     df = jsonToTable(df)
+     df = time(df)
+     db = INSERTDATA()
+     write_api = db.client.write_api(write_options=SYNCHRONOUS)
+     write_api.write(bucket="UEData",record=df, data_frame_measurement_name="liveCell",org="primary")
+
+populatedb()