+ assertThat(job.getOwner()).isEqualTo("owner");
+ }
+
+ @Test
+ void putEiProducerWithOneType_rejecting() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
+ String url = ConsumerConsts.API_ROOT + "/eijobs/jobId";
+ String body = gson.toJson(eiJobInfo());
+ testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Job not accepted by any producers");
+
+ ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
+ // There is one retry -> 2 calls
+ await().untilAsserted(() -> assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2));
+ assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2);
+ }
+
+ @Test
+ void testPutEiJob_jsonSchemavalidationError() throws Exception {
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+
+ String url = ConsumerConsts.API_ROOT + "/eijobs/jobId";
+ // The element with name "property1" is mandatory in the schema
+ ConsumerEiJobInfo jobInfo = new ConsumerEiJobInfo("typeId", jsonObject("{ \"XXstring\" : \"value\" }"), "owner",
+ "targetUri", "jobStatusUrl");
+ String body = gson.toJson(jobInfo);
+
+ testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Json validation failure");
+ }
+
+ @Test
+ void testGetEiProducerTypes() throws Exception {
+ final String EI_TYPE_ID_2 = EI_TYPE_ID + "_2";
+ putEiProducerWithOneType("producer1", EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, "jobId");
+ putEiProducerWithOneType("producer2", EI_TYPE_ID_2);
+ putEiJob(EI_TYPE_ID_2, "jobId2");
+ String url = ProducerConsts.API_ROOT + "/eitypes";
+
+ ResponseEntity<String> resp = restClient().getForEntity(url).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(resp.getBody()).contains(EI_TYPE_ID);
+ assertThat(resp.getBody()).contains(EI_TYPE_ID_2);
+ }
+
+ @Test
+ void testReplacingEiProducerTypes() throws Exception {
+ final String REPLACED_TYPE_ID = "replaced";
+ putEiProducerWithOneType(EI_PRODUCER_ID, REPLACED_TYPE_ID);
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+
+ String url = ProducerConsts.API_ROOT + "/eitypes";
+
+ ResponseEntity<String> resp = restClient().getForEntity(url).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(resp.getBody()).contains(EI_TYPE_ID);
+ assertThat(resp.getBody()).doesNotContain(REPLACED_TYPE_ID);
+ }
+
+ @Test
+ void testChangingEiTypeGetRejected() throws Exception {
+ putEiProducerWithOneType("producer1", "typeId1");
+ putEiProducerWithOneType("producer2", "typeId2");
+ putEiJob("typeId1", "jobId");
+
+ String url = ConsumerConsts.API_ROOT + "/eijobs/jobId";
+ String body = gson.toJson(eiJobInfo("typeId2", "jobId"));
+ testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT,
+ "Not allowed to change type for existing EI job");
+ }
+
+ @Test
+ void testPutEiProducer() throws Exception {
+ String url = ProducerConsts.API_ROOT + "/eiproducers/eiProducerId";
+ String body = gson.toJson(producerEiRegistratioInfo(EI_TYPE_ID));
+
+ ResponseEntity<String> resp = restClient().putForEntity(url, body).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
+
+ assertThat(this.eiTypes.size()).isEqualTo(1);
+ EiType type = this.eiTypes.getType(EI_TYPE_ID);
+ assertThat(type.getProducerIds()).contains("eiProducerId");
+ assertThat(this.eiProducers.size()).isEqualTo(1);
+ assertThat(this.eiProducers.get("eiProducerId").getEiTypes().iterator().next().getId()).isEqualTo(EI_TYPE_ID);
+
+ resp = restClient().putForEntity(url, body).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+
+ resp = restClient().getForEntity(url).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(resp.getBody()).isEqualTo(body);
+ }
+
+ @Test
+ void testPutEiProducerExistingJob() throws Exception {
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, "jobId");
+ String url = ProducerConsts.API_ROOT + "/eiproducers/eiProducerId";
+ String body = gson.toJson(producerEiRegistratioInfo(EI_TYPE_ID));
+ restClient().putForEntity(url, body).block();
+
+ ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(simulatorResults.jobsStarted.size()).isEqualTo(2));
+ ProducerJobInfo request = simulatorResults.jobsStarted.get(0);
+ assertThat(request.id).isEqualTo("jobId");
+ }
+
+ @Test
+ void testPutProducerAndEiJob() throws Exception {
+ String url = ProducerConsts.API_ROOT + "/eiproducers/eiProducerId";
+ String body = gson.toJson(producerEiRegistratioInfo(EI_TYPE_ID));
+ restClient().putForEntity(url, body).block();
+ assertThat(this.eiTypes.size()).isEqualTo(1);
+ this.eiTypes.getType(EI_TYPE_ID);
+
+ url = ConsumerConsts.API_ROOT + "/eijobs/jobId";
+ body = gson.toJson(eiJobInfo());
+ restClient().putForEntity(url, body).block();
+
+ ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(simulatorResults.jobsStarted.size()).isEqualTo(1));
+ ProducerJobInfo request = simulatorResults.jobsStarted.get(0);
+ assertThat(request.id).isEqualTo("jobId");
+ }
+
+ @Test
+ void testGetEiJobsForProducer() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, "jobId1");
+ putEiJob(EI_TYPE_ID, "jobId2");
+
+ // PUT a consumer
+ String url = ProducerConsts.API_ROOT + "/eiproducers/eiProducerId";
+ String body = gson.toJson(producerEiRegistratioInfo(EI_TYPE_ID));
+ restClient().putForEntity(url, body).block();
+
+ url = ProducerConsts.API_ROOT + "/eiproducers/eiProducerId/eijobs";
+ ResponseEntity<String> resp = restClient().getForEntity(url).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+
+ ProducerJobInfo[] parsedResp = gson.fromJson(resp.getBody(), ProducerJobInfo[].class);
+ assertThat(parsedResp[0].typeId).isEqualTo(EI_TYPE_ID);
+ assertThat(parsedResp[1].typeId).isEqualTo(EI_TYPE_ID);
+ }
+
+ @Test
+ void testDeleteEiProducer() throws Exception {
+ putEiProducerWithOneType("eiProducerId", EI_TYPE_ID);
+ putEiProducerWithOneType("eiProducerId2", EI_TYPE_ID);
+
+ assertThat(this.eiProducers.size()).isEqualTo(2);
+ EiType type = this.eiTypes.getType(EI_TYPE_ID);
+ assertThat(type.getProducerIds()).contains("eiProducerId");
+ assertThat(type.getProducerIds()).contains("eiProducerId2");
+ putEiJob(EI_TYPE_ID, "jobId");
+ assertThat(this.eiJobs.size()).isEqualTo(1);
+
+ deleteEiProducer("eiProducerId");
+ assertThat(this.eiProducers.size()).isEqualTo(1);
+ assertThat(this.eiTypes.getType(EI_TYPE_ID).getProducerIds()).doesNotContain("eiProducerId");
+ verifyJobStatus("jobId", "ENABLED");
+
+ deleteEiProducer("eiProducerId2");
+ assertThat(this.eiProducers.size()).isZero();
+ assertThat(this.eiTypes.size()).isZero();
+ verifyJobStatus("jobId", "DISABLED");
+ }
+
+ @Test
+ void testJobStatusNotifications() throws JsonMappingException, JsonProcessingException, ServiceException {
+ ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+ ProducerSimulatorController.TestResults producerCalls = this.producerSimulator.getTestResults();
+
+ putEiProducerWithOneType("eiProducerId", EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, "jobId");
+ putEiProducerWithOneType("eiProducerId2", EI_TYPE_ID);
+ await().untilAsserted(() -> assertThat(producerCalls.jobsStarted.size()).isEqualTo(2));
+
+ deleteEiProducer("eiProducerId2");
+ assertThat(this.eiTypes.size()).isEqualTo(1); // The type remains, one producer left
+ deleteEiProducer("eiProducerId");
+ assertThat(this.eiTypes.size()).isZero(); // The type is gone
+ assertThat(this.eiJobs.size()).isEqualTo(1); // The job remains
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
+ assertThat(consumerCalls.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+
+ putEiProducerWithOneType("eiProducerId", EI_TYPE_ID);
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
+ assertThat(consumerCalls.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+ }
+
+ @Test
+ void testJobStatusNotifications2() throws JsonMappingException, JsonProcessingException, ServiceException {
+ // Test replacing a producer with new and removed types
+
+ // Create a job
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, EI_JOB_ID);
+
+ // change the type for the producer, the EI_TYPE_ID is deleted
+ putEiProducerWithOneType(EI_PRODUCER_ID, "junk");
+ verifyJobStatus(EI_JOB_ID, "DISABLED");
+ ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
+ assertThat(consumerCalls.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
+ assertThat(consumerCalls.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+ }
+
+ @Test
+ void testGetProducerEiType() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ String url = ProducerConsts.API_ROOT + "/eitypes/" + EI_TYPE_ID;
+ ResponseEntity<String> resp = restClient().getForEntity(url).block();
+ assertThat(resp.getBody()).contains(EI_PRODUCER_ID);