2 * ========================LICENSE_START=================================
5 * Copyright (C) 2024: OpenInfra Foundation Europe
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package com.demo.consumer.controllers;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import org.springframework.beans.factory.annotation.Autowired;
26 import org.springframework.web.bind.annotation.PathVariable;
27 import org.springframework.web.bind.annotation.PostMapping;
28 import org.springframework.web.bind.annotation.RequestBody;
29 import org.springframework.web.bind.annotation.RequestMapping;
30 import org.springframework.web.bind.annotation.RestController;
32 import com.demo.consumer.repository.InfoType;
33 import com.demo.consumer.repository.InfoTypes;
34 import com.demo.consumer.repository.Job.Parameters;
35 import com.demo.consumer.dme.ConsumerJobInfo;
36 import com.demo.consumer.dme.ConsumerStatusInfo;
37 import com.demo.consumer.repository.Jobs;
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
42 @RequestMapping(path = "/consumer", produces = "application/json")
43 public class ConsumerController {
44 private static final Logger log = LoggerFactory.getLogger(ConsumerController.class);
46 private static Gson gson = new GsonBuilder().create();
48 private final Jobs jobs;
49 private final InfoTypes types;
51 public ConsumerController(@Autowired Jobs jobs, @Autowired InfoTypes types) {
54 InfoType type1 = InfoType.builder().build();
55 Parameters p = Parameters.builder().build();
57 type1.setKafkaInputTopic("mytopic");
58 type1.setInputJobType("type1");
59 type1.setInputJobDefinition(p);
63 @PostMapping("/job/{infoJobId}")
64 public void startinfojob(@RequestBody String requestBody, @PathVariable String infoJobId) {
65 ConsumerJobInfo request = gson.fromJson(requestBody, ConsumerJobInfo.class);
66 log.info("Add Job Info" + infoJobId, request);
68 this.jobs.addJob(request.infoTypeId, types.getType(request.infoTypeId), request.owner,
69 toJobParameters(request.jobDefinition));
70 } catch (Exception e) {
71 log.error("Error adding the job" + infoJobId, e.getMessage());
75 @PostMapping("/info-type-status")
76 public void statusChange(@RequestBody String requestBody) {
77 ConsumerStatusInfo request = gson.fromJson(requestBody, ConsumerStatusInfo.class);
78 log.info("Add Status Job Info", request);
81 private Parameters toJobParameters(Object jobData) {
82 String json = gson.toJson(jobData);
83 return gson.fromJson(json, Parameters.class);