Merge "Exception test Fix"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerImpl.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import com.google.common.collect.Iterables;
24 import java.io.IOException;
25 import java.util.Properties;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import org.onap.dmaap.mr.client.MRClientFactory;
29 import org.onap.dmaap.mr.client.MRConsumer;
30 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
31 import org.oransc.policyagent.configuration.ApplicationConfig;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.scheduling.annotation.EnableScheduling;
36 import org.springframework.scheduling.annotation.Scheduled;
37 import org.springframework.stereotype.Component;
38
39 @Component
40 @EnableScheduling
41 public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
42
43     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
44
45     private boolean alive = false;
46     private final ApplicationConfig applicationConfig;
47     protected MRConsumer consumer;
48     @Autowired
49     private DmaapMessageHandler dmaapMessageHandler;
50     private final long FETCHTIMEOUT = 30000;
51
52     private CountDownLatch sleep = new CountDownLatch(1);
53
54     @Autowired
55     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
56         this.applicationConfig = applicationConfig;
57     }
58
59     @Scheduled(fixedRate = 1000 * 10)
60     @Override
61     public void run() {
62         if (!alive) {
63             init();
64         }
65         if (this.alive) {
66             try {
67                 Iterable<String> dmaapMsgs = fetchAllMessages();
68                 if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
69                     logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
70                     for (String msg : dmaapMsgs) {
71                         processMsg(msg);
72                     }
73                 }
74             } catch (Exception e) {
75                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
76             }
77         }
78     }
79
80     private Iterable<String> fetchAllMessages() {
81         MRConsumerResponse response = null;
82         try {
83             response = consumer.fetchWithReturnConsumerResponse();
84         } catch (Exception e) {
85             logger.error("Failed to get message from DMAAP", e);
86         }
87         if (response == null || !"200".equals(response.getResponseCode())) {
88             logger.warn("{}: DMaaP NULL response received", this);
89             sleepAfterFailure();
90         } else {
91             logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
92         }
93         return response.getActualMessages();
94     }
95
96     @Override
97     public void init() {
98         Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
99         Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
100         // No need to start if there is no configuration.
101         if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
102                 || dmaapPublisherProperties.size() == 0) {
103             logger.error("DMaaP properties Failed to Load");
104             return;
105         }
106         try {
107             logger.debug("Creating DMAAP Client");
108             logger.debug("dmaapConsumerProperties---> {}", dmaapConsumerProperties.getProperty("topic"));
109             logger.debug("dmaapPublisherProperties---> {}", dmaapPublisherProperties.getProperty("topic"));
110             consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
111             this.alive = true;
112         } catch (IOException e) {
113             logger.error("Exception occurred while creating Dmaap Consumer", e);
114         }
115     }
116
117     @Override
118     public void processMsg(String msg) throws Exception {
119         logger.debug("Message Reveived from DMAAP : {}", msg);
120         // Call the concurrent Task executor to handle the incoming request
121         dmaapMessageHandler.handleDmaapMsg(msg);
122     }
123
124     @Override
125     public boolean isAlive() {
126         return alive;
127     }
128
129     private void sleepAfterFailure() {
130         logger.warn("DMAAP message Consumer is put to Sleep for {} milliseconds", FETCHTIMEOUT);
131         try {
132             sleep.await(FETCHTIMEOUT, TimeUnit.MILLISECONDS);
133         } catch (InterruptedException e) {
134             logger.error("Failed to put the thread to sleep: {}", e);
135         }
136     }
137 }