Bugfix historical data was broken, fixed.
Change-Id: I7ba95f962676e69ebd35b3ff467ac47cc1786b2d
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
- <version>1.3.12</version>
+ <version>1.3.13</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
- <version>2.13.73</version>
+ <version>2.17.292</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
- <version>1.11.795</version>
+ <version>1.12.321</version>
</dependency>
</dependencies>
<build>
@ToString
public static class FilteredData {
- public final String key;
- public final String value;
- private static final FilteredData emptyData = new FilteredData("", "");
+ public final byte[] key;
+ public final byte[] value;
+ private static final FilteredData emptyData = new FilteredData(null, null);
public boolean isEmpty() {
- return value.isEmpty() && key.isEmpty();
+ return (key == null || key.length == 0) && (value == null || value.length == 0);
}
- public FilteredData(String key, String value) {
+ public FilteredData(byte[] key, byte[] value) {
this.key = key;
this.value = value;
}
+ public String getValueAString() {
+ return value == null ? "" : new String(this.value);
+ }
+
public static FilteredData empty() {
return emptyData;
}
if (filteredNode == NullNode.instance) {
return FilteredData.empty();
}
- return new FilteredData(data.key, mapper.writeValueAsString(filteredNode));
+ return new FilteredData(data.key, mapper.writeValueAsBytes(filteredNode));
} catch (Exception e) {
return FilteredData.empty();
}
@Override
public FilteredData filter(DataFromTopic data) {
try {
- Object o = JsonPath.parse(data.value).read(this.expression, Object.class);
- return o == null ? FilteredData.empty() : new FilteredData(data.key, gson.toJson(o));
+ String str = new String(data.value);
+ Object o = JsonPath.parse(str).read(this.expression, Object.class);
+ String json = gson.toJson(o);
+ return o == null ? FilteredData.empty() : new FilteredData(data.key, json.getBytes());
} catch (Exception e) {
return FilteredData.empty();
}
if (!filter(report, this.filterData)) {
return FilteredData.empty();
}
- return new FilteredData(data.key, gson.toJson(report));
+ return new FilteredData(data.key, gson.toJson(report).getBytes());
} catch (Exception e) {
logger.warn("Could not parse PM data. {}, reason: {}", data, e.getMessage());
return FilteredData.empty();
private PmReport createPmReport(DataFromTopic data) {
synchronized (data) {
if (data.getCachedPmReport() == null) {
- data.setCachedPmReport(gsonParse.fromJson(data.value, PmReport.class));
+ data.setCachedPmReport(gsonParse.fromJson(data.valueAsString(), PmReport.class));
}
return data.getCachedPmReport();
}
if (regexp == null) {
return new FilteredData(data.key, data.value);
}
- Matcher matcher = regexp.matcher(data.value);
+ Matcher matcher = regexp.matcher(data.valueAsString());
boolean match = matcher.find();
if (match) {
return new FilteredData(data.key, data.value);
@Builder.Default
int noOfSentBytes = 0;
- public void received(String str) {
- noOfReceivedBytes += str.length();
+ public void received(byte[] bytes) {
+ noOfReceivedBytes += bytes.length;
noOfReceivedObjects += 1;
}
- public void filtered(String str) {
- noOfSentBytes += str.length();
+ public void filtered(byte[] bytes) {
+ noOfSentBytes += bytes.length;
noOfSentObjects += 1;
}
}
+ @Builder
public static class Parameters {
public static final String REGEXP_TYPE = "regexp";
public static final String PM_FILTER_TYPE = "pmdata";
public static final String JSON_PATH_FILTER_TYPE = "json-path";
@Setter
+ @Builder.Default
private String filterType = REGEXP_TYPE;
+
@Getter
private Object filter;
+
@Getter
private BufferTimeout bufferTimeout;
@Getter
private String kafkaOutputTopic;
- public Parameters() {}
+ @Getter
+ private Boolean gzip;
- public Parameters(Object filter, String filterType, BufferTimeout bufferTimeout, Integer maxConcurrency,
- String kafkaOutputTopic) {
- this.filter = filter;
- this.bufferTimeout = bufferTimeout;
- this.maxConcurrency = maxConcurrency;
- this.filterType = filterType;
- this.kafkaOutputTopic = kafkaOutputTopic;
+ public int getMaxConcurrency() {
+ return maxConcurrency == null || maxConcurrency == 1 ? 1 : maxConcurrency;
}
- public int getMaxConcurrency() {
- return maxConcurrency == null || maxConcurrency == 0 ? 1 : maxConcurrency;
+ public boolean isGzip() {
+ return gzip != null && gzip;
}
public Filter.Type getFilterType() {
.doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
.doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
.doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
- .map(input -> new DataFromTopic("", input))
+ .map(input -> new DataFromTopic(null, input.getBytes()))
.flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100)
.publish() //
.autoConnect();
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
public class HttpJobDataDistributor extends JobDataDistributor {
private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
- public HttpJobDataDistributor(Job job, ApplicationConfig config) {
- super(job, config);
+ public HttpJobDataDistributor(Job job, ApplicationConfig config, Flux<TopicListener.DataFromTopic> input) {
+ super(job, config, input);
}
@Override
Job job = this.getJob();
logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
- return job.getConsumerRestClient().post("", output.value, contentType);
+ return job.getConsumerRestClient().post("", output.getValueAString(), contentType);
}
}
package org.oran.dmaapadapter.tasks;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
+import java.util.zip.GZIPOutputStream;
import lombok.Getter;
}
}
- protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
+ protected JobDataDistributor(Job job, ApplicationConfig applConfig, Flux<TopicListener.DataFromTopic> input) {
this.job = job;
this.applConfig = applConfig;
this.dataStore = DataStore.create(applConfig);
this.dataStore.create(DataStore.Bucket.FILES).subscribe();
this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
- }
-
- public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
- collectHistoricalData();
this.errorStats.resetIrrecoverableErrors();
this.subscription = filterAndBuffer(input, this.job) //
}
}
- private void collectHistoricalData() {
+ public void collectHistoricalData() {
PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
.flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, this.job.getType(),
dataStore), 100)
.map(job::filter) //
+ .map(this::gzip) //
.flatMap(this::sendToClient, 1) //
.onErrorResume(this::handleCollectHistoricalDataError) //
.subscribe();
}
}
+ private Filter.FilteredData gzip(Filter.FilteredData data) {
+ if (job.getParameters().isGzip()) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzip = new GZIPOutputStream(out)) {
+ gzip.write(data.value);
+ return new Filter.FilteredData(data.key, out.toByteArray());
+ } catch (IOException e) {
+ logger.error("Unexpected exception when zipping: {}", e.getMessage());
+ return data;
+ }
+ } else {
+ return data;
+ }
+ }
+
private Mono<String> handleCollectHistoricalDataError(Throwable t) {
if (t instanceof LockedException) {
logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
return Mono.empty(); // Ignore
} else {
+ logger.error("Exception: {} job: {}", t.getMessage(), job.getId());
return tryDeleteLockFile() //
.map(bool -> "OK");
}
NewFileEvent ev = new NewFileEvent(fileName);
- return new TopicListener.DataFromTopic("", gson.toJson(ev));
+ return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes());
}
private boolean filterStartTime(String startTimeStr, String fileName) {
- // A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json
+ // A20000626.2315+0200-2330+0200_HTTPS-6-73.json
try {
if (fileName.endsWith(".json") || fileName.endsWith(".json.gz")) {
Flux<Filter.FilteredData> filtered = //
inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
.map(job::filter) //
+ .map(this::gzip) //
.filter(f -> !f.isEmpty()) //
.doOnNext(f -> job.getStatistics().filtered(f.value)); //
if (job.isBuffered()) {
- filtered = filtered.map(input -> quoteNonJson(input.value, job)) //
+ filtered = filtered.map(input -> quoteNonJson(input.getValueAString(), job)) //
.bufferTimeout( //
job.getParameters().getBufferTimeout().getMaxSize(), //
job.getParameters().getBufferTimeout().getMaxTime()) //
- .map(buffered -> new Filter.FilteredData("", buffered.toString()));
+ .map(buffered -> new Filter.FilteredData(null, buffered.toString().getBytes()));
}
return filtered;
}
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
public class KafkaJobDataDistributor extends JobDataDistributor {
private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class);
- private KafkaSender<String, String> sender;
+ private KafkaSender<byte[], byte[]> sender;
private final ApplicationConfig appConfig;
- public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
- super(job, appConfig);
+ public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig, Flux<TopicListener.DataFromTopic> input) {
+ super(job, appConfig, input);
this.appConfig = appConfig;
+ SenderOptions<byte[], byte[]> senderOptions = senderOptions(appConfig);
+ this.sender = KafkaSender.create(senderOptions);
}
@Override
protected Mono<String> sendToClient(Filter.FilteredData data) {
Job job = this.getJob();
- SenderRecord<String, String, Integer> senderRecord = senderRecord(data, job);
+ SenderRecord<byte[], byte[], Integer> senderRecord = senderRecord(data, job);
- logger.trace("Sending data '{}' to Kafka topic: {}", data, job.getParameters().getKafkaOutputTopic());
+ logger.trace("Sending data '{}' to Kafka topic: {}", StringUtils.truncate(data.getValueAString(), 10),
+ job.getParameters().getKafkaOutputTopic());
return this.sender.send(Mono.just(senderRecord)) //
.doOnNext(n -> logger.debug("Sent data to Kafka topic: {}", job.getParameters().getKafkaOutputTopic())) //
t -> logger.warn("Failed to send to Kafka, job: {}, reason: {}", job.getId(), t.getMessage())) //
.onErrorResume(t -> Mono.empty()) //
.collectList() //
- .map(x -> data.value);
- }
+ .map(x -> "ok");
- @Override
- public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
- super.start(input);
- SenderOptions<String, String> senderOptions = senderOptions(appConfig);
- this.sender = KafkaSender.create(senderOptions);
}
@Override
}
}
- private static SenderOptions<String, String> senderOptions(ApplicationConfig config) {
+ private static SenderOptions<byte[], byte[]> senderOptions(ApplicationConfig config) {
String bootstrapServers = config.getKafkaBootStrapServers();
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.ACKS_CONFIG, "all");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return SenderOptions.create(props);
}
- private SenderRecord<String, String, Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
+ private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
int correlationMetadata = 2;
String topic = infoJob.getParameters().getKafkaOutputTopic();
return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
import java.util.zip.GZIPInputStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.datastore.DataStore;
import org.oran.dmaapadapter.repository.InfoType;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
-
/**
* The class streams incoming requests from a Kafka topic and sends them further
* to a multi cast sink, which several other streams can connect to.
return KafkaReceiver.create(kafkaInputProperties(clientId)) //
.receiveAutoAck() //
.concatMap(consumerRecord -> consumerRecord) //
- .doOnNext(input -> logger.trace("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
- input.value())) //
+ .doOnNext(input -> logger.trace("Received from kafka topic: {}", this.type.getKafkaInputTopic())) //
.doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
.doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
- .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
+ .filter(t -> t.value().length > 0 || t.key().length > 0) //
.map(input -> new DataFromTopic(input.key(), input.value())) //
- .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) //
- .publish() //
+ .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100).publish() //
.autoConnect(1);
}
- private ReceiverOptions<String, String> kafkaInputProperties(String clientId) {
+ private ReceiverOptions<byte[], byte[]> kafkaInputProperties(String clientId) {
Map<String, Object> consumerProps = new HashMap<>();
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.applicationConfig.getKafkaMaxPollRecords());
- return ReceiverOptions.<String, String>create(consumerProps)
+ return ReceiverOptions.<byte[], byte[]>create(consumerProps)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
}
public static Mono<DataFromTopic> getDataFromFileIfNewPmFileEvent(DataFromTopic data, InfoType type,
DataStore fileStore) {
- if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) {
+ if (type.getDataType() != InfoType.DataType.PM_DATA || data.value.length > 1000) {
return Mono.just(data);
}
try {
- NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
+ NewFileEvent ev = gson.fromJson(data.valueAsString(), NewFileEvent.class);
if (ev.getFilename() == null) {
logger.warn("Ignoring received message: {}", data);
package org.oran.dmaapadapter.tasks;
-import java.nio.charset.StandardCharsets;
import lombok.Getter;
import lombok.Setter;
@ToString
public static class DataFromTopic {
- public final String key;
- public final String value;
+ public final byte[] key;
+ public final byte[] value;
+
+ private static byte[] noBytes = new byte[0];
@Getter
@Setter
@ToString.Exclude
private PmReport cachedPmReport;
- public DataFromTopic(String key, String value) {
- this.key = key;
- this.value = value;
+ public DataFromTopic(byte[] key, byte[] value) {
+ this.key = key == null ? noBytes : key;
+ this.value = value == null ? noBytes : value;
}
- public DataFromTopic(String key, byte[] value) {
- this.key = key;
- this.value = new String(value, StandardCharsets.UTF_8);
+ public String valueAsString() {
+ return new String(this.value);
}
}
}
}
- private JobDataDistributor createConsumer(Job job) {
- return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
- : new HttpJobDataDistributor(job, appConfig);
+ private JobDataDistributor createConsumer(Job job, TopicListener topicListener) {
+ return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic())
+ ? new KafkaJobDataDistributor(job, appConfig, topicListener.getFlux())
+ : new HttpJobDataDistributor(job, appConfig, topicListener.getFlux());
}
private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
Map<String, TopicListener> topicListeners) {
TopicListener topicListener = topicListeners.get(job.getType().getId());
- JobDataDistributor distributor = createConsumer(job);
- distributor.start(topicListener.getFlux());
+ JobDataDistributor distributor = createConsumer(job, topicListener);
+
+ distributor.collectHistoricalData();
+
distributors.put(job.getType().getId(), job.getId(), distributor);
}
"kafkaOutputTopic": {
"type": "string"
},
+ "gzip": {
+ "type": "boolean"
+ },
"bufferTimeout": {
"type": "object",
"additionalProperties": false,
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.JobDataDistributor;
import org.oran.dmaapadapter.tasks.NewFileEvent;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
-import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.context.TestPropertySource;
import org.springframework.web.reactive.function.client.WebClientResponseException;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
"Could not find type");
}
- @Test
- void testReceiveAndPostDataFromKafka() throws Exception {
- final String JOB_ID = "ID";
- final String TYPE_ID = "PmDataOverKafka";
- waitForRegistration();
-
- // Create a job
- Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null);
- String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- ConsumerJobInfo kafkaJobInfo = new ConsumerJobInfo(TYPE_ID, toJson(gson.toJson(param)), "owner", targetUri, "");
-
- this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
- JobDataDistributor kafkaConsumer = this.topicListeners.getDataDistributors().get(TYPE_ID, JOB_ID);
-
- // Handle received data from Kafka, check that it has been posted to the
- // consumer
- kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data")));
-
- ConsumerController.TestResults consumer = this.consumerController.testResults;
- await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
- assertThat(consumer.receivedBodies.get(0)).isEqualTo("[data]");
- assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
-
- // This only works in debugger. Removed for now.
- assertThat(this.icsSimulatorController.testResults.createdJob).isNotNull();
- assertThat(this.icsSimulatorController.testResults.createdJob.infoTypeId)
- .isEqualTo("xml-file-data-to-filestore");
-
- }
-
@Test
void testReceiveAndPostDataFromDmaapBuffering() throws Exception {
final String JOB_ID = "testReceiveAndPostDataFromDmaap";
waitForRegistration();
// Create a job
- Job.Parameters param = new Job.Parameters(null, null, new Job.BufferTimeout(123, 456), 1, null);
+ Job.Parameters param = Job.Parameters.builder().bufferTimeout(new Job.BufferTimeout(123, 456)).build();
+
ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
waitForRegistration();
// Create a job
- Job.Parameters param = new Job.Parameters(null, null, null, 1, null);
+ Job.Parameters param = Job.Parameters.builder().build();
ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
filterData.getSourceNames().add("O-DU-1122");
filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
- Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
- new Job.BufferTimeout(123, 456), null, null);
+ Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
+ .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
DataStore fileStore = DataStore.create(this.applicationConfig);
fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
- "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+ "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block();
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.getSourceNames().add("O-DU-1122");
filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
- Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
- new Job.BufferTimeout(123, 456), null, null);
+ Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
+ .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
+
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
// Create a job with a PM filter
String expresssion = "if(.event.commonEventHeader.sourceName == \"O-DU-1122\")" //
+ ".";
- Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null);
+ Job.Parameters param =
+ Job.Parameters.builder().filter(expresssion).filterType(Job.Parameters.JSLT_FILTER_TYPE).build();
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson));
// Create a job with a PM filter
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
filterData.getMeasTypes().add("succImmediateAssignProcs");
- Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null);
+ Job.Parameters param =
+ Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE).build();
+
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverKafka", "EI_PM_JOB_ID", toJson(paramJson));
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
- Job.Parameters param =
- new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE, new Job.BufferTimeout(123, 456), 1, null);
+ Job.Parameters param = Job.Parameters.builder().filter("filter").filterType(Job.Parameters.REGEXP_TYPE)
+ .bufferTimeout(new Job.BufferTimeout(123, 456)).build();
ConsumerJobInfo jobInfo =
new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
- Job.Parameters param = new Job.Parameters("filter", Job.Parameters.REGEXP_TYPE,
- new Job.BufferTimeout(123, 170 * 1000), 1, null);
+ Job.Parameters param = Job.Parameters.builder().filter("filter").filterType(Job.Parameters.REGEXP_TYPE)
+ .bufferTimeout(new Job.BufferTimeout(123, 170 * 1000)).build();
ConsumerJobInfo jobInfo =
new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
}
synchronized String lastKey() {
- return this.receivedKafkaOutput.key;
+ return new String(this.receivedKafkaOutput.key);
}
synchronized String lastValue() {
- return this.receivedKafkaOutput.value;
+ return new String(this.receivedKafkaOutput.value);
}
void reset() {
- count = 0;
- this.receivedKafkaOutput = new TopicListener.DataFromTopic("", "");
+ this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null);
}
}
private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
int maxConcurrency) {
Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null;
- Job.Parameters param = new Job.Parameters(filter, Job.Parameters.REGEXP_TYPE, buffer, maxConcurrency, null);
+ Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE)
+ .bufferTimeout(buffer).maxConcurrency(maxConcurrency).build();
+
String str = gson.toJson(param);
return jsonObject(str);
}
}
}
- ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+ ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData, boolean gzip) {
try {
- Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic);
+ Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
+ .kafkaOutputTopic(topic).gzip(gzip).build();
String str = gson.toJson(param);
Object parametersObj = jsonObject(str);
}
}
+ ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+ return consumerJobInfoKafka(topic, filterData, false);
+ }
+
ConsumerJobInfo consumerJobInfoKafka(String topic) {
try {
- Job.Parameters param = new Job.Parameters(null, null, null, 1, topic);
+ Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build();
String str = gson.toJson(param);
Object parametersObj = jsonObject(str);
}
}
- private SenderOptions<String, String> kafkaSenderOptions() {
+ private SenderOptions<byte[], byte[]> kafkaSenderOptions() {
String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
props.put(ProducerConfig.ACKS_CONFIG, "all");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return SenderOptions.create(props);
}
- private SenderRecord<String, String, Integer> kafkaSenderRecord(String data, String key, String typeId) {
+ private SenderRecord<byte[], byte[], Integer> kafkaSenderRecord(String data, String key, String typeId) {
final InfoType infoType = this.types.get(typeId);
int correlationMetadata = 2;
- return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
+ return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key.getBytes(), data.getBytes()),
+ correlationMetadata);
}
- private void sendDataToKafka(Flux<SenderRecord<String, String, Integer>> dataToSend) {
- final KafkaSender<String, String> sender = KafkaSender.create(kafkaSenderOptions());
+ private void sendDataToKafka(Flux<SenderRecord<byte[], byte[], Integer>> dataToSend) {
+ final KafkaSender<byte[], byte[]> sender = KafkaSender.create(kafkaSenderOptions());
sender.send(dataToSend) //
.doOnError(e -> logger.error("Send failed", e)) //
private void printStatistics() {
String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
String stats = restClient().get(targetUri).block();
- logger.info("Stats : {}", stats);
+ logger.info("Stats : {}", org.apache.commons.lang3.StringUtils.truncate(stats, 1000));
}
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("succImmediateAssignProcs");
- filterData.getMeasObjClass().add("UtranCell");
+ filterData.getMeasTypes().add("pmAnrNcgiMeasFailUeCap");
+ filterData.getMeasTypes().add("pmAnrNcgiMeasRcvDrx");
+ filterData.getMeasObjInstIds().add("ManagedElement=seliitdus00487,GNBCUCPFunction=1,NRCellCU=32");
final int NO_OF_JOBS = 150;
ArrayList<KafkaReceiver> receivers = new ArrayList<>();
for (int i = 0; i < NO_OF_JOBS; ++i) {
final String outputTopic = "manyJobs_" + i;
- this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic,
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, false), outputTopic,
restClient());
KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext);
receivers.add(receiver);
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 500;
+ final int NO_OF_OBJECTS = 1000;
Instant startTime = Instant.now();
- final String FILE_NAME = "pm_report.json.gz";
+ final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
DataStore fileStore = dataStore();
+ fileStore.deleteBucket(DataStore.Bucket.FILES).block();
fileStore.create(DataStore.Bucket.FILES).block();
fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
sendDataToKafka(dataToSend);
while (receivers.get(0).count != NO_OF_OBJECTS) {
- logger.info("sleeping {}", kafkaReceiver.count);
- Thread.sleep(1000 * 1);
+ // logger.info("sleeping {}", kafkaReceiver.count);
+ Thread.sleep(100 * 1);
}
final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
}
}
- // printStatistics();
+ printStatistics();
}
private String newFileEvent(String fileName) {
@Test
void testHistoricalData() throws Exception {
// test
- waitForKafkaListener();
final String JOB_ID = "testHistoricalData";
DataStore fileStore = dataStore();
+ fileStore.deleteBucket(DataStore.Bucket.FILES).block();
fileStore.create(DataStore.Bucket.FILES).block();
fileStore.create(DataStore.Bucket.LOCKS).block();
fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
- "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+ "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block();
fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block();
restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
+ await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive());
}
@Test
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.filter.Filter.FilteredData;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
class JsltFilterTest {
private String filterReport(JsltFilter filter) throws Exception {
- return filter.filter(new DataFromTopic("", loadReport())).value;
+ DataFromTopic data = new DataFromTopic(null, loadReport().getBytes());
+ FilteredData filtered = filter.filter(data);
+ return filtered.getValueAString();
}
@Test
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.filter.Filter.FilteredData;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
class JsonPathFilterTest {
void testJsonPath() throws Exception {
String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
JsonPathFilter filter = new JsonPathFilter(exp);
- String res = filter.filter(new DataFromTopic("", loadReport())).value;
+ DataFromTopic data = new DataFromTopic(null, loadReport().getBytes());
+ FilteredData filtered = filter.filter(data);
+ String res = filtered.getValueAString();
assertThat(res).isEqualTo("\"attTCHSeizures\"");
}
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
+import org.oran.dmaapadapter.filter.Filter.FilteredData;
import org.oran.dmaapadapter.tasks.TopicListener;
class PmReportFilterTest {
private String filterReport(PmReportFilter filter) throws Exception {
- return filter.filter(new TopicListener.DataFromTopic("", loadReport())).value;
+ TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes());
+ FilteredData filtered = filter.filter(data);
+ return filtered.getValueAString();
}
@Test
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
PmReportFilter filter = new PmReportFilter(filterData);
- String filtered = filter.filter(new TopicListener.DataFromTopic("", "junk")).value;
- assertThat(filtered).isEmpty();
+ FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes()));
+ assertThat(filtered.isEmpty()).isTrue();
- filtered = filter.filter(new TopicListener.DataFromTopic("", reQuote("{'msg': 'test'}"))).value;
- assertThat(filtered).isEmpty();
+ filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes()));
+ assertThat(filtered.isEmpty()).isTrue();
}