Added pmRopEndTime in the PM filtering.
Simplified the PM job schema.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Idd5d501e65643cf37b101f9e4e64d446b2d9940f
}
@Schema(name = "statistics_info", description = "Statistics information")
- public class Statistics {
+ public class StatisticsCollection {
@Schema(description = "Statistics per job")
public final Collection<Job.Statistics> jobStatistics;
- public Statistics(Collection<Job.Statistics> stats) {
+ public StatisticsCollection(Collection<Job.Statistics> stats) {
this.jobStatistics = stats;
}
-
}
@GetMapping(path = STATISTICS_URL, produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Returns statistics", description = "")
@ApiResponses(value = { //
@ApiResponse(responseCode = "200", description = "OK", //
- content = @Content(schema = @Schema(implementation = Statistics.class))) //
+ content = @Content(schema = @Schema(implementation = StatisticsCollection.class))) //
})
public ResponseEntity<Object> getStatistics() {
List<Job.Statistics> res = new ArrayList<>();
res.add(job.getStatistics());
}
- return new ResponseEntity<>(gson.toJson(new Statistics(res)), HttpStatus.OK);
+ return new ResponseEntity<>(gson.toJson(new StatisticsCollection(res)), HttpStatus.OK);
}
}
public class PmReport {
@Expose
- Event event = new Event();
+ public Event event = new Event();
public static class CommonEventHeader {
@Expose
@Setter
String pmRopStartTime;
+
+ @Setter
+ String pmRopEndTime;
}
private static class MeasTypesIndexed extends PmReport.MeasTypes {
reportFiltered.event.perf3gppFields = report.event.perf3gppFields.toBuilder().build();
reportFiltered.event.perf3gppFields.measDataCollection =
report.event.perf3gppFields.measDataCollection.toBuilder().build();
+
reportFiltered.event.perf3gppFields.measDataCollection.measInfoList = filteredMeasObjs;
return !filteredMeasObjs.isEmpty();
}
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Builder
+ @Getter
@Schema(name = "job_statistics", description = "Statistics information for one job")
public static class Statistics {
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
-
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
public class HttpJobDataDistributor extends JobDataDistributor {
private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
- public HttpJobDataDistributor(Job job, ApplicationConfig config, Flux<TopicListener.DataFromTopic> input) {
- super(job, config, input);
+ public HttpJobDataDistributor(Job job, ApplicationConfig config) {
+ super(job, config);
}
@Override
private final Job job;
private Disposable subscription;
private final ErrorStats errorStats = new ErrorStats();
- private final ApplicationConfig applConfig;
private final DataStore dataStore;
private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
}
}
- protected JobDataDistributor(Job job, ApplicationConfig applConfig, Flux<TopicListener.DataFromTopic> input) {
+ protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
this.job = job;
- this.applConfig = applConfig;
this.dataStore = DataStore.create(applConfig);
this.dataStore.create(DataStore.Bucket.FILES).subscribe();
this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
this.errorStats.resetIrrecoverableErrors();
- this.subscription = filterAndBuffer(input, this.job) //
- .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
- .onErrorResume(this::handleError) //
- .subscribe(this::handleSentOk, //
- this::handleExceptionInStream, //
- () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
}
static class LockedException extends ServiceException {
}
}
- public void collectHistoricalData() {
+ public void start(Flux<TopicListener.DataFromTopic> input) {
PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
+ if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
+ this.subscription = filterAndBuffer(input, this.job) //
+ .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
+ .onErrorResume(this::handleError) //
+ .subscribe(this::handleSentOk, //
+ this::handleExceptionInStream, //
+ () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
+ }
+
if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
this.dataStore.createLock(collectHistoricalDataLockName()) //
.flatMap(isLockGranted -> Boolean.TRUE.equals(isLockGranted) ? Mono.just(isLockGranted)
.doOnNext(sourceName -> logger.debug("Checking source name: {}, jobId: {}", sourceName,
this.job.getId())) //
.flatMap(sourceName -> dataStore.listObjects(DataStore.Bucket.FILES, sourceName), 1) //
- .filter(fileName -> filterStartTime(filter.getFilterData().getPmRopStartTime(), fileName)) //
+ .filter(this::isRopFile).filter(fileName -> filterStartTime(filter.getFilterData(), fileName)) //
+ .filter(fileName -> filterEndTime(filter.getFilterData(), fileName)) //
.map(this::createFakeEvent) //
.flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(),
dataStore), 100)
return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes());
}
- private boolean filterStartTime(String startTimeStr, String fileName) {
+ private static String fileTimePartFromRopFileName(String fileName) {
+ return fileName.substring(fileName.lastIndexOf("/") + 2);
+ }
+
+ private static boolean filterStartTime(PmReportFilter.FilterData filter, String fileName) {
// A20000626.2315+0200-2330+0200_HTTPS-6-73.json
try {
- if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) {
-
- String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2);
- fileTimePart = fileTimePart.substring(0, 18);
+ String fileTimePart = fileTimePartFromRopFileName(fileName);
+ fileTimePart = fileTimePart.substring(0, 18);
+ OffsetDateTime fileStartTime = parseFileDate(fileTimePart);
+ OffsetDateTime startTime = OffsetDateTime.parse(filter.getPmRopStartTime());
+ boolean isMatch = fileStartTime.isAfter(startTime);
+ logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isAfter: {}", fileName,
+ fileStartTime, startTime, isMatch);
+ return isMatch;
+ } catch (Exception e) {
+ logger.warn("Time parsing exception: {}", e.getMessage());
+ return false;
+ }
+ }
- DateTimeFormatter formatter =
- new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+ private boolean isRopFile(String fileName) {
+ return fileName.endsWith(".json") || fileName.endsWith(".json.gz");
+ }
- OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter);
- OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr);
- boolean isBefore = startTime.isBefore(fileStartTime);
- logger.debug("Checking file: {}, fileStartTime: {}, filterStartTime: {}, isBefore: {}", fileName,
- fileStartTime, startTime, isBefore);
- return isBefore;
- } else {
- return false;
- }
+ private static boolean filterEndTime(PmReportFilter.FilterData filter, String fileName) {
+ // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
+ if (filter.getPmRopEndTime() == null) {
+ return true;
+ }
+ try {
+ String fileTimePart = fileTimePartFromRopFileName(fileName);
+ fileTimePart = fileTimePart.substring(0, 9) + fileTimePart.substring(19, 28);
+ OffsetDateTime fileEndTime = parseFileDate(fileTimePart);
+ OffsetDateTime endTime = OffsetDateTime.parse(filter.getPmRopEndTime());
+ boolean isMatch = fileEndTime.isBefore(endTime);
+ logger.debug("Checking file: {}, fileEndTime: {}, endTime: {}, isBefore: {}", fileName, fileEndTime,
+ endTime, isMatch);
+ return isMatch;
} catch (Exception e) {
logger.warn("Time parsing exception: {}", e.getMessage());
}
}
+ private static OffsetDateTime parseFileDate(String timeStr) {
+ DateTimeFormatter startTimeFormatter =
+ new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+ return OffsetDateTime.parse(timeStr, startTimeFormatter);
+ }
+
private void handleExceptionInStream(Throwable t) {
logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
stop();
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class);
private KafkaSender<byte[], byte[]> sender;
- private final ApplicationConfig appConfig;
- public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig, Flux<TopicListener.DataFromTopic> input) {
- super(job, appConfig, input);
- this.appConfig = appConfig;
+ public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
+ super(job, appConfig);
+
SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
this.sender = KafkaSender.create(senderOptions);
}
}
}
- static byte[] unzip(byte[] bytes, String fileName) {
+ public static byte[] unzip(byte[] bytes, String fileName) {
if (!fileName.endsWith(".gz")) {
return bytes;
}
}
}
- private JobDataDistributor createConsumer(Job job, TopicListener topicListener) {
- return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic())
- ? new KafkaJobDataDistributor(job, appConfig, topicListener.getFlux())
- : new HttpJobDataDistributor(job, appConfig, topicListener.getFlux());
+ private JobDataDistributor createConsumer(Job job) {
+ return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
+ : new HttpJobDataDistributor(job, appConfig);
}
private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
Map<String, TopicListener> topicListeners) {
TopicListener topicListener = topicListeners.get(job.getType().getId());
- JobDataDistributor distributor = createConsumer(job, topicListener);
+ JobDataDistributor distributor = createConsumer(job);
- distributor.collectHistoricalData();
+ distributor.start(topicListener.getFlux());
distributors.put(job.getType().getId(), job.getId(), distributor);
}
"additionalProperties": false,
"properties": {
"filter": {
- "anyOf": [
- {
- "type": "string"
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "sourceNames": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measObjInstIds": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measObjClass": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
+ },
+ "measTypes": {
+ "type": "array",
+ "items": [
+ {
+ "type": "string"
+ }
+ ]
},
- {
- "type": "object",
- "additionalProperties": false,
- "properties": {
- "sourceNames": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measObjInstIds": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measObjClass": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measTypes": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "measuredEntityDns": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "pmRopStartTime": {
+ "measuredEntityDns": {
+ "type": "array",
+ "items": [
+ {
"type": "string"
}
- }
+ ]
+ },
+ "pmRopStartTime": {
+ "type": "string"
+ },
+ "pmRopEndTime": {
+ "type": "string"
}
- ]
+ }
},
"filterType": {
"type": "string",
"enum": [
- "jslt",
- "regexp",
- "pmdata",
- "json-path"
+ "pmdata"
]
},
- "maxConcurrency": {
- "type": "integer",
- "minimum": 1
- },
"kafkaOutputTopic": {
"type": "string"
},
"gzip": {
"type": "boolean"
- },
- "bufferTimeout": {
- "type": "object",
- "additionalProperties": false,
- "properties": {
- "maxSize": {
- "type": "integer",
- "minimum": 1
- },
- "maxTimeMiliseconds": {
- "type": "integer",
- "minimum": 0,
- "maximum": 160000
- }
- },
- "required": [
- "maxSize",
- "maxTimeMiliseconds"
- ]
}
}
}
\ No newline at end of file
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.gson.JsonParser;
-import com.google.protobuf.AbstractMessage.Builder;
-import com.google.protobuf.Message;
-import com.google.protobuf.MessageOrBuilder;
-import com.google.protobuf.util.JsonFormat;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.time.Instant;
-import java.util.ArrayList;
+import java.time.OffsetDateTime;
import java.util.Map;
import org.json.JSONObject;
"app.webclient.trust-store-used=true", //
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
"app.pm-files-path=/tmp/dmaapadaptor", //
- "app.s3.endpointOverride="})
+ "app.s3.endpointOverride=" //
+})
class ApplicationTest {
@Autowired
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static class ProtoJsonUtil {
-
- /**
- * Makes a Json from a given message or builder
- *
- * @param messageOrBuilder is the instance
- * @return The string representation
- * @throws IOException if any error occurs
- */
- public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException {
- return JsonFormat.printer().print(messageOrBuilder);
- }
-
- /**
- * Makes a new instance of message based on the json and the class
- *
- * @param <T> is the class type
- * @param json is the json instance
- * @param clazz is the class instance
- * @return An instance of T based on the json values
- * @throws IOException if any error occurs
- */
- @SuppressWarnings({"unchecked", "rawtypes"})
- public static <T extends Message> T fromJson(String json, Class<T> clazz) throws IOException {
- // https://stackoverflow.com/questions/27642021/calling-parsefrom-method-for-generic-protobuffer-class-in-java/33701202#33701202
- Builder builder = null;
- try {
- // Since we are dealing with a Message type, we can call newBuilder()
- builder = (Builder) clazz.getMethod("newBuilder").invoke(null);
-
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
- | NoSuchMethodException | SecurityException e) {
- return null;
- }
-
- // The instance is placed into the builder values
- JsonFormat.parser().ignoringUnknownFields().merge(json, builder);
-
- // the instance will be from the build
- return (T) builder.build();
- }
- }
-
- // @Test
- void testProtoBuf() throws Exception {
- String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json";
-
- String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
-
- PmProtoGenerated.PmRopFile proto = ProtoJsonUtil.fromJson(pmReportJson, PmProtoGenerated.PmRopFile.class);
- byte[] bytes = proto.toByteArray();
-
- int TIMES = 100000;
- {
- Instant startTime = Instant.now();
- for (int i = 0; i < TIMES; ++i) {
- PmProtoGenerated.PmRopFile.parseFrom(bytes);
- }
- long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
- logger.info("*** Duration PROTO :" + durationSeconds + ", objects/second: " + TIMES / durationSeconds
- + " time: " + (float) durationSeconds / TIMES);
- }
- {
- Instant startTime = Instant.now();
- for (int i = 0; i < TIMES; ++i) {
- PmReport reportsParsed = gson.fromJson(pmReportJson, PmReport.class);
- }
- long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
- logger.info("*** Duration GSON :" + durationSeconds + ", objects/second: " + TIMES / durationSeconds
- + " time: " + (float) durationSeconds / TIMES);
- }
-
- }
-
static class TestApplicationConfig extends ApplicationConfig {
@Override
TestApplicationConfig cfg = new TestApplicationConfig();
return cfg;
}
+
}
@BeforeEach
@AfterEach
void reset() {
+ DmaapSimulatorController.reset();
for (Job job : this.jobs.getAll()) {
this.icsSimulatorController.deleteJob(job.getId(), restClient());
}
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ DmaapSimulatorController.reset();
this.consumerController.testResults.reset();
this.icsSimulatorController.testResults.reset();
// Return two messages from DMAAP and verify that these are sent to the owner of
// the job (consumer)
- DmaapSimulatorController.addResponse("[\"DmaapResponse123\", \"DmaapResponse223\"]");
+ DmaapSimulatorController.addResponse("[\"{}\", \"{}\"]");
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
- assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse123\", \"DmaapResponse223\"]");
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[{}, {}]");
assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse11");
assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse22");
- assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
// Delete the job
this.icsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(4));
}
- static class PmReportArray extends ArrayList<PmReport> {
- };
-
@Test
void testPmFiltering() throws Exception {
// Create a job
filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
filterData.getSourceNames().add("O-DU-1122");
filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
- Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
- .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
+ Job.Parameters param =
+ Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
+
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
String receivedFiltered = consumer.receivedBodies.get(0);
assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1");
- PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class);
- assertThat(reportsParsed).hasSize(1);
+ PmReport reportsParsed = gson.fromJson(receivedFiltered, PmReport.class);
+ assertThat(reportsParsed.event).isNotNull();
}
@Test
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.getSourceNames().add("O-DU-1122");
filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
+ filterData.setPmRopEndTime(OffsetDateTime.now().toString());
- Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
- .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
+ Job.Parameters param =
+ Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
Job.Parameters param =
Job.Parameters.builder().filter(expresssion).filterType(Job.Parameters.JSLT_FILTER_TYPE).build();
String paramJson = gson.toJson(param);
- ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson));
+ ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(paramJson));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
// filtered PM message
String path = "./src/test/resources/pm_report.json";
String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
- DmaapSimulatorController.addPmResponse(pmReportJson);
+ DmaapSimulatorController.addResponse(json2dmaapResp(pmReportJson));
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
assertThat(receivedFiltered).contains("event");
}
+ private String json2dmaapResp(String json) {
+ return "[" + quote(json) + "]";
+ }
+
@Test
void testAuthToken() throws Exception {
Map<String, String> headers = consumer.receivedHeaders.get(0);
assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN);
- // This is the only time it is verified that mime type is plaintext when isJson
- // is false and buffering is not used
- assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
-
Files.delete(authFile);
this.securityContext.setAuthTokenFilePath(null);
}
waitForRegistration();
// Create a job with JsonPath Filtering
- ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, this.jsonObjectJsonPath());
+ ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, this.jsonObjectJsonPath());
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
// filtered PM message
String path = "./src/test/resources/pm_report.json";
String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
- DmaapSimulatorController.addPmResponse(pmReportJson);
+ DmaapSimulatorController.addResponse(json2dmaapResp(pmReportJson));
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
package org.oran.dmaapadapter;
+import static org.assertj.core.api.Assertions.assertThat;
+
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
dmaapResponses.add(response);
}
+ public static void reset() {
+ assertThat(dmaapPmResponses).isEmpty();
+ assertThat(dmaapResponses).isEmpty();
+ dmaapPmResponses.clear();
+ dmaapResponses.clear();
+ }
+
private static String quote(String str) {
final String q = "\"";
return q + str.replace(q, "\\\"") + q;
return nothing();
} else {
String resp = dmaapResponses.remove(0);
- logger.info("DMAAP simulator returned: {}", resp);
+ logger.trace("DMAAP simulator returned: {}", resp);
return new ResponseEntity<>(resp, HttpStatus.OK);
}
ProducerInfoTypeInfo type = testResults.types.get(job.infoTypeId);
if (type == null) {
logger.error("type not found: {} size: {}", job.infoTypeId, testResults.types.size());
+ } else {
+ assertThat(type).isNotNull();
+ validateJsonObjectAgainstSchema(job.jobDefinition, type.jobDataSchema);
+ logger.debug("ICS Simulator PUT job: {}", body);
+ restClient.post(url, body, MediaType.APPLICATION_JSON).block();
}
- assertThat(type).isNotNull();
- validateJsonObjectAgainstSchema(job.jobDefinition, type.jobDataSchema);
- logger.debug("ICS Simulator PUT job: {}", body);
- restClient.post(url, body, MediaType.APPLICATION_JSON).block();
}
private void validateJsonObjectAgainstSchema(Object object, Object schemaObj) throws ServiceException {
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
+import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.controllers.ProducerCallbacksController.StatisticsCollection;
import org.oran.dmaapadapter.datastore.DataStore;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Job.Statistics;
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.tasks.KafkaTopicListener;
import org.oran.dmaapadapter.tasks.NewFileEvent;
"app.pm-files-path=./src/test/resources/", //
"app.s3.locksBucket=ropfilelocks", //
"app.pm-files-path=/tmp/dmaapadaptor", //
- "app.s3.bucket="}) //
+ "app.s3.bucket=" //
+}) //
class IntegrationWithKafka {
final String KAFKA_TYPE_ID = "KafkaInformationType";
KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
topicListener.getFlux() //
+ .map(this::unzip) //
.doOnNext(this::set) //
.doFinally(sig -> logger.info("Finally " + sig)) //
.subscribe();
}
+ boolean isUnzip = false;
+
+ private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
+ if (!this.isUnzip) {
+ return receivedKafkaOutput;
+ }
+ byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value, "junk.gz");
+ return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key);
+ }
+
+ public void setUnzip(boolean unzip) {
+ this.isUnzip = unzip;
+ }
+
private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
this.receivedKafkaOutput = receivedKafkaOutput;
this.count++;
void reset() {
this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null);
+ this.count = 0;
}
}
Thread.sleep(4000);
}
+ private void printStatistics() {
+ String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
+ String statsResp = restClient().get(targetUri).block();
+ StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class);
+ int noOfSentBytes = 0;
+ int noOfSentObjs = 0;
+ for (Statistics s : stats.jobStatistics) {
+ noOfSentBytes += s.getNoOfSentBytes();
+ noOfSentObjs += s.getNoOfSentObjects();
+ }
+ logger.error(" Stats noOfSentBytes: {}, noOfSentObjects: {}, noOfTopics: {}", noOfSentBytes, noOfSentObjs,
+ stats.jobStatistics.size());
+ }
+
+ private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
+ final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
+ logger.error("*** {} Duration ({} ms), objects/second: {}", str, durationMs,
+ (noOfIterations * 1000) / durationMs);
+ printStatistics();
+ }
+
@Test
void simpleCase() throws Exception {
final String JOB_ID = "ID";
printStatistics();
}
- private void printStatistics() {
- String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
- String stats = restClient().get(targetUri).block();
- logger.info("Stats : {}", org.apache.commons.lang3.StringUtils.truncate(stats, 1000));
- }
-
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@Test
void kafkaCharacteristics() throws Exception {
Thread.sleep(1000 * 1);
}
- final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
- logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+ printCharacteristicsResult("kafkaCharacteristics", startTime, NO_OF_OBJECTS);
}
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("succImmediateAssignProcs");
- filterData.getMeasObjClass().add("UtranCell");
+ filterData.getMeasTypes().add("pmCounterNumber0");
+ filterData.getMeasObjClass().add("NRCellCU");
this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
restClient());
Instant startTime = Instant.now();
- final String FILE_NAME = "pm_report.json.gz";
+ final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
DataStore fileStore = dataStore();
Thread.sleep(1000 * 1);
}
- final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
- logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+ printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
-
- printStatistics();
- }
-
- @Test
- void clear() {
-
}
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@Test
void kafkaCharacteristics_manyPmJobs() throws Exception {
- // Filter PM reports and sent to two jobs over Kafka
+ // Filter PM reports and sent to many jobs over Kafka
// Register producer, Register types
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
filterData.getMeasTypes().add("pmCounterNumber1");
filterData.getMeasObjClass().add("NRCellCU");
+ final boolean USE_GZIP = true;
final int NO_OF_JOBS = 150;
ArrayList<KafkaReceiver> receivers = new ArrayList<>();
for (int i = 0; i < NO_OF_JOBS; ++i) {
final String outputTopic = "manyJobs_" + i;
- this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, false), outputTopic,
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, USE_GZIP), outputTopic,
restClient());
KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext);
+ receiver.setUnzip(USE_GZIP);
receivers.add(receiver);
}
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 1000;
+ final int NO_OF_OBJECTS = 100;
Instant startTime = Instant.now();
sendDataToKafka(dataToSend);
while (receivers.get(0).count != NO_OF_OBJECTS) {
- logger.info("sleeping {}", kafkaReceiver.count);
+ if (kafkaReceiver.count > 0) {
+ logger.info("sleeping {}", kafkaReceiver.count);
+ }
Thread.sleep(1000 * 1);
}
- final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
- logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+ printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
for (KafkaReceiver receiver : receivers) {
if (receiver.count != NO_OF_OBJECTS) {
System.out.println("** Unexpected" + receiver.OUTPUT_TOPIC + " " + receiver.count);
}
}
-
- printStatistics();
}
private String newFileEvent(String fileName) {
filterData.getSourceNames().add("O-DU-1122");
filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
+ filterData.setPmRopEndTime(OffsetDateTime.now().toString());
+
this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.protobuf.AbstractMessage.Builder;
+import com.google.protobuf.Message;
+import com.google.protobuf.MessageOrBuilder;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.PmProtoGenerated;
import org.oran.dmaapadapter.filter.Filter.FilteredData;
+import org.oran.dmaapadapter.tasks.KafkaTopicListener;
import org.oran.dmaapadapter.tasks.TopicListener;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class PmReportFilterTest {
+
+ public static class ProtoJsonUtil {
+
+ /**
+ * Makes a Json from a given message or builder
+ *
+ * @param messageOrBuilder is the instance
+ * @return The string representation
+ * @throws IOException if any error occurs
+ */
+ public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException {
+ return JsonFormat.printer().print(messageOrBuilder);
+ }
+
+ /**
+ * Makes a new instance of message based on the json and the class
+ *
+ * @param <T> is the class type
+ * @param json is the json instance
+ * @param clazz is the class instance
+ * @return An instance of T based on the json values
+ * @throws IOException if any error occurs
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static <T extends Message> T fromJson(String json, Class<T> clazz) throws IOException {
+ // https://stackoverflow.com/questions/27642021/calling-parsefrom-method-for-generic-protobuffer-class-in-java/33701202#33701202
+ Builder builder = null;
+ try {
+ // Since we are dealing with a Message type, we can call newBuilder()
+ builder = (Builder) clazz.getMethod("newBuilder").invoke(null);
+
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
+ | NoSuchMethodException | SecurityException e) {
+ return null;
+ }
+
+ // The instance is placed into the builder values
+ JsonFormat.parser().ignoringUnknownFields().merge(json, builder);
+
+ // the instance will be from the build
+ return (T) builder.build();
+ }
+ }
+
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String filterReport(PmReportFilter filter) throws Exception {
assertThat(filtered).contains("O-DU-1122");
}
- void testCharacteristics() throws Exception {
+ // @Test
+ void testSomeCharacteristics() throws Exception {
Gson gson = new GsonBuilder() //
.disableHtmlEscaping() //
.create(); //
String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
- String report = Files.readString(Path.of(path), Charset.defaultCharset());
- TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, report.getBytes());
+ String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
- Instant startTime = Instant.now();
+ PmProtoGenerated.PmRopFile proto = ProtoJsonUtil.fromJson(pmReportJson, PmProtoGenerated.PmRopFile.class);
+ byte[] bytes = proto.toByteArray();
- int CNT = 100000;
- for (int i = 0; i < CNT; ++i) {
- gson.fromJson(data.valueAsString(), PmReport.class);
+ int TIMES = 100000;
+
+ {
+ path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
+ byte[] pmReportZipped = Files.readAllBytes(Path.of(path));
+
+ Instant startTime = Instant.now();
+ for (int i = 0; i < TIMES; ++i) {
+ KafkaTopicListener.unzip(pmReportZipped, "junk.gz");
+ }
+
+ printDuration("Unzip", startTime, TIMES);
}
- printDuration("Parse", startTime, CNT);
+ {
- startTime = Instant.now();
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.getMeasTypes().add("pmCounterNumber0");
+ filterData.getMeasObjClass().add("NRCellCU");
+ PmReportFilter filter = new PmReportFilter(filterData);
+ DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes());
- PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.measTypes.add("pmCounterNumber0");
- PmReportFilter filter = new PmReportFilter(filterData);
- for (int i = 0; i < CNT; ++i) {
- FilteredData filtered = filter.filter(data);
+ Instant startTime = Instant.now();
+ for (int i = 0; i < TIMES; ++i) {
+ filter.filter(topicData);
+ }
+ printDuration("PM Filter", startTime, TIMES);
+ }
+
+ {
+ Instant startTime = Instant.now();
+ for (int i = 0; i < TIMES; ++i) {
+ PmProtoGenerated.PmRopFile.parseFrom(bytes);
+ }
+
+ printDuration("Protobuf parsing", startTime, TIMES);
+ }
+ {
+ Instant startTime = Instant.now();
+ for (int i = 0; i < TIMES; ++i) {
+ gson.fromJson(pmReportJson, PmReport.class);
+ }
+ printDuration("Json parsing", startTime, TIMES);
}
- printDuration("Filter", startTime, CNT);
}
void printDuration(String str, Instant startTime, int noOfIterations) {
- final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
- logger.info("*** Duration " + str + " :" + durationSeconds + ", objects/second: "
- + noOfIterations / durationSeconds);
+ final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
+ logger.info("*** Duration (ms) " + str + " :" + durationMs + ", objects/second: "
+ + (noOfIterations * 1000) / durationMs);
}
@Test
"id": "DmaapInformationType",
"dmaapTopicUrl": "/dmaap-topic-1",
"useHttpProxy": false,
- "isJson": false
+ "isJson": true
},
{
"id": "KafkaInformationType",