Stepping to springboot 3
[nonrtric.git] / pmlog / src / main / java / org / oran / pmlog / InfluxStore.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmlog;
22
23 import com.influxdb.client.InfluxDBClient;
24 import com.influxdb.client.InfluxDBClientFactory;
25 import com.influxdb.client.WriteApiBlocking;
26 import com.influxdb.client.domain.Ready;
27 import com.influxdb.client.domain.WritePrecision;
28 import com.influxdb.client.write.Point;
29
30 import java.time.Instant;
31 import java.util.ArrayList;
32 import java.util.List;
33
34 import lombok.Getter;
35
36 import org.oran.pmlog.configuration.ApplicationConfig;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import reactor.core.Disposable;
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
43
44 /**
45  * The class receives PM reports and stores these in an Influx DB
46  */
47 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
48 public class InfluxStore {
49     private static final Logger logger = LoggerFactory.getLogger(InfluxStore.class);
50
51     @Getter
52     private Disposable subscription;
53
54     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
55     private final ApplicationConfig applConfig;
56
57     private final InfluxDBClient influxClient;
58
59     public InfluxStore(ApplicationConfig applConfig) {
60         this.applConfig = applConfig;
61         this.influxClient = createInfluxClient();
62
63         pingDb();
64     }
65
66     private void pingDb() {
67         try {
68             String version = this.influxClient.version();
69             logger.info("Influx version {} ", version);
70             Ready ready = this.influxClient.ready();
71             logger.info("Ready {}", ready);
72             logger.info("Onboarding {}", this.influxClient.isOnboardingAllowed());
73         } catch (Exception e) {
74             logger.error("Could not connect to influx DB, reason: {}", e.getMessage());
75         }
76     }
77
78     private InfluxDBClient createInfluxClient() {
79         try {
80             // org = est bucket = pm_data
81             String token = applConfig.getInfluxAccessToken();
82             if (!token.isBlank()) {
83                 return InfluxDBClientFactory.create(applConfig.getInfluxUrl(), token.toCharArray());
84             } else {
85                 return InfluxDBClientFactory.createV1(applConfig.getInfluxUrl(), //
86                         applConfig.getInfluxUser(), applConfig.getInfluxPassword().toCharArray(), //
87                         applConfig.getInfluxDatabase(), //
88                         null);
89             }
90         } catch (Exception e) {
91             logger.error("Exception,could not create influx client {}", e.getMessage());
92             return null;
93         }
94     }
95
96     public void start(Flux<DataFromKafkaTopic> input) {
97         this.subscription = input.flatMap(this::storeInInflux) //
98                 .subscribe(this::handleSentOk, //
99                         this::handleExceptionInStream, //
100                         () -> stop());
101
102     }
103
104     private String measType(PmReport.MeasResult measResult, PmReport.MeasInfoList measInfoList) {
105         return measInfoList.getMeasTypes().getMeasType(measResult.getP());
106     }
107
108     private void addCounterFieldToPoint(Point point, PmReport.MeasInfoList measInfoList,
109             PmReport.MeasValuesList measValueList, PmReport.MeasResult measResult) {
110         String measType = measType(measResult, measInfoList);
111
112         try {
113             Long value = Long.valueOf(measResult.getSValue());
114             point.addField(measType, value);
115         } catch (Exception e) {
116             point.addField(measType, measResult.getSValue());
117         }
118     }
119
120     private Instant endTime(PmReport report) {
121         return Instant.ofEpochMilli(report.lastTimeEpochMili());
122     }
123
124     private Mono<String> storeInInflux(DataFromKafkaTopic data) {
125         PmReport report = PmReport.parse(data);
126
127         List<Point> points = new ArrayList<>();
128         PmReport.MeasDataCollection measDataCollection = report.event.getPerf3gppFields().getMeasDataCollection();
129         for (PmReport.MeasInfoList measInfoList : measDataCollection.getMeasInfoList()) {
130
131             for (PmReport.MeasValuesList measValueList : measInfoList.getMeasValuesList()) {
132                 if (measValueList.getSuspectFlag().equals("true")) {
133                     continue;
134                 }
135                 Point point = Point.measurement(report.fullDistinguishedName(measValueList)).time(endTime(report),
136                         WritePrecision.MS);
137
138                 point.addField("GranularityPeriod", measDataCollection.getGranularityPeriod());
139
140                 for (PmReport.MeasResult measResult : measValueList.getMeasResults()) {
141                     addCounterFieldToPoint(point, measInfoList, measValueList, measResult);
142                 }
143                 points.add(point);
144             }
145         }
146
147         store(points, report);
148
149         logger.info("Processed file from: {}", report.event.getCommonEventHeader().getSourceName());
150
151         return Mono.just("ok");
152
153     }
154
155     public void store(List<Point> points, PmReport report) {
156         try {
157             WriteApiBlocking writeApi = influxClient.getWriteApiBlocking();
158             writeApi.writePoints(applConfig.getInfluxBucket(), applConfig.getInfluxOrg(), points);
159         } catch (Exception e) {
160             logger.error("Could not write points from {}, reason: {}",
161                     report.event.getCommonEventHeader().getSourceName(), e.getMessage());
162         }
163     }
164
165     public synchronized void stop() {
166         if (this.subscription != null) {
167             this.subscription.dispose();
168             this.subscription = null;
169         }
170         logger.info("InfluxStore stopped");
171     }
172
173     private void handleExceptionInStream(Throwable t) {
174         logger.warn(" exception: {}", t.getMessage());
175         stop();
176     }
177
178     public synchronized boolean isRunning() {
179         return this.subscription != null;
180     }
181
182     private void handleSentOk(String data) {
183         logger.debug("Stored data");
184     }
185
186 }