Merge "Fix Sonar and CheckStyle warnings"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / dmaap / DmaapMessageConsumer.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import com.google.common.collect.Iterables;
24
25 import java.io.IOException;
26 import java.time.Duration;
27 import java.util.Properties;
28
29 import org.onap.dmaap.mr.client.MRBatchingPublisher;
30 import org.onap.dmaap.mr.client.MRClientFactory;
31 import org.onap.dmaap.mr.client.MRConsumer;
32 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
33 import org.oransc.policyagent.clients.AsyncRestClient;
34 import org.oransc.policyagent.configuration.ApplicationConfig;
35 import org.oransc.policyagent.exceptions.ServiceException;
36 import org.oransc.policyagent.tasks.RefreshConfigTask;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.beans.factory.annotation.Autowired;
40 import org.springframework.beans.factory.annotation.Value;
41 import org.springframework.stereotype.Component;
42
43 /**
44  * The class fetches incoming requests from DMAAP. It uses the timeout parameter that lets the MessageRouter keep the
45  * connection with the Kafka open until requests are sent in.
46  *
47  * <p>
48  * If there is no DMaaP configuration in the application configuration, then this service will regularly check the
49  * configuration and start polling DMaaP if the configuration is added. If the DMaaP configuration is removed, then the
50  * service will stop polling and resume checking for configuration.
51  *
52  * <p>
53  * Each received request is processed by {@link DmaapMessageHandler}.
54  */
55 @Component
56 public class DmaapMessageConsumer {
57
58     protected static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
59
60     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
61
62     private final ApplicationConfig applicationConfig;
63
64     @Value("${server.port}")
65     private int localServerPort;
66
67     @Autowired
68     public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
69         this.applicationConfig = applicationConfig;
70     }
71
72     /**
73      * Starts the consumer. If there is a DMaaP configuration, it will start polling for messages. Otherwise it will
74      * check regularly for the configuration.
75      *
76      * @return the running thread, for test purposes.
77      */
78     public Thread start() {
79         Thread thread = new Thread(this::checkConfigLoop);
80         thread.start();
81         return thread;
82     }
83
84     private void checkConfigLoop() {
85         while (!isStopped()) {
86             if (isDmaapConfigured()) {
87                 messageHandlingLoop();
88             } else {
89                 sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
90             }
91         }
92     }
93
94     private void messageHandlingLoop() {
95         while (!isStopped() && isDmaapConfigured()) {
96             try {
97                 Iterable<String> dmaapMsgs = fetchAllMessages();
98                 if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
99                     logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
100                     for (String msg : dmaapMsgs) {
101                         processMsg(msg);
102                     }
103                 }
104             } catch (Exception e) {
105                 logger.warn("Cannot fetch because of {}", e.getMessage());
106                 sleep(TIME_BETWEEN_DMAAP_RETRIES);
107             }
108         }
109     }
110
111     protected boolean isStopped() {
112         return false;
113     }
114
115     protected boolean isDmaapConfigured() {
116         Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
117         Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
118         return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
119     }
120
121     protected Iterable<String> fetchAllMessages() throws ServiceException, IOException {
122         Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
123         MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties);
124         MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
125         if (response == null || !"200".equals(response.getResponseCode())) {
126             String errorMessage = "DMaaP NULL response received";
127             if (response != null) {
128                 errorMessage = "Error respons " + response.getResponseCode() + " " + response.getResponseMessage()
129                     + " from DMaaP.";
130             }
131             throw new ServiceException(errorMessage);
132         } else {
133             logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
134             return response.getActualMessages();
135         }
136     }
137
138     private void processMsg(String msg) throws IOException {
139         logger.debug("Message Reveived from DMAAP : {}", msg);
140         getDmaapMessageHandler().handleDmaapMsg(msg);
141     }
142
143     private DmaapMessageHandler getDmaapMessageHandler() throws IOException {
144         String agentBaseUrl = "https://localhost:" + this.localServerPort;
145         AsyncRestClient agentClient = createRestClient(agentBaseUrl);
146         Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
147         MRBatchingPublisher producer = getMessageRouterPublisher(dmaapPublisherProperties);
148
149         return createDmaapMessageHandler(agentClient, producer);
150     }
151
152     protected void sleep(Duration duration) {
153         try {
154             Thread.sleep(duration.toMillis());
155         } catch (Exception e) {
156             logger.error("Failed to put the thread to sleep", e);
157         }
158     }
159
160     protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
161         return MRClientFactory.createConsumer(dmaapConsumerProperties);
162     }
163
164     protected DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
165         return new DmaapMessageHandler(producer, agentClient);
166     }
167
168     protected AsyncRestClient createRestClient(String agentBaseUrl) {
169         return new AsyncRestClient(agentBaseUrl);
170     }
171
172     protected MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
173         return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
174     }
175 }