2 * ========================LICENSE_START=================================
5 * Copyright (C) 2023 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.pmlog;
23 import com.influxdb.client.InfluxDBClient;
24 import com.influxdb.client.InfluxDBClientFactory;
25 import com.influxdb.client.WriteApiBlocking;
26 import com.influxdb.client.domain.Ready;
27 import com.influxdb.client.domain.WritePrecision;
28 import com.influxdb.client.write.Point;
30 import java.time.Instant;
31 import java.util.ArrayList;
32 import java.util.List;
36 import org.oran.pmlog.configuration.ApplicationConfig;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import reactor.core.Disposable;
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
45 * The class receives PM reports and stores these in an Influx DB
47 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
48 public class InfluxStore {
49 private static final Logger logger = LoggerFactory.getLogger(InfluxStore.class);
52 private Disposable subscription;
54 private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
55 private final ApplicationConfig applConfig;
57 private final InfluxDBClient influxClient;
59 public InfluxStore(ApplicationConfig applConfig) {
60 this.applConfig = applConfig;
61 this.influxClient = createInfluxClient();
66 private void pingDb() {
68 String version = this.influxClient.version();
69 logger.info("Influx version {} ", version);
70 Ready ready = this.influxClient.ready();
71 logger.info("Ready {}", ready);
72 logger.info("Onboarding {}", this.influxClient.isOnboardingAllowed());
73 } catch (Exception e) {
74 logger.error("Could not connect to influx DB, reason: {}", e.getMessage());
78 private InfluxDBClient createInfluxClient() {
80 // org = est bucket = pm_data
81 String token = applConfig.getInfluxAccessToken();
82 if (!token.isBlank()) {
83 return InfluxDBClientFactory.create(applConfig.getInfluxUrl(), token.toCharArray());
85 return InfluxDBClientFactory.createV1(applConfig.getInfluxUrl(), //
86 applConfig.getInfluxUser(), applConfig.getInfluxPassword().toCharArray(), //
87 applConfig.getInfluxDatabase(), //
90 } catch (Exception e) {
91 logger.error("Exception,could not create influx client {}", e.getMessage());
96 public void start(Flux<DataFromKafkaTopic> input) {
97 this.subscription = input.flatMap(this::storeInInflux) //
98 .subscribe(this::handleSentOk, //
99 this::handleExceptionInStream, //
104 private String measType(PmReport.MeasResult measResult, PmReport.MeasInfoList measInfoList) {
105 return measInfoList.getMeasTypes().getMeasType(measResult.getP());
108 private String fdn(PmReport report, PmReport.MeasValuesList measValueList) {
109 return report.event.getPerf3gppFields().getMeasDataCollection().getMeasuredEntityDn() + ","
110 + measValueList.getMeasObjInstId();
113 private void addCounterFieldToPoint(Point point, PmReport.MeasInfoList measInfoList,
114 PmReport.MeasValuesList measValueList, PmReport.MeasResult measResult) {
115 String measType = measType(measResult, measInfoList);
118 Long value = Long.valueOf(measResult.getSValue());
119 point.addField(measType, value);
120 } catch (Exception e) {
121 point.addField(measType, measResult.getSValue());
125 private Instant endTime(PmReport report) {
126 return Instant.ofEpochMilli(report.lastTimeEpochMili());
129 private Mono<String> storeInInflux(DataFromKafkaTopic data) {
130 PmReport report = PmReport.parse(data);
132 List<Point> points = new ArrayList<>();
133 PmReport.MeasDataCollection measDataCollection = report.event.getPerf3gppFields().getMeasDataCollection();
134 for (PmReport.MeasInfoList measInfoList : measDataCollection.getMeasInfoList()) {
136 for (PmReport.MeasValuesList measValueList : measInfoList.getMeasValuesList()) {
137 if (measValueList.getSuspectFlag().equals("true")) {
140 Point point = Point.measurement(fdn(report, measValueList)).time(endTime(report), WritePrecision.MS);
142 point.addField("GranularityPeriod", measDataCollection.getGranularityPeriod());
144 for (PmReport.MeasResult measResult : measValueList.getMeasResults()) {
145 addCounterFieldToPoint(point, measInfoList, measValueList, measResult);
151 store(points, report);
153 logger.info("Processed file from: {}", report.event.getCommonEventHeader().getSourceName());
155 return Mono.just("ok");
159 public void store(List<Point> points, PmReport report) {
161 WriteApiBlocking writeApi = influxClient.getWriteApiBlocking();
162 writeApi.writePoints(applConfig.getInfluxBucket(), applConfig.getInfluxOrg(), points);
163 } catch (Exception e) {
164 logger.error("Could not write points from {}, reason: {}",
165 report.event.getCommonEventHeader().getSourceName(), e.getMessage());
169 public synchronized void stop() {
170 if (this.subscription != null) {
171 this.subscription.dispose();
172 this.subscription = null;
174 logger.info("InfluxStore stopped");
177 private void handleExceptionInStream(Throwable t) {
178 logger.warn(" exception: {}", t.getMessage());
182 public synchronized boolean isRunning() {
183 return this.subscription != null;
186 private void handleSentOk(String data) {
187 logger.debug("Stored data");