import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Map;
import org.json.JSONObject;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.filters.PmReport;
import org.oran.dmaapadapter.repository.filters.PmReportFilter;
import org.oran.dmaapadapter.tasks.JobDataConsumer;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+ assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
// Test send an exception
kafkaConsumer.start(Flux.error(new NullPointerException()));
}
@Test
- void testReceiveAndPostDataFromDmaap() throws Exception {
+ void testReceiveAndPostDataFromDmaapBuffering() throws Exception {
final String JOB_ID = "testReceiveAndPostDataFromDmaap";
// Register producer, Register types
// Return two messages from DMAAP and verify that these are sent to the owner of
// the job (consumer)
- DmaapSimulatorController.addResponse("DmaapResponse1");
- DmaapSimulatorController.addResponse("DmaapResponse2");
+ DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
- assertThat(consumer.receivedBodies.get(0)).contains("[\"DmaapResponse1");
- assertThat(consumer.receivedBodies.get(0)).contains("DmaapResponse2\"]");
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+ assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
String jobs = restClient().get(jobUrl).block();
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
+ @Test
+ void testReceiveAndPostDataFromDmaapNonBuffering() throws Exception {
+ final String JOB_ID = "testReceiveAndPostDataFromDmaapNonBuffering";
+
+ // Register producer, Register types
+ waitForRegistration();
+
+ // Create a job
+ Job.Parameters param = new Job.Parameters(null, null, null, 1);
+ ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Return two messages from DMAAP and verify that these are sent to the owner of
+ // the job (consumer)
+ DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
+ assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse2");
+ assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
+
+ // Delete the job
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+ static class PmReportArray extends ArrayList<PmReport> {
+ };
+
@Test
void testPmFiltering() throws Exception {
// Create a job
filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
filterData.getSourceNames().add("O-DU-1122");
filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
- Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null);
+ Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
+ new Job.BufferTimeout(123, 456), null);
String paramJson = gson.toJson(param);
ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson));
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+ assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
String receivedFiltered = consumer.receivedBodies.get(0);
assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1");
+
+ PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class);
+ assertThat(reportsParsed).hasSize(1);
+
}
@Test
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
String receivedFiltered = consumer.receivedBodies.get(0);
+ assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
assertThat(receivedFiltered).contains("event");
}
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- DmaapSimulatorController.addResponse("Hello");
+ DmaapSimulatorController.addResponse("[\"Hello\"]");
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1));
String received = consumer.receivedBodies.get(0);
assertThat(received).isEqualTo("Hello");
+ // This is the only time it is verified that mime type is plaintext when isJson
+ // is false and buffering is not used
+ assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
// Check that the auth token was received by the consumer
assertThat(consumer.receivedHeaders).hasSize(1);
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
String receivedFiltered = consumer.receivedBodies.get(0);
+ assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
assertThat(receivedFiltered).contains("event");
}