@RequestBody String body) {
try {
ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class);
- logger.debug("Job started callback {}", request.id);
+ logger.debug("Job started callback id: {}, body: {}", request.id, body);
this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner,
request.lastUpdated, toJobParameters(request.jobData));
return new ResponseEntity<>(HttpStatus.OK);
public String filter(String data) {
if (filter == null) {
+ logger.debug("No filter used");
return data;
}
return filter.filter(data);
package org.oran.dmaapadapter.tasks;
import java.time.Duration;
+import java.util.Collection;
+import java.util.LinkedList;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
protected final ApplicationConfig applicationConfig;
protected final InfoType type;
protected final Jobs jobs;
+ private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
.flatMap(notUsed -> Mono.empty());
}
- private Mono<String> getFromMessageRouter(String topicUrl) {
+ private Flux<String> getFromMessageRouter(String topicUrl) {
logger.trace("getFromMessageRouter {}", topicUrl);
return dmaapRestClient.get(topicUrl) //
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
+ .flatMapMany(body -> toMessages(body)) //
.doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
.onErrorResume(this::handleDmaapErrorResponse); //
}
+ private Flux<String> toMessages(String body) {
+ Collection<String> messages = gson.fromJson(body, LinkedList.class);
+ return Flux.fromIterable(messages);
+ }
+
private Mono<String> handleConsumerErrorResponse(Throwable t) {
logger.warn("error from CONSUMER {}", t.getMessage());
return Mono.empty();
}
- protected Flux<String> pushDataToConsumers(String body) {
- logger.debug("Received data {}", body);
+ protected Flux<String> pushDataToConsumers(String input) {
+ logger.debug("Received data {}", input);
final int CONCURRENCY = 50;
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
- .map(job -> Tuples.of(job, job.filter(body))) //
+ .map(job -> Tuples.of(job, job.filter(input))) //
.filter(t -> !t.getT2().isEmpty()) //
.doOnNext(touple -> logger.debug("Sending to consumer {}", touple.getT1().getCallbackUrl())) //
.flatMap(touple -> touple.getT1().getConsumerRestClient().post("", touple.getT2(),
MediaType.APPLICATION_JSON), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
}
+
}
public synchronized void start(Flux<String> input) {
stop();
this.errorStats.resetKafkaErrors();
- this.subscription = getMessagesFromKafka(input, job) //
+ this.subscription = handleMessagesFromKafka(input, job) //
.flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
return this.subscription != null;
}
- private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+ private Flux<String> handleMessagesFromKafka(Flux<String> input, Job job) {
Flux<String> result = input.map(job::filter) //
.filter(t -> !t.isEmpty()); //
// Return two messages from DMAAP and verify that these are sent to the owner of
// the job (consumer)
- DmaapSimulatorController.dmaapResponses.add("DmaapResponse1");
- DmaapSimulatorController.dmaapResponses.add("DmaapResponse2");
+ DmaapSimulatorController.addResponse("DmaapResponse1");
+ DmaapSimulatorController.addResponse("DmaapResponse2");
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
// filtered PM message
String path = "./src/test/resources/pm_report.json";
String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
- DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+ DmaapSimulatorController.addPmResponse(pmReportJson);
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
// filtered PM message
String path = "./src/test/resources/pm_report.json";
String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
- DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+ DmaapSimulatorController.addPmResponse(pmReportJson);
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
// Register producer, Register types
waitForRegistration();
- // Create a job with a JsonPath
+ // Create a job with atestJsonPathFiltering JsonPath
ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath());
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
// filtered PM message
String path = "./src/test/resources/pm_report.json";
String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
- DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+ DmaapSimulatorController.addPmResponse(pmReportJson);
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
public static final String DMAAP_TOPIC_URL = "/dmaap-topic-1";
public static final String DMAAP_TOPIC_PM_URL = "/dmaap-topic-2";
- public static List<String> dmaapResponses = Collections.synchronizedList(new LinkedList<String>());
+ private static List<String> dmaapResponses = Collections.synchronizedList(new LinkedList<String>());
- public static List<String> dmaapPmResponses = Collections.synchronizedList(new LinkedList<String>());
+ private static List<String> dmaapPmResponses = Collections.synchronizedList(new LinkedList<String>());
+
+ public static void addPmResponse(String response) {
+ response = response.replace("\"", "\\\"");
+ dmaapPmResponses.add("[\"" + response + "\"]");
+ }
+
+ public static void addResponse(String response) {
+ dmaapResponses.add("[\"" + response + "\"]");
+ }
@GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "GET from topic",
String resp = dmaapPmResponses.remove(0);
return new ResponseEntity<>(resp, HttpStatus.OK);
}
-
}
}
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID";
private static final String DMAAP_TYPE_ID = "DmaapInformationType";
+ private static final Logger logger = LoggerFactory.getLogger(Application.class);
@Autowired
private ApplicationConfig applicationConfig;
}
private void createInformationJobInIcs(String typeId, String jobId, String filter) {
- String body = gson.toJson(consumerJobInfo(typeId, filter));
- try {
- // Delete the job if it already exists
- deleteInformationJobInIcs(jobId);
- } catch (Exception e) {
- }
+ createInformationJobInIcs(jobId, consumerJobInfo(typeId, filter));
+ }
+
+ private void createInformationJobInIcs(String jobId, ConsumerJobInfo jobInfo) {
+ String body = gson.toJson(jobInfo);
restClient().putForEntity(jobUrl(jobId), body).block();
+ logger.info("Created job {}, {}", jobId, body);
}
private void deleteInformationJobInIcs(String jobId) {
- restClient().delete(jobUrl(jobId)).block();
+ try {
+ restClient().delete(jobUrl(jobId)).block();
+ } catch (Exception e) {
+ logger.warn("Couldnot delete job: {} reason: {}", jobId, e.getMessage());
+ }
}
private ConsumerJobInfo consumerJobInfo(String typeId, String filter) {
return "\"" + str + "\"";
}
+ private String reQuote(String str) {
+ return str.replaceAll("'", "\\\"");
+ }
+
private String consumerUri() {
return selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
}
String jsonStr = "{ \"filter\" :" + quote(filter) + "}";
return new ConsumerJobInfo(typeId, jsonObject(jsonStr), "owner", consumerUri(), "");
} catch (Exception e) {
+ logger.error("Error {}", e.getMessage());
return null;
}
}
ApplicationTest.testErrorCode(restClient().put(jobUrl("KAFKA_JOB_ID"), body), HttpStatus.BAD_REQUEST,
"Json validation failure");
-
}
@Test
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- DmaapSimulatorController.dmaapResponses.add("DmaapResponse1");
- DmaapSimulatorController.dmaapResponses.add("DmaapResponse2");
- DmaapSimulatorController.dmaapResponses.add("Junk");
+ DmaapSimulatorController.addResponse("DmaapResponse1");
+ DmaapSimulatorController.addResponse("DmaapResponse2");
+ DmaapSimulatorController.addResponse("Junk");
ConsumerController.TestResults results = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(2));
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
+ @Test
+ void testPmFilter() throws Exception {
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
+ final String TYPE_ID = "PmInformationType";
+
+ String jsonStr =
+ reQuote("{ 'filterType' : 'pmdata', 'filter': { 'measTypes': [ 'succImmediateAssignProcs' ] } }");
+
+ ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(jsonStr), "owner", consumerUri(), "");
+
+ createInformationJobInIcs(DMAAP_JOB_ID, jobInfo);
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ String path = "./src/test/resources/pm_report.json";
+ String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+ DmaapSimulatorController.addPmResponse(pmReportJson);
+
+ ConsumerController.TestResults results = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(1));
+
+ String filtered = results.receivedBodies.get(0);
+ assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("attTCHSeizures");
+
+ logger.info(filtered);
+
+ deleteInformationJobInIcs(DMAAP_JOB_ID);
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
}