DmaapMessageHandler using the agent NBI
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / dmaap / DmaapMessageConsumerImpl.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import com.google.common.collect.Iterables;
24
25 import java.io.IOException;
26 import java.util.Properties;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29
30 import org.onap.dmaap.mr.client.MRBatchingPublisher;
31 import org.onap.dmaap.mr.client.MRClientFactory;
32 import org.onap.dmaap.mr.client.MRConsumer;
33 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
34 import org.oransc.policyagent.clients.AsyncRestClient;
35 import org.oransc.policyagent.configuration.ApplicationConfig;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import org.springframework.beans.factory.annotation.Autowired;
39 import org.springframework.beans.factory.annotation.Value;
40 import org.springframework.scheduling.annotation.EnableScheduling;
41 import org.springframework.scheduling.annotation.Scheduled;
42 import org.springframework.stereotype.Component;
43
44 @Component
45 @EnableScheduling
46 public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
47
48     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
49
50     private boolean alive = false;
51     private final ApplicationConfig applicationConfig;
52     protected MRConsumer consumer;
53     private MRBatchingPublisher producer;
54     private final long FETCHTIMEOUT = 30000;
55
56     private CountDownLatch sleep = new CountDownLatch(1);
57
58     @Value("${server.port}")
59     private int localServerPort;
60
61     @Autowired
62     public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
63         this.applicationConfig = applicationConfig;
64     }
65
66     @Scheduled(fixedRate = 1000 * 10)
67     @Override
68     public void run() {
69         if (!alive) {
70             init();
71         }
72         if (this.alive) {
73             try {
74                 Iterable<String> dmaapMsgs = fetchAllMessages();
75                 if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
76                     logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
77                     for (String msg : dmaapMsgs) {
78                         processMsg(msg);
79                     }
80                 }
81             } catch (Exception e) {
82                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
83             }
84         }
85     }
86
87     private Iterable<String> fetchAllMessages() {
88         MRConsumerResponse response = null;
89         try {
90             response = consumer.fetchWithReturnConsumerResponse();
91         } catch (Exception e) {
92             logger.error("Failed to get message from DMAAP", e);
93         }
94         if (response == null || !"200".equals(response.getResponseCode())) {
95             logger.warn("{}: DMaaP NULL response received", this);
96             sleepAfterFailure();
97         } else {
98             logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
99         }
100         return response.getActualMessages();
101     }
102
103     @Override
104     public void init() {
105         Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
106         Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
107         // No need to start if there is no configuration.
108         if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
109             || dmaapPublisherProperties.size() == 0) {
110             logger.error("DMaaP properties Failed to Load");
111             return;
112         }
113         try {
114             logger.debug("Creating DMAAP Client");
115             logger.debug("dmaapConsumerProperties---> {}", dmaapConsumerProperties.getProperty("topic"));
116             logger.debug("dmaapPublisherProperties---> {}", dmaapPublisherProperties.getProperty("topic"));
117             consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
118             producer = MRClientFactory.createBatchingPublisher(dmaapConsumerProperties);
119             this.alive = true;
120         } catch (IOException e) {
121             logger.error("Exception occurred while creating Dmaap Consumer", e);
122         }
123     }
124
125     @Override
126     public void processMsg(String msg) throws Exception {
127         logger.debug("Message Reveived from DMAAP : {}", msg);
128         createDmaapMessageHandler().handleDmaapMsg(msg);
129     }
130
131     protected DmaapMessageHandler createDmaapMessageHandler() {
132         String agentBaseUrl = "http://localhost:" + this.localServerPort;
133         AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
134         return new DmaapMessageHandler(this.producer, this.applicationConfig, agentClient);
135     }
136
137     @Override
138     public boolean isAlive() {
139         return alive;
140     }
141
142     private void sleepAfterFailure() {
143         logger.warn("DMAAP message Consumer is put to Sleep for {} milliseconds", FETCHTIMEOUT);
144         try {
145             sleep.await(FETCHTIMEOUT, TimeUnit.MILLISECONDS);
146         } catch (InterruptedException e) {
147             logger.error("Failed to put the thread to sleep: {}", e);
148         }
149     }
150 }