Added supervision of producers
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / controllers / consumer / ConsumerController.java
index a74e487..288421a 100644 (file)
@@ -30,7 +30,6 @@ import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 
-import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -40,14 +39,13 @@ import org.json.JSONObject;
 import org.oransc.enrichment.clients.ProducerCallbacks;
 import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.controllers.ErrorResponse;
+import org.oransc.enrichment.controllers.VoidResponse;
 import org.oransc.enrichment.exceptions.ServiceException;
 import org.oransc.enrichment.repository.EiJob;
 import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiType;
 import org.oransc.enrichment.repository.EiTypes;
 import org.oransc.enrichment.repository.ImmutableEiJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -58,14 +56,13 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
 @RestController("ConsumerController")
 @Api(tags = {ConsumerConsts.CONSUMER_API_NAME})
 public class ConsumerController {
 
-    private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
     @Autowired
     ApplicationConfig applicationConfig;
 
@@ -213,8 +210,8 @@ public class ConsumerController {
     @ApiOperation(value = "Individual EI Job", notes = "")
     @ApiResponses(
         value = { //
-            @ApiResponse(code = 200, message = "Not used", response = void.class),
-            @ApiResponse(code = 204, message = "Job deleted", response = void.class),
+            @ApiResponse(code = 200, message = "Not used", response = VoidResponse.class),
+            @ApiResponse(code = 204, message = "Job deleted", response = VoidResponse.class),
             @ApiResponse(
                 code = 404,
                 message = "Enrichment Information type or job is not found",
@@ -239,47 +236,68 @@ public class ConsumerController {
     @ApiOperation(value = "Individual EI Job", notes = "")
     @ApiResponses(
         value = { //
-            @ApiResponse(code = 201, message = "Job created", response = void.class), //
-            @ApiResponse(code = 200, message = "Job updated", response = void.class), // ,
+            @ApiResponse(code = 201, message = "Job created", response = VoidResponse.class), //
+            @ApiResponse(code = 200, message = "Job updated", response = VoidResponse.class), // ,
             @ApiResponse(
                 code = 404,
                 message = "Enrichment Information type is not found",
                 response = ErrorResponse.ErrorInfo.class)})
-    public ResponseEntity<Object> putIndividualEiJob( //
+    public Mono<ResponseEntity<Object>> putIndividualEiJob( //
         @PathVariable("eiTypeId") String eiTypeId, //
         @PathVariable("eiJobId") String eiJobId, //
         @RequestBody ConsumerEiJobInfo eiJobInfo) {
+
+        final boolean isNewJob = this.eiJobs.get(eiJobId) == null;
+
+        return validatePutEiJob(eiTypeId, eiJobId, eiJobInfo) //
+            .flatMap(this::notifyProducersNewJob) //
+            .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
+            .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+            .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
+    }
+
+    private Mono<EiJob> notifyProducersNewJob(EiJob newEiJob) {
+        return this.producerCallbacks.notifyProducersJobStarted(newEiJob) //
+            .flatMap(noOfAcceptingProducers -> {
+                if (noOfAcceptingProducers.intValue() > 0) {
+                    return Mono.just(newEiJob);
+                } else {
+                    return Mono.error(new ServiceException("Job not accepted by any producers", HttpStatus.CONFLICT));
+                }
+            });
+    }
+
+    private Mono<EiJob> validatePutEiJob(String eiTypeId, String eiJobId, ConsumerEiJobInfo eiJobInfo) {
         try {
             EiType eiType = this.eiTypes.getType(eiTypeId);
-            validateJobData(eiType.getJobDataSchema(), eiJobInfo.jobData);
-            final boolean newJob = this.eiJobs.get(eiJobId) == null;
-            EiJob eiJob = toEiJob(eiJobInfo, eiJobId, eiType);
-            this.eiJobs.put(eiJob);
-            this.producerCallbacks.notifyProducersJobCreated(eiJob);
-            return new ResponseEntity<>(newJob ? HttpStatus.CREATED : HttpStatus.OK);
+            validateJsonObjectAgainstSchema(eiType.getJobDataSchema(), eiJobInfo.jobData);
+            EiJob existingEiJob = this.eiJobs.get(eiJobId);
+
+            if (existingEiJob != null && !existingEiJob.type().getId().equals(eiTypeId)) {
+                throw new ServiceException("Not allowed to change type for existing EI job", HttpStatus.CONFLICT);
+            }
+            return Mono.just(toEiJob(eiJobInfo, eiJobId, eiType));
         } catch (Exception e) {
-            return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
+            return Mono.error(e);
         }
     }
 
-    private void validateJobData(Object schemaObj, Object object) throws ServiceException {
-        if (schemaObj == null) {
-            return; // schema is optional for now
-        }
-        try {
-            ObjectMapper mapper = new ObjectMapper();
+    private void validateJsonObjectAgainstSchema(Object schemaObj, Object object) throws ServiceException {
+        if (schemaObj != null) { // schema is optional for now
+            try {
+                ObjectMapper mapper = new ObjectMapper();
 
-            String schemaAsString = mapper.writeValueAsString(schemaObj);
-            JSONObject schemaJSON = new JSONObject(schemaAsString);
-            Schema schema = SchemaLoader.load(schemaJSON);
+                String schemaAsString = mapper.writeValueAsString(schemaObj);
+                JSONObject schemaJSON = new JSONObject(schemaAsString);
+                Schema schema = SchemaLoader.load(schemaJSON);
 
-            String objectAsString = mapper.writeValueAsString(object);
-            JSONObject json = new JSONObject(objectAsString);
-            schema.validate(json);
-        } catch (Exception e) {
-            throw new ServiceException("Json validation failure", e);
+                String objectAsString = mapper.writeValueAsString(object);
+                JSONObject json = new JSONObject(objectAsString);
+                schema.validate(json);
+            } catch (Exception e) {
+                throw new ServiceException("Json validation failure " + e.toString(), HttpStatus.CONFLICT);
+            }
         }
-
     }
 
     // Status TBD
@@ -290,6 +308,7 @@ public class ConsumerController {
             .type(type) //
             .owner(info.owner) //
             .jobData(info.jobData) //
+            .targetUri(info.targetUri) //
             .build();
     }
 
@@ -298,6 +317,6 @@ public class ConsumerController {
     }
 
     private ConsumerEiJobInfo toEiJobInfo(EiJob s) {
-        return new ConsumerEiJobInfo(s.jobData(), s.owner());
+        return new ConsumerEiJobInfo(s.jobData(), s.owner(), s.targetUri());
     }
 }