Simulating producer errors
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / controllers / consumer / ConsumerController.java
index ff04522..b120d5f 100644 (file)
@@ -58,6 +58,7 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
 @RestController("ConsumerController")
@@ -245,45 +246,62 @@ public class ConsumerController {
                 code = 404,
                 message = "Enrichment Information type is not found",
                 response = ErrorResponse.ErrorInfo.class)})
-    public ResponseEntity<Object> putIndividualEiJob( //
+    public Mono<ResponseEntity<Object>> putIndividualEiJob( //
         @PathVariable("eiTypeId") String eiTypeId, //
         @PathVariable("eiJobId") String eiJobId, //
         @RequestBody ConsumerEiJobInfo eiJobInfo) {
+
+        final boolean isNewJob = this.eiJobs.get(eiJobId) == null;
+
+        return validatePutEiJob(eiTypeId, eiJobId, eiJobInfo) //
+            .flatMap(this::notifyProducersNewJob) //
+            .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
+            .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+            .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
+    }
+
+    private Mono<EiJob> notifyProducersNewJob(EiJob newEiJob) {
+        return this.producerCallbacks.notifyProducersJobStarted(newEiJob) //
+            .flatMap(noOfAcceptingProducers -> {
+                if (noOfAcceptingProducers.intValue() > 0) {
+                    return Mono.just(newEiJob);
+                } else {
+                    return Mono.error(new ServiceException("Job not accepted by any producers", HttpStatus.CONFLICT));
+                }
+            });
+    }
+
+    private Mono<EiJob> validatePutEiJob(String eiTypeId, String eiJobId, ConsumerEiJobInfo eiJobInfo) {
         try {
             EiType eiType = this.eiTypes.getType(eiTypeId);
-            validateJobData(eiType.getJobDataSchema(), eiJobInfo.jobData);
+            validateJsonObjectAgainstSchema(eiType.getJobDataSchema(), eiJobInfo.jobData);
             EiJob existingEiJob = this.eiJobs.get(eiJobId);
-            final boolean newJob = existingEiJob == null;
+
             if (existingEiJob != null && !existingEiJob.type().getId().equals(eiTypeId)) {
-                return ErrorResponse.create("Not allowed to change type for existing EI job", HttpStatus.CONFLICT);
+                throw new ServiceException("Not allowed to change type for existing EI job", HttpStatus.CONFLICT);
             }
-            EiJob newEiJob = toEiJob(eiJobInfo, eiJobId, eiType);
-            this.eiJobs.put(newEiJob);
-            this.producerCallbacks.notifyProducersJobCreated(newEiJob);
-            return new ResponseEntity<>(newJob ? HttpStatus.CREATED : HttpStatus.OK);
+            return Mono.just(toEiJob(eiJobInfo, eiJobId, eiType));
         } catch (Exception e) {
-            return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
+            return Mono.error(e);
         }
     }
 
-    private void validateJobData(Object schemaObj, Object object) throws ServiceException {
-        if (schemaObj == null) {
-            return; // schema is optional for now
-        }
-        try {
-            ObjectMapper mapper = new ObjectMapper();
+    private void validateJsonObjectAgainstSchema(Object schemaObj, Object object) throws ServiceException {
+        if (schemaObj != null) { // schema is optional for now
+            try {
+                ObjectMapper mapper = new ObjectMapper();
 
-            String schemaAsString = mapper.writeValueAsString(schemaObj);
-            JSONObject schemaJSON = new JSONObject(schemaAsString);
-            Schema schema = SchemaLoader.load(schemaJSON);
+                String schemaAsString = mapper.writeValueAsString(schemaObj);
+                JSONObject schemaJSON = new JSONObject(schemaAsString);
+                Schema schema = SchemaLoader.load(schemaJSON);
 
-            String objectAsString = mapper.writeValueAsString(object);
-            JSONObject json = new JSONObject(objectAsString);
-            schema.validate(json);
-        } catch (Exception e) {
-            throw new ServiceException("Json validation failure", e);
+                String objectAsString = mapper.writeValueAsString(object);
+                JSONObject json = new JSONObject(objectAsString);
+                schema.validate(json);
+            } catch (Exception e) {
+                throw new ServiceException("Json validation failure " + e.toString(), HttpStatus.CONFLICT);
+            }
         }
-
     }
 
     // Status TBD