2 * ========================LICENSE_START=================================
5 * Copyright (C) 2023 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.pmlog;
23 import com.influxdb.client.InfluxDBClient;
24 import com.influxdb.client.InfluxDBClientFactory;
25 import com.influxdb.client.WriteApiBlocking;
26 import com.influxdb.client.domain.Ready;
27 import com.influxdb.client.domain.WritePrecision;
28 import com.influxdb.client.write.Point;
30 import java.time.Instant;
31 import java.util.ArrayList;
32 import java.util.List;
36 import org.oran.pmlog.configuration.ApplicationConfig;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import reactor.core.Disposable;
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
45 * The class receives PM reports and stores these in an Influx DB
47 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
48 public class InfluxStore {
49 private static final Logger logger = LoggerFactory.getLogger(InfluxStore.class);
52 private Disposable subscription;
54 private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
55 private final ApplicationConfig applConfig;
57 private final InfluxDBClient influxClient;
59 public InfluxStore(ApplicationConfig applConfig) {
60 this.applConfig = applConfig;
61 this.influxClient = createInfluxClient();
66 private void pingDb() {
68 String version = this.influxClient.version();
69 logger.info("Influx version {} ", version);
70 Ready ready = this.influxClient.ready();
71 logger.info("Ready {}", ready);
72 logger.info("Onboarding {}", this.influxClient.isOnboardingAllowed());
73 } catch (Exception e) {
74 logger.error("Could not connect to influx DB, reason: {}", e.getMessage());
78 private InfluxDBClient createInfluxClient() {
80 // org = est bucket = pm_data
81 String token = applConfig.getInfluxAccessToken();
82 if (!token.isBlank()) {
83 return InfluxDBClientFactory.create(applConfig.getInfluxUrl(), token.toCharArray());
85 return InfluxDBClientFactory.createV1(applConfig.getInfluxUrl(), //
86 applConfig.getInfluxUser(), applConfig.getInfluxPassword().toCharArray(), //
87 applConfig.getInfluxDatabase(), //
90 } catch (Exception e) {
91 logger.error("Exception,could not create influx client {}", e.getMessage());
96 public void start(Flux<DataFromKafkaTopic> input) {
97 this.subscription = input.flatMap(this::storeInInflux) //
98 .subscribe(this::handleSentOk, //
99 this::handleExceptionInStream, //
104 private String measType(PmReport.MeasResult measResult, PmReport.MeasInfoList measInfoList) {
105 return measInfoList.getMeasTypes().getMeasType(measResult.getP());
108 private void addCounterFieldToPoint(Point point, PmReport.MeasInfoList measInfoList,
109 PmReport.MeasValuesList measValueList, PmReport.MeasResult measResult) {
110 String measType = measType(measResult, measInfoList);
113 Long value = Long.valueOf(measResult.getSValue());
114 point.addField(measType, value);
115 } catch (Exception e) {
116 point.addField(measType, measResult.getSValue());
120 private Instant endTime(PmReport report) {
121 return Instant.ofEpochMilli(report.lastTimeEpochMili());
124 private Mono<String> storeInInflux(DataFromKafkaTopic data) {
125 PmReport report = PmReport.parse(data);
127 List<Point> points = new ArrayList<>();
128 PmReport.MeasDataCollection measDataCollection = report.event.getPerf3gppFields().getMeasDataCollection();
129 for (PmReport.MeasInfoList measInfoList : measDataCollection.getMeasInfoList()) {
131 for (PmReport.MeasValuesList measValueList : measInfoList.getMeasValuesList()) {
132 if (measValueList.getSuspectFlag().equals("true")) {
135 Point point = Point.measurement(report.fullDistinguishedName(measValueList)).time(endTime(report),
138 point.addField("GranularityPeriod", measDataCollection.getGranularityPeriod());
140 for (PmReport.MeasResult measResult : measValueList.getMeasResults()) {
141 addCounterFieldToPoint(point, measInfoList, measValueList, measResult);
147 store(points, report);
149 logger.info("Processed file from: {}", report.event.getCommonEventHeader().getSourceName());
151 return Mono.just("ok");
155 public void store(List<Point> points, PmReport report) {
157 WriteApiBlocking writeApi = influxClient.getWriteApiBlocking();
158 writeApi.writePoints(applConfig.getInfluxBucket(), applConfig.getInfluxOrg(), points);
159 } catch (Exception e) {
160 logger.error("Could not write points from {}, reason: {}",
161 report.event.getCommonEventHeader().getSourceName(), e.getMessage());
165 public synchronized void stop() {
166 if (this.subscription != null) {
167 this.subscription.dispose();
168 this.subscription = null;
170 logger.info("InfluxStore stopped");
173 private void handleExceptionInStream(Throwable t) {
174 logger.warn(" exception: {}", t.getMessage());
178 public synchronized boolean isRunning() {
179 return this.subscription != null;
182 private void handleSentOk(String data) {
183 logger.debug("Stored data");