e503b849e531b340a9f71a07ab6f693ab3672da6
[nonrtric.git] / sample-services / ics-producer-consumer / producer / src / main / java / com / demo / producer / controllers / ThreadsController.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  *
5  * Copyright (C) 2024: OpenInfra Foundation Europe
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package com.demo.producer.controllers;
22
23 import java.util.concurrent.CompletableFuture;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.springframework.beans.factory.annotation.Autowired;
28 import org.springframework.scheduling.annotation.Async;
29 import org.springframework.web.bind.annotation.GetMapping;
30 import org.springframework.web.bind.annotation.PathVariable;
31 import org.springframework.web.bind.annotation.RestController;
32 import com.demo.producer.messages.KafkaMessageHandlerImpl;
33 import com.demo.producer.messages.ApplicationMessageHandlerImpl;
34 import com.demo.producer.producer.SimpleProducer;
35
36 @RestController
37 public class ThreadsController {
38     private static final Logger log = LoggerFactory.getLogger(ThreadsController.class);
39
40     @Autowired
41     private SimpleProducer simpleProducer;
42
43     private Thread producerThread;
44
45     @Async
46     @GetMapping("/startProducer/{topicName}")
47     public CompletableFuture<String> startProducer(@PathVariable("topicName") String topicName) {
48         try {
49             producerThread = new Thread(() -> {
50                 try {
51                     simpleProducer.runAlways(topicName, new ApplicationMessageHandlerImpl());
52                 } catch (Exception e) {
53                     log.error("Error starting producer on: " + topicName, e.getMessage());
54                 }
55             });
56             producerThread.start();
57             return CompletableFuture.completedFuture("Producer started for topic: " + topicName);
58         } catch (Exception e) {
59             return CompletableFuture.failedFuture(e);
60         }
61     }
62
63     @GetMapping("/stopProducer")
64     public String stopProducer() {
65         if (producerThread != null && producerThread.isAlive()) {
66             try {
67                 simpleProducer.shutdown();
68             } catch (Exception e) {
69                 log.error("Error stopping producer Thread", e.getMessage());
70             }
71             return "Producer stopped successfully";
72         } else {
73             return "No active producer to stop";
74         }
75     }
76
77     @GetMapping("/publish/{numberOfMessages}/on/{topicName}")
78     public CompletableFuture<String> publishNMessage(@PathVariable int numberOfMessages, @PathVariable String topicName) {
79         try {
80             producerThread = new Thread(() -> {
81                 try {
82                     simpleProducer.run(topicName, numberOfMessages, new KafkaMessageHandlerImpl());
83                 } catch (Exception e) {
84                     log.error("Error producing " + numberOfMessages + "on " + topicName, e.getMessage());
85                 }
86             });
87             producerThread.start();
88             return CompletableFuture.completedFuture("Producer started for topic: " + topicName);
89         } catch (Exception e) {
90             return CompletableFuture.failedFuture(e);
91         }
92     }
93 }