/*- * ========================LICENSE_START================================= * O-RAN-SC * %% * Copyright (C) 2023 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ========================LICENSE_END=================================== */ package org.oran.pmlog; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.WriteApiBlocking; import com.influxdb.client.domain.Ready; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import java.time.Instant; import java.util.ArrayList; import java.util.List; import lombok.Getter; import org.oran.pmlog.configuration.ApplicationConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** * The class receives PM reports and stores these in an Influx DB */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public class InfluxStore { private static final Logger logger = LoggerFactory.getLogger(InfluxStore.class); @Getter private Disposable subscription; private final ApplicationConfig applConfig; private final InfluxDBClient influxClient; public InfluxStore(ApplicationConfig applConfig) { this.applConfig = applConfig; this.influxClient = createInfluxClient(); pingDb(); } private void pingDb() { try { String version = this.influxClient.version(); logger.info("Influx version {} ", version); Ready ready = this.influxClient.ready(); logger.info("Ready {}", ready); logger.info("Onboarding {}", this.influxClient.isOnboardingAllowed()); } catch (Exception e) { logger.error("Could not connect to influx DB, reason: {}", e.getMessage()); } } private InfluxDBClient createInfluxClient() { try { // org = est bucket = pm_data String token = applConfig.getInfluxAccessToken(); if (!token.isBlank()) { return InfluxDBClientFactory.create(applConfig.getInfluxUrl(), token.toCharArray()); } else { return InfluxDBClientFactory.createV1(applConfig.getInfluxUrl(), // applConfig.getInfluxUser(), applConfig.getInfluxPassword().toCharArray(), // applConfig.getInfluxDatabase(), // null); } } catch (Exception e) { logger.error("Exception,could not create influx client {}", e.getMessage()); return null; } } public void start(Flux input) { this.subscription = input.flatMap(this::storeInInflux) // .subscribe(this::handleSentOk, // this::handleExceptionInStream, // this::stop); } private String measType(PmReport.MeasResult measResult, PmReport.MeasInfoList measInfoList) { return measInfoList.getMeasTypes().getMeasType(measResult.getP()); } @SuppressWarnings("java:S1172") private void addCounterFieldToPoint(Point point, PmReport.MeasInfoList measInfoList, PmReport.MeasValuesList measValueList, PmReport.MeasResult measResult) { String measType = measType(measResult, measInfoList); try { Long value = Long.valueOf(measResult.getSValue()); point.addField(measType, value); } catch (Exception e) { point.addField(measType, measResult.getSValue()); } } private Instant endTime(PmReport report) { return Instant.ofEpochMilli(report.lastTimeEpochMili()); } private Mono storeInInflux(DataFromKafkaTopic data) { PmReport report = PmReport.parse(data); List points = new ArrayList<>(); PmReport.MeasDataCollection measDataCollection = report.event.getPerf3gppFields().getMeasDataCollection(); for (PmReport.MeasInfoList measInfoList : measDataCollection.getMeasInfoList()) { for (PmReport.MeasValuesList measValueList : measInfoList.getMeasValuesList()) { if (measValueList.getSuspectFlag().equals("true")) { continue; } Point point = Point.measurement(report.fullDistinguishedName(measValueList)).time(endTime(report), WritePrecision.MS); point.addField("GranularityPeriod", measDataCollection.getGranularityPeriod()); for (PmReport.MeasResult measResult : measValueList.getMeasResults()) { addCounterFieldToPoint(point, measInfoList, measValueList, measResult); } points.add(point); } } store(points, report); logger.info("Processed file from: {}", report.event.getCommonEventHeader().getSourceName()); return Mono.just("ok"); } public void store(List points, PmReport report) { try { WriteApiBlocking writeApi = influxClient.getWriteApiBlocking(); writeApi.writePoints(applConfig.getInfluxBucket(), applConfig.getInfluxOrg(), points); } catch (Exception e) { logger.error("Could not write points from {}, reason: {}", report.event.getCommonEventHeader().getSourceName(), e.getMessage()); } } public synchronized void stop() { if (this.subscription != null) { this.subscription.dispose(); this.subscription = null; } logger.info("InfluxStore stopped"); } private void handleExceptionInStream(Throwable t) { logger.warn(" exception: {}", t.getMessage()); stop(); } public synchronized boolean isRunning() { return this.subscription != null; } private void handleSentOk(String data) { logger.debug("Stored data"); } }