+ keepServiceAlive(service);
+ if (ric == null || type == null) {
+ return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+ }
+ Policy policy = ImmutablePolicy.builder() //
+ .id(instanceId) //
+ .json(jsonString) //
+ .type(type) //
+ .ric(ric) //
+ .ownerServiceName(service) //
+ .lastModified(getTimeStampUtc()) //
+ .build();
+
+ final boolean isCreate = this.policies.get(policy.id()) == null;
+
+ return ric.getLock().lock(LockType.SHARED) //
+ .flatMap(notUsed -> assertRicStateIdle(ric)) //
+ .flatMap(notUsed -> checkSupportedType(ric, type)) //
+ .flatMap(notUsed -> validateModifiedPolicy(policy)) //
+ .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
+ .flatMap(client -> client.putPolicy(policy)) //
+ .doOnNext(notUsed -> policies.put(policy)) //
+ .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
+ .doOnError(trowable -> ric.getLock().unlockBlocking()) //
+ .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+ .onErrorResume(this::handleException);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private <T> Mono<ResponseEntity<T>> createResponseEntity(String message, HttpStatus status) {
+ ResponseEntity<T> re = new ResponseEntity<>((T) message, status);
+ return Mono.just(re);
+ }
+
+ private <T> Mono<ResponseEntity<T>> handleException(Throwable throwable) {
+ if (throwable instanceof WebClientResponseException) {
+ WebClientResponseException e = (WebClientResponseException) throwable;
+ return createResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
+ } else if (throwable instanceof RejectionException) {
+ RejectionException e = (RejectionException) throwable;
+ return createResponseEntity(e.getMessage(), e.getStatus());
+ } else {
+ return createResponseEntity(throwable.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private Mono<Object> validateModifiedPolicy(Policy policy) {
+ // Check that ric is not updated
+ Policy current = this.policies.get(policy.id());
+ if (current != null && !current.ric().name().equals(policy.ric().name())) {
+ RejectionException e = new RejectionException("Policy cannot change RIC, policyId: " + current.id() + //
+ ", RIC name: " + current.ric().name() + //
+ ", new name: " + policy.ric().name(), HttpStatus.CONFLICT);
+ return Mono.error(e);
+ }
+ return Mono.just("OK");
+ }
+
+ private Mono<Object> checkSupportedType(Ric ric, PolicyType type) {
+ if (!ric.isSupportingType(type.name())) {
+ RejectionException e = new RejectionException(
+ "Type: " + type.name() + " not supported by RIC: " + ric.name(), HttpStatus.NOT_FOUND);
+ return Mono.error(e);
+ }
+ return Mono.just("OK");
+ }
+
+ private Mono<Object> assertRicStateIdle(Ric ric) {
+ if (ric.getState() == Ric.RicState.AVAILABLE) {
+ return Mono.just("OK");
+ } else {
+ RejectionException e = new RejectionException(
+ "Ric is not operational, RIC name: " + ric.name() + ", state: " + ric.getState(), HttpStatus.LOCKED);
+ return Mono.error(e);