The descition of zipping the ouput is taken by the producer instead of the consumer.
The consumer will get a Kafkaheader named "gzip" if the output is gzipped.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I3016313bbb8a833c14c651492da7a87c37e71e31
# If the file name is empty, no authorization token is used
auth-token-file:
pm-files-path: /tmp
+ zip-output: false
s3:
endpointOverride: http://localhost:9000
accessKeyId: minio
private String kafkaBootStrapServers;
@Getter
- @Value("${app.kafka.max-poll-records:100}")
+ @Value("${app.kafka.max-poll-records:300}")
private int kafkaMaxPollRecords;
@Getter
@Value("${app.s3.bucket:}")
private String s3Bucket;
+ @Getter
+ @Setter
+ @Value("${app.zip-output:}")
+ private boolean zipOutput;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
package org.oran.dmaapadapter.filter;
+import java.util.ArrayList;
+
+import lombok.Getter;
import lombok.ToString;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
public interface Filter {
public static class FilteredData {
public final byte[] key;
public final byte[] value;
+
+ @Getter
+ private final boolean isZipped;
+
private static final FilteredData emptyData = new FilteredData(null, null);
public boolean isEmpty() {
}
public FilteredData(byte[] key, byte[] value) {
+ this(key, value, false);
+ }
+
+ public FilteredData(byte[] key, byte[] value, boolean isZipped) {
this.key = key;
this.value = value;
+ this.isZipped = isZipped;
}
public String getValueAString() {
public static FilteredData empty() {
return emptyData;
}
+
+ public Iterable<Header> headers() {
+ ArrayList<Header> result = new ArrayList<>();
+ if (isZipped()) {
+ Header h = new RecordHeader(DataFromTopic.ZIP_PROPERTY, null);
+ result.add(h);
+ }
+ return result;
+ }
}
public FilteredData filter(DataFromTopic data);
@Getter
private BufferTimeout bufferTimeout;
- private Integer maxConcurrency;
-
@Getter
private String kafkaOutputTopic;
- @Getter
- private Boolean gzip;
-
- public int getMaxConcurrency() {
- return maxConcurrency == null || maxConcurrency == 1 ? 1 : maxConcurrency;
- }
-
- public boolean isGzip() {
- return gzip != null && gzip;
- }
-
public Filter.Type getFilterType() {
if (filter == null || filterType == null) {
return Filter.Type.NONE;
.doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
.doOnError(t -> logger.error("DmaapTopicListener error: {}", t.getMessage())) //
.doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
- .map(input -> new DataFromTopic(null, input.getBytes()))
+ .map(input -> new DataFromTopic(null, input.getBytes(), false))
.flatMap(data -> KafkaTopicListener.getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100)
.publish() //
.autoConnect();
private final DataStore dataStore;
private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+ private final ApplicationConfig applConfig;
private class ErrorStats {
private int consumerFaultCounter = 0;
}
protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
+ this.applConfig = applConfig;
this.job = job;
this.dataStore = DataStore.create(applConfig);
this.dataStore.create(DataStore.Bucket.FILES).subscribe();
PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
- this.subscription = filterAndBuffer(input, this.job) //
- .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
+ this.subscription = Flux.just(input) //
+ .flatMap(in -> filterAndBuffer(in, this.job)) //
+ .flatMap(this::sendToClient) //
.onErrorResume(this::handleError) //
.subscribe(this::handleSentOk, //
this::handleExceptionInStream, //
}
private Filter.FilteredData gzip(Filter.FilteredData data) {
- if (job.getParameters().isGzip()) {
+ if (this.applConfig.isZipOutput()) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.flush();
gzip.close();
byte[] zipped = out.toByteArray();
- return new Filter.FilteredData(data.key, zipped);
+ return new Filter.FilteredData(data.key, zipped, true);
} catch (IOException e) {
logger.error("Unexpected exception when zipping: {}", e.getMessage());
return data;
NewFileEvent ev = new NewFileEvent(fileName);
- return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes());
+ return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false);
}
private static String fileTimePartFromRopFileName(String fileName) {
private void handleExceptionInStream(Throwable t) {
logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
- stop();
}
protected abstract Mono<String> sendToClient(Filter.FilteredData output);
public synchronized void stop() {
if (this.subscription != null) {
+ logger.debug("Stopped, job: {}", job.getId());
this.subscription.dispose();
this.subscription = null;
}
private SenderRecord<byte[], byte[], Integer> senderRecord(Filter.FilteredData output, Job infoJob) {
int correlationMetadata = 2;
String topic = infoJob.getParameters().getKafkaOutputTopic();
- return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
+ var producerRecord = new ProducerRecord<>(topic, null, null, output.key, output.value, output.headers());
+ return SenderRecord.create(producerRecord, correlationMetadata);
}
}
.receiveAutoAck() //
.concatMap(consumerRecord -> consumerRecord) //
.doOnNext(input -> logger.trace("Received from kafka topic: {}", this.type.getKafkaInputTopic())) //
- .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
- .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
+ .doOnError(t -> logger.error("Received error: {}", t.getMessage())) //
+ .onErrorResume(t -> Mono.empty()) //
+ .doFinally(
+ sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) //
.filter(t -> t.value().length > 0 || t.key().length > 0) //
- .map(input -> new DataFromTopic(input.key(), input.value())) //
- .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100).publish() //
+ .map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) //
+ .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) //
+ .publish() //
.autoConnect(1);
}
return fileStore.readObject(DataStore.Bucket.FILES, ev.getFilename()) //
.map(bytes -> unzip(bytes, ev.getFilename())) //
- .map(bytes -> new DataFromTopic(data.key, bytes));
+ .map(bytes -> new DataFromTopic(data.key, bytes, false));
} catch (Exception e) {
return Mono.just(data);
}
}
- public static byte[] unzip(byte[] bytes, String fileName) {
- if (!fileName.endsWith(".gz")) {
- return bytes;
- }
-
+ public static byte[] unzip(byte[] bytes) throws IOException {
try (final GZIPInputStream gzipInput = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
-
return gzipInput.readAllBytes();
+ }
+ }
+
+ private static byte[] unzip(byte[] bytes, String fileName) {
+ try {
+ return fileName.endsWith(".gz") ? unzip(bytes) : bytes;
} catch (IOException e) {
logger.error("Error while decompression, file: {}, reason: {}", fileName, e.getMessage());
return new byte[0];
}
+
}
}
package org.oran.dmaapadapter.tasks;
-
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import org.apache.kafka.common.header.Header;
import org.oran.dmaapadapter.filter.PmReport;
import reactor.core.publisher.Flux;
public static class DataFromTopic {
public final byte[] key;
public final byte[] value;
+ public final boolean isZipped;
private static byte[] noBytes = new byte[0];
@ToString.Exclude
private PmReport cachedPmReport;
- public DataFromTopic(byte[] key, byte[] value) {
+ public DataFromTopic(byte[] key, byte[] value, boolean isZipped) {
this.key = key == null ? noBytes : key;
this.value = value == null ? noBytes : value;
+ this.isZipped = isZipped;
}
public String valueAsString() {
return new String(this.value);
}
+ public static final String ZIP_PROPERTY = "gzip";
+
+ public static boolean findZipped(Iterable<Header> headers) {
+ if (headers == null) {
+ return false;
+ }
+ for (Header h : headers) {
+ if (h.key().equals(ZIP_PROPERTY)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
public Flux<DataFromTopic> getFlux();
},
"kafkaOutputTopic": {
"type": "string"
- },
- "gzip": {
- "type": "boolean"
}
}
}
\ No newline at end of file
import com.google.gson.JsonParser;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.time.Duration;
"app.pm-files-path=./src/test/resources/", //
"app.s3.locksBucket=ropfilelocks", //
"app.pm-files-path=/tmp/dmaapadaptor", //
- "app.s3.bucket=" //
+ "app.s3.bucket=dmaaptest" //
}) //
class IntegrationWithKafka {
public final String OUTPUT_TOPIC;
private TopicListener.DataFromTopic receivedKafkaOutput;
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ApplicationConfig applicationConfig;
int count = 0;
public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext) {
+ this.applicationConfig = applicationConfig;
this.OUTPUT_TOPIC = outputTopic;
// Create a listener to the output topic. The KafkaTopicListener happens to be
.subscribe();
}
- boolean isUnzip = false;
-
private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
- if (!this.isUnzip) {
+ assertThat(this.applicationConfig.isZipOutput()).isEqualTo(receivedKafkaOutput.isZipped);
+ if (!receivedKafkaOutput.isZipped) {
return receivedKafkaOutput;
}
- byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value, "junk.gz");
- return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key);
- }
-
- public void setUnzip(boolean unzip) {
- this.isUnzip = unzip;
+ try {
+ byte[] unzipped = KafkaTopicListener.unzip(receivedKafkaOutput.value);
+ return new TopicListener.DataFromTopic(unzipped, receivedKafkaOutput.key, false);
+ } catch (IOException e) {
+ logger.error("********* ERROR ", e.getMessage());
+ return null;
+ }
}
private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
}
void reset() {
- this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null);
+ this.receivedKafkaOutput = new TopicListener.DataFromTopic(null, null, false);
this.count = 0;
}
}
@BeforeEach
void init() {
+ this.applicationConfig.setZipOutput(false);
+
if (kafkaReceiver == null) {
kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext);
kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext);
return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
}
- private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
- int maxConcurrency) {
+ private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize) {
Job.BufferTimeout buffer = maxSize > 0 ? new Job.BufferTimeout(maxSize, maxTimeMiliseconds) : null;
Job.Parameters param = Job.Parameters.builder().filter(filter).filterType(Job.Parameters.REGEXP_TYPE)
- .bufferTimeout(buffer).maxConcurrency(maxConcurrency).build();
+ .bufferTimeout(buffer).build();
String str = gson.toJson(param);
return jsonObject(str);
}
}
- ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) {
+ ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize) {
try {
String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- return new ConsumerJobInfo(KAFKA_TYPE_ID,
- jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize, maxConcurrency), "owner", targetUri,
- "");
+ return new ConsumerJobInfo(KAFKA_TYPE_ID, jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize),
+ "owner", targetUri, "");
} catch (Exception e) {
return null;
}
}
- ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData, boolean gzip) {
+ ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
try {
Job.Parameters param = Job.Parameters.builder().filter(filterData).filterType(Job.Parameters.PM_FILTER_TYPE)
- .kafkaOutputTopic(topic).gzip(gzip).build();
+ .kafkaOutputTopic(topic).build();
String str = gson.toJson(param);
Object parametersObj = jsonObject(str);
}
}
- ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
- return consumerJobInfoKafka(topic, filterData, false);
- }
-
ConsumerJobInfo consumerJobInfoKafka(String topic) {
try {
Job.Parameters param = Job.Parameters.builder().kafkaOutputTopic(topic).build();
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
waitForKafkaListener();
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs. One buffering and one with a filter
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
- restClient());
- this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10), JOB_ID1, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
waitForKafkaListener();
filterData.getMeasTypes().add("pmCounterNumber1");
filterData.getMeasObjClass().add("NRCellCU");
- final boolean USE_GZIP = true;
+ this.applicationConfig.setZipOutput(true);
final int NO_OF_JOBS = 150;
ArrayList<KafkaReceiver> receivers = new ArrayList<>();
for (int i = 0; i < NO_OF_JOBS; ++i) {
final String outputTopic = "manyJobs_" + i;
- this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData, USE_GZIP), outputTopic,
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic,
restClient());
KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext);
- receiver.setUnzip(USE_GZIP);
receivers.add(receiver);
}
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 100;
+ final int NO_OF_OBJECTS = 1000;
Instant startTime = Instant.now();
var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
sendDataToKafka(dataToSend);
- while (receivers.get(0).count != NO_OF_OBJECTS) {
+ logger.info("sleeping {}", kafkaReceiver.count);
+ while (receivers.get(0).count < NO_OF_OBJECTS) {
if (kafkaReceiver.count > 0) {
- logger.info("sleeping {}", kafkaReceiver.count);
+ logger.info("sleeping {}", receivers.get(0).count);
}
+
Thread.sleep(1000 * 1);
}
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs.
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
- restClient());
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000), JOB_ID1, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
dataToSend = Flux.just(kafkaSenderRecord("Howdy", "", KAFKA_TYPE_ID));
class JsltFilterTest {
private String filterReport(JsltFilter filter) throws Exception {
- DataFromTopic data = new DataFromTopic(null, loadReport().getBytes());
+ DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
FilteredData filtered = filter.filter(data);
return filtered.getValueAString();
}
void testJsonPath() throws Exception {
String exp = ("$.event.perf3gppFields.measDataCollection.measInfoList[0].measTypes.sMeasTypesList[0]");
JsonPathFilter filter = new JsonPathFilter(exp);
- DataFromTopic data = new DataFromTopic(null, loadReport().getBytes());
+ DataFromTopic data = new DataFromTopic(null, loadReport().getBytes(), false);
FilteredData filtered = filter.filter(data);
String res = filtered.getValueAString();
assertThat(res).isEqualTo("\"attTCHSeizures\"");
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String filterReport(PmReportFilter filter) throws Exception {
- TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes());
+ TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
FilteredData filtered = filter.filter(data);
return filtered.getValueAString();
}
}
{
- TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes());
+ TopicListener.DataFromTopic data = new TopicListener.DataFromTopic(null, loadReport().getBytes(), false);
PmReportFilter.FilterData utranCellFilter = new PmReportFilter.FilterData();
utranCellFilter.measObjClass.add("UtranCell");
Instant startTime = Instant.now();
for (int i = 0; i < TIMES; ++i) {
- KafkaTopicListener.unzip(pmReportZipped, "junk.gz");
+ KafkaTopicListener.unzip(pmReportZipped);
}
printDuration("Unzip", startTime, TIMES);
filterData.getMeasTypes().add("pmCounterNumber0");
filterData.getMeasObjClass().add("NRCellCU");
PmReportFilter filter = new PmReportFilter(filterData);
- DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes());
+ DataFromTopic topicData = new DataFromTopic(null, pmReportJson.getBytes(), false);
Instant startTime = Instant.now();
for (int i = 0; i < TIMES; ++i) {
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
PmReportFilter filter = new PmReportFilter(filterData);
- FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes()));
+ FilteredData filtered = filter.filter(new TopicListener.DataFromTopic(null, "junk".getBytes(), false));
assertThat(filtered.isEmpty()).isTrue();
- filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes()));
+ filtered = filter.filter(new TopicListener.DataFromTopic(null, reQuote("{'msg': 'test'}").getBytes(), false));
assertThat(filtered.isEmpty()).isTrue();
}