db4956ab39170563abc4eda7c56fe92dbf3b4105
[nonrtric.git] / DmaapMessageConsumerImpl.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import com.google.common.collect.Iterables;
24 import java.io.IOException;
25 import java.util.Properties;
26 import org.onap.dmaap.mr.client.MRBatchingPublisher;
27 import org.onap.dmaap.mr.client.MRClientFactory;
28 import org.onap.dmaap.mr.client.MRConsumer;
29 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
30 import org.oransc.policyagent.clients.AsyncRestClient;
31 import org.oransc.policyagent.configuration.ApplicationConfig;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.beans.factory.annotation.Value;
36 import org.springframework.scheduling.annotation.EnableScheduling;
37 import org.springframework.scheduling.annotation.Scheduled;
38 import org.springframework.stereotype.Component;
39
40 @Component
41 @EnableScheduling
42 public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
43
44     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
45
46     private boolean alive = false;
47     private final ApplicationConfig applicationConfig;
48     protected MRConsumer consumer;
49     private MRBatchingPublisher producer;
50
51     @Value("${server.port}")
52     private int localServerPort;
53
54     @Autowired
55     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
56         this.applicationConfig = applicationConfig;
57     }
58
59     @Scheduled(fixedRate = 1000 * 40)
60     @Override
61     public void run() {
62         if (!alive) {
63             init();
64         }
65         if (this.alive) {
66             try {
67                 Iterable<String> dmaapMsgs = fetchAllMessages();
68                 if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
69                     logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
70                     for (String msg : dmaapMsgs) {
71                         processMsg(msg);
72                     }
73                 }
74             } catch (Exception e) {
75                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
76             }
77         }
78     }
79
80     private Iterable<String> fetchAllMessages() {
81         MRConsumerResponse response = null;
82         try {
83             response = consumer.fetchWithReturnConsumerResponse();
84         } catch (Exception e) {
85             logger.error("Failed to get message from DMAAP", e);
86         }
87         if (response == null || !"200".equals(response.getResponseCode())) {
88             logger.warn("{}: DMaaP NULL response received", this);
89         } else {
90             logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
91         }
92         return response.getActualMessages();
93     }
94
95     @Override
96     public void init() {
97         Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
98         Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
99         // No need to start if there is no configuration.
100         if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
101                 || dmaapPublisherProperties.size() == 0) {
102             logger.error("DMaaP properties Failed to Load");
103             return;
104         }
105         try {
106             logger.debug("Creating DMAAP Client");
107             logger.debug("dmaapConsumerProperties---> {}", dmaapConsumerProperties.getProperty("topic"));
108             logger.debug("dmaapPublisherProperties---> {}", dmaapPublisherProperties.getProperty("topic"));
109             consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
110             producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
111             this.alive = true;
112         } catch (IOException e) {
113             logger.error("Exception occurred while creating Dmaap Consumer", e);
114         }
115     }
116
117     @Override
118     public void processMsg(String msg) throws Exception {
119         logger.debug("Message Reveived from DMAAP : {}", msg);
120         createDmaapMessageHandler().handleDmaapMsg(msg);
121     }
122
123     protected DmaapMessageHandler createDmaapMessageHandler() {
124         String agentBaseUrl = "http://localhost:" + this.localServerPort;
125         AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
126         return new DmaapMessageHandler(this.producer, this.applicationConfig, agentClient);
127     }
128
129     @Override
130     public boolean isAlive() {
131         return alive;
132     }
133 }