import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
-import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
import org.springframework.beans.factory.annotation.Autowired;
})
class IntegrationWithEcs {
- private static final String EI_JOB_ID = "EI_JOB_ID";
+ private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID";
+ private static final String DMAAP_TYPE_ID = "DmaapInformationType";
@Autowired
private ApplicationConfig applicationConfig;
@AfterEach
void reset() {
this.consumerController.testResults.reset();
- this.jobs.clear();
- this.types.clear();
+ assertThat(this.jobs.size()).isZero();
}
private AsyncRestClient restClient(boolean useTrustValidation) {
}
private String jobUrl(String jobId) {
- return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId;
+ return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId + "?typeCheck=true";
}
- private void createInformationJobInEcs(String jobId) {
- String body = gson.toJson(consumerJobInfo());
+ private void createInformationJobInEcs(String typeId, String jobId, String filter) {
+ String body = gson.toJson(consumerJobInfo(typeId, filter));
try {
// Delete the job if it already exists
deleteInformationJobInEcs(jobId);
restClient().delete(jobUrl(jobId)).block();
}
- private ConsumerJobInfo consumerJobInfo() {
- InfoType type = this.types.getAll().iterator().next();
- return consumerJobInfo(type.getId(), EI_JOB_ID);
- }
-
- private Object jsonObject() {
- return jsonObject("{}");
+ private ConsumerJobInfo consumerJobInfo(String typeId, String filter) {
+ return consumerJobInfo(typeId, DMAAP_JOB_ID, filter);
}
private Object jsonObject(String json) {
}
}
- private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) {
+ private String quote(String str) {
+ return "\"" + str + "\"";
+ }
+
+ private String consumerUri() {
+ return selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ }
+
+ private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId, String filter) {
try {
- String targetUri = selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, "");
+
+ String jsonStr = "{ \"filter\" :" + quote(filter) + "}";
+ return new ConsumerJobInfo(typeId, jsonObject(jsonStr), "owner", consumerUri(), "");
} catch (Exception e) {
return null;
}
}
+ @Test
+ void testCreateKafkaJob() {
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ final String TYPE_ID = "KafkaInformationType";
+
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+
+ ConsumerJobInfo jobInfo =
+ new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
+ String body = gson.toJson(jobInfo);
+
+ restClient().putForEntity(jobUrl("KAFKA_JOB_ID"), body).block();
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ deleteInformationJobInEcs("KAFKA_JOB_ID");
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
@Test
void testWholeChain() throws Exception {
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
- createInformationJobInEcs(EI_JOB_ID);
+ createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
DmaapSimulatorController.dmaapResponses.add("DmaapResponse1");
DmaapSimulatorController.dmaapResponses.add("DmaapResponse2");
+ DmaapSimulatorController.dmaapResponses.add("Junk");
ConsumerController.TestResults results = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(results.receivedBodies.size()).isEqualTo(2));
assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
- deleteInformationJobInEcs(EI_JOB_ID);
+ deleteInformationJobInEcs(DMAAP_JOB_ID);
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());