beef up the AI/ML framework tests by adding InfluxDB as data source and populate...
[it/test.git] / XTesting / aiml-fw / insert.py
1 import pandas as pd
2 from influxdb_client import InfluxDBClient
3 from influxdb_client.client.write_api import SYNCHRONOUS
4 import datetime
5
6
7 class INSERTDATA:
8
9    def __init__(self):
10         self.client = InfluxDBClient(url = "http://localhost:8086", token="xJVlOom1GRUxDNkldo1v")
11
12
13 def explode(df):
14      for col in df.columns:
15              if isinstance(df.iloc[0][col], list):
16                      df = df.explode(col)
17              d = df[col].apply(pd.Series)
18              df[d.columns] = d
19              df = df.drop(col, axis=1)
20      return df
21
22
23 def jsonToTable(df):
24      df.index = range(len(df))
25      cols = [col for col in df.columns if isinstance(df.iloc[0][col], dict) or isinstance(df.iloc[0][col], list)]
26      if len(cols) == 0:
27              return df
28      for col in cols:
29              d = explode(pd.DataFrame(df[col], columns=[col]))
30              d = d.dropna(axis=1, how='all')
31              df = pd.concat([df, d], axis=1)
32              df = df.drop(col, axis=1).dropna()
33      return jsonToTable(df)
34
35
36 def time(df):
37      df.index = pd.date_range(start=datetime.datetime.now(), freq='10ms', periods=len(df))
38      df['measTimeStampRf'] = df['measTimeStampRf'].apply(lambda x: str(x))
39      return df
40
41
42 def populatedb():
43      df = pd.read_json('cell.json', lines=True)
44      df = df[['cellMeasReport']].dropna()
45      df = jsonToTable(df)
46      df = time(df)
47      db = INSERTDATA()
48      write_api = db.client.write_api(write_options=SYNCHRONOUS)
49      write_api.write(bucket="UEData",record=df, data_frame_measurement_name="liveCell",org="primary")
50
51 populatedb()