import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
-import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import org.oransc.enrichment.clients.ProducerCallbacks;
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.controllers.ErrorResponse;
+import org.oransc.enrichment.controllers.VoidResponse;
import org.oransc.enrichment.exceptions.ServiceException;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiType;
import org.oransc.enrichment.repository.EiTypes;
import org.oransc.enrichment.repository.ImmutableEiJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
@SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
@RestController("ConsumerController")
@Api(tags = {ConsumerConsts.CONSUMER_API_NAME})
public class ConsumerController {
- private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
@Autowired
ApplicationConfig applicationConfig;
@ApiOperation(value = "Individual EI Job", notes = "")
@ApiResponses(
value = { //
- @ApiResponse(code = 200, message = "Not used", response = void.class),
- @ApiResponse(code = 204, message = "Job deleted", response = void.class),
+ @ApiResponse(code = 200, message = "Not used", response = VoidResponse.class),
+ @ApiResponse(code = 204, message = "Job deleted", response = VoidResponse.class),
@ApiResponse(
code = 404,
message = "Enrichment Information type or job is not found",
@ApiOperation(value = "Individual EI Job", notes = "")
@ApiResponses(
value = { //
- @ApiResponse(code = 201, message = "Job created", response = void.class), //
- @ApiResponse(code = 200, message = "Job updated", response = void.class), // ,
+ @ApiResponse(code = 201, message = "Job created", response = VoidResponse.class), //
+ @ApiResponse(code = 200, message = "Job updated", response = VoidResponse.class), // ,
@ApiResponse(
code = 404,
message = "Enrichment Information type is not found",
response = ErrorResponse.ErrorInfo.class)})
- public ResponseEntity<Object> putIndividualEiJob( //
+ public Mono<ResponseEntity<Object>> putIndividualEiJob( //
@PathVariable("eiTypeId") String eiTypeId, //
@PathVariable("eiJobId") String eiJobId, //
@RequestBody ConsumerEiJobInfo eiJobInfo) {
+
+ final boolean isNewJob = this.eiJobs.get(eiJobId) == null;
+
+ return validatePutEiJob(eiTypeId, eiJobId, eiJobInfo) //
+ .flatMap(this::notifyProducersNewJob) //
+ .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
+ .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+ .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
+ }
+
+ private Mono<EiJob> notifyProducersNewJob(EiJob newEiJob) {
+ return this.producerCallbacks.notifyProducersJobStarted(newEiJob) //
+ .flatMap(noOfAcceptingProducers -> {
+ if (noOfAcceptingProducers.intValue() > 0) {
+ return Mono.just(newEiJob);
+ } else {
+ return Mono.error(new ServiceException("Job not accepted by any producers", HttpStatus.CONFLICT));
+ }
+ });
+ }
+
+ private Mono<EiJob> validatePutEiJob(String eiTypeId, String eiJobId, ConsumerEiJobInfo eiJobInfo) {
try {
EiType eiType = this.eiTypes.getType(eiTypeId);
- validateJobData(eiType.getJobDataSchema(), eiJobInfo.jobData);
- final boolean newJob = this.eiJobs.get(eiJobId) == null;
- EiJob eiJob = toEiJob(eiJobInfo, eiJobId, eiType);
- this.eiJobs.put(eiJob);
- this.producerCallbacks.notifyProducersJobCreated(eiJob);
- return new ResponseEntity<>(newJob ? HttpStatus.CREATED : HttpStatus.OK);
+ validateJsonObjectAgainstSchema(eiType.getJobDataSchema(), eiJobInfo.jobData);
+ EiJob existingEiJob = this.eiJobs.get(eiJobId);
+
+ if (existingEiJob != null && !existingEiJob.type().getId().equals(eiTypeId)) {
+ throw new ServiceException("Not allowed to change type for existing EI job", HttpStatus.CONFLICT);
+ }
+ return Mono.just(toEiJob(eiJobInfo, eiJobId, eiType));
} catch (Exception e) {
- return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
+ return Mono.error(e);
}
}
- private void validateJobData(Object schemaObj, Object object) throws ServiceException {
- if (schemaObj == null) {
- return; // schema is optional for now
- }
- try {
- ObjectMapper mapper = new ObjectMapper();
+ private void validateJsonObjectAgainstSchema(Object schemaObj, Object object) throws ServiceException {
+ if (schemaObj != null) { // schema is optional for now
+ try {
+ ObjectMapper mapper = new ObjectMapper();
- String schemaAsString = mapper.writeValueAsString(schemaObj);
- JSONObject schemaJSON = new JSONObject(schemaAsString);
- Schema schema = SchemaLoader.load(schemaJSON);
+ String schemaAsString = mapper.writeValueAsString(schemaObj);
+ JSONObject schemaJSON = new JSONObject(schemaAsString);
+ Schema schema = SchemaLoader.load(schemaJSON);
- String objectAsString = mapper.writeValueAsString(object);
- JSONObject json = new JSONObject(objectAsString);
- schema.validate(json);
- } catch (Exception e) {
- throw new ServiceException("Json validation failure", e);
+ String objectAsString = mapper.writeValueAsString(object);
+ JSONObject json = new JSONObject(objectAsString);
+ schema.validate(json);
+ } catch (Exception e) {
+ throw new ServiceException("Json validation failure " + e.toString(), HttpStatus.CONFLICT);
+ }
}
-
}
// Status TBD
.type(type) //
.owner(info.owner) //
.jobData(info.jobData) //
+ .targetUri(info.targetUri) //
.build();
}
}
private ConsumerEiJobInfo toEiJobInfo(EiJob s) {
- return new ConsumerEiJobInfo(s.jobData(), s.owner());
+ return new ConsumerEiJobInfo(s.jobData(), s.owner(), s.targetUri());
}
}