+ .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
+ .doOnError(t -> ric.getLock().unlockBlocking()) //
+ .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+ .onErrorResume(this::handleException);
+ }
+
+ return ric == null || type == null ? Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND))
+ : Mono.just(new ResponseEntity<>(HttpStatus.LOCKED)); // Recovering
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private <T> Mono<ResponseEntity<T>> createResponseEntity(String message, HttpStatus status) {
+ ResponseEntity<T> re = new ResponseEntity<>((T) message, status);
+ return Mono.just(re);
+ }
+
+ private <T> Mono<ResponseEntity<T>> handleException(Throwable throwable) {
+ if (throwable instanceof WebClientResponseException) {
+ WebClientResponseException e = (WebClientResponseException) throwable;
+ return createResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
+ } else if (throwable instanceof RejectionException) {
+ RejectionException e = (RejectionException) throwable;
+ return createResponseEntity(e.getMessage(), e.getStatus());
+ } else {
+ return createResponseEntity(throwable.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);