ICS sample producer and consumer
[nonrtric.git] / sample-services / ics-producer-consumer / consumer / src / main / java / com / demo / consumer / controllers / ThreadsController.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  *
5  * Copyright (C) 2024: OpenInfra Foundation Europe
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package com.demo.consumer.controllers;
22
23 import java.util.concurrent.CompletableFuture;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.springframework.beans.factory.annotation.Autowired;
28 import org.springframework.scheduling.annotation.Async;
29 import org.springframework.web.bind.annotation.GetMapping;
30 import org.springframework.web.bind.annotation.PathVariable;
31 import org.springframework.web.bind.annotation.RestController;
32
33 import com.demo.consumer.consumer.SimpleConsumer;
34 import com.demo.consumer.messages.ApplicationMessageHandlerImpl;
35
36 @RestController
37 public class ThreadsController {
38     private static final Logger log = LoggerFactory.getLogger(ThreadsController.class);
39
40     @Autowired
41     private SimpleConsumer simpleConsumer;
42
43     private Thread consumerThread;
44
45     @Async
46     @GetMapping("/startConsumer/{topicName}")
47     public CompletableFuture<String> startConsumer(@PathVariable("topicName") String topicName) {
48         try {
49             Thread consumerThread = new Thread(() -> {
50                 try {
51                     simpleConsumer.runAlways(topicName, new ApplicationMessageHandlerImpl());
52                 } catch (Exception e) {
53                     log.error("Error starting consuming on: " + topicName, e.getMessage());
54                 }
55             });
56             consumerThread.start();
57             return CompletableFuture.completedFuture("Consumer started for topic: " + topicName);
58         } catch (Exception e) {
59             return CompletableFuture.failedFuture(e);
60         }
61     }
62
63     @GetMapping("/stopConsumer")
64     public String stopConsumer() {
65         if (consumerThread != null && consumerThread.isAlive()) {
66             try {
67                 simpleConsumer.shutdown();
68             } catch (Exception e) {
69                 log.error("Error stopping consumer Thread", e.getMessage());
70             }
71             return "Consumer stopped successfully";
72         } else {
73             return "No active consumer to stop";
74         }
75     }
76
77     @GetMapping("/listen/{numberOfMessages}/on/{topicName}")
78     public CompletableFuture<String> listenMessage(@PathVariable int numberOfMessages, @PathVariable String topicName) {
79         try {
80             Thread consumerThread = new Thread(() -> {
81                 try {
82                     simpleConsumer.run(topicName, numberOfMessages, new ApplicationMessageHandlerImpl());
83                 } catch (Exception e) {
84                     log.error("Error starting consuming on: " + topicName, e.getMessage());
85                 }
86             });
87             consumerThread.start();
88             return CompletableFuture.completedFuture("Consumer started for topic: " + topicName);
89         } catch (Exception e) {
90             return CompletableFuture.failedFuture(e);
91         }
92     }
93 }