import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
import org.oran.dmaapadapter.r1.ProducerJobInfo;
-import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
}
private ConsumerJobInfo consumerJobInfo() {
- InfoType type = this.types.getAll().iterator().next();
- return consumerJobInfo(type.getId(), "EI_JOB_ID");
+ return consumerJobInfo("DmaapInformationType", "EI_JOB_ID");
}
private Object jsonObject() {
// Register producer, Register types
await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create a job
this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
String jobs = restClient().get(jobUrl).block();
- assertThat(jobs).contains("ExampleInformationType");
+ assertThat(jobs).contains(JOB_ID);
// Delete the job
this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-
}
@Test
void testReRegister() throws Exception {
// Wait foir register types and producer
await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Clear the registration, should trigger a re-register
ecsSimulatorController.testResults.reset();
await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Just clear the registerred types, should trigger a re-register
ecsSimulatorController.testResults.types.clear();
await().untilAsserted(
- () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1));
+ () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
+ }
+
+ @Test
+ void testCreateKafkaJob() {
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ final String TYPE_ID = "KafkaInformationType";
+
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+ // Create a job
+ this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Delete the job
+ this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
}
return true;
}
-
}