Simulating producer errors 84/4784/3
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 29 Sep 2020 07:23:18 +0000 (09:23 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 29 Sep 2020 11:53:31 +0000 (13:53 +0200)
At Job create, error will be returned if no producers accepted the job.

Change-Id: I63422a4d898f21510667d7439fc3fddb6d3a5170
Issue-ID: NONRTRIC-173
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerCallbacks.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/ErrorResponse.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobInfo.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobStatus.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiTypeInfo.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/exceptions/ServiceException.java
enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java
enrichment-coordinator-service/src/test/java/org/oransc/enrichment/controller/ProducerSimulatorController.java

index 07b188d..f42c6e3 100644 (file)
@@ -34,6 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 /**
  * Callbacks to the EiProducer
  */
@@ -48,12 +51,6 @@ public class ProducerCallbacks {
     @Autowired
     ApplicationConfig applicationConfig;
 
-    public void notifyProducersJobCreated(EiJob eiJob) {
-        for (EiProducer producer : eiJob.type().getProducers()) {
-            notifyProducerJobStarted(producer, eiJob);
-        }
-    }
-
     public void notifyProducersJobDeleted(EiJob eiJob) {
         AsyncRestClient restClient = restClient(false);
         ProducerJobInfo request = new ProducerJobInfo(eiJob);
@@ -65,15 +62,24 @@ public class ProducerCallbacks {
         }
     }
 
-    public void notifyProducerJobStarted(EiProducer producer, EiJob eiJob) {
+    public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
+        return Flux.fromIterable(eiJob.type().getProducers()) //
+            .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob)) //
+            .collectList() //
+            .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
+    }
+
+    public Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob) {
         AsyncRestClient restClient = restClient(false);
         ProducerJobInfo request = new ProducerJobInfo(eiJob);
         String body = gson.toJson(request);
 
-        restClient.post(producer.jobCreationCallbackUrl(), body) //
-            .subscribe(notUsed -> logger.debug("Job subscription started OK {}", producer.id()), //
-                throwable -> logger.warn("Job subscription failed {}", producer.id(), throwable.toString()), null);
-
+        return restClient.post(producer.jobCreationCallbackUrl(), body)
+            .doOnNext(resp -> logger.debug("Job subscription started OK {}", producer.id()))
+            .onErrorResume(throwable -> {
+                logger.warn("Job subscription failed {}", producer.id(), throwable.toString());
+                return Mono.empty();
+            });
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
index 20e9f76..4e68c1a 100644 (file)
@@ -27,6 +27,7 @@ import com.google.gson.annotations.SerializedName;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 
+import org.oransc.enrichment.exceptions.ServiceException;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -82,13 +83,18 @@ public class ErrorResponse {
         this.message = message;
     }
 
-    public static Mono<ResponseEntity<Object>> createMono(Exception e, HttpStatus code) {
+    public static Mono<ResponseEntity<Object>> createMono(Throwable e, HttpStatus code) {
         return Mono.just(create(e, code));
     }
 
-    public static ResponseEntity<Object> create(Exception e, HttpStatus code) {
+    public static ResponseEntity<Object> create(Throwable e, HttpStatus code) {
         if (e instanceof RuntimeException) {
             code = HttpStatus.INTERNAL_SERVER_ERROR;
+        } else if (e instanceof ServiceException) {
+            ServiceException se = (ServiceException) e;
+            if (se.getHttpStatus() != null) {
+                code = se.getHttpStatus();
+            }
         }
         return create(e.toString(), code);
     }
index ff04522..b120d5f 100644 (file)
@@ -58,6 +58,7 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
 @RestController("ConsumerController")
@@ -245,45 +246,62 @@ public class ConsumerController {
                 code = 404,
                 message = "Enrichment Information type is not found",
                 response = ErrorResponse.ErrorInfo.class)})
-    public ResponseEntity<Object> putIndividualEiJob( //
+    public Mono<ResponseEntity<Object>> putIndividualEiJob( //
         @PathVariable("eiTypeId") String eiTypeId, //
         @PathVariable("eiJobId") String eiJobId, //
         @RequestBody ConsumerEiJobInfo eiJobInfo) {
+
+        final boolean isNewJob = this.eiJobs.get(eiJobId) == null;
+
+        return validatePutEiJob(eiTypeId, eiJobId, eiJobInfo) //
+            .flatMap(this::notifyProducersNewJob) //
+            .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
+            .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+            .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
+    }
+
+    private Mono<EiJob> notifyProducersNewJob(EiJob newEiJob) {
+        return this.producerCallbacks.notifyProducersJobStarted(newEiJob) //
+            .flatMap(noOfAcceptingProducers -> {
+                if (noOfAcceptingProducers.intValue() > 0) {
+                    return Mono.just(newEiJob);
+                } else {
+                    return Mono.error(new ServiceException("Job not accepted by any producers", HttpStatus.CONFLICT));
+                }
+            });
+    }
+
+    private Mono<EiJob> validatePutEiJob(String eiTypeId, String eiJobId, ConsumerEiJobInfo eiJobInfo) {
         try {
             EiType eiType = this.eiTypes.getType(eiTypeId);
-            validateJobData(eiType.getJobDataSchema(), eiJobInfo.jobData);
+            validateJsonObjectAgainstSchema(eiType.getJobDataSchema(), eiJobInfo.jobData);
             EiJob existingEiJob = this.eiJobs.get(eiJobId);
-            final boolean newJob = existingEiJob == null;
+
             if (existingEiJob != null && !existingEiJob.type().getId().equals(eiTypeId)) {
-                return ErrorResponse.create("Not allowed to change type for existing EI job", HttpStatus.CONFLICT);
+                throw new ServiceException("Not allowed to change type for existing EI job", HttpStatus.CONFLICT);
             }
-            EiJob newEiJob = toEiJob(eiJobInfo, eiJobId, eiType);
-            this.eiJobs.put(newEiJob);
-            this.producerCallbacks.notifyProducersJobCreated(newEiJob);
-            return new ResponseEntity<>(newJob ? HttpStatus.CREATED : HttpStatus.OK);
+            return Mono.just(toEiJob(eiJobInfo, eiJobId, eiType));
         } catch (Exception e) {
-            return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
+            return Mono.error(e);
         }
     }
 
-    private void validateJobData(Object schemaObj, Object object) throws ServiceException {
-        if (schemaObj == null) {
-            return; // schema is optional for now
-        }
-        try {
-            ObjectMapper mapper = new ObjectMapper();
+    private void validateJsonObjectAgainstSchema(Object schemaObj, Object object) throws ServiceException {
+        if (schemaObj != null) { // schema is optional for now
+            try {
+                ObjectMapper mapper = new ObjectMapper();
 
-            String schemaAsString = mapper.writeValueAsString(schemaObj);
-            JSONObject schemaJSON = new JSONObject(schemaAsString);
-            Schema schema = SchemaLoader.load(schemaJSON);
+                String schemaAsString = mapper.writeValueAsString(schemaObj);
+                JSONObject schemaJSON = new JSONObject(schemaAsString);
+                Schema schema = SchemaLoader.load(schemaJSON);
 
-            String objectAsString = mapper.writeValueAsString(object);
-            JSONObject json = new JSONObject(objectAsString);
-            schema.validate(json);
-        } catch (Exception e) {
-            throw new ServiceException("Json validation failure", e);
+                String objectAsString = mapper.writeValueAsString(object);
+                JSONObject json = new JSONObject(objectAsString);
+                schema.validate(json);
+            } catch (Exception e) {
+                throw new ServiceException("Json validation failure " + e.toString(), HttpStatus.CONFLICT);
+            }
         }
-
     }
 
     // Status TBD
index 4d11a84..3111d10 100644 (file)
@@ -29,22 +29,22 @@ import io.swagger.annotations.ApiModelProperty;
 import org.immutables.gson.Gson;
 
 @Gson.TypeAdapters
-@ApiModel(value = "ei_job_info", description = "Information for a Enrichment Information Job")
+@ApiModel(value = "EiJob", description = "Information for an Enrichment Information Job")
 public class ConsumerEiJobInfo {
 
     @ApiModelProperty(value = "Identity of the owner of the job", required = true)
-    @SerializedName("job_owner")
-    @JsonProperty(value = "job_owner", required = true)
+    @SerializedName("jobOwner")
+    @JsonProperty(value = "jobOwner", required = true)
     public String owner;
 
     @ApiModelProperty(value = "EI Type specific job data", required = true)
-    @SerializedName("job_data")
-    @JsonProperty(value = "job_data", required = true)
+    @SerializedName("jobParameters")
+    @JsonProperty(value = "jobParameters", required = true)
     public Object jobData;
 
     @ApiModelProperty(value = "The target of the EI data", required = true)
-    @SerializedName("target_uri")
-    @JsonProperty(value = "target_uri", required = true)
+    @SerializedName("targetUri")
+    @JsonProperty(value = "targetUri", required = true)
     public String targetUri;
 
     public ConsumerEiJobInfo() {
index 282f44d..43643ab 100644 (file)
@@ -29,11 +29,11 @@ import io.swagger.annotations.ApiModelProperty;
 import org.immutables.gson.Gson;
 
 @Gson.TypeAdapters
-@ApiModel(value = "ei_job_status", description = "Status for an EI Job")
+@ApiModel(value = "EiJobStatus", description = "Status for an EI Job")
 public class ConsumerEiJobStatus {
 
     @Gson.TypeAdapters
-    @ApiModel(value = "operational_state", description = "Represents the operational states for a EI Job")
+    @ApiModel(value = "OperationalState", description = "Represents the operational states for a EI Job")
     public enum OperationalState {
         ENABLED, DISABLED
     }
@@ -43,8 +43,8 @@ public class ConsumerEiJobStatus {
         + "DISABLED: TBD.";
 
     @ApiModelProperty(value = OPERATIONAL_STATE_DESCRIPTION, name = "operational_state", required = true)
-    @SerializedName("operational_state")
-    @JsonProperty(value = "operational_state", required = true)
+    @SerializedName("operationalState")
+    @JsonProperty(value = "operationalState", required = true)
     public final OperationalState state;
 
     public ConsumerEiJobStatus(OperationalState state) {
index 05d2326..0d04115 100644 (file)
@@ -29,15 +29,15 @@ import io.swagger.annotations.ApiModelProperty;
 import org.immutables.gson.Gson;
 
 @Gson.TypeAdapters
-@ApiModel(value = "ei_type_info", description = "Information for an EI type")
+@ApiModel(value = "EiType", description = "Information for an EI type")
 public class ConsumerEiTypeInfo {
 
     @ApiModelProperty(value = "Json schema for the job data")
-    @SerializedName("job_data_schema")
-    @JsonProperty("job_data_schema")
-    public final Object jobDataSchema;
+    @SerializedName("eiJobParametersSchema")
+    @JsonProperty("eiJobParametersSchema")
+    public final Object jobParametersSchema;
 
-    ConsumerEiTypeInfo(Object jobDataSchema) {
-        this.jobDataSchema = jobDataSchema;
+    ConsumerEiTypeInfo(Object jobParametersSchema) {
+        this.jobParametersSchema = jobParametersSchema;
     }
 }
index c9b6467..7743bce 100644 (file)
@@ -278,7 +278,11 @@ public class ProducerController {
 
         for (EiType type : types) {
             for (EiJob job : this.eiJobs.getJobsForType(type)) {
-                this.producerCallbacks.notifyProducerJobStarted(producer, job);
+                this.producerCallbacks.notifyProducerJobStarted(producer, job) //
+                    .subscribe(//
+                        response -> logger.debug("Producer notified OK"), //
+                        throwable -> logger.warn("Producer rejected job {}", throwable.getMessage()) //
+                    );
             }
             type.addProducer(producer);
         }
index a14e8de..ffccf80 100644 (file)
 
 package org.oransc.enrichment.exceptions;
 
+import lombok.Getter;
+
+import org.springframework.http.HttpStatus;
+
 public class ServiceException extends Exception {
 
     private static final long serialVersionUID = 1L;
+    @Getter
+    private final HttpStatus httpStatus;
 
     public ServiceException(String message) {
         super(message);
+        this.httpStatus = null;
     }
 
     public ServiceException(String message, Exception originalException) {
         super(message, originalException);
+        this.httpStatus = null;
+    }
+
+    public ServiceException(String message, HttpStatus httpStatus) {
+        super(message);
+        this.httpStatus = httpStatus;
     }
 }
index 62eff12..9a731e6 100644 (file)
@@ -45,6 +45,7 @@ import org.oransc.enrichment.configuration.WebClientConfig;
 import org.oransc.enrichment.controller.ProducerSimulatorController;
 import org.oransc.enrichment.controllers.consumer.ConsumerConsts;
 import org.oransc.enrichment.controllers.consumer.ConsumerEiJobInfo;
+import org.oransc.enrichment.controllers.consumer.ConsumerEiTypeInfo;
 import org.oransc.enrichment.controllers.producer.ProducerConsts;
 import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo;
 import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo.ProducerEiTypeRegistrationInfo;
@@ -146,7 +147,8 @@ class ApplicationTest {
         putEiProducerWithOneType(EI_PRODUCER_ID, "test");
         String url = ConsumerConsts.API_ROOT + "/eitypes/test";
         String rsp = restClient().get(url).block();
-        assertThat(rsp).contains("job_data_schema");
+        ConsumerEiTypeInfo info = gson.fromJson(rsp, ConsumerEiTypeInfo.class);
+        assertThat(info.jobParametersSchema).isNotNull();
     }
 
     @Test
@@ -176,7 +178,8 @@ class ApplicationTest {
         putEiJob(EI_TYPE_ID, "jobId");
         String url = ConsumerConsts.API_ROOT + "/eitypes/typeId/eijobs/jobId";
         String rsp = restClient().get(url).block();
-        assertThat(rsp).contains("job_data");
+        ConsumerEiJobInfo info = gson.fromJson(rsp, ConsumerEiJobInfo.class);
+        assertThat(info.owner).isEqualTo("owner");
     }
 
     @Test
@@ -220,7 +223,9 @@ class ApplicationTest {
 
     @Test
     void testPutEiJob() throws Exception {
+        // Test that one producer accepting a job is enough
         putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+        putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
 
         String url = ConsumerConsts.API_ROOT + "/eitypes/typeId/eijobs/jobId";
         String body = gson.toJson(eiJobInfo());
@@ -233,12 +238,25 @@ class ApplicationTest {
         ProducerJobInfo request = simulatorResults.jobsStarted.get(0);
         assertThat(request.id).isEqualTo("jobId");
 
+        assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(1);
+
         resp = restClient().putForEntity(url, body).block();
         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
         EiJob job = this.eiJobs.getJob("jobId");
         assertThat(job.owner()).isEqualTo("owner");
     }
 
+    @Test
+    void putEiProducerWithOneType_rejecting() throws JsonMappingException, JsonProcessingException, ServiceException {
+        putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
+        String url = ConsumerConsts.API_ROOT + "/eitypes/typeId/eijobs/jobId";
+        String body = gson.toJson(eiJobInfo());
+        testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Job not accepted by any producers");
+
+        ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
+        assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(1);
+    }
+
     @Test
     void testPutEiJob_jsonSchemavalidationError() throws Exception {
         putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
@@ -249,7 +267,7 @@ class ApplicationTest {
             new ConsumerEiJobInfo(jsonObject("{ \"XXstring\" : \"value\" }"), "owner", "targetUri");
         String body = gson.toJson(jobInfo);
 
-        testErrorCode(restClient().put(url, body), HttpStatus.NOT_FOUND, "Json validation failure");
+        testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Json validation failure");
     }
 
     @Test
@@ -413,6 +431,14 @@ class ApplicationTest {
         return new ProducerEiTypeRegistrationInfo(jsonSchemaObject(), typeId);
     }
 
+    ProducerRegistrationInfo producerEiRegistratioInfoRejecting(String typeId)
+        throws JsonMappingException, JsonProcessingException {
+        Collection<ProducerEiTypeRegistrationInfo> types = new ArrayList<>();
+        types.add(producerEiTypeRegistrationInfo(typeId));
+        return new ProducerRegistrationInfo(types, baseUrl() + ProducerSimulatorController.JOB_CREATED_ERROR_URL,
+            baseUrl() + ProducerSimulatorController.JOB_DELETED_ERROR_URL);
+    }
+
     ProducerRegistrationInfo producerEiRegistratioInfo(String typeId)
         throws JsonMappingException, JsonProcessingException {
         Collection<ProducerEiTypeRegistrationInfo> types = new ArrayList<>();
@@ -464,6 +490,15 @@ class ApplicationTest {
         return this.eiJobs.getJob(jobId);
     }
 
+    private EiType putEiProducerWithOneTypeRejecting(String producerId, String eiTypeId)
+        throws JsonMappingException, JsonProcessingException, ServiceException {
+        String url = ProducerConsts.API_ROOT + "/eiproducers/" + producerId;
+        String body = gson.toJson(producerEiRegistratioInfoRejecting(eiTypeId));
+
+        restClient().putForEntity(url, body).block();
+        return this.eiTypes.getType(eiTypeId);
+    }
+
     private EiType putEiProducerWithOneType(String producerId, String eiTypeId)
         throws JsonMappingException, JsonProcessingException, ServiceException {
         String url = ProducerConsts.API_ROOT + "/eiproducers/" + producerId;
index b6b8bc3..c44a9ee 100644 (file)
@@ -51,11 +51,15 @@ public class ProducerSimulatorController {
 
     public static final String JOB_CREATED_URL = "/producer_simulator/job_created";
     public static final String JOB_DELETED_URL = "/producer_simulator/job_deleted";
+    public static final String JOB_CREATED_ERROR_URL = "/producer_simulator/job_created_error";
+    public static final String JOB_DELETED_ERROR_URL = "/producer_simulator/job_deleted_error";
 
     public static class TestResults {
 
         public List<ProducerJobInfo> jobsStarted = Collections.synchronizedList(new ArrayList<ProducerJobInfo>());
         public List<ProducerJobInfo> jobsStopped = Collections.synchronizedList(new ArrayList<ProducerJobInfo>());
+        public int noOfRejectedCreate = 0;
+        public int noOfRejectedDelete = 0;
         public boolean errorFound = false;
 
         public TestResults() {
@@ -65,6 +69,8 @@ public class ProducerSimulatorController {
             jobsStarted.clear();
             jobsStopped.clear();
             this.errorFound = false;
+            this.noOfRejectedCreate = 0;
+            this.noOfRejectedDelete = 0;
         }
     }
 
@@ -109,4 +115,30 @@ public class ProducerSimulatorController {
         }
     }
 
+    @PostMapping(path = JOB_CREATED_ERROR_URL, produces = MediaType.APPLICATION_JSON_VALUE)
+    @ApiOperation(value = "Callback for EI job creation, returns error", notes = "")
+    @ApiResponses(
+        value = { //
+            @ApiResponse(code = 200, message = "OK", response = void.class)}//
+    )
+    public ResponseEntity<Object> jobCreatedCallbackReturnError( //
+        @RequestBody ProducerJobInfo request) {
+        logger.info("Job created (returning error) callback {}", request.id);
+        this.testResults.noOfRejectedCreate += 1;
+        return ErrorResponse.create("Producer returns error on create job", HttpStatus.NOT_FOUND);
+    }
+
+    @PostMapping(path = JOB_DELETED_ERROR_URL, produces = MediaType.APPLICATION_JSON_VALUE)
+    @ApiOperation(value = "Callback for EI job creation, returns error", notes = "")
+    @ApiResponses(
+        value = { //
+            @ApiResponse(code = 200, message = "OK", response = void.class)}//
+    )
+    public ResponseEntity<Object> jobDeletedCallbackReturnError( //
+        @RequestBody ProducerJobInfo request) {
+        logger.info("Job created (returning error) callback {}", request.id);
+        this.testResults.noOfRejectedDelete += 1;
+        return ErrorResponse.create("Producer returns error on delete job", HttpStatus.NOT_FOUND);
+    }
+
 }