X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageHandler.java;h=226b54e9a2093612bd38491bfe3be3b6ff97cb49;hb=6f86ab364ac739951556bf2d5bf70429b518de47;hp=3d5da6297a0e3b12f45ccb6fbfa46eb25f74e36a;hpb=ba50f8809edc7d49a74021e25b4094f4c3174b26;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java index 3d5da629..226b54e9 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -24,10 +24,8 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; -import java.io.IOException; import java.util.Optional; -import org.onap.dmaap.mr.client.MRBatchingPublisher; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation; import org.oransc.policyagent.exceptions.ServiceException; @@ -35,23 +33,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.web.reactive.function.client.WebClientException; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; /** * The class handles incoming requests from DMAAP. *
- * That means: invoke a REST call towards this services and to send back a
- * response though DMAAP
+ * That means: invoke a REST call towards this services and to send back a response though DMAAP
*/
public class DmaapMessageHandler {
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
private static Gson gson = new GsonBuilder() //
.create(); //
- private final MRBatchingPublisher dmaapClient;
+ private final AsyncRestClient dmaapClient;
private final AsyncRestClient agentClient;
- public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) {
+ public DmaapMessageHandler(AsyncRestClient dmaapClient, AsyncRestClient agentClient) {
this.agentClient = agentClient;
this.dmaapClient = dmaapClient;
}
@@ -69,41 +67,34 @@ public class DmaapMessageHandler {
try {
DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
return this.invokePolicyAgent(dmaapRequestMessage) //
- .onErrorResume(t -> handleAgentCallError(t, msg, dmaapRequestMessage)) //
+ .onErrorResume(t -> handleAgentCallError(t, dmaapRequestMessage)) //
.flatMap(
response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
} catch (Exception e) {
- logger.warn("Received unparsable message from DMAAP: {}", msg);
- return Mono.error(e); // Cannot make any response
+ String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage();
+ return Mono.error(new ServiceException(errorMsg)); // Cannot make any response
}
}
- private Mono