Merge "ICS sample producer and consumer"
[nonrtric.git] / sample-services / ics-producer-consumer / producer / src / main / java / com / demo / producer / controllers / ProducerController.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  *
5  * Copyright (C) 2024: OpenInfra Foundation Europe
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package com.demo.producer.controllers;
22
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import org.springframework.beans.factory.annotation.Autowired;
26 import org.springframework.http.HttpStatus;
27 import org.springframework.http.ResponseEntity;
28 import org.springframework.web.bind.annotation.DeleteMapping;
29 import org.springframework.web.bind.annotation.GetMapping;
30 import org.springframework.web.bind.annotation.PathVariable;
31 import org.springframework.web.bind.annotation.PostMapping;
32 import org.springframework.web.bind.annotation.RequestBody;
33 import org.springframework.web.bind.annotation.RequestMapping;
34 import org.springframework.web.bind.annotation.RestController;
35
36 import com.demo.producer.repository.InfoType;
37 import com.demo.producer.repository.InfoTypes;
38 import com.demo.producer.repository.Job.Parameters;
39 import com.demo.producer.repository.Jobs;
40 import com.demo.producer.dme.ProducerJobInfo;
41 import com.demo.producer.messages.KafkaMessageHandlerImpl;
42 import com.demo.producer.producer.SimpleProducer;
43 import com.google.gson.Gson;
44 import com.google.gson.GsonBuilder;
45
46 @RestController
47 @RequestMapping(path = "/producer", produces = "application/json")
48 public class ProducerController {
49     private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
50
51     private static Gson gson = new GsonBuilder().create();
52
53     private final Jobs jobs;
54     private final InfoTypes types;
55     private String topicName = "mytopic";
56
57
58     public ProducerController(@Autowired Jobs jobs, @Autowired InfoTypes types) {
59         this.jobs = jobs;
60         this.types = types;
61         InfoType type1 = InfoType.builder().build();
62         type1.setId("type1");
63         type1.setKafkaInputTopic(topicName);
64         type1.setInputJobType("type1");
65         type1.setInputJobDefinition(null);
66         types.put(type1);
67     }
68
69     @GetMapping("/publish/{numberOfMessages}")
70     public ResponseEntity<?> publishMessage(@PathVariable int numberOfMessages) {
71         try {
72             new SimpleProducer().run(topicName, numberOfMessages, new KafkaMessageHandlerImpl());
73             return ResponseEntity.ok("message published successfully ..");
74         } catch (Exception ex) {
75             return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
76                     .build();
77         }
78     }
79
80     @PostMapping("/job/{infoJobId}")
81     public void jobCallback(@RequestBody String requestBody, @PathVariable String infoJobId) {
82         ProducerJobInfo request = gson.fromJson(requestBody, ProducerJobInfo.class);
83         try {
84             log.info("Adding producer job info " + request.toString());
85             this.jobs.addJob(request.id, types.getType(request.typeId), request.owner,
86                     toJobParameters(request.jobData));
87         } catch (Exception e) {
88             log.error("Error adding producer job info: " + request.toString(), e.getMessage());
89         }
90     }
91
92     @PostMapping("/job")
93     public void jobCallbackNoId(@RequestBody String requestBody) {
94         ProducerJobInfo request = gson.fromJson(requestBody, ProducerJobInfo.class);
95         try {
96             log.info("Adding producer job info "+request.toString());
97             this.jobs.addJob(request.id, types.getType(request.typeId), request.owner,
98                     toJobParameters(request.jobData));
99         } catch (Exception e) {
100             log.error("Error adding producer job info: " + request.toString(), e.getMessage());
101         }
102     }
103
104     private Parameters toJobParameters(Object jobData) {
105         String json = gson.toJson(jobData);
106         return gson.fromJson(json, Parameters.class);
107     }
108
109     @GetMapping("/job")
110     public ResponseEntity<String> getJobs() {
111         try {
112             log.info("Get all jobs");
113             return new ResponseEntity<>(this.jobs.getAll().toString(), HttpStatus.OK);
114         } catch (Exception e) {
115             log.error("Error finding jobs", e.getMessage());
116             return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
117         }
118     }
119
120     @DeleteMapping("/job/{infoJobId}")
121     public ResponseEntity<String> deleteJob(@PathVariable String infoJobId) {
122         try {
123             log.info("Delete Job" + infoJobId);
124             this.jobs.delete(infoJobId);
125             return new ResponseEntity<>("Deleted job:" + infoJobId, HttpStatus.OK);
126         } catch (Exception e) {
127             log.error("Error finding job " + infoJobId, e.getMessage());
128             return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
129         }
130     }
131
132     @GetMapping("/supervision")
133     public ResponseEntity<String> getSupervision() {
134         log.info("Get Supervision");
135         return new ResponseEntity<>("Ok", HttpStatus.OK);
136     }
137 }